diff options
| author | Fuwn <[email protected]> | 2026-02-27 15:49:45 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 15:49:45 -0800 |
| commit | cace626823320be8702d9015e7a7fe29b8f206e6 (patch) | |
| tree | ee5c1fc21a9f8faa54f1daebad6e3dcead136b6e /internal/ingest/client.go | |
| parent | docs: clarify mode comparison table wording (diff) | |
| download | plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.tar.xz plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.zip | |
fix(thin): retry DID fetches on 429 and align DID doc shape
Diffstat (limited to 'internal/ingest/client.go')
| -rw-r--r-- | internal/ingest/client.go | 73 |
1 files changed, 69 insertions, 4 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index bdef942..0ffa35c 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -140,9 +140,9 @@ func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRec } req.Header.Set("Accept", "application/json") - resp, err := c.http.Do(req) + resp, err := c.doRequestWithRetries(ctx, req, "did log fetch") if err != nil { - return nil, fmt.Errorf("fetch did log: %w", err) + return nil, err } defer resp.Body.Close() @@ -225,9 +225,9 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, return nil, fmt.Errorf("new request: %w", err) } - resp, err := c.http.Do(req) + resp, err := c.doRequestWithRetries(ctx, req, "did audit fetch") if err != nil { - return nil, fmt.Errorf("fetch did audit log: %w", err) + return nil, err } defer resp.Body.Close() @@ -248,6 +248,71 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, return entries, nil } +func (c *Client) doRequestWithRetries(ctx context.Context, req *http.Request, operation string) (*http.Response, error) { + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + + var lastErr error + + for attempt := 1; attempt <= maxAttempts; attempt++ { + reqClone := req.Clone(ctx) + resp, err := c.http.Do(reqClone) + if err != nil { + lastErr = fmt.Errorf("%s: %w", operation, err) + if !isTransientNetworkErr(err) || attempt == maxAttempts || ctx.Err() != nil { + return nil, lastErr + } + + delay := c.backoffDelay(attempt) + log.Printf("retrying %s attempt=%d url=%s delay=%s reason=%v", operation, attempt, req.URL.String(), delay, err) + + if err := waitForRetry(ctx, delay); err != nil { + return nil, err + } + + continue + } + + if shouldRetryStatus(resp.StatusCode) && attempt < maxAttempts { + delay := parseRetryAfter(resp.Header.Get("Retry-After")) + if delay <= 0 { + delay = c.backoffDelay(attempt) + } + + log.Printf("retrying %s attempt=%d url=%s status=%d delay=%s", operation, attempt, req.URL.String(), resp.StatusCode, delay) + _ = resp.Body.Close() + + if err := waitForRetry(ctx, delay); err != nil { + return nil, err + } + + continue + } + + return resp, nil + } + + if lastErr != nil { + return nil, lastErr + } + + return nil, fmt.Errorf("%s: request attempts exhausted", operation) +} + +func waitForRetry(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { pageSize := uint64(1000) batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) |