diff options
Diffstat (limited to 'internal/ingest/client.go')
| -rw-r--r-- | internal/ingest/client.go | 83 |
1 files changed, 81 insertions, 2 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index d25b73f..12d8bd6 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -17,7 +17,6 @@ import ( "strconv" "strings" "time" - "github.com/Fuwn/plutia/internal/types" ) @@ -39,17 +38,21 @@ func NewClient(source string, opts ...ClientOptions) *Client { BaseDelay: 250 * time.Millisecond, MaxDelay: 10 * time.Second, } + if len(opts) > 0 { if opts[0].MaxAttempts > 0 { cfg.MaxAttempts = opts[0].MaxAttempts } + if opts[0].BaseDelay > 0 { cfg.BaseDelay = opts[0].BaseDelay } + if opts[0].MaxDelay > 0 { cfg.MaxDelay = opts[0].MaxDelay } } + transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, @@ -60,6 +63,7 @@ func NewClient(source string, opts ...ClientOptions) *Client { TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } + return &Client{ source: strings.TrimRight(source, "/"), opts: cfg, @@ -80,45 +84,63 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin } u, err := url.Parse(c.source) + if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) } + u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() + q.Set("after", fmt.Sprintf("%d", after)) - u.RawQuery = q.Encode() + u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { return nil, fmt.Errorf("new request: %w", err) } + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { maxAttempts = 1 } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + if err == nil { return records, nil } + lastErr = err + if !retryable || attempt == maxAttempts || ctx.Err() != nil { break } + delay := retryAfter + if delay <= 0 { delay = c.backoffDelay(attempt) } + log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) + timer := time.NewTimer(delay) + select { case <-ctx.Done(): timer.Stop() + return nil, ctx.Err() case <-timer.C: } } + return nil, lastErr } @@ -133,11 +155,15 @@ func (e httpStatusError) Error() string { func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { reqClone := req.Clone(req.Context()) + reqClone.Header.Set("Accept-Encoding", "gzip") + resp, err := c.http.Do(reqClone) + if err != nil { return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { @@ -145,12 +171,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor body := strings.TrimSpace(string(b)) retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err } + records, err := decodeExportBody(resp.Body, limit) + if err != nil { return nil, 0, false, err } + return records, 0, false, nil } @@ -158,23 +188,31 @@ func (c *Client) backoffDelay(attempt int) time.Duration { if attempt < 1 { attempt = 1 } + delay := c.opts.BaseDelay + if delay <= 0 { delay = 250 * time.Millisecond } + maxDelay := c.opts.MaxDelay + if maxDelay <= 0 { maxDelay = 10 * time.Second } + for i := 1; i < attempt; i++ { delay *= 2 + if delay >= maxDelay { return maxDelay } } + if delay > maxDelay { return maxDelay } + return delay } @@ -189,18 +227,23 @@ func shouldRetryStatus(status int) bool { func parseRetryAfter(v string) time.Duration { v = strings.TrimSpace(v) + if v == "" { return 0 } + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { return time.Duration(secs) * time.Second } + if ts, err := http.ParseTime(v); err == nil { delay := time.Until(ts) + if delay > 0 { return delay } } + return 0 } @@ -208,63 +251,85 @@ func isTransientNetworkErr(err error) bool { if err == nil { return false } + var netErr net.Error + if errors.As(err, &netErr) { return true } + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) + if err != nil { if err == io.EOF { return nil, nil } + return nil, err } if first == '[' { b, err := io.ReadAll(br) + if err != nil { return nil, fmt.Errorf("read export body: %w", err) } + trimmed := bytes.TrimSpace(b) + var records []types.ExportRecord + if err := json.Unmarshal(trimmed, &records); err != nil { return nil, fmt.Errorf("decode export json array: %w", err) } + if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } + return records, nil } + out := make([]types.ExportRecord, 0, 1024) + for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) + if err != nil && !isEOF { return nil, fmt.Errorf("read ndjson line: %w", err) } + if limit > 0 && uint64(len(out)) >= limit { return out, nil } + trimmed := bytes.TrimSpace(line) + if len(trimmed) > 0 { var rec types.ExportRecord + if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { return out, nil } + return nil, fmt.Errorf("decode ndjson line: %w", err) } + out = append(out, rec) } + if isEOF { break } } + return out, nil } @@ -272,19 +337,23 @@ func isTrailingNDJSONPartial(err error) bool { if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") { return false } + return true } func peekFirstNonSpace(br *bufio.Reader) (byte, error) { for { b, err := br.ReadByte() + if err != nil { return 0, err } + if !isSpace(b) { if err := br.UnreadByte(); err != nil { return 0, fmt.Errorf("unread byte: %w", err) } + return b, nil } } @@ -301,27 +370,37 @@ func isSpace(b byte) bool { func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) { path := c.source + if strings.HasPrefix(path, "file://") { path = strings.TrimPrefix(path, "file://") } + path = filepath.Clean(path) b, err := os.ReadFile(path) + if err != nil { return nil, fmt.Errorf("read source file: %w", err) } + recs, err := decodeExportBody(bytes.NewReader(b), 0) + if err != nil { return nil, err } + out := make([]types.ExportRecord, 0, len(recs)) + for _, r := range recs { if r.Seq <= after { continue } + out = append(out, r) + if limit > 0 && uint64(len(out)) >= limit { break } } + return out, nil } |