package ingest import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "github.com/Fuwn/plutia/internal/types" "io" "log" "net" "net/http" "net/url" "os" "path/filepath" "strconv" "strings" "time" ) type Client struct { source string http *http.Client opts ClientOptions } type ClientOptions struct { MaxAttempts int BaseDelay time.Duration MaxDelay time.Duration } func NewClient(source string, opts ...ClientOptions) *Client { cfg := ClientOptions{ MaxAttempts: 8, BaseDelay: 250 * time.Millisecond, MaxDelay: 10 * time.Second, } if len(opts) > 0 { if opts[0].MaxAttempts > 0 { cfg.MaxAttempts = opts[0].MaxAttempts } if opts[0].BaseDelay > 0 { cfg.BaseDelay = opts[0].BaseDelay } if opts[0].MaxDelay > 0 { cfg.MaxDelay = opts[0].MaxDelay } } transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, ForceAttemptHTTP2: true, MaxIdleConns: 256, MaxIdleConnsPerHost: 64, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } return &Client{ source: strings.TrimRight(source, "/"), opts: cfg, http: &http.Client{ Timeout: 60 * time.Second, Transport: transport, }, } } func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportRecord, error) { return c.FetchExportLimited(ctx, after, 0) } func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { return c.fetchFromFile(after, limit) } u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) } u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() q.Set("after", fmt.Sprintf("%d", after)) u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, fmt.Errorf("new request: %w", err) } maxAttempts := c.opts.MaxAttempts if maxAttempts < 1 { maxAttempts = 1 } var lastErr error for attempt := 1; attempt <= maxAttempts; attempt++ { records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) if err == nil { return records, nil } lastErr = err if !retryable || attempt == maxAttempts || ctx.Err() != nil { break } delay := retryAfter if delay <= 0 { delay = c.backoffDelay(attempt) } log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) timer := time.NewTimer(delay) select { case <-ctx.Done(): timer.Stop() return nil, ctx.Err() case <-timer.C: } } return nil, lastErr } type httpStatusError struct { StatusCode int Body string } func (e httpStatusError) Error() string { return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) } func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { reqClone := req.Clone(req.Context()) reqClone.Header.Set("Accept-Encoding", "gzip") resp, err := c.http.Do(reqClone) if err != nil { return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) body := strings.TrimSpace(string(b)) retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err } records, err := decodeExportBody(resp.Body, limit) if err != nil { return nil, 0, false, err } return records, 0, false, nil } func (c *Client) backoffDelay(attempt int) time.Duration { if attempt < 1 { attempt = 1 } delay := c.opts.BaseDelay if delay <= 0 { delay = 250 * time.Millisecond } maxDelay := c.opts.MaxDelay if maxDelay <= 0 { maxDelay = 10 * time.Second } for i := 1; i < attempt; i++ { delay *= 2 if delay >= maxDelay { return maxDelay } } if delay > maxDelay { return maxDelay } return delay } func shouldRetryStatus(status int) bool { switch status { case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: return true default: return false } } func parseRetryAfter(v string) time.Duration { v = strings.TrimSpace(v) if v == "" { return 0 } if secs, err := strconv.Atoi(v); err == nil && secs > 0 { return time.Duration(secs) * time.Second } if ts, err := http.ParseTime(v); err == nil { delay := time.Until(ts) if delay > 0 { return delay } } return 0 } func isTransientNetworkErr(err error) bool { if err == nil { return false } var netErr net.Error if errors.As(err, &netErr) { return true } return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) if err != nil { if err == io.EOF { return nil, nil } return nil, err } if first == '[' { b, err := io.ReadAll(br) if err != nil { return nil, fmt.Errorf("read export body: %w", err) } trimmed := bytes.TrimSpace(b) var records []types.ExportRecord if err := json.Unmarshal(trimmed, &records); err != nil { return nil, fmt.Errorf("decode export json array: %w", err) } if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } return records, nil } out := make([]types.ExportRecord, 0, 1024) for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) if err != nil && !isEOF { return nil, fmt.Errorf("read ndjson line: %w", err) } if limit > 0 && uint64(len(out)) >= limit { return out, nil } trimmed := bytes.TrimSpace(line) if len(trimmed) > 0 { var rec types.ExportRecord if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { return out, nil } return nil, fmt.Errorf("decode ndjson line: %w", err) } out = append(out, rec) } if isEOF { break } } return out, nil } func isTrailingNDJSONPartial(err error) bool { if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") { return false } return true } func peekFirstNonSpace(br *bufio.Reader) (byte, error) { for { b, err := br.ReadByte() if err != nil { return 0, err } if !isSpace(b) { if err := br.UnreadByte(); err != nil { return 0, fmt.Errorf("unread byte: %w", err) } return b, nil } } } func isSpace(b byte) bool { switch b { case ' ', '\n', '\t', '\r': return true default: return false } } func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) { path := strings.TrimPrefix(c.source, "file://") path = filepath.Clean(path) b, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("read source file: %w", err) } recs, err := decodeExportBody(bytes.NewReader(b), 0) if err != nil { return nil, err } out := make([]types.ExportRecord, 0, len(recs)) for _, r := range recs { if r.Seq <= after { continue } out = append(out, r) if limit > 0 && uint64(len(out)) >= limit { break } } return out, nil }