aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/trace.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 08:48:17 -0800
committerFuwn <[email protected]>2026-02-27 08:48:17 -0800
commit219b220b99b509e65cb1325ba67f00264e7cc25c (patch)
tree746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/trace.go
parentchore: improve user-facing API copy clarity and consistency (diff)
downloadplutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz
plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/trace.go')
-rw-r--r--internal/ingest/trace.go79
1 files changed, 79 insertions, 0 deletions
diff --git a/internal/ingest/trace.go b/internal/ingest/trace.go
new file mode 100644
index 0000000..36f785b
--- /dev/null
+++ b/internal/ingest/trace.go
@@ -0,0 +1,79 @@
+package ingest
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "sync"
+ "time"
+)
+
+type replayTraceEvent struct {
+ Timestamp string `json:"timestamp"`
+ Kind string `json:"kind"`
+ RequestURL string `json:"request_url,omitempty"`
+ After uint64 `json:"after,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Attempt int `json:"attempt,omitempty"`
+ StatusCode int `json:"status_code,omitempty"`
+ Retryable bool `json:"retryable,omitempty"`
+ RetryAfterMS int64 `json:"retry_after_ms,omitempty"`
+ ParsedLines int `json:"parsed_lines,omitempty"`
+ ParsedRecords int `json:"parsed_records,omitempty"`
+ SkippedLines int `json:"skipped_lines,omitempty"`
+ TruncatedTail bool `json:"truncated_tail,omitempty"`
+ FirstCreatedAt string `json:"first_created_at,omitempty"`
+ LastCreatedAt string `json:"last_created_at,omitempty"`
+ Sequence uint64 `json:"sequence,omitempty"`
+ Reason string `json:"reason,omitempty"`
+ Message string `json:"message,omitempty"`
+ DID string `json:"did,omitempty"`
+ Prev string `json:"prev,omitempty"`
+ Tip string `json:"tip,omitempty"`
+ CID string `json:"cid,omitempty"`
+ CreatedAt string `json:"created_at,omitempty"`
+ Error string `json:"error,omitempty"`
+ Details any `json:"details,omitempty"`
+}
+
+type replayTracer struct {
+ mu sync.Mutex
+ f *os.File
+ enc *json.Encoder
+}
+
+func openReplayTracer(dataDir string) (*replayTracer, error) {
+ path := filepath.Join(dataDir, "trace.jsonl")
+ file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
+
+ if err != nil {
+ return nil, fmt.Errorf("open replay trace file: %w", err)
+ }
+
+ return &replayTracer{
+ f: file,
+ enc: json.NewEncoder(file),
+ }, nil
+}
+
+func (t *replayTracer) Write(ev replayTraceEvent) {
+ if t == nil {
+ return
+ }
+
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ ev.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
+
+ _ = t.enc.Encode(ev)
+}
+
+func (t *replayTracer) Close() error {
+ if t == nil || t.f == nil {
+ return nil
+ }
+
+ return t.f.Close()
+}