diff options
Diffstat (limited to 'internal/ingest/service.go')
| -rw-r--r-- | internal/ingest/service.go | 304 |
1 files changed, 303 insertions, 1 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go index e318422..1c95068 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -49,6 +49,8 @@ type Service struct { corruptErr atomic.Value startedAt time.Time metricsSink MetricsSink + tracer *replayTracer + startupErr error } type MetricsSink interface { @@ -71,6 +73,16 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog startedAt: time.Now(), } + if cfg.ReplayTrace { + tracer, err := openReplayTracer(cfg.DataDir) + + if err != nil { + s.startupErr = err + } else { + s.tracer = tracer + } + } + if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) } @@ -116,6 +128,10 @@ func (s *Service) Poll(ctx context.Context) error { for { if _, err := s.RunOnce(ctx); err != nil { atomic.AddUint64(&s.stats.Errors, 1) + + if isFatalIntegrityError(err) { + return err + } } select { @@ -127,6 +143,10 @@ func (s *Service) Poll(ctx context.Context) error { } func (s *Service) RunOnce(ctx context.Context) (bool, error) { + if s.startupErr != nil { + return false, s.startupErr + } + if err := s.CorruptionError(); err != nil { return false, err } @@ -153,7 +173,19 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { limit = remaining } - records, err := s.client.FetchExportLimited(ctx, lastSeq, limit) + pageSize := uint64(s.cfg.ExportPageSize) + + if pageSize == 0 { + pageSize = 1000 + } + + batch, err := s.client.FetchExportBatch(ctx, lastSeq, limit, pageSize, s.traceFetchAttempt) + + if err != nil { + return false, err + } + + records, err := s.prepareRecordsForCommit(lastSeq, batch) if err != nil { return false, err @@ -177,6 +209,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) + if lastCommitted != lastSeq+committed { + err := fatalIntegrityErrorf( + "sequence accounting mismatch: previous_global_seq=%d committed=%d new_global_seq=%d", + lastSeq, + committed, + lastCommitted, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "after": lastSeq, + "batch_count": len(records), + "last_committed": lastCommitted, + }) + + return committed > 0, err + } + if latestSourceSeq > lastCommitted { atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) } else { @@ -199,6 +247,22 @@ type verifyResult struct { err error } +type fatalIntegrityError struct { + msg string +} + +func (e fatalIntegrityError) Error() string { return e.msg } + +func fatalIntegrityErrorf(format string, args ...any) error { + return fatalIntegrityError{msg: fmt.Sprintf(format, args...)} +} + +func isFatalIntegrityError(err error) bool { + var target fatalIntegrityError + + return errors.As(err, &target) +} + func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { verified, err := s.verifyRecords(ctx, records) @@ -209,6 +273,98 @@ func (s *Service) processRecords(ctx context.Context, records []types.ExportReco return s.commitVerified(ctx, verified) } +func (s *Service) prepareRecordsForCommit(lastSeq uint64, batch FetchBatch) ([]types.ExportRecord, error) { + if len(batch.Records) == 0 { + return nil, nil + } + + sorted := append([]types.ExportRecord(nil), batch.Records...) + + sort.SliceStable(sorted, func(i, j int) bool { + if sorted[i].Seq == sorted[j].Seq { + if sorted[i].DID == sorted[j].DID { + return sorted[i].CID < sorted[j].CID + } + + return sorted[i].DID < sorted[j].DID + } + + return sorted[i].Seq < sorted[j].Seq + }) + + out := make([]types.ExportRecord, 0, len(sorted)) + seen := make(map[uint64]struct{}, len(sorted)) + expected := lastSeq + 1 + + for _, rec := range sorted { + if rec.Seq == 0 { + err := fatalIntegrityErrorf( + "export record missing seq: after=%d request=%s did=%s cid=%s created_at=%s", + lastSeq, + batch.RequestURL, + rec.DID, + rec.CID, + rec.CreatedAt, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "after": lastSeq, + "request_url": batch.RequestURL, + "did": rec.DID, + "cid": rec.CID, + "created_at": rec.CreatedAt, + "parsed_records": batch.ParsedRecords, + }) + + return nil, err + } + + if rec.Seq <= lastSeq { + s.traceDedupDecision(rec, "already_committed_seq", batch, lastSeq) + continue + } + + if _, ok := seen[rec.Seq]; ok { + s.traceDedupDecision(rec, "duplicate_seq_in_batch", batch, lastSeq) + continue + } + + if rec.Seq != expected { + err := fatalIntegrityErrorf( + "sequence gap detected: expected=%d got=%d previous_global_seq=%d request=%s first_created_at=%s last_created_at=%s parsed_records=%d skipped_lines=%d truncated_tail=%t", + expected, + rec.Seq, + lastSeq, + batch.RequestURL, + batch.FirstCreated, + batch.LastCreated, + batch.ParsedRecords, + batch.SkippedLines, + batch.TruncatedTail, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "expected_seq": expected, + "got_seq": rec.Seq, + "after": lastSeq, + "request_url": batch.RequestURL, + "count": batch.Count, + "first_created_at": batch.FirstCreated, + "last_created_at": batch.LastCreated, + "parsed_records": batch.ParsedRecords, + "skipped_lines": batch.SkippedLines, + "truncated_tail": batch.TruncatedTail, + }) + + return nil, err + } + + seen[rec.Seq] = struct{}{} + out = append(out, rec) + expected++ + } + + return out, nil +} + func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { _ = ctx workers := s.cfg.VerifyWorkers @@ -252,6 +408,14 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor if err := s.verifier.VerifyOperation(op, existing); err != nil { atomic.AddUint64(&s.stats.VerifyFailures, 1) + if strings.Contains(err.Error(), "prev linkage mismatch") { + diag := s.buildPrevMismatchDiagnostic(op, existing) + s.tracePrevMismatch(diag) + results <- verifyResult{index: task.index, err: fatalIntegrityErrorf("%s", diag.Message)} + + continue + } + results <- verifyResult{index: task.index, err: err} continue @@ -383,6 +547,139 @@ func cloneState(in *types.StateV1) *types.StateV1 { return &out } +type prevMismatchDiagnostic struct { + Message string `json:"message"` + DID string `json:"did"` + TipCID string `json:"tip_cid"` + PrevCID string `json:"prev_cid"` + OpCID string `json:"op_cid"` + CreatedAt string `json:"created_at"` + LastOps []types.BlockRefV1 `json:"last_ops"` +} + +func (s *Service) buildPrevMismatchDiagnostic(op types.ParsedOperation, existing *types.StateV1) prevMismatchDiagnostic { + diag := prevMismatchDiagnostic{ + DID: op.DID, + PrevCID: op.Prev, + OpCID: op.CID, + CreatedAt: op.RawRecord.CreatedAt, + } + + if existing != nil { + diag.TipCID = existing.ChainTipHash + } + + const tail = 8 + + seqs, err := s.store.ListDIDSequences(op.DID) + + if err == nil && len(seqs) > 0 { + start := len(seqs) - tail + + if start < 0 { + start = 0 + } + + for _, seq := range seqs[start:] { + ref, ok, getErr := s.store.GetOpSeqRef(seq) + + if getErr != nil || !ok { + continue + } + + diag.LastOps = append(diag.LastOps, ref) + } + } + + diag.Message = fmt.Sprintf( + "prev linkage mismatch: did=%s tip=%s op_prev=%s op_cid=%s created_at=%s", + diag.DID, + diag.TipCID, + diag.PrevCID, + diag.OpCID, + diag.CreatedAt, + ) + + return diag +} + +func (s *Service) traceFetchAttempt(trace FetchAttemptTrace) { + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "export_fetch", + RequestURL: trace.URL, + After: trace.After, + Count: trace.Count, + Attempt: trace.Attempt, + StatusCode: trace.StatusCode, + Retryable: trace.Retryable, + RetryAfterMS: trace.RetryAfter.Milliseconds(), + ParsedLines: trace.ParsedLines, + ParsedRecords: trace.ParsedRecords, + SkippedLines: trace.SkippedLines, + TruncatedTail: trace.TruncatedTail, + FirstCreatedAt: trace.FirstCreated, + LastCreatedAt: trace.LastCreated, + Error: trace.Error, + }) +} + +func (s *Service) traceDedupDecision(rec types.ExportRecord, reason string, batch FetchBatch, after uint64) { + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "dedup_decision", + Reason: reason, + Sequence: rec.Seq, + DID: rec.DID, + CID: rec.CID, + CreatedAt: rec.CreatedAt, + After: after, + Count: batch.Count, + RequestURL: batch.RequestURL, + FirstCreatedAt: batch.FirstCreated, + LastCreatedAt: batch.LastCreated, + }) +} + +func (s *Service) tracePrevMismatch(diag prevMismatchDiagnostic) { + log.Printf("%s", diag.Message) + + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "prev_mismatch", + DID: diag.DID, + Tip: diag.TipCID, + Prev: diag.PrevCID, + CID: diag.OpCID, + CreatedAt: diag.CreatedAt, + Message: diag.Message, + Details: diag.LastOps, + }) +} + +func (s *Service) traceIntegrityError(message string, details map[string]any) { + log.Printf("%s", message) + + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "integrity_error", + Message: message, + Details: details, + }) +} + func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { _ = ctx batchSize := s.cfg.CommitBatchSize @@ -473,6 +770,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( } result.Ref.Received = time.Now().UTC().Format(time.RFC3339) + result.Ref.CreatedAt = v.op.RawRecord.CreatedAt ref = &result.Ref } @@ -1383,6 +1681,10 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st } func (s *Service) Close() { + if s.tracer != nil { + _ = s.tracer.Close() + } + if s.appender != nil { s.appender.Close() } |