aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/client.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:07:03 -0800
committerFuwn <[email protected]>2026-02-26 15:07:03 -0800
commitf1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch)
tree4003d041bc46866b7d4b7f0bfb6c6924d991cbb6 /internal/ingest/client.go
parentfeat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff)
downloadplutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz
plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip
feat: harden launch readiness with versioning, metrics, and resilience
Diffstat (limited to 'internal/ingest/client.go')
-rw-r--r--internal/ingest/client.go148
1 files changed, 143 insertions, 5 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index 0f9ad43..d25b73f 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -8,11 +8,13 @@ import (
"errors"
"fmt"
"io"
+ "log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -22,9 +24,32 @@ import (
type Client struct {
source string
http *http.Client
+ opts ClientOptions
}
-func NewClient(source string) *Client {
+type ClientOptions struct {
+ MaxAttempts int
+ BaseDelay time.Duration
+ MaxDelay time.Duration
+}
+
+func NewClient(source string, opts ...ClientOptions) *Client {
+ cfg := ClientOptions{
+ MaxAttempts: 8,
+ BaseDelay: 250 * time.Millisecond,
+ MaxDelay: 10 * time.Second,
+ }
+ if len(opts) > 0 {
+ if opts[0].MaxAttempts > 0 {
+ cfg.MaxAttempts = opts[0].MaxAttempts
+ }
+ if opts[0].BaseDelay > 0 {
+ cfg.BaseDelay = opts[0].BaseDelay
+ }
+ if opts[0].MaxDelay > 0 {
+ cfg.MaxDelay = opts[0].MaxDelay
+ }
+ }
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
@@ -37,6 +62,7 @@ func NewClient(source string) *Client {
}
return &Client{
source: strings.TrimRight(source, "/"),
+ opts: cfg,
http: &http.Client{
Timeout: 60 * time.Second,
Transport: transport,
@@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
return c.fetchFromFile(after, limit)
}
+
u, err := url.Parse(c.source)
if err != nil {
return nil, fmt.Errorf("parse plc source: %w", err)
@@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
- resp, err := c.http.Do(req)
+ maxAttempts := c.opts.MaxAttempts
+ if maxAttempts < 1 {
+ maxAttempts = 1
+ }
+ var lastErr error
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+ if err == nil {
+ return records, nil
+ }
+ lastErr = err
+ if !retryable || attempt == maxAttempts || ctx.Err() != nil {
+ break
+ }
+ delay := retryAfter
+ if delay <= 0 {
+ delay = c.backoffDelay(attempt)
+ }
+ log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err)
+ timer := time.NewTimer(delay)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return nil, ctx.Err()
+ case <-timer.C:
+ }
+ }
+ return nil, lastErr
+}
+
+type httpStatusError struct {
+ StatusCode int
+ Body string
+}
+
+func (e httpStatusError) Error() string {
+ return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body)
+}
+
+func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
+ reqClone := req.Clone(req.Context())
+ reqClone.Header.Set("Accept-Encoding", "gzip")
+ resp, err := c.http.Do(reqClone)
if err != nil {
- return nil, fmt.Errorf("fetch export: %w", err)
+ return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
}
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
- return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
+ body := strings.TrimSpace(string(b))
+ retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
+ err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
+ return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
+ }
+ records, err := decodeExportBody(resp.Body, limit)
+ if err != nil {
+ return nil, 0, false, err
+ }
+ return records, 0, false, nil
+}
+
+func (c *Client) backoffDelay(attempt int) time.Duration {
+ if attempt < 1 {
+ attempt = 1
+ }
+ delay := c.opts.BaseDelay
+ if delay <= 0 {
+ delay = 250 * time.Millisecond
+ }
+ maxDelay := c.opts.MaxDelay
+ if maxDelay <= 0 {
+ maxDelay = 10 * time.Second
+ }
+ for i := 1; i < attempt; i++ {
+ delay *= 2
+ if delay >= maxDelay {
+ return maxDelay
+ }
+ }
+ if delay > maxDelay {
+ return maxDelay
+ }
+ return delay
+}
+
+func shouldRetryStatus(status int) bool {
+ switch status {
+ case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
+ return true
+ default:
+ return false
+ }
+}
+
+func parseRetryAfter(v string) time.Duration {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return 0
+ }
+ if secs, err := strconv.Atoi(v); err == nil && secs > 0 {
+ return time.Duration(secs) * time.Second
+ }
+ if ts, err := http.ParseTime(v); err == nil {
+ delay := time.Until(ts)
+ if delay > 0 {
+ return delay
+ }
+ }
+ return 0
+}
+
+func isTransientNetworkErr(err error) bool {
+ if err == nil {
+ return false
+ }
+ var netErr net.Error
+ if errors.As(err, &netErr) {
+ return true
}
- return decodeExportBody(resp.Body, limit)
+ return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {