diff options
Diffstat (limited to 'internal/ingest/client.go')
| -rw-r--r-- | internal/ingest/client.go | 148 |
1 files changed, 143 insertions, 5 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index 0f9ad43..d25b73f 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -8,11 +8,13 @@ import ( "errors" "fmt" "io" + "log" "net" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" @@ -22,9 +24,32 @@ import ( type Client struct { source string http *http.Client + opts ClientOptions } -func NewClient(source string) *Client { +type ClientOptions struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration +} + +func NewClient(source string, opts ...ClientOptions) *Client { + cfg := ClientOptions{ + MaxAttempts: 8, + BaseDelay: 250 * time.Millisecond, + MaxDelay: 10 * time.Second, + } + if len(opts) > 0 { + if opts[0].MaxAttempts > 0 { + cfg.MaxAttempts = opts[0].MaxAttempts + } + if opts[0].BaseDelay > 0 { + cfg.BaseDelay = opts[0].BaseDelay + } + if opts[0].MaxDelay > 0 { + cfg.MaxDelay = opts[0].MaxDelay + } + } transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, @@ -37,6 +62,7 @@ func NewClient(source string) *Client { } return &Client{ source: strings.TrimRight(source, "/"), + opts: cfg, http: &http.Client{ Timeout: 60 * time.Second, Transport: transport, @@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { return c.fetchFromFile(after, limit) } + u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) @@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if err != nil { return nil, fmt.Errorf("new request: %w", err) } - resp, err := c.http.Do(req) + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + if err == nil { + return records, nil + } + lastErr = err + if !retryable || attempt == maxAttempts || ctx.Err() != nil { + break + } + delay := retryAfter + if delay <= 0 { + delay = c.backoffDelay(attempt) + } + log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + return nil, lastErr +} + +type httpStatusError struct { + StatusCode int + Body string +} + +func (e httpStatusError) Error() string { + return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) +} + +func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { + reqClone := req.Clone(req.Context()) + reqClone.Header.Set("Accept-Encoding", "gzip") + resp, err := c.http.Do(reqClone) if err != nil { - return nil, fmt.Errorf("fetch export: %w", err) + return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) - return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b))) + body := strings.TrimSpace(string(b)) + retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) + err := httpStatusError{StatusCode: resp.StatusCode, Body: body} + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err + } + records, err := decodeExportBody(resp.Body, limit) + if err != nil { + return nil, 0, false, err + } + return records, 0, false, nil +} + +func (c *Client) backoffDelay(attempt int) time.Duration { + if attempt < 1 { + attempt = 1 + } + delay := c.opts.BaseDelay + if delay <= 0 { + delay = 250 * time.Millisecond + } + maxDelay := c.opts.MaxDelay + if maxDelay <= 0 { + maxDelay = 10 * time.Second + } + for i := 1; i < attempt; i++ { + delay *= 2 + if delay >= maxDelay { + return maxDelay + } + } + if delay > maxDelay { + return maxDelay + } + return delay +} + +func shouldRetryStatus(status int) bool { + switch status { + case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return true + default: + return false + } +} + +func parseRetryAfter(v string) time.Duration { + v = strings.TrimSpace(v) + if v == "" { + return 0 + } + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { + return time.Duration(secs) * time.Second + } + if ts, err := http.ParseTime(v); err == nil { + delay := time.Until(ts) + if delay > 0 { + return delay + } + } + return 0 +} + +func isTransientNetworkErr(err error) bool { + if err == nil { + return false + } + var netErr net.Error + if errors.As(err, &netErr) { + return true } - return decodeExportBody(resp.Body, limit) + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { |