aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 08:48:17 -0800
committerFuwn <[email protected]>2026-02-27 08:48:17 -0800
commit219b220b99b509e65cb1325ba67f00264e7cc25c (patch)
tree746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0
parentchore: improve user-facing API copy clarity and consistency (diff)
downloadplutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz
plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip
fix: make mirror replay lossless with strict seq accounting and trace
-rw-r--r--config.default.yaml2
-rw-r--r--internal/config/config.go17
-rw-r--r--internal/ingest/client.go202
-rw-r--r--internal/ingest/service.go304
-rw-r--r--internal/ingest/service_export_integrity_test.go357
-rw-r--r--internal/ingest/trace.go79
-rw-r--r--internal/types/state.go21
7 files changed, 947 insertions, 35 deletions
diff --git a/config.default.yaml b/config.default.yaml
index 8e09923..3e71d75 100644
--- a/config.default.yaml
+++ b/config.default.yaml
@@ -7,6 +7,8 @@ block_size_mb: 8
checkpoint_interval: 100000
commit_batch_size: 128
verify_workers: 10
+export_page_size: 1000
+replay_trace: false
listen_addr: :8080
mirror_private_key_path: ./mirror.key
poll_interval: 5s
diff --git a/internal/config/config.go b/internal/config/config.go
index 11ac848..f777a21 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -29,6 +29,8 @@ type Config struct {
CheckpointInterval uint64 `yaml:"checkpoint_interval"`
CommitBatchSize int `yaml:"commit_batch_size"`
VerifyWorkers int `yaml:"verify_workers"`
+ ExportPageSize int `yaml:"export_page_size"`
+ ReplayTrace bool `yaml:"replay_trace"`
ListenAddr string `yaml:"listen_addr"`
MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"`
PollInterval time.Duration `yaml:"poll_interval"`
@@ -57,6 +59,8 @@ func Default() Config {
CheckpointInterval: 100000,
CommitBatchSize: 128,
VerifyWorkers: runtime.NumCPU(),
+ ExportPageSize: 1000,
+ ReplayTrace: false,
ListenAddr: ":8080",
MirrorPrivateKeyPath: "./mirror.key",
PollInterval: 5 * time.Second,
@@ -131,6 +135,13 @@ func applyEnv(cfg *Config) {
}
}
}
+ setBool := func(key string, dst *bool) {
+ if v := strings.TrimSpace(os.Getenv(key)); v != "" {
+ if b, err := strconv.ParseBool(v); err == nil {
+ *dst = b
+ }
+ }
+ }
setString("PLUTIA_MODE", &cfg.Mode)
setString("PLUTIA_DATA_DIR", &cfg.DataDir)
@@ -140,6 +151,8 @@ func applyEnv(cfg *Config) {
setInt("PLUTIA_BLOCK_SIZE_MB", &cfg.BlockSizeMB)
setInt("PLUTIA_COMMIT_BATCH_SIZE", &cfg.CommitBatchSize)
setInt("PLUTIA_VERIFY_WORKERS", &cfg.VerifyWorkers)
+ setInt("PLUTIA_EXPORT_PAGE_SIZE", &cfg.ExportPageSize)
+ setBool("PLUTIA_REPLAY_TRACE", &cfg.ReplayTrace)
setUint64("PLUTIA_CHECKPOINT_INTERVAL", &cfg.CheckpointInterval)
setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr)
setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath)
@@ -193,6 +206,10 @@ func (c Config) Validate() error {
return fmt.Errorf("verify_workers must be between 1 and 1024, got %d", c.VerifyWorkers)
}
+ if c.ExportPageSize <= 0 || c.ExportPageSize > 1000 {
+ return fmt.Errorf("export_page_size must be between 1 and 1000, got %d", c.ExportPageSize)
+ }
+
if c.ListenAddr == "" {
return errors.New("listen_addr is required")
}
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index ad145c3..eba6fd0 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -32,6 +32,44 @@ type ClientOptions struct {
MaxDelay time.Duration
}
+type FetchAttemptTrace struct {
+ URL string
+ After uint64
+ Count uint64
+ Attempt int
+ StatusCode int
+ Retryable bool
+ RetryAfter time.Duration
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+ FirstCreated string
+ LastCreated string
+ Error string
+}
+
+type FetchBatch struct {
+ Records []types.ExportRecord
+ RequestURL string
+ After uint64
+ Count uint64
+ Attempts int
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+ FirstCreated string
+ LastCreated string
+}
+
+type decodeStats struct {
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+}
+
func NewClient(source string, opts ...ClientOptions) *Client {
cfg := ClientOptions{
MaxAttempts: 8,
@@ -79,26 +117,69 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR
}
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
+ pageSize := uint64(1000)
+ batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return batch.Records, nil
+}
+
+func (c *Client) FetchExportBatch(ctx context.Context, after uint64, limit uint64, pageSize uint64, traceHook func(FetchAttemptTrace)) (FetchBatch, error) {
if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
- return c.fetchFromFile(after, limit)
+ records, err := c.fetchFromFile(after, limit)
+
+ if err != nil {
+ return FetchBatch{}, err
+ }
+
+ batch := FetchBatch{
+ Records: records,
+ RequestURL: c.source,
+ After: after,
+ Count: uint64(len(records)),
+ Attempts: 1,
+ ParsedLines: len(records),
+ ParsedRecords: len(records),
+ }
+
+ if len(records) > 0 {
+ batch.FirstCreated = records[0].CreatedAt
+ batch.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ return batch, nil
}
u, err := url.Parse(c.source)
if err != nil {
- return nil, fmt.Errorf("parse plc source: %w", err)
+ return FetchBatch{}, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ if pageSize == 0 {
+ pageSize = 1000
+ }
+
+ count := pageSize
+
+ if limit > 0 && limit < count {
+ count = limit
}
u.Path = strings.TrimRight(u.Path, "/") + "/export"
q := u.Query()
q.Set("after", fmt.Sprintf("%d", after))
+ q.Set("count", fmt.Sprintf("%d", count))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
- return nil, fmt.Errorf("new request: %w", err)
+ return FetchBatch{}, fmt.Errorf("new request: %w", err)
}
maxAttempts := c.opts.MaxAttempts
@@ -107,13 +188,61 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
maxAttempts = 1
}
- var lastErr error
+ var (
+ lastErr error
+ attempts int
+ )
for attempt := 1; attempt <= maxAttempts; attempt++ {
- records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+ records, retryAfter, retryable, statusCode, decStats, err := c.fetchExportOnce(req, limit)
+ attempts = attempt
+
+ if traceHook != nil {
+ trace := FetchAttemptTrace{
+ URL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempt: attempt,
+ StatusCode: statusCode,
+ Retryable: retryable,
+ RetryAfter: retryAfter,
+ ParsedLines: decStats.ParsedLines,
+ ParsedRecords: decStats.ParsedRecords,
+ SkippedLines: decStats.SkippedLines,
+ TruncatedTail: decStats.TruncatedTail,
+ }
+
+ if len(records) > 0 {
+ trace.FirstCreated = records[0].CreatedAt
+ trace.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ if err != nil {
+ trace.Error = err.Error()
+ }
+
+ traceHook(trace)
+ }
if err == nil {
- return records, nil
+ batch := FetchBatch{
+ Records: records,
+ RequestURL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempts: attempt,
+ ParsedLines: decStats.ParsedLines,
+ ParsedRecords: decStats.ParsedRecords,
+ SkippedLines: decStats.SkippedLines,
+ TruncatedTail: decStats.TruncatedTail,
+ }
+
+ if len(records) > 0 {
+ batch.FirstCreated = records[0].CreatedAt
+ batch.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ return batch, nil
}
lastErr = err
@@ -136,12 +265,17 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
case <-ctx.Done():
timer.Stop()
- return nil, ctx.Err()
+ return FetchBatch{}, ctx.Err()
case <-timer.C:
}
}
- return nil, lastErr
+ return FetchBatch{
+ RequestURL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempts: attempts,
+ }, lastErr
}
type httpStatusError struct {
@@ -153,7 +287,7 @@ func (e httpStatusError) Error() string {
return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body)
}
-func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
+func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, int, decodeStats, error) {
reqClone := req.Clone(req.Context())
reqClone.Header.Set("Accept-Encoding", "gzip")
@@ -161,7 +295,7 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
resp, err := c.http.Do(reqClone)
if err != nil {
- return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
+ return nil, 0, isTransientNetworkErr(err), 0, decodeStats{}, fmt.Errorf("fetch export: %w", err)
}
defer resp.Body.Close()
@@ -172,16 +306,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
- return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
+ return nil, retryDelay, shouldRetryStatus(resp.StatusCode), resp.StatusCode, decodeStats{}, err
}
- records, err := decodeExportBody(resp.Body, limit)
+ records, decStats, err := decodeExportBodyWithStats(resp.Body, limit)
if err != nil {
- return nil, 0, false, err
+ return nil, 0, false, resp.StatusCode, decStats, err
}
- return records, 0, false, nil
+ return records, 0, false, resp.StatusCode, decStats, nil
}
func (c *Client) backoffDelay(attempt int) time.Duration {
@@ -262,22 +396,28 @@ func isTransientNetworkErr(err error) bool {
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
+ records, _, err := decodeExportBodyWithStats(r, limit)
+
+ return records, err
+}
+
+func decodeExportBodyWithStats(r io.Reader, limit uint64) ([]types.ExportRecord, decodeStats, error) {
br := bufio.NewReader(r)
first, err := peekFirstNonSpace(br)
if err != nil {
if err == io.EOF {
- return nil, nil
+ return nil, decodeStats{}, nil
}
- return nil, err
+ return nil, decodeStats{}, err
}
if first == '[' {
b, err := io.ReadAll(br)
if err != nil {
- return nil, fmt.Errorf("read export body: %w", err)
+ return nil, decodeStats{}, fmt.Errorf("read export body: %w", err)
}
trimmed := bytes.TrimSpace(b)
@@ -285,41 +425,53 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
var records []types.ExportRecord
if err := json.Unmarshal(trimmed, &records); err != nil {
- return nil, fmt.Errorf("decode export json array: %w", err)
+ return nil, decodeStats{}, fmt.Errorf("decode export json array: %w", err)
}
if limit > 0 && uint64(len(records)) > limit {
records = records[:limit]
}
- return records, nil
+ return records, decodeStats{
+ ParsedLines: len(records),
+ ParsedRecords: len(records),
+ }, nil
}
out := make([]types.ExportRecord, 0, 1024)
+ stats := decodeStats{}
for {
line, err := br.ReadBytes('\n')
isEOF := errors.Is(err, io.EOF)
if err != nil && !isEOF {
- return nil, fmt.Errorf("read ndjson line: %w", err)
+ return nil, stats, fmt.Errorf("read ndjson line: %w", err)
}
if limit > 0 && uint64(len(out)) >= limit {
- return out, nil
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
trimmed := bytes.TrimSpace(line)
if len(trimmed) > 0 {
+ stats.ParsedLines++
+
var rec types.ExportRecord
if err := json.Unmarshal(trimmed, &rec); err != nil {
if isEOF && isTrailingNDJSONPartial(err) {
- return out, nil
+ stats.SkippedLines++
+ stats.TruncatedTail = true
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
- return nil, fmt.Errorf("decode ndjson line: %w", err)
+ return nil, stats, fmt.Errorf("decode ndjson line: %w", err)
}
out = append(out, rec)
@@ -330,7 +482,9 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
}
}
- return out, nil
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
func isTrailingNDJSONPartial(err error) bool {
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index e318422..1c95068 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -49,6 +49,8 @@ type Service struct {
corruptErr atomic.Value
startedAt time.Time
metricsSink MetricsSink
+ tracer *replayTracer
+ startupErr error
}
type MetricsSink interface {
@@ -71,6 +73,16 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
startedAt: time.Now(),
}
+ if cfg.ReplayTrace {
+ tracer, err := openReplayTracer(cfg.DataDir)
+
+ if err != nil {
+ s.startupErr = err
+ } else {
+ s.tracer = tracer
+ }
+ }
+
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
}
@@ -116,6 +128,10 @@ func (s *Service) Poll(ctx context.Context) error {
for {
if _, err := s.RunOnce(ctx); err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
+
+ if isFatalIntegrityError(err) {
+ return err
+ }
}
select {
@@ -127,6 +143,10 @@ func (s *Service) Poll(ctx context.Context) error {
}
func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if s.startupErr != nil {
+ return false, s.startupErr
+ }
+
if err := s.CorruptionError(); err != nil {
return false, err
}
@@ -153,7 +173,19 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
limit = remaining
}
- records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+ pageSize := uint64(s.cfg.ExportPageSize)
+
+ if pageSize == 0 {
+ pageSize = 1000
+ }
+
+ batch, err := s.client.FetchExportBatch(ctx, lastSeq, limit, pageSize, s.traceFetchAttempt)
+
+ if err != nil {
+ return false, err
+ }
+
+ records, err := s.prepareRecordsForCommit(lastSeq, batch)
if err != nil {
return false, err
@@ -177,6 +209,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+ if lastCommitted != lastSeq+committed {
+ err := fatalIntegrityErrorf(
+ "sequence accounting mismatch: previous_global_seq=%d committed=%d new_global_seq=%d",
+ lastSeq,
+ committed,
+ lastCommitted,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "after": lastSeq,
+ "batch_count": len(records),
+ "last_committed": lastCommitted,
+ })
+
+ return committed > 0, err
+ }
+
if latestSourceSeq > lastCommitted {
atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
} else {
@@ -199,6 +247,22 @@ type verifyResult struct {
err error
}
+type fatalIntegrityError struct {
+ msg string
+}
+
+func (e fatalIntegrityError) Error() string { return e.msg }
+
+func fatalIntegrityErrorf(format string, args ...any) error {
+ return fatalIntegrityError{msg: fmt.Sprintf(format, args...)}
+}
+
+func isFatalIntegrityError(err error) bool {
+ var target fatalIntegrityError
+
+ return errors.As(err, &target)
+}
+
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
verified, err := s.verifyRecords(ctx, records)
@@ -209,6 +273,98 @@ func (s *Service) processRecords(ctx context.Context, records []types.ExportReco
return s.commitVerified(ctx, verified)
}
+func (s *Service) prepareRecordsForCommit(lastSeq uint64, batch FetchBatch) ([]types.ExportRecord, error) {
+ if len(batch.Records) == 0 {
+ return nil, nil
+ }
+
+ sorted := append([]types.ExportRecord(nil), batch.Records...)
+
+ sort.SliceStable(sorted, func(i, j int) bool {
+ if sorted[i].Seq == sorted[j].Seq {
+ if sorted[i].DID == sorted[j].DID {
+ return sorted[i].CID < sorted[j].CID
+ }
+
+ return sorted[i].DID < sorted[j].DID
+ }
+
+ return sorted[i].Seq < sorted[j].Seq
+ })
+
+ out := make([]types.ExportRecord, 0, len(sorted))
+ seen := make(map[uint64]struct{}, len(sorted))
+ expected := lastSeq + 1
+
+ for _, rec := range sorted {
+ if rec.Seq == 0 {
+ err := fatalIntegrityErrorf(
+ "export record missing seq: after=%d request=%s did=%s cid=%s created_at=%s",
+ lastSeq,
+ batch.RequestURL,
+ rec.DID,
+ rec.CID,
+ rec.CreatedAt,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "after": lastSeq,
+ "request_url": batch.RequestURL,
+ "did": rec.DID,
+ "cid": rec.CID,
+ "created_at": rec.CreatedAt,
+ "parsed_records": batch.ParsedRecords,
+ })
+
+ return nil, err
+ }
+
+ if rec.Seq <= lastSeq {
+ s.traceDedupDecision(rec, "already_committed_seq", batch, lastSeq)
+ continue
+ }
+
+ if _, ok := seen[rec.Seq]; ok {
+ s.traceDedupDecision(rec, "duplicate_seq_in_batch", batch, lastSeq)
+ continue
+ }
+
+ if rec.Seq != expected {
+ err := fatalIntegrityErrorf(
+ "sequence gap detected: expected=%d got=%d previous_global_seq=%d request=%s first_created_at=%s last_created_at=%s parsed_records=%d skipped_lines=%d truncated_tail=%t",
+ expected,
+ rec.Seq,
+ lastSeq,
+ batch.RequestURL,
+ batch.FirstCreated,
+ batch.LastCreated,
+ batch.ParsedRecords,
+ batch.SkippedLines,
+ batch.TruncatedTail,
+ )
+ s.traceIntegrityError(err.Error(), map[string]any{
+ "expected_seq": expected,
+ "got_seq": rec.Seq,
+ "after": lastSeq,
+ "request_url": batch.RequestURL,
+ "count": batch.Count,
+ "first_created_at": batch.FirstCreated,
+ "last_created_at": batch.LastCreated,
+ "parsed_records": batch.ParsedRecords,
+ "skipped_lines": batch.SkippedLines,
+ "truncated_tail": batch.TruncatedTail,
+ })
+
+ return nil, err
+ }
+
+ seen[rec.Seq] = struct{}{}
+ out = append(out, rec)
+ expected++
+ }
+
+ return out, nil
+}
+
func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
_ = ctx
workers := s.cfg.VerifyWorkers
@@ -252,6 +408,14 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
if err := s.verifier.VerifyOperation(op, existing); err != nil {
atomic.AddUint64(&s.stats.VerifyFailures, 1)
+ if strings.Contains(err.Error(), "prev linkage mismatch") {
+ diag := s.buildPrevMismatchDiagnostic(op, existing)
+ s.tracePrevMismatch(diag)
+ results <- verifyResult{index: task.index, err: fatalIntegrityErrorf("%s", diag.Message)}
+
+ continue
+ }
+
results <- verifyResult{index: task.index, err: err}
continue
@@ -383,6 +547,139 @@ func cloneState(in *types.StateV1) *types.StateV1 {
return &out
}
+type prevMismatchDiagnostic struct {
+ Message string `json:"message"`
+ DID string `json:"did"`
+ TipCID string `json:"tip_cid"`
+ PrevCID string `json:"prev_cid"`
+ OpCID string `json:"op_cid"`
+ CreatedAt string `json:"created_at"`
+ LastOps []types.BlockRefV1 `json:"last_ops"`
+}
+
+func (s *Service) buildPrevMismatchDiagnostic(op types.ParsedOperation, existing *types.StateV1) prevMismatchDiagnostic {
+ diag := prevMismatchDiagnostic{
+ DID: op.DID,
+ PrevCID: op.Prev,
+ OpCID: op.CID,
+ CreatedAt: op.RawRecord.CreatedAt,
+ }
+
+ if existing != nil {
+ diag.TipCID = existing.ChainTipHash
+ }
+
+ const tail = 8
+
+ seqs, err := s.store.ListDIDSequences(op.DID)
+
+ if err == nil && len(seqs) > 0 {
+ start := len(seqs) - tail
+
+ if start < 0 {
+ start = 0
+ }
+
+ for _, seq := range seqs[start:] {
+ ref, ok, getErr := s.store.GetOpSeqRef(seq)
+
+ if getErr != nil || !ok {
+ continue
+ }
+
+ diag.LastOps = append(diag.LastOps, ref)
+ }
+ }
+
+ diag.Message = fmt.Sprintf(
+ "prev linkage mismatch: did=%s tip=%s op_prev=%s op_cid=%s created_at=%s",
+ diag.DID,
+ diag.TipCID,
+ diag.PrevCID,
+ diag.OpCID,
+ diag.CreatedAt,
+ )
+
+ return diag
+}
+
+func (s *Service) traceFetchAttempt(trace FetchAttemptTrace) {
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "export_fetch",
+ RequestURL: trace.URL,
+ After: trace.After,
+ Count: trace.Count,
+ Attempt: trace.Attempt,
+ StatusCode: trace.StatusCode,
+ Retryable: trace.Retryable,
+ RetryAfterMS: trace.RetryAfter.Milliseconds(),
+ ParsedLines: trace.ParsedLines,
+ ParsedRecords: trace.ParsedRecords,
+ SkippedLines: trace.SkippedLines,
+ TruncatedTail: trace.TruncatedTail,
+ FirstCreatedAt: trace.FirstCreated,
+ LastCreatedAt: trace.LastCreated,
+ Error: trace.Error,
+ })
+}
+
+func (s *Service) traceDedupDecision(rec types.ExportRecord, reason string, batch FetchBatch, after uint64) {
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "dedup_decision",
+ Reason: reason,
+ Sequence: rec.Seq,
+ DID: rec.DID,
+ CID: rec.CID,
+ CreatedAt: rec.CreatedAt,
+ After: after,
+ Count: batch.Count,
+ RequestURL: batch.RequestURL,
+ FirstCreatedAt: batch.FirstCreated,
+ LastCreatedAt: batch.LastCreated,
+ })
+}
+
+func (s *Service) tracePrevMismatch(diag prevMismatchDiagnostic) {
+ log.Printf("%s", diag.Message)
+
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "prev_mismatch",
+ DID: diag.DID,
+ Tip: diag.TipCID,
+ Prev: diag.PrevCID,
+ CID: diag.OpCID,
+ CreatedAt: diag.CreatedAt,
+ Message: diag.Message,
+ Details: diag.LastOps,
+ })
+}
+
+func (s *Service) traceIntegrityError(message string, details map[string]any) {
+ log.Printf("%s", message)
+
+ if s.tracer == nil {
+ return
+ }
+
+ s.tracer.Write(replayTraceEvent{
+ Kind: "integrity_error",
+ Message: message,
+ Details: details,
+ })
+}
+
func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
_ = ctx
batchSize := s.cfg.CommitBatchSize
@@ -473,6 +770,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
}
result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
+ result.Ref.CreatedAt = v.op.RawRecord.CreatedAt
ref = &result.Ref
}
@@ -1383,6 +1681,10 @@ func extractRotationKeysFromPayload(payload map[string]any, prior []string) []st
}
func (s *Service) Close() {
+ if s.tracer != nil {
+ _ = s.tracer.Close()
+ }
+
if s.appender != nil {
s.appender.Close()
}
diff --git a/internal/ingest/service_export_integrity_test.go b/internal/ingest/service_export_integrity_test.go
new file mode 100644
index 0000000..92a1703
--- /dev/null
+++ b/internal/ingest/service_export_integrity_test.go
@@ -0,0 +1,357 @@
+package ingest
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestReplayHandlesTruncatedTailAndOverlappingRetryWithoutGaps(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedChainRecords(t, 6)
+
+ var (
+ mu sync.Mutex
+ attempts = map[uint64]int{}
+ )
+
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ t.Helper()
+
+ q := r.URL.Query()
+ afterRaw := q.Get("after")
+ after, err := strconv.ParseUint(afterRaw, 10, 64)
+ if err != nil {
+ http.Error(w, "invalid after", http.StatusBadRequest)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/jsonlines")
+
+ mu.Lock()
+ attempts[after]++
+ attempt := attempts[after]
+ mu.Unlock()
+
+ write := func(recs ...types.ExportRecord) {
+ for _, rec := range recs {
+ b, _ := json.Marshal(rec)
+ _, _ = fmt.Fprintln(w, string(b))
+ }
+ }
+
+ switch after {
+ case 0:
+ write(records[0], records[1])
+ case 2:
+ // Truncate the tail on first pass: seq=5 is present but partial.
+ line3, _ := json.Marshal(records[2])
+ line4, _ := json.Marshal(records[3])
+ line5, _ := json.Marshal(records[4])
+ _, _ = fmt.Fprintln(w, string(line3))
+ _, _ = fmt.Fprintln(w, string(line4))
+ partial := string(line5)
+ if len(partial) > 16 {
+ partial = partial[:len(partial)-16]
+ }
+ _, _ = fmt.Fprint(w, partial)
+ case 4:
+ // First response is rate-limited; retry overlaps with an old seq.
+ if attempt == 1 {
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+ return
+ }
+
+ write(records[2], records[4], records[5])
+ case 6:
+ // caught up
+ default:
+ http.Error(w, "unexpected after", http.StatusBadRequest)
+ }
+ }))
+ defer server.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: server.URL,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 100000,
+ CommitBatchSize: 4,
+ VerifyWorkers: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: time.Second,
+ RequestTimeout: 5 * time.Second,
+ HTTPRetryMaxAttempts: 3,
+ HTTPRetryBaseDelay: 10 * time.Millisecond,
+ HTTPRetryMaxDelay: 20 * time.Millisecond,
+ RateLimit: config.RateLimit{
+ ResolveRPS: 30,
+ ResolveBurst: 60,
+ ProofRPS: 10,
+ ProofBurst: 20,
+ },
+ ReplayTrace: true,
+ }
+
+ client := NewClient(server.URL, ClientOptions{
+ MaxAttempts: cfg.HTTPRetryMaxAttempts,
+ BaseDelay: cfg.HTTPRetryBaseDelay,
+ MaxDelay: cfg.HTTPRetryMaxDelay,
+ })
+ service := NewService(cfg, store, client, bl, checkpoint.NewManager(store, dataDir, keyPath))
+ defer service.Close()
+
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ seq, err := store.GetGlobalSeq()
+ if err != nil {
+ t.Fatalf("get global seq: %v", err)
+ }
+ if seq != 6 {
+ t.Fatalf("global seq mismatch: got %d want 6", seq)
+ }
+
+ var refs int
+ var gaps uint64
+ var prev uint64
+ err = store.ForEachOpSeqRef(func(seq uint64, _ types.BlockRefV1) error {
+ refs++
+ if prev > 0 && seq > prev+1 {
+ gaps += seq - prev - 1
+ }
+ prev = seq
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("iterate op refs: %v", err)
+ }
+ if refs != 6 {
+ t.Fatalf("op ref count mismatch: got %d want 6", refs)
+ }
+ if gaps != 0 {
+ t.Fatalf("unexpected sequence gaps: %d", gaps)
+ }
+
+ tracePath := filepath.Join(dataDir, "trace.jsonl")
+ traceBytes, err := os.ReadFile(tracePath)
+ if err != nil {
+ t.Fatalf("read trace file: %v", err)
+ }
+
+ trace := string(traceBytes)
+ if !strings.Contains(trace, `"kind":"export_fetch"`) {
+ t.Fatalf("trace missing export_fetch event")
+ }
+ if !strings.Contains(trace, `"truncated_tail":true`) {
+ t.Fatalf("trace missing truncated tail marker")
+ }
+ if !strings.Contains(trace, `"kind":"dedup_decision"`) {
+ t.Fatalf("trace missing dedup decision event")
+ }
+ if !strings.Contains(trace, `"reason":"already_committed_seq"`) {
+ t.Fatalf("trace missing already_committed_seq reason")
+ }
+}
+
+func TestReplayFailsFastOnSequenceGap(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedChainRecords(t, 3)
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/jsonlines")
+ // Deliberately skip seq=2.
+ for _, rec := range []types.ExportRecord{records[0], records[2]} {
+ b, _ := json.Marshal(rec)
+ _, _ = fmt.Fprintln(w, string(b))
+ }
+ }))
+ defer server.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: server.URL,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 100000,
+ CommitBatchSize: 2,
+ VerifyWorkers: 1,
+ ExportPageSize: 1000,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: time.Second,
+ RequestTimeout: 5 * time.Second,
+ HTTPRetryMaxAttempts: 1,
+ HTTPRetryBaseDelay: 10 * time.Millisecond,
+ HTTPRetryMaxDelay: 20 * time.Millisecond,
+ RateLimit: config.RateLimit{
+ ResolveRPS: 30,
+ ResolveBurst: 60,
+ ProofRPS: 10,
+ ProofBurst: 20,
+ },
+ ReplayTrace: true,
+ }
+
+ service := NewService(cfg, store, NewClient(server.URL), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ defer service.Close()
+
+ err = service.Replay(context.Background())
+ if err == nil {
+ t.Fatalf("expected replay to fail on sequence gap")
+ }
+ if !strings.Contains(err.Error(), "sequence gap detected") {
+ t.Fatalf("unexpected error: %v", err)
+ }
+ if !strings.Contains(err.Error(), "expected=2 got=3") {
+ t.Fatalf("error missing sequence details: %v", err)
+ }
+}
+
+func buildSignedChainRecords(t *testing.T, total uint64) []types.ExportRecord {
+ t.Helper()
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+
+ out := make([]types.ExportRecord, 0, total)
+ did := "did:plc:chain"
+ createdAtBase := time.Date(2026, time.January, 1, 0, 0, 0, 0, time.UTC)
+ prev := ""
+
+ for seq := uint64(1); seq <= total; seq++ {
+ payloadDoc := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": seq},
+ }
+
+ if prev != "" {
+ payloadDoc["prev"] = prev
+ }
+
+ payloadBytes, _ := json.Marshal(payloadDoc)
+ canon, _ := types.CanonicalizeJSON(payloadBytes)
+ sig := ed25519.Sign(priv, canon)
+ op := map[string]any{
+ "did": did,
+ "didDoc": payloadDoc["didDoc"],
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ "sigPayload": base64.RawURLEncoding.EncodeToString(canon),
+ "sig": base64.RawURLEncoding.EncodeToString(sig),
+ }
+ if prev != "" {
+ op["prev"] = prev
+ }
+
+ opRaw, _ := json.Marshal(op)
+ opCanon, _ := types.CanonicalizeJSON(opRaw)
+ cid := types.ComputeDigestCID(opCanon)
+
+ createdAt := createdAtBase.Add(time.Duration(seq-1) * time.Second).Format(time.RFC3339Nano)
+ if seq == 5 || seq == 6 {
+ // Identical timestamps to validate deterministic tie handling.
+ createdAt = createdAtBase.Add(4 * time.Second).Format(time.RFC3339Nano)
+ }
+
+ out = append(out, types.ExportRecord{
+ Seq: seq,
+ DID: did,
+ CreatedAt: createdAt,
+ CID: cid,
+ Operation: opRaw,
+ })
+ prev = cid
+ }
+
+ return out
+}
diff --git a/internal/ingest/trace.go b/internal/ingest/trace.go
new file mode 100644
index 0000000..36f785b
--- /dev/null
+++ b/internal/ingest/trace.go
@@ -0,0 +1,79 @@
+package ingest
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+type replayTraceEvent struct {
+ Timestamp string `json:"timestamp"`
+ Kind string `json:"kind"`
+ RequestURL string `json:"request_url,omitempty"`
+ After uint64 `json:"after,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Attempt int `json:"attempt,omitempty"`
+ StatusCode int `json:"status_code,omitempty"`
+ Retryable bool `json:"retryable,omitempty"`
+ RetryAfterMS int64 `json:"retry_after_ms,omitempty"`
+ ParsedLines int `json:"parsed_lines,omitempty"`
+ ParsedRecords int `json:"parsed_records,omitempty"`
+ SkippedLines int `json:"skipped_lines,omitempty"`
+ TruncatedTail bool `json:"truncated_tail,omitempty"`
+ FirstCreatedAt string `json:"first_created_at,omitempty"`
+ LastCreatedAt string `json:"last_created_at,omitempty"`
+ Sequence uint64 `json:"sequence,omitempty"`
+ Reason string `json:"reason,omitempty"`
+ Message string `json:"message,omitempty"`
+ DID string `json:"did,omitempty"`
+ Prev string `json:"prev,omitempty"`
+ Tip string `json:"tip,omitempty"`
+ CID string `json:"cid,omitempty"`
+ CreatedAt string `json:"created_at,omitempty"`
+ Error string `json:"error,omitempty"`
+ Details any `json:"details,omitempty"`
+}
+
+type replayTracer struct {
+ mu sync.Mutex
+ f *os.File
+ enc *json.Encoder
+}
+
+func openReplayTracer(dataDir string) (*replayTracer, error) {
+ path := filepath.Join(dataDir, "trace.jsonl")
+ file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
+
+ if err != nil {
+ return nil, fmt.Errorf("open replay trace file: %w", err)
+ }
+
+ return &replayTracer{
+ f: file,
+ enc: json.NewEncoder(file),
+ }, nil
+}
+
+func (t *replayTracer) Write(ev replayTraceEvent) {
+ if t == nil {
+ return
+ }
+
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ ev.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
+
+ _ = t.enc.Encode(ev)
+}
+
+func (t *replayTracer) Close() error {
+ if t == nil || t.f == nil {
+ return nil
+ }
+
+ return t.f.Close()
+}
diff --git a/internal/types/state.go b/internal/types/state.go
index cba1235..31b9372 100644
--- a/internal/types/state.go
+++ b/internal/types/state.go
@@ -14,16 +14,17 @@ type StateV1 struct {
}
type BlockRefV1 struct {
- Version uint8 `json:"v"`
- BlockID uint64 `json:"block_id"`
- Offset uint64 `json:"offset"`
- Length uint64 `json:"length"`
- OpSeq uint64 `json:"op_seq"`
- DID string `json:"did"`
- CID string `json:"cid"`
- PrevCID string `json:"prev_cid"`
- OpHash string `json:"op_hash"`
- Received string `json:"received"`
+ Version uint8 `json:"v"`
+ BlockID uint64 `json:"block_id"`
+ Offset uint64 `json:"offset"`
+ Length uint64 `json:"length"`
+ OpSeq uint64 `json:"op_seq"`
+ DID string `json:"did"`
+ CID string `json:"cid"`
+ PrevCID string `json:"prev_cid"`
+ OpHash string `json:"op_hash"`
+ Received string `json:"received"`
+ CreatedAt string `json:"created_at,omitempty"`
}
type CheckpointReference struct {