diff options
| author | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
| commit | 219b220b99b509e65cb1325ba67f00264e7cc25c (patch) | |
| tree | 746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 | |
| parent | chore: improve user-facing API copy clarity and consistency (diff) | |
| download | plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip | |
fix: make mirror replay lossless with strict seq accounting and trace
| -rw-r--r-- | config.default.yaml | 2 | ||||
| -rw-r--r-- | internal/config/config.go | 17 | ||||
| -rw-r--r-- | internal/ingest/client.go | 202 | ||||
| -rw-r--r-- | internal/ingest/service.go | 304 | ||||
| -rw-r--r-- | internal/ingest/service_export_integrity_test.go | 357 | ||||
| -rw-r--r-- | internal/ingest/trace.go | 79 | ||||
| -rw-r--r-- | internal/types/state.go | 21 |
7 files changed, 947 insertions, 35 deletions
diff --git a/config.default.yaml b/config.default.yaml index 8e09923..3e71d75 100644 --- a/config.default.yaml +++ b/config.default.yaml @@ -7,6 +7,8 @@ block_size_mb: 8 checkpoint_interval: 100000 commit_batch_size: 128 verify_workers: 10 +export_page_size: 1000 +replay_trace: false listen_addr: :8080 mirror_private_key_path: ./mirror.key poll_interval: 5s diff --git a/internal/config/config.go b/internal/config/config.go index 11ac848..f777a21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,8 @@ type Config struct { CheckpointInterval uint64 `yaml:"checkpoint_interval"` CommitBatchSize int `yaml:"commit_batch_size"` VerifyWorkers int `yaml:"verify_workers"` + ExportPageSize int `yaml:"export_page_size"` + ReplayTrace bool `yaml:"replay_trace"` ListenAddr string `yaml:"listen_addr"` MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"` PollInterval time.Duration `yaml:"poll_interval"` @@ -57,6 +59,8 @@ func Default() Config { CheckpointInterval: 100000, CommitBatchSize: 128, VerifyWorkers: runtime.NumCPU(), + ExportPageSize: 1000, + ReplayTrace: false, ListenAddr: ":8080", MirrorPrivateKeyPath: "./mirror.key", PollInterval: 5 * time.Second, @@ -131,6 +135,13 @@ func applyEnv(cfg *Config) { } } } + setBool := func(key string, dst *bool) { + if v := strings.TrimSpace(os.Getenv(key)); v != "" { + if b, err := strconv.ParseBool(v); err == nil { + *dst = b + } + } + } setString("PLUTIA_MODE", &cfg.Mode) setString("PLUTIA_DATA_DIR", &cfg.DataDir) @@ -140,6 +151,8 @@ func applyEnv(cfg *Config) { setInt("PLUTIA_BLOCK_SIZE_MB", &cfg.BlockSizeMB) setInt("PLUTIA_COMMIT_BATCH_SIZE", &cfg.CommitBatchSize) setInt("PLUTIA_VERIFY_WORKERS", &cfg.VerifyWorkers) + setInt("PLUTIA_EXPORT_PAGE_SIZE", &cfg.ExportPageSize) + setBool("PLUTIA_REPLAY_TRACE", &cfg.ReplayTrace) setUint64("PLUTIA_CHECKPOINT_INTERVAL", &cfg.CheckpointInterval) setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr) setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath) @@ -193,6 +206,10 @@ func (c Config) Validate() error { return fmt.Errorf("verify_workers must be between 1 and 1024, got %d", c.VerifyWorkers) } + if c.ExportPageSize <= 0 || c.ExportPageSize > 1000 { + return fmt.Errorf("export_page_size must be between 1 and 1000, got %d", c.ExportPageSize) + } + if c.ListenAddr == "" { return errors.New("listen_addr is required") } diff --git a/internal/ingest/client.go b/internal/ingest/client.go index ad145c3..eba6fd0 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -32,6 +32,44 @@ type ClientOptions struct { MaxDelay time.Duration } +type FetchAttemptTrace struct { + URL string + After uint64 + Count uint64 + Attempt int + StatusCode int + Retryable bool + RetryAfter time.Duration + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool + FirstCreated string + LastCreated string + Error string +} + +type FetchBatch struct { + Records []types.ExportRecord + RequestURL string + After uint64 + Count uint64 + Attempts int + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool + FirstCreated string + LastCreated string +} + +type decodeStats struct { + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool +} + func NewClient(source string, opts ...ClientOptions) *Client { cfg := ClientOptions{ MaxAttempts: 8, @@ -79,26 +117,69 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR } func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { + pageSize := uint64(1000) + batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) + + if err != nil { + return nil, err + } + + return batch.Records, nil +} + +func (c *Client) FetchExportBatch(ctx context.Context, after uint64, limit uint64, pageSize uint64, traceHook func(FetchAttemptTrace)) (FetchBatch, error) { if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { - return c.fetchFromFile(after, limit) + records, err := c.fetchFromFile(after, limit) + + if err != nil { + return FetchBatch{}, err + } + + batch := FetchBatch{ + Records: records, + RequestURL: c.source, + After: after, + Count: uint64(len(records)), + Attempts: 1, + ParsedLines: len(records), + ParsedRecords: len(records), + } + + if len(records) > 0 { + batch.FirstCreated = records[0].CreatedAt + batch.LastCreated = records[len(records)-1].CreatedAt + } + + return batch, nil } u, err := url.Parse(c.source) if err != nil { - return nil, fmt.Errorf("parse plc source: %w", err) + return FetchBatch{}, fmt.Errorf("parse plc source: %w", err) + } + + if pageSize == 0 { + pageSize = 1000 + } + + count := pageSize + + if limit > 0 && limit < count { + count = limit } u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() q.Set("after", fmt.Sprintf("%d", after)) + q.Set("count", fmt.Sprintf("%d", count)) u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { - return nil, fmt.Errorf("new request: %w", err) + return FetchBatch{}, fmt.Errorf("new request: %w", err) } maxAttempts := c.opts.MaxAttempts @@ -107,13 +188,61 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin maxAttempts = 1 } - var lastErr error + var ( + lastErr error + attempts int + ) for attempt := 1; attempt <= maxAttempts; attempt++ { - records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + records, retryAfter, retryable, statusCode, decStats, err := c.fetchExportOnce(req, limit) + attempts = attempt + + if traceHook != nil { + trace := FetchAttemptTrace{ + URL: req.URL.String(), + After: after, + Count: count, + Attempt: attempt, + StatusCode: statusCode, + Retryable: retryable, + RetryAfter: retryAfter, + ParsedLines: decStats.ParsedLines, + ParsedRecords: decStats.ParsedRecords, + SkippedLines: decStats.SkippedLines, + TruncatedTail: decStats.TruncatedTail, + } + + if len(records) > 0 { + trace.FirstCreated = records[0].CreatedAt + trace.LastCreated = records[len(records)-1].CreatedAt + } + + if err != nil { + trace.Error = err.Error() + } + + traceHook(trace) + } if err == nil { - return records, nil + batch := FetchBatch{ + Records: records, + RequestURL: req.URL.String(), + After: after, + Count: count, + Attempts: attempt, + ParsedLines: decStats.ParsedLines, + ParsedRecords: decStats.ParsedRecords, + SkippedLines: decStats.SkippedLines, + TruncatedTail: decStats.TruncatedTail, + } + + if len(records) > 0 { + batch.FirstCreated = records[0].CreatedAt + batch.LastCreated = records[len(records)-1].CreatedAt + } + + return batch, nil } lastErr = err @@ -136,12 +265,17 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin case <-ctx.Done(): timer.Stop() - return nil, ctx.Err() + return FetchBatch{}, ctx.Err() case <-timer.C: } } - return nil, lastErr + return FetchBatch{ + RequestURL: req.URL.String(), + After: after, + Count: count, + Attempts: attempts, + }, lastErr } type httpStatusError struct { @@ -153,7 +287,7 @@ func (e httpStatusError) Error() string { return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) } -func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { +func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, int, decodeStats, error) { reqClone := req.Clone(req.Context()) reqClone.Header.Set("Accept-Encoding", "gzip") @@ -161,7 +295,7 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor resp, err := c.http.Do(reqClone) if err != nil { - return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) + return nil, 0, isTransientNetworkErr(err), 0, decodeStats{}, fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() @@ -172,16 +306,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} - return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), resp.StatusCode, decodeStats{}, err } - records, err := decodeExportBody(resp.Body, limit) + records, decStats, err := decodeExportBodyWithStats(resp.Body, limit) if err != nil { - return nil, 0, false, err + return nil, 0, false, resp.StatusCode, decStats, err } - return records, 0, false, nil + return records, 0, false, resp.StatusCode, decStats, nil } func (c *Client) backoffDelay(attempt int) time.Duration { @@ -262,22 +396,28 @@ func isTransientNetworkErr(err error) bool { } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { + records, _, err := decodeExportBodyWithStats(r, limit) + + return records, err +} + +func decodeExportBodyWithStats(r io.Reader, limit uint64) ([]types.ExportRecord, decodeStats, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) if err != nil { if err == io.EOF { - return nil, nil + return nil, decodeStats{}, nil } - return nil, err + return nil, decodeStats{}, err } if first == '[' { b, err := io.ReadAll(br) if err != nil { - return nil, fmt.Errorf("read export body: %w", err) + return nil, decodeStats{}, fmt.Errorf("read export body: %w", err) } trimmed := bytes.TrimSpace(b) @@ -285,41 +425,53 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { var records []types.ExportRecord if err := json.Unmarshal(trimmed, &records); err != nil { - return nil, fmt.Errorf("decode export json array: %w", err) + return nil, decodeStats{}, fmt.Errorf("decode export json array: %w", err) } if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } - return records, nil + return records, decodeStats{ + ParsedLines: len(records), + ParsedRecords: len(records), + }, nil } out := make([]types.ExportRecord, 0, 1024) + stats := decodeStats{} for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) if err != nil && !isEOF { - return nil, fmt.Errorf("read ndjson line: %w", err) + return nil, stats, fmt.Errorf("read ndjson line: %w", err) } if limit > 0 && uint64(len(out)) >= limit { - return out, nil + stats.ParsedRecords = len(out) + + return out, stats, nil } trimmed := bytes.TrimSpace(line) if len(trimmed) > 0 { + stats.ParsedLines++ + var rec types.ExportRecord if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { - return out, nil + stats.SkippedLines++ + stats.TruncatedTail = true + stats.ParsedRecords = len(out) + + return out, stats, nil } - return nil, fmt.Errorf("decode ndjson line: %w", err) + return nil, stats, fmt.Errorf("decode ndjson line: %w", err) } out = append(out, rec) @@ -330,7 +482,9 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { } } - return out, nil + stats.ParsedRecords = len(out) + + return out, stats, nil } func isTrailingNDJSONPartial(err error) bool { diff --git a/internal/ingest/service.go b/internal/ingest/service.go index e318422..1c95068 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -49,6 +49,8 @@ type Service struct { corruptErr atomic.Value startedAt time.Time metricsSink MetricsSink + tracer *replayTracer + startupErr error } type MetricsSink interface { @@ -71,6 +73,16 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog startedAt: time.Now(), } + if cfg.ReplayTrace { + tracer, err := openReplayTracer(cfg.DataDir) + + if err != nil { + s.startupErr = err + } else { + s.tracer = tracer + } + } + if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) } @@ -116,6 +128,10 @@ func (s *Service) Poll(ctx context.Context) error { for { if _, err := s.RunOnce(ctx); err != nil { atomic.AddUint64(&s.stats.Errors, 1) + + if isFatalIntegrityError(err) { + return err + } } select { @@ -127,6 +143,10 @@ func (s *Service) Poll(ctx context.Context) error { } func (s *Service) RunOnce(ctx context.Context) (bool, error) { + if s.startupErr != nil { + return false, s.startupErr + } + if err := s.CorruptionError(); err != nil { return false, err } @@ -153,7 +173,19 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { limit = remaining } - records, err := s.client.FetchExportLimited(ctx, lastSeq, limit) + pageSize := uint64(s.cfg.ExportPageSize) + + if pageSize == 0 { + pageSize = 1000 + } + + batch, err := s.client.FetchExportBatch(ctx, lastSeq, limit, pageSize, s.traceFetchAttempt) + + if err != nil { + return false, err + } + + records, err := s.prepareRecordsForCommit(lastSeq, batch) if err != nil { return false, err @@ -177,6 +209,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) + if lastCommitted != lastSeq+committed { + err := fatalIntegrityErrorf( + "sequence accounting mismatch: previous_global_seq=%d committed=%d new_global_seq=%d", + lastSeq, + committed, + lastCommitted, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "after": lastSeq, + "batch_count": len(records), + "last_committed": lastCommitted, + }) + + return committed > 0, err + } + if latestSourceSeq > lastCommitted { atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) } else { @@ -199,6 +247,22 @@ type verifyResult struct { err error } +type fatalIntegrityError struct { + msg string +} + +func (e fatalIntegrityError) Error() string { return e.msg } + +func fatalIntegrityErrorf(format string, args ...any) error { + return fatalIntegrityError{msg: fmt.Sprintf(format, args...)} +} + +func isFatalIntegrityError(err error) bool { + var target fatalIntegrityError + + return errors.As(err, &target) +} + func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { verified, err := s.verifyRecords(ctx, records) @@ -209,6 +273,98 @@ func (s *Service) processRecords(ctx context.Context, records []types.ExportReco return s.commitVerified(ctx, verified) } +func (s *Service) prepareRecordsForCommit(lastSeq uint64, batch FetchBatch) ([]types.ExportRecord, error) { + if len(batch.Records) == 0 { + return nil, nil + } + + sorted := append([]types.ExportRecord(nil), batch.Records...) + + sort.SliceStable(sorted, func(i, j int) bool { + if sorted[i].Seq == sorted[j].Seq { + if sorted[i].DID == sorted[j].DID { + return sorted[i].CID < sorted[j].CID + } + + return sorted[i].DID < sorted[j].DID + } + + return sorted[i].Seq < sorted[j].Seq + }) + + out := make([]types.ExportRecord, 0, len(sorted)) + seen := make(map[uint64]struct{}, len(sorted)) + expected := lastSeq + 1 + + for _, rec := range sorted { + if rec.Seq == 0 { + err := fatalIntegrityErrorf( + "export record missing seq: after=%d request=%s did=%s cid=%s created_at=%s", + lastSeq, + batch.RequestURL, + rec.DID, + rec.CID, + rec.CreatedAt, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "after": lastSeq, + "request_url": batch.RequestURL, + "did": rec.DID, + "cid": rec.CID, + "created_at": rec.CreatedAt, + "parsed_records": batch.ParsedRecords, + }) + + return nil, err + } + + if rec.Seq <= lastSeq { + s.traceDedupDecision(rec, "already_committed_seq", batch, lastSeq) + continue + } + + if _, ok := seen[rec.Seq]; ok { + s.traceDedupDecision(rec, "duplicate_seq_in_batch", batch, lastSeq) + continue + } + + if rec.Seq != expected { + err := fatalIntegrityErrorf( + "sequence gap detected: expected=%d got=%d previous_global_seq=%d request=%s first_created_at=%s last_created_at=%s parsed_records=%d skipped_lines=%d truncated_tail=%t", + expected, + rec.Seq, + lastSeq, + batch.RequestURL, + batch.FirstCreated, + batch.LastCreated, + batch.ParsedRecords, + batch.SkippedLines, + batch.TruncatedTail, + ) + s.traceIntegrityError(err.Error(), map[string]any{ + "expected_seq": expected, + "got_seq": rec.Seq, + "after": lastSeq, + "request_url": batch.RequestURL, + "count": batch.Count, + "first_created_at": batch.FirstCreated, + "last_created_at": batch.LastCreated, + "parsed_records": batch.ParsedRecords, + "skipped_lines": batch.SkippedLines, + "truncated_tail": batch.TruncatedTail, + }) + + return nil, err + } + + seen[rec.Seq] = struct{}{} + out = append(out, rec) + expected++ + } + + return out, nil +} + func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { _ = ctx workers := s.cfg.VerifyWorkers @@ -252,6 +408,14 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor if err := s.verifier.VerifyOperation(op, existing); err != nil { atomic.AddUint64(&s.stats.VerifyFailures, 1) + if strings.Contains(err.Error(), "prev linkage mismatch") { + diag := s.buildPrevMismatchDiagnostic(op, existing) + s.tracePrevMismatch(diag) + results <- verifyResult{index: task.index, err: fatalIntegrityErrorf("%s", diag.Message)} + + continue + } + results <- verifyResult{index: task.index, err: err} continue @@ -383,6 +547,139 @@ func cloneState(in *types.StateV1) *types.StateV1 { return &out } +type prevMismatchDiagnostic struct { + Message string `json:"message"` + DID string `json:"did"` + TipCID string `json:"tip_cid"` + PrevCID string `json:"prev_cid"` + OpCID string `json:"op_cid"` + CreatedAt string `json:"created_at"` + LastOps []types.BlockRefV1 `json:"last_ops"` +} + +func (s *Service) buildPrevMismatchDiagnostic(op types.ParsedOperation, existing *types.StateV1) prevMismatchDiagnostic { + diag := prevMismatchDiagnostic{ + DID: op.DID, + PrevCID: op.Prev, + OpCID: op.CID, + CreatedAt: op.RawRecord.CreatedAt, + } + + if existing != nil { + diag.TipCID = existing.ChainTipHash + } + + const tail = 8 + + seqs, err := s.store.ListDIDSequences(op.DID) + + if err == nil && len(seqs) > 0 { + start := len(seqs) - tail + + if start < 0 { + start = 0 + } + + for _, seq := range seqs[start:] { + ref, ok, getErr := s.store.GetOpSeqRef(seq) + + if getErr != nil || !ok { + continue + } + + diag.LastOps = append(diag.LastOps, ref) + } + } + + diag.Message = fmt.Sprintf( + "prev linkage mismatch: did=%s tip=%s op_prev=%s op_cid=%s created_at=%s", + diag.DID, + diag.TipCID, + diag.PrevCID, + diag.OpCID, + diag.CreatedAt, + ) + + return diag +} + +func (s *Service) traceFetchAttempt(trace FetchAttemptTrace) { + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "export_fetch", + RequestURL: trace.URL, + After: trace.After, + Count: trace.Count, + Attempt: trace.Attempt, + StatusCode: trace.StatusCode, + Retryable: trace.Retryable, + RetryAfterMS: trace.RetryAfter.Milliseconds(), + ParsedLines: trace.ParsedLines, + ParsedRecords: trace.ParsedRecords, + SkippedLines: trace.SkippedLines, + TruncatedTail: trace.TruncatedTail, + FirstCreatedAt: trace.FirstCreated, + LastCreatedAt: trace.LastCreated, + Error: trace.Error, + }) +} + +func (s *Service) traceDedupDecision(rec types.ExportRecord, reason string, batch FetchBatch, after uint64) { + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "dedup_decision", + Reason: reason, + Sequence: rec.Seq, + DID: rec.DID, + CID: rec.CID, + CreatedAt: rec.CreatedAt, + After: after, + Count: batch.Count, + RequestURL: batch.RequestURL, + FirstCreatedAt: batch.FirstCreated, + LastCreatedAt: batch.LastCreated, + }) +} + +func (s *Service) tracePrevMismatch(diag prevMismatchDiagnostic) { + log.Printf("%s", diag.Message) + + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "prev_mismatch", + DID: diag.DID, + Tip: diag.TipCID, + Prev: diag.PrevCID, + CID: diag.OpCID, + CreatedAt: diag.CreatedAt, + Message: diag.Message, + Details: diag.LastOps, + }) +} + +func (s *Service) traceIntegrityError(message string, details map[string]any) { + log.Printf("%s", message) + + if s.tracer == nil { + return + } + + s.tracer.Write(replayTraceEvent{ + Kind: "integrity_error", + Message: message, + Details: details, + }) +} + func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { _ = ctx batchSize := s.cfg.CommitBatchSize @@ -473,6 +770,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( } result.Ref.Received = time.Now().UTC().Format(time.RFC3339) + result.Ref.CreatedAt = v.op.RawRecord.CreatedAt ref = &result.Ref } @@ -1383,6 +1681,10 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st } func (s *Service) Close() { + if s.tracer != nil { + _ = s.tracer.Close() + } + if s.appender != nil { s.appender.Close() } diff --git a/internal/ingest/service_export_integrity_test.go b/internal/ingest/service_export_integrity_test.go new file mode 100644 index 0000000..92a1703 --- /dev/null +++ b/internal/ingest/service_export_integrity_test.go @@ -0,0 +1,357 @@ +package ingest + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +func TestReplayHandlesTruncatedTailAndOverlappingRetryWithoutGaps(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedChainRecords(t, 6) + + var ( + mu sync.Mutex + attempts = map[uint64]int{} + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Helper() + + q := r.URL.Query() + afterRaw := q.Get("after") + after, err := strconv.ParseUint(afterRaw, 10, 64) + if err != nil { + http.Error(w, "invalid after", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/jsonlines") + + mu.Lock() + attempts[after]++ + attempt := attempts[after] + mu.Unlock() + + write := func(recs ...types.ExportRecord) { + for _, rec := range recs { + b, _ := json.Marshal(rec) + _, _ = fmt.Fprintln(w, string(b)) + } + } + + switch after { + case 0: + write(records[0], records[1]) + case 2: + // Truncate the tail on first pass: seq=5 is present but partial. + line3, _ := json.Marshal(records[2]) + line4, _ := json.Marshal(records[3]) + line5, _ := json.Marshal(records[4]) + _, _ = fmt.Fprintln(w, string(line3)) + _, _ = fmt.Fprintln(w, string(line4)) + partial := string(line5) + if len(partial) > 16 { + partial = partial[:len(partial)-16] + } + _, _ = fmt.Fprint(w, partial) + case 4: + // First response is rate-limited; retry overlaps with an old seq. + if attempt == 1 { + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + + write(records[2], records[4], records[5]) + case 6: + // caught up + default: + http.Error(w, "unexpected after", http.StatusBadRequest) + } + })) + defer server.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: server.URL, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 100000, + CommitBatchSize: 4, + VerifyWorkers: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: time.Second, + RequestTimeout: 5 * time.Second, + HTTPRetryMaxAttempts: 3, + HTTPRetryBaseDelay: 10 * time.Millisecond, + HTTPRetryMaxDelay: 20 * time.Millisecond, + RateLimit: config.RateLimit{ + ResolveRPS: 30, + ResolveBurst: 60, + ProofRPS: 10, + ProofBurst: 20, + }, + ReplayTrace: true, + } + + client := NewClient(server.URL, ClientOptions{ + MaxAttempts: cfg.HTTPRetryMaxAttempts, + BaseDelay: cfg.HTTPRetryBaseDelay, + MaxDelay: cfg.HTTPRetryMaxDelay, + }) + service := NewService(cfg, store, client, bl, checkpoint.NewManager(store, dataDir, keyPath)) + defer service.Close() + + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + seq, err := store.GetGlobalSeq() + if err != nil { + t.Fatalf("get global seq: %v", err) + } + if seq != 6 { + t.Fatalf("global seq mismatch: got %d want 6", seq) + } + + var refs int + var gaps uint64 + var prev uint64 + err = store.ForEachOpSeqRef(func(seq uint64, _ types.BlockRefV1) error { + refs++ + if prev > 0 && seq > prev+1 { + gaps += seq - prev - 1 + } + prev = seq + return nil + }) + if err != nil { + t.Fatalf("iterate op refs: %v", err) + } + if refs != 6 { + t.Fatalf("op ref count mismatch: got %d want 6", refs) + } + if gaps != 0 { + t.Fatalf("unexpected sequence gaps: %d", gaps) + } + + tracePath := filepath.Join(dataDir, "trace.jsonl") + traceBytes, err := os.ReadFile(tracePath) + if err != nil { + t.Fatalf("read trace file: %v", err) + } + + trace := string(traceBytes) + if !strings.Contains(trace, `"kind":"export_fetch"`) { + t.Fatalf("trace missing export_fetch event") + } + if !strings.Contains(trace, `"truncated_tail":true`) { + t.Fatalf("trace missing truncated tail marker") + } + if !strings.Contains(trace, `"kind":"dedup_decision"`) { + t.Fatalf("trace missing dedup decision event") + } + if !strings.Contains(trace, `"reason":"already_committed_seq"`) { + t.Fatalf("trace missing already_committed_seq reason") + } +} + +func TestReplayFailsFastOnSequenceGap(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedChainRecords(t, 3) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/jsonlines") + // Deliberately skip seq=2. + for _, rec := range []types.ExportRecord{records[0], records[2]} { + b, _ := json.Marshal(rec) + _, _ = fmt.Fprintln(w, string(b)) + } + })) + defer server.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: server.URL, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 100000, + CommitBatchSize: 2, + VerifyWorkers: 1, + ExportPageSize: 1000, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: time.Second, + RequestTimeout: 5 * time.Second, + HTTPRetryMaxAttempts: 1, + HTTPRetryBaseDelay: 10 * time.Millisecond, + HTTPRetryMaxDelay: 20 * time.Millisecond, + RateLimit: config.RateLimit{ + ResolveRPS: 30, + ResolveBurst: 60, + ProofRPS: 10, + ProofBurst: 20, + }, + ReplayTrace: true, + } + + service := NewService(cfg, store, NewClient(server.URL), bl, checkpoint.NewManager(store, dataDir, keyPath)) + defer service.Close() + + err = service.Replay(context.Background()) + if err == nil { + t.Fatalf("expected replay to fail on sequence gap") + } + if !strings.Contains(err.Error(), "sequence gap detected") { + t.Fatalf("unexpected error: %v", err) + } + if !strings.Contains(err.Error(), "expected=2 got=3") { + t.Fatalf("error missing sequence details: %v", err) + } +} + +func buildSignedChainRecords(t *testing.T, total uint64) []types.ExportRecord { + t.Helper() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + + out := make([]types.ExportRecord, 0, total) + did := "did:plc:chain" + createdAtBase := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC) + prev := "" + + for seq := uint64(1); seq <= total; seq++ { + payloadDoc := map[string]any{ + "did": did, + "didDoc": map[string]any{"id": did, "seq": seq}, + } + + if prev != "" { + payloadDoc["prev"] = prev + } + + payloadBytes, _ := json.Marshal(payloadDoc) + canon, _ := types.CanonicalizeJSON(payloadBytes) + sig := ed25519.Sign(priv, canon) + op := map[string]any{ + "did": did, + "didDoc": payloadDoc["didDoc"], + "publicKey": base64.RawURLEncoding.EncodeToString(pub), + "sigPayload": base64.RawURLEncoding.EncodeToString(canon), + "sig": base64.RawURLEncoding.EncodeToString(sig), + } + if prev != "" { + op["prev"] = prev + } + + opRaw, _ := json.Marshal(op) + opCanon, _ := types.CanonicalizeJSON(opRaw) + cid := types.ComputeDigestCID(opCanon) + + createdAt := createdAtBase.Add(time.Duration(seq-1) * time.Second).Format(time.RFC3339Nano) + if seq == 5 || seq == 6 { + // Identical timestamps to validate deterministic tie handling. + createdAt = createdAtBase.Add(4 * time.Second).Format(time.RFC3339Nano) + } + + out = append(out, types.ExportRecord{ + Seq: seq, + DID: did, + CreatedAt: createdAt, + CID: cid, + Operation: opRaw, + }) + prev = cid + } + + return out +} diff --git a/internal/ingest/trace.go b/internal/ingest/trace.go new file mode 100644 index 0000000..36f785b --- /dev/null +++ b/internal/ingest/trace.go @@ -0,0 +1,79 @@ +package ingest + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +type replayTraceEvent struct { + Timestamp string `json:"timestamp"` + Kind string `json:"kind"` + RequestURL string `json:"request_url,omitempty"` + After uint64 `json:"after,omitempty"` + Count uint64 `json:"count,omitempty"` + Attempt int `json:"attempt,omitempty"` + StatusCode int `json:"status_code,omitempty"` + Retryable bool `json:"retryable,omitempty"` + RetryAfterMS int64 `json:"retry_after_ms,omitempty"` + ParsedLines int `json:"parsed_lines,omitempty"` + ParsedRecords int `json:"parsed_records,omitempty"` + SkippedLines int `json:"skipped_lines,omitempty"` + TruncatedTail bool `json:"truncated_tail,omitempty"` + FirstCreatedAt string `json:"first_created_at,omitempty"` + LastCreatedAt string `json:"last_created_at,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + DID string `json:"did,omitempty"` + Prev string `json:"prev,omitempty"` + Tip string `json:"tip,omitempty"` + CID string `json:"cid,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + Error string `json:"error,omitempty"` + Details any `json:"details,omitempty"` +} + +type replayTracer struct { + mu sync.Mutex + f *os.File + enc *json.Encoder +} + +func openReplayTracer(dataDir string) (*replayTracer, error) { + path := filepath.Join(dataDir, "trace.jsonl") + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + + if err != nil { + return nil, fmt.Errorf("open replay trace file: %w", err) + } + + return &replayTracer{ + f: file, + enc: json.NewEncoder(file), + }, nil +} + +func (t *replayTracer) Write(ev replayTraceEvent) { + if t == nil { + return + } + + t.mu.Lock() + defer t.mu.Unlock() + + ev.Timestamp = time.Now().UTC().Format(time.RFC3339Nano) + + _ = t.enc.Encode(ev) +} + +func (t *replayTracer) Close() error { + if t == nil || t.f == nil { + return nil + } + + return t.f.Close() +} diff --git a/internal/types/state.go b/internal/types/state.go index cba1235..31b9372 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -14,16 +14,17 @@ type StateV1 struct { } type BlockRefV1 struct { - Version uint8 `json:"v"` - BlockID uint64 `json:"block_id"` - Offset uint64 `json:"offset"` - Length uint64 `json:"length"` - OpSeq uint64 `json:"op_seq"` - DID string `json:"did"` - CID string `json:"cid"` - PrevCID string `json:"prev_cid"` - OpHash string `json:"op_hash"` - Received string `json:"received"` + Version uint8 `json:"v"` + BlockID uint64 `json:"block_id"` + Offset uint64 `json:"offset"` + Length uint64 `json:"length"` + OpSeq uint64 `json:"op_seq"` + DID string `json:"did"` + CID string `json:"cid"` + PrevCID string `json:"prev_cid"` + OpHash string `json:"op_hash"` + Received string `json:"received"` + CreatedAt string `json:"created_at,omitempty"` } type CheckpointReference struct { |