aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/client.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 08:48:17 -0800
committerFuwn <[email protected]>2026-02-27 08:48:17 -0800
commit219b220b99b509e65cb1325ba67f00264e7cc25c (patch)
tree746c71e7f8c1e7a253bfe4a6991dc1af6d839fe0 /internal/ingest/client.go
parentchore: improve user-facing API copy clarity and consistency (diff)
downloadplutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.tar.xz
plutia-test-219b220b99b509e65cb1325ba67f00264e7cc25c.zip
fix: make mirror replay lossless with strict seq accounting and trace
Diffstat (limited to 'internal/ingest/client.go')
-rw-r--r--internal/ingest/client.go202
1 files changed, 178 insertions, 24 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index ad145c3..eba6fd0 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -32,6 +32,44 @@ type ClientOptions struct {
MaxDelay time.Duration
}
+type FetchAttemptTrace struct {
+ URL string
+ After uint64
+ Count uint64
+ Attempt int
+ StatusCode int
+ Retryable bool
+ RetryAfter time.Duration
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+ FirstCreated string
+ LastCreated string
+ Error string
+}
+
+type FetchBatch struct {
+ Records []types.ExportRecord
+ RequestURL string
+ After uint64
+ Count uint64
+ Attempts int
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+ FirstCreated string
+ LastCreated string
+}
+
+type decodeStats struct {
+ ParsedLines int
+ ParsedRecords int
+ SkippedLines int
+ TruncatedTail bool
+}
+
func NewClient(source string, opts ...ClientOptions) *Client {
cfg := ClientOptions{
MaxAttempts: 8,
@@ -79,26 +117,69 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR
}
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
+ pageSize := uint64(1000)
+ batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return batch.Records, nil
+}
+
+func (c *Client) FetchExportBatch(ctx context.Context, after uint64, limit uint64, pageSize uint64, traceHook func(FetchAttemptTrace)) (FetchBatch, error) {
if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
- return c.fetchFromFile(after, limit)
+ records, err := c.fetchFromFile(after, limit)
+
+ if err != nil {
+ return FetchBatch{}, err
+ }
+
+ batch := FetchBatch{
+ Records: records,
+ RequestURL: c.source,
+ After: after,
+ Count: uint64(len(records)),
+ Attempts: 1,
+ ParsedLines: len(records),
+ ParsedRecords: len(records),
+ }
+
+ if len(records) > 0 {
+ batch.FirstCreated = records[0].CreatedAt
+ batch.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ return batch, nil
}
u, err := url.Parse(c.source)
if err != nil {
- return nil, fmt.Errorf("parse plc source: %w", err)
+ return FetchBatch{}, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ if pageSize == 0 {
+ pageSize = 1000
+ }
+
+ count := pageSize
+
+ if limit > 0 && limit < count {
+ count = limit
}
u.Path = strings.TrimRight(u.Path, "/") + "/export"
q := u.Query()
q.Set("after", fmt.Sprintf("%d", after))
+ q.Set("count", fmt.Sprintf("%d", count))
u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
- return nil, fmt.Errorf("new request: %w", err)
+ return FetchBatch{}, fmt.Errorf("new request: %w", err)
}
maxAttempts := c.opts.MaxAttempts
@@ -107,13 +188,61 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
maxAttempts = 1
}
- var lastErr error
+ var (
+ lastErr error
+ attempts int
+ )
for attempt := 1; attempt <= maxAttempts; attempt++ {
- records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+ records, retryAfter, retryable, statusCode, decStats, err := c.fetchExportOnce(req, limit)
+ attempts = attempt
+
+ if traceHook != nil {
+ trace := FetchAttemptTrace{
+ URL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempt: attempt,
+ StatusCode: statusCode,
+ Retryable: retryable,
+ RetryAfter: retryAfter,
+ ParsedLines: decStats.ParsedLines,
+ ParsedRecords: decStats.ParsedRecords,
+ SkippedLines: decStats.SkippedLines,
+ TruncatedTail: decStats.TruncatedTail,
+ }
+
+ if len(records) > 0 {
+ trace.FirstCreated = records[0].CreatedAt
+ trace.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ if err != nil {
+ trace.Error = err.Error()
+ }
+
+ traceHook(trace)
+ }
if err == nil {
- return records, nil
+ batch := FetchBatch{
+ Records: records,
+ RequestURL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempts: attempt,
+ ParsedLines: decStats.ParsedLines,
+ ParsedRecords: decStats.ParsedRecords,
+ SkippedLines: decStats.SkippedLines,
+ TruncatedTail: decStats.TruncatedTail,
+ }
+
+ if len(records) > 0 {
+ batch.FirstCreated = records[0].CreatedAt
+ batch.LastCreated = records[len(records)-1].CreatedAt
+ }
+
+ return batch, nil
}
lastErr = err
@@ -136,12 +265,17 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
case <-ctx.Done():
timer.Stop()
- return nil, ctx.Err()
+ return FetchBatch{}, ctx.Err()
case <-timer.C:
}
}
- return nil, lastErr
+ return FetchBatch{
+ RequestURL: req.URL.String(),
+ After: after,
+ Count: count,
+ Attempts: attempts,
+ }, lastErr
}
type httpStatusError struct {
@@ -153,7 +287,7 @@ func (e httpStatusError) Error() string {
return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body)
}
-func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
+func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, int, decodeStats, error) {
reqClone := req.Clone(req.Context())
reqClone.Header.Set("Accept-Encoding", "gzip")
@@ -161,7 +295,7 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
resp, err := c.http.Do(reqClone)
if err != nil {
- return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
+ return nil, 0, isTransientNetworkErr(err), 0, decodeStats{}, fmt.Errorf("fetch export: %w", err)
}
defer resp.Body.Close()
@@ -172,16 +306,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
- return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
+ return nil, retryDelay, shouldRetryStatus(resp.StatusCode), resp.StatusCode, decodeStats{}, err
}
- records, err := decodeExportBody(resp.Body, limit)
+ records, decStats, err := decodeExportBodyWithStats(resp.Body, limit)
if err != nil {
- return nil, 0, false, err
+ return nil, 0, false, resp.StatusCode, decStats, err
}
- return records, 0, false, nil
+ return records, 0, false, resp.StatusCode, decStats, nil
}
func (c *Client) backoffDelay(attempt int) time.Duration {
@@ -262,22 +396,28 @@ func isTransientNetworkErr(err error) bool {
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
+ records, _, err := decodeExportBodyWithStats(r, limit)
+
+ return records, err
+}
+
+func decodeExportBodyWithStats(r io.Reader, limit uint64) ([]types.ExportRecord, decodeStats, error) {
br := bufio.NewReader(r)
first, err := peekFirstNonSpace(br)
if err != nil {
if err == io.EOF {
- return nil, nil
+ return nil, decodeStats{}, nil
}
- return nil, err
+ return nil, decodeStats{}, err
}
if first == '[' {
b, err := io.ReadAll(br)
if err != nil {
- return nil, fmt.Errorf("read export body: %w", err)
+ return nil, decodeStats{}, fmt.Errorf("read export body: %w", err)
}
trimmed := bytes.TrimSpace(b)
@@ -285,41 +425,53 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
var records []types.ExportRecord
if err := json.Unmarshal(trimmed, &records); err != nil {
- return nil, fmt.Errorf("decode export json array: %w", err)
+ return nil, decodeStats{}, fmt.Errorf("decode export json array: %w", err)
}
if limit > 0 && uint64(len(records)) > limit {
records = records[:limit]
}
- return records, nil
+ return records, decodeStats{
+ ParsedLines: len(records),
+ ParsedRecords: len(records),
+ }, nil
}
out := make([]types.ExportRecord, 0, 1024)
+ stats := decodeStats{}
for {
line, err := br.ReadBytes('\n')
isEOF := errors.Is(err, io.EOF)
if err != nil && !isEOF {
- return nil, fmt.Errorf("read ndjson line: %w", err)
+ return nil, stats, fmt.Errorf("read ndjson line: %w", err)
}
if limit > 0 && uint64(len(out)) >= limit {
- return out, nil
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
trimmed := bytes.TrimSpace(line)
if len(trimmed) > 0 {
+ stats.ParsedLines++
+
var rec types.ExportRecord
if err := json.Unmarshal(trimmed, &rec); err != nil {
if isEOF && isTrailingNDJSONPartial(err) {
- return out, nil
+ stats.SkippedLines++
+ stats.TruncatedTail = true
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
- return nil, fmt.Errorf("decode ndjson line: %w", err)
+ return nil, stats, fmt.Errorf("decode ndjson line: %w", err)
}
out = append(out, rec)
@@ -330,7 +482,9 @@ func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
}
}
- return out, nil
+ stats.ParsedRecords = len(out)
+
+ return out, stats, nil
}
func isTrailingNDJSONPartial(err error) bool {