aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/client.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 15:49:45 -0800
committerFuwn <[email protected]>2026-02-27 15:49:45 -0800
commitcace626823320be8702d9015e7a7fe29b8f206e6 (patch)
treeee5c1fc21a9f8faa54f1daebad6e3dcead136b6e /internal/ingest/client.go
parentdocs: clarify mode comparison table wording (diff)
downloadplutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.tar.xz
plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.zip
fix(thin): retry DID fetches on 429 and align DID doc shape
Diffstat (limited to 'internal/ingest/client.go')
-rw-r--r--internal/ingest/client.go73
1 files changed, 69 insertions, 4 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index bdef942..0ffa35c 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -140,9 +140,9 @@ func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRec
}
req.Header.Set("Accept", "application/json")
- resp, err := c.http.Do(req)
+ resp, err := c.doRequestWithRetries(ctx, req, "did log fetch")
if err != nil {
- return nil, fmt.Errorf("fetch did log: %w", err)
+ return nil, err
}
defer resp.Body.Close()
@@ -225,9 +225,9 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry,
return nil, fmt.Errorf("new request: %w", err)
}
- resp, err := c.http.Do(req)
+ resp, err := c.doRequestWithRetries(ctx, req, "did audit fetch")
if err != nil {
- return nil, fmt.Errorf("fetch did audit log: %w", err)
+ return nil, err
}
defer resp.Body.Close()
@@ -248,6 +248,71 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry,
return entries, nil
}
+func (c *Client) doRequestWithRetries(ctx context.Context, req *http.Request, operation string) (*http.Response, error) {
+ maxAttempts := c.opts.MaxAttempts
+ if maxAttempts < 1 {
+ maxAttempts = 1
+ }
+
+ var lastErr error
+
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ reqClone := req.Clone(ctx)
+ resp, err := c.http.Do(reqClone)
+ if err != nil {
+ lastErr = fmt.Errorf("%s: %w", operation, err)
+ if !isTransientNetworkErr(err) || attempt == maxAttempts || ctx.Err() != nil {
+ return nil, lastErr
+ }
+
+ delay := c.backoffDelay(attempt)
+ log.Printf("retrying %s attempt=%d url=%s delay=%s reason=%v", operation, attempt, req.URL.String(), delay, err)
+
+ if err := waitForRetry(ctx, delay); err != nil {
+ return nil, err
+ }
+
+ continue
+ }
+
+ if shouldRetryStatus(resp.StatusCode) && attempt < maxAttempts {
+ delay := parseRetryAfter(resp.Header.Get("Retry-After"))
+ if delay <= 0 {
+ delay = c.backoffDelay(attempt)
+ }
+
+ log.Printf("retrying %s attempt=%d url=%s status=%d delay=%s", operation, attempt, req.URL.String(), resp.StatusCode, delay)
+ _ = resp.Body.Close()
+
+ if err := waitForRetry(ctx, delay); err != nil {
+ return nil, err
+ }
+
+ continue
+ }
+
+ return resp, nil
+ }
+
+ if lastErr != nil {
+ return nil, lastErr
+ }
+
+ return nil, fmt.Errorf("%s: request attempts exhausted", operation)
+}
+
+func waitForRetry(ctx context.Context, delay time.Duration) error {
+ timer := time.NewTimer(delay)
+ defer timer.Stop()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ return nil
+ }
+}
+
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
pageSize := uint64(1000)
batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)