diff options
| author | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
| commit | 219b220b99b509e65cb1325ba67f00264e7cc25c (patch) | |
| tree | 746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/trace.go | |
| parent | chore: improve user-facing API copy clarity and consistency (diff) | |
| download | plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip | |
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/trace.go')
| -rw-r--r-- | internal/ingest/trace.go | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/internal/ingest/trace.go b/internal/ingest/trace.go new file mode 100644 index 0000000..36f785b --- /dev/null +++ b/internal/ingest/trace.go @@ -0,0 +1,79 @@ +package ingest + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +type replayTraceEvent struct { + Timestamp string `json:"timestamp"` + Kind string `json:"kind"` + RequestURL string `json:"request_url,omitempty"` + After uint64 `json:"after,omitempty"` + Count uint64 `json:"count,omitempty"` + Attempt int `json:"attempt,omitempty"` + StatusCode int `json:"status_code,omitempty"` + Retryable bool `json:"retryable,omitempty"` + RetryAfterMS int64 `json:"retry_after_ms,omitempty"` + ParsedLines int `json:"parsed_lines,omitempty"` + ParsedRecords int `json:"parsed_records,omitempty"` + SkippedLines int `json:"skipped_lines,omitempty"` + TruncatedTail bool `json:"truncated_tail,omitempty"` + FirstCreatedAt string `json:"first_created_at,omitempty"` + LastCreatedAt string `json:"last_created_at,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + DID string `json:"did,omitempty"` + Prev string `json:"prev,omitempty"` + Tip string `json:"tip,omitempty"` + CID string `json:"cid,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + Error string `json:"error,omitempty"` + Details any `json:"details,omitempty"` +} + +type replayTracer struct { + mu sync.Mutex + f *os.File + enc *json.Encoder +} + +func openReplayTracer(dataDir string) (*replayTracer, error) { + path := filepath.Join(dataDir, "trace.jsonl") + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + + if err != nil { + return nil, fmt.Errorf("open replay trace file: %w", err) + } + + return &replayTracer{ + f: file, + enc: json.NewEncoder(file), + }, nil +} + +func (t *replayTracer) Write(ev replayTraceEvent) { + if t == nil { + return + } + + t.mu.Lock() + defer t.mu.Unlock() + + ev.Timestamp = time.Now().UTC().Format(time.RFC3339Nano) + + _ = t.enc.Encode(ev) +} + +func (t *replayTracer) Close() error { + if t == nil || t.f == nil { + return nil + } + + return t.f.Close() +} |