diff options
| author | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 08:48:17 -0800 |
| commit | 219b220b99b509e65cb1325ba67f00264e7cc25c (patch) | |
| tree | 746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/client.go | |
| parent | chore: improve user-facing API copy clarity and consistency (diff) | |
| download | plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip | |
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/client.go')
| -rw-r--r-- | internal/ingest/client.go | 202 |
1 files changed, 178 insertions, 24 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index ad145c3..eba6fd0 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -32,6 +32,44 @@ type ClientOptions struct { MaxDelay time.Duration } +type FetchAttemptTrace struct { + URL string + After uint64 + Count uint64 + Attempt int + StatusCode int + Retryable bool + RetryAfter time.Duration + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool + FirstCreated string + LastCreated string + Error string +} + +type FetchBatch struct { + Records []types.ExportRecord + RequestURL string + After uint64 + Count uint64 + Attempts int + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool + FirstCreated string + LastCreated string +} + +type decodeStats struct { + ParsedLines int + ParsedRecords int + SkippedLines int + TruncatedTail bool +} + func NewClient(source string, opts ...ClientOptions) *Client { cfg := ClientOptions{ MaxAttempts: 8, @@ -79,26 +117,69 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR } func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { + pageSize := uint64(1000) + batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) + + if err != nil { + return nil, err + } + + return batch.Records, nil +} + +func (c *Client) FetchExportBatch(ctx context.Context, after uint64, limit uint64, pageSize uint64, traceHook func(FetchAttemptTrace)) (FetchBatch, error) { if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { - return c.fetchFromFile(after, limit) + records, err := c.fetchFromFile(after, limit) + + if err != nil { + return FetchBatch{}, err + } + + batch := FetchBatch{ + Records: records, + RequestURL: c.source, + After: after, + Count: uint64(len(records)), + Attempts: 1, + ParsedLines: len(records), + ParsedRecords: len(records), + } + + if len(records) > 0 { + batch.FirstCreated = records[0].CreatedAt + batch.LastCreated = records[len(records)-1].CreatedAt + } + + return batch, nil } u, err := url.Parse(c.source) if err != nil { - return nil, fmt.Errorf("parse plc source: %w", err) + return FetchBatch{}, fmt.Errorf("parse plc source: %w", err) + } + + if pageSize == 0 { + pageSize = 1000 + } + + count := pageSize + + if limit > 0 && limit < count { + count = limit } u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() q.Set("after", fmt.Sprintf("%d", after)) + q.Set("count", fmt.Sprintf("%d", count)) u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { - return nil, fmt.Errorf("new request: %w", err) + return FetchBatch{}, fmt.Errorf("new request: %w", err) } maxAttempts := c.opts.MaxAttempts @@ -107,13 +188,61 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin maxAttempts = 1 } - var lastErr error + var ( + lastErr error + attempts int + ) for attempt := 1; attempt <= maxAttempts; attempt++ { - records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + records, retryAfter, retryable, statusCode, decStats, err := c.fetchExportOnce(req, limit) + attempts = attempt + + if traceHook != nil { + trace := FetchAttemptTrace{ + URL: req.URL.String(), + After: after, + Count: count, + Attempt: attempt, + StatusCode: statusCode, + Retryable: retryable, + RetryAfter: retryAfter, + ParsedLines: decStats.ParsedLines, + ParsedRecords: decStats.ParsedRecords, + SkippedLines: decStats.SkippedLines, + TruncatedTail: decStats.TruncatedTail, + } + + if len(records) > 0 { + trace.FirstCreated = records[0].CreatedAt + trace.LastCreated = records[len(records)-1].CreatedAt + } + + if err != nil { + trace.Error = err.Error() + } + + traceHook(trace) + } if err == nil { - return records, nil + batch := FetchBatch{ + Records: records, + RequestURL: req.URL.String(), + After: after, + Count: count, + Attempts: attempt, + ParsedLines: decStats.ParsedLines, + ParsedRecords: decStats.ParsedRecords, + SkippedLines: decStats.SkippedLines, + TruncatedTail: decStats.TruncatedTail, + } + + if len(records) > 0 { + batch.FirstCreated = records[0].CreatedAt + batch.LastCreated = records[len(records)-1].CreatedAt + } + + return batch, nil } lastErr = err @@ -136,12 +265,17 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin case <-ctx.Done(): timer.Stop() - return nil, ctx.Err() + return FetchBatch{}, ctx.Err() case <-timer.C: } } - return nil, lastErr + return FetchBatch{ + RequestURL: req.URL.String(), + After: after, + Count: count, + Attempts: attempts, + }, lastErr } type httpStatusError struct { @@ -153,7 +287,7 @@ func (e httpStatusError) Error() string { return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) } -func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { +func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, int, decodeStats, error) { reqClone := req.Clone(req.Context()) reqClone.Header.Set("Accept-Encoding", "gzip") @@ -161,7 +295,7 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor resp, err := c.http.Do(reqClone) if err != nil { - return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) + return nil, 0, isTransientNetworkErr(err), 0, decodeStats{}, fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() @@ -172,16 +306,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} - return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), resp.StatusCode, decodeStats{}, err } - records, err := decodeExportBody(resp.Body, limit) + records, decStats, err := decodeExportBodyWithStats(resp.Body, limit) if err != nil { - return nil, 0, false, err + return nil, 0, false, resp.StatusCode, decStats, err } - return records, 0, false, nil + return records, 0, false, resp.StatusCode, decStats, nil } func (c *Client) backoffDelay(attempt int) time.Duration { @@ -262,22 +396,28 @@ func isTransientNetworkErr(err error) bool { } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { + records, _, err := decodeExportBodyWithStats(r, limit) + + return records, err +} + +func decodeExportBodyWithStats(r io.Reader, limit uint64) ([]types.ExportRecord, decodeStats, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) if err != nil { if err == io.EOF { - return nil, nil + return nil, decodeStats{}, nil } - return nil, err + return nil, decodeStats{}, err } if first == '[' { b, err := io.ReadAll(br) if err != nil { - return nil, fmt.Errorf("read export body: %w", err) + return nil, decodeStats{}, fmt.Errorf("read export body: %w", err) } trimmed := bytes.TrimSpace(b) @@ -285,41 +425,53 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { var records []types.ExportRecord if err := json.Unmarshal(trimmed, &records); err != nil { - return nil, fmt.Errorf("decode export json array: %w", err) + return nil, decodeStats{}, fmt.Errorf("decode export json array: %w", err) } if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } - return records, nil + return records, decodeStats{ + ParsedLines: len(records), + ParsedRecords: len(records), + }, nil } out := make([]types.ExportRecord, 0, 1024) + stats := decodeStats{} for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) if err != nil && !isEOF { - return nil, fmt.Errorf("read ndjson line: %w", err) + return nil, stats, fmt.Errorf("read ndjson line: %w", err) } if limit > 0 && uint64(len(out)) >= limit { - return out, nil + stats.ParsedRecords = len(out) + + return out, stats, nil } trimmed := bytes.TrimSpace(line) if len(trimmed) > 0 { + stats.ParsedLines++ + var rec types.ExportRecord if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { - return out, nil + stats.SkippedLines++ + stats.TruncatedTail = true + stats.ParsedRecords = len(out) + + return out, stats, nil } - return nil, fmt.Errorf("decode ndjson line: %w", err) + return nil, stats, fmt.Errorf("decode ndjson line: %w", err) } out = append(out, rec) @@ -330,7 +482,9 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { } } - return out, nil + stats.ParsedRecords = len(out) + + return out, stats, nil } func isTrailingNDJSONPartial(err error) bool { |