diff options
| author | Fuwn <[email protected]> | 2026-02-27 15:49:45 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 15:49:45 -0800 |
| commit | cace626823320be8702d9015e7a7fe29b8f206e6 (patch) | |
| tree | ee5c1fc21a9f8faa54f1daebad6e3dcead136b6e | |
| parent | docs: clarify mode comparison table wording (diff) | |
| download | plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.tar.xz plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.zip | |
fix(thin): retry DID fetches on 429 and align DID doc shape
| -rw-r--r-- | internal/ingest/client.go | 73 | ||||
| -rw-r--r-- | internal/ingest/client_retry_test.go | 108 | ||||
| -rw-r--r-- | internal/ingest/service.go | 90 |
3 files changed, 267 insertions, 4 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index bdef942..0ffa35c 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -140,9 +140,9 @@ func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRec } req.Header.Set("Accept", "application/json") - resp, err := c.http.Do(req) + resp, err := c.doRequestWithRetries(ctx, req, "did log fetch") if err != nil { - return nil, fmt.Errorf("fetch did log: %w", err) + return nil, err } defer resp.Body.Close() @@ -225,9 +225,9 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, return nil, fmt.Errorf("new request: %w", err) } - resp, err := c.http.Do(req) + resp, err := c.doRequestWithRetries(ctx, req, "did audit fetch") if err != nil { - return nil, fmt.Errorf("fetch did audit log: %w", err) + return nil, err } defer resp.Body.Close() @@ -248,6 +248,71 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, return entries, nil } +func (c *Client) doRequestWithRetries(ctx context.Context, req *http.Request, operation string) (*http.Response, error) { + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + + var lastErr error + + for attempt := 1; attempt <= maxAttempts; attempt++ { + reqClone := req.Clone(ctx) + resp, err := c.http.Do(reqClone) + if err != nil { + lastErr = fmt.Errorf("%s: %w", operation, err) + if !isTransientNetworkErr(err) || attempt == maxAttempts || ctx.Err() != nil { + return nil, lastErr + } + + delay := c.backoffDelay(attempt) + log.Printf("retrying %s attempt=%d url=%s delay=%s reason=%v", operation, attempt, req.URL.String(), delay, err) + + if err := waitForRetry(ctx, delay); err != nil { + return nil, err + } + + continue + } + + if shouldRetryStatus(resp.StatusCode) && attempt < maxAttempts { + delay := parseRetryAfter(resp.Header.Get("Retry-After")) + if delay <= 0 { + delay = c.backoffDelay(attempt) + } + + log.Printf("retrying %s attempt=%d url=%s status=%d delay=%s", operation, attempt, req.URL.String(), resp.StatusCode, delay) + _ = resp.Body.Close() + + if err := waitForRetry(ctx, delay); err != nil { + return nil, err + } + + continue + } + + return resp, nil + } + + if lastErr != nil { + return nil, lastErr + } + + return nil, fmt.Errorf("%s: request attempts exhausted", operation) +} + +func waitForRetry(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { pageSize := uint64(1000) batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go index b019ac6..cee3280 100644 --- a/internal/ingest/client_retry_test.go +++ b/internal/ingest/client_retry_test.go @@ -2,6 +2,7 @@ package ingest import ( "context" + "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -73,3 +74,110 @@ func TestFetchExportLimitedDoesNotRetry400(t *testing.T) { t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got) } } + +func TestFetchDIDLogRetries429ThenSucceeds(t *testing.T) { + const did = "did:plc:retry429" + var attempts atomic.Int32 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/"+did+"/log" { + http.NotFound(w, r) + + return + } + + n := attempts.Add(1) + if n <= 2 { + w.Header().Set("Retry-After", "0") + http.Error(w, "rate limited", http.StatusTooManyRequests) + + return + } + + _ = json.NewEncoder(w).Encode([]map[string]any{ + { + "did": did, + "cid": "bafy-retry", + "createdAt": "2026-02-27T00:00:00Z", + "operation": json.RawMessage(`{"did":"did:plc:retry429","sig":"x","sigPayload":"e30","rotationKeys":["did:key:z"],"verificationMethods":{"atproto":"did:key:z"}}`), + }, + }) + })) + + defer srv.Close() + + client := NewClient(srv.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + records, err := client.FetchDIDLog(context.Background(), did) + + if err != nil { + t.Fatalf("fetch did log: %v", err) + } + + if len(records) != 1 { + t.Fatalf("record count mismatch: got %d want 1", len(records)) + } + + if got := attempts.Load(); got != 3 { + t.Fatalf("attempt count mismatch: got %d want 3", got) + } +} + +func TestFetchDIDLogAuditFallbackRetries429ThenSucceeds(t *testing.T) { + const did = "did:plc:auditretry" + var auditAttempts atomic.Int32 + + op := json.RawMessage(`{"did":"did:plc:auditretry","sig":"x","sigPayload":"e30","rotationKeys":["did:key:z"],"verificationMethods":{"atproto":"did:key:z"}}`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/" + did + "/log": + _ = json.NewEncoder(w).Encode([]json.RawMessage{op}) + case "/" + did + "/log/audit": + n := auditAttempts.Add(1) + if n == 1 { + w.Header().Set("Retry-After", "0") + http.Error(w, "rate limited", http.StatusTooManyRequests) + + return + } + + _ = json.NewEncoder(w).Encode([]map[string]any{{ + "did": did, + "operation": op, + "cid": "bafy-audit", + "createdAt": "2026-02-27T00:00:00Z", + }}) + default: + http.NotFound(w, r) + } + })) + + defer srv.Close() + + client := NewClient(srv.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + records, err := client.FetchDIDLog(context.Background(), did) + + if err != nil { + t.Fatalf("fetch did log: %v", err) + } + + if len(records) != 1 { + t.Fatalf("record count mismatch: got %d want 1", len(records)) + } + + if records[0].CID != "bafy-audit" { + t.Fatalf("cid mismatch: got %q want %q", records[0].CID, "bafy-audit") + } + + if got := auditAttempts.Load(); got != 2 { + t.Fatalf("audit attempt count mismatch: got %d want 2", got) + } +} diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 873204f..b0bd68e 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -1234,6 +1234,7 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ verifier := verify.New(config.VerifyFull) var previous *types.StateV1 + var lastPayload map[string]any normalized := make([]types.ExportRecord, 0, len(records)) for i, rec := range records { @@ -1265,6 +1266,8 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ return types.StateV1{}, nil, err } + lastPayload = op.Payload + normalized = append(normalized, types.ExportRecord{ Seq: rec.Seq, DID: did, @@ -1280,9 +1283,96 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ return types.StateV1{}, nil, ErrDIDNotFound } + if lastPayload != nil { + doc, err := buildThinDIDDocumentFromPayload(did, lastPayload, previous.RotationKeys) + if err != nil { + return types.StateV1{}, nil, err + } + + previous.DIDDocument = doc + } + return *previous, normalized, nil } +func buildThinDIDDocumentFromPayload(did string, payload map[string]any, rotationKeys []string) ([]byte, error) { + data := normalizePLCDataFromOperation(did, payload, rotationKeys) + doc := map[string]any{ + "id": did, + } + + if aka, ok := data["alsoKnownAs"].([]string); ok && len(aka) > 0 { + doc["alsoKnownAs"] = aka + } + + if methods, ok := data["verificationMethods"].(map[string]string); ok && len(methods) > 0 { + names := make([]string, 0, len(methods)) + for name := range methods { + names = append(names, name) + } + + sort.Strings(names) + + entries := make([]map[string]any, 0, len(names)) + for _, name := range names { + key := strings.TrimSpace(methods[name]) + if key == "" { + continue + } + + entries = append(entries, map[string]any{ + "id": did + "#" + name, + "type": "Multikey", + "controller": did, + "publicKeyMultibase": key, + }) + } + + if len(entries) > 0 { + doc["verificationMethod"] = entries + } + } + + if services, ok := data["services"].(map[string]map[string]string); ok && len(services) > 0 { + names := make([]string, 0, len(services)) + for name := range services { + names = append(names, name) + } + + sort.Strings(names) + + entries := make([]map[string]any, 0, len(names)) + for _, name := range names { + entry := services[name] + endpoint := strings.TrimSpace(entry["endpoint"]) + if endpoint == "" { + continue + } + + entries = append(entries, map[string]any{ + "id": "#" + name, + "type": entry["type"], + "serviceEndpoint": endpoint, + }) + } + + if len(entries) > 0 { + doc["service"] = entries + } + } + + if typ, _ := payload["type"].(string); typ == "plc_tombstone" || typ == "tombstone" { + doc["deactivated"] = true + } + + raw, err := json.Marshal(doc) + if err != nil { + return nil, err + } + + return types.CanonicalizeJSON(raw) +} + func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool { if meta.LastVerified.IsZero() { return false |