aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest/client.go')
-rw-r--r--internal/ingest/client.go83
1 files changed, 81 insertions, 2 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index d25b73f..12d8bd6 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -17,7 +17,6 @@ import (
"strconv"
"strings"
"time"
-
"github.com/Fuwn/plutia/internal/types"
)
@@ -39,17 +38,21 @@ func NewClient(source string, opts ...ClientOptions) *Client {
BaseDelay: 250 * time.Millisecond,
MaxDelay: 10 * time.Second,
}
+
if len(opts) > 0 {
if opts[0].MaxAttempts > 0 {
cfg.MaxAttempts = opts[0].MaxAttempts
}
+
if opts[0].BaseDelay > 0 {
cfg.BaseDelay = opts[0].BaseDelay
}
+
if opts[0].MaxDelay > 0 {
cfg.MaxDelay = opts[0].MaxDelay
}
}
+
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
@@ -60,6 +63,7 @@ func NewClient(source string, opts ...ClientOptions) *Client {
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
+
return &Client{
source: strings.TrimRight(source, "/"),
opts: cfg,
@@ -80,45 +84,63 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
}
u, err := url.Parse(c.source)
+
if err != nil {
return nil, fmt.Errorf("parse plc source: %w", err)
}
+
u.Path = strings.TrimRight(u.Path, "/") + "/export"
q := u.Query()
+
q.Set("after", fmt.Sprintf("%d", after))
- u.RawQuery = q.Encode()
+ u.RawQuery = q.Encode()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
+
maxAttempts := c.opts.MaxAttempts
+
if maxAttempts < 1 {
maxAttempts = 1
}
+
var lastErr error
+
for attempt := 1; attempt <= maxAttempts; attempt++ {
records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+
if err == nil {
return records, nil
}
+
lastErr = err
+
if !retryable || attempt == maxAttempts || ctx.Err() != nil {
break
}
+
delay := retryAfter
+
if delay <= 0 {
delay = c.backoffDelay(attempt)
}
+
log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err)
+
timer := time.NewTimer(delay)
+
select {
case <-ctx.Done():
timer.Stop()
+
return nil, ctx.Err()
case <-timer.C:
}
}
+
return nil, lastErr
}
@@ -133,11 +155,15 @@ func (e httpStatusError) Error() string {
func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
reqClone := req.Clone(req.Context())
+
reqClone.Header.Set("Accept-Encoding", "gzip")
+
resp, err := c.http.Do(reqClone)
+
if err != nil {
return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
}
+
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
@@ -145,12 +171,16 @@ func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.Expor
body := strings.TrimSpace(string(b))
retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
+
return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
}
+
records, err := decodeExportBody(resp.Body, limit)
+
if err != nil {
return nil, 0, false, err
}
+
return records, 0, false, nil
}
@@ -158,23 +188,31 @@ func (c *Client) backoffDelay(attempt int) time.Duration {
if attempt < 1 {
attempt = 1
}
+
delay := c.opts.BaseDelay
+
if delay <= 0 {
delay = 250 * time.Millisecond
}
+
maxDelay := c.opts.MaxDelay
+
if maxDelay <= 0 {
maxDelay = 10 * time.Second
}
+
for i := 1; i < attempt; i++ {
delay *= 2
+
if delay >= maxDelay {
return maxDelay
}
}
+
if delay > maxDelay {
return maxDelay
}
+
return delay
}
@@ -189,18 +227,23 @@ func shouldRetryStatus(status int) bool {
func parseRetryAfter(v string) time.Duration {
v = strings.TrimSpace(v)
+
if v == "" {
return 0
}
+
if secs, err := strconv.Atoi(v); err == nil && secs > 0 {
return time.Duration(secs) * time.Second
}
+
if ts, err := http.ParseTime(v); err == nil {
delay := time.Until(ts)
+
if delay > 0 {
return delay
}
}
+
return 0
}
@@ -208,63 +251,85 @@ func isTransientNetworkErr(err error) bool {
if err == nil {
return false
}
+
var netErr net.Error
+
if errors.As(err, &netErr) {
return true
}
+
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
br := bufio.NewReader(r)
first, err := peekFirstNonSpace(br)
+
if err != nil {
if err == io.EOF {
return nil, nil
}
+
return nil, err
}
if first == '[' {
b, err := io.ReadAll(br)
+
if err != nil {
return nil, fmt.Errorf("read export body: %w", err)
}
+
trimmed := bytes.TrimSpace(b)
+
var records []types.ExportRecord
+
if err := json.Unmarshal(trimmed, &records); err != nil {
return nil, fmt.Errorf("decode export json array: %w", err)
}
+
if limit > 0 && uint64(len(records)) > limit {
records = records[:limit]
}
+
return records, nil
}
+
out := make([]types.ExportRecord, 0, 1024)
+
for {
line, err := br.ReadBytes('\n')
isEOF := errors.Is(err, io.EOF)
+
if err != nil && !isEOF {
return nil, fmt.Errorf("read ndjson line: %w", err)
}
+
if limit > 0 && uint64(len(out)) >= limit {
return out, nil
}
+
trimmed := bytes.TrimSpace(line)
+
if len(trimmed) > 0 {
var rec types.ExportRecord
+
if err := json.Unmarshal(trimmed, &rec); err != nil {
if isEOF && isTrailingNDJSONPartial(err) {
return out, nil
}
+
return nil, fmt.Errorf("decode ndjson line: %w", err)
}
+
out = append(out, rec)
}
+
if isEOF {
break
}
}
+
return out, nil
}
@@ -272,19 +337,23 @@ func isTrailingNDJSONPartial(err error) bool {
if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") {
return false
}
+
return true
}
func peekFirstNonSpace(br *bufio.Reader) (byte, error) {
for {
b, err := br.ReadByte()
+
if err != nil {
return 0, err
}
+
if !isSpace(b) {
if err := br.UnreadByte(); err != nil {
return 0, fmt.Errorf("unread byte: %w", err)
}
+
return b, nil
}
}
@@ -301,27 +370,37 @@ func isSpace(b byte) bool {
func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) {
path := c.source
+
if strings.HasPrefix(path, "file://") {
path = strings.TrimPrefix(path, "file://")
}
+
path = filepath.Clean(path)
b, err := os.ReadFile(path)
+
if err != nil {
return nil, fmt.Errorf("read source file: %w", err)
}
+
recs, err := decodeExportBody(bytes.NewReader(b), 0)
+
if err != nil {
return nil, err
}
+
out := make([]types.ExportRecord, 0, len(recs))
+
for _, r := range recs {
if r.Seq <= after {
continue
}
+
out = append(out, r)
+
if limit > 0 && uint64(len(out)) >= limit {
break
}
}
+
return out, nil
}