aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 15:49:45 -0800
committerFuwn <[email protected]>2026-02-27 15:49:45 -0800
commitcace626823320be8702d9015e7a7fe29b8f206e6 (patch)
treeee5c1fc21a9f8faa54f1daebad6e3dcead136b6e
parentdocs: clarify mode comparison table wording (diff)
downloadplutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.tar.xz
plutia-test-cace626823320be8702d9015e7a7fe29b8f206e6.zip
fix(thin): retry DID fetches on 429 and align DID doc shape
-rw-r--r--internal/ingest/client.go73
-rw-r--r--internal/ingest/client_retry_test.go108
-rw-r--r--internal/ingest/service.go90
3 files changed, 267 insertions, 4 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index bdef942..0ffa35c 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -140,9 +140,9 @@ func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRec
}
req.Header.Set("Accept", "application/json")
- resp, err := c.http.Do(req)
+ resp, err := c.doRequestWithRetries(ctx, req, "did log fetch")
if err != nil {
- return nil, fmt.Errorf("fetch did log: %w", err)
+ return nil, err
}
defer resp.Body.Close()
@@ -225,9 +225,9 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry,
return nil, fmt.Errorf("new request: %w", err)
}
- resp, err := c.http.Do(req)
+ resp, err := c.doRequestWithRetries(ctx, req, "did audit fetch")
if err != nil {
- return nil, fmt.Errorf("fetch did audit log: %w", err)
+ return nil, err
}
defer resp.Body.Close()
@@ -248,6 +248,71 @@ func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry,
return entries, nil
}
+func (c *Client) doRequestWithRetries(ctx context.Context, req *http.Request, operation string) (*http.Response, error) {
+ maxAttempts := c.opts.MaxAttempts
+ if maxAttempts < 1 {
+ maxAttempts = 1
+ }
+
+ var lastErr error
+
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ reqClone := req.Clone(ctx)
+ resp, err := c.http.Do(reqClone)
+ if err != nil {
+ lastErr = fmt.Errorf("%s: %w", operation, err)
+ if !isTransientNetworkErr(err) || attempt == maxAttempts || ctx.Err() != nil {
+ return nil, lastErr
+ }
+
+ delay := c.backoffDelay(attempt)
+ log.Printf("retrying %s attempt=%d url=%s delay=%s reason=%v", operation, attempt, req.URL.String(), delay, err)
+
+ if err := waitForRetry(ctx, delay); err != nil {
+ return nil, err
+ }
+
+ continue
+ }
+
+ if shouldRetryStatus(resp.StatusCode) && attempt < maxAttempts {
+ delay := parseRetryAfter(resp.Header.Get("Retry-After"))
+ if delay <= 0 {
+ delay = c.backoffDelay(attempt)
+ }
+
+ log.Printf("retrying %s attempt=%d url=%s status=%d delay=%s", operation, attempt, req.URL.String(), resp.StatusCode, delay)
+ _ = resp.Body.Close()
+
+ if err := waitForRetry(ctx, delay); err != nil {
+ return nil, err
+ }
+
+ continue
+ }
+
+ return resp, nil
+ }
+
+ if lastErr != nil {
+ return nil, lastErr
+ }
+
+ return nil, fmt.Errorf("%s: request attempts exhausted", operation)
+}
+
+func waitForRetry(ctx context.Context, delay time.Duration) error {
+ timer := time.NewTimer(delay)
+ defer timer.Stop()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-timer.C:
+ return nil
+ }
+}
+
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
pageSize := uint64(1000)
batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)
diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go
index b019ac6..cee3280 100644
--- a/internal/ingest/client_retry_test.go
+++ b/internal/ingest/client_retry_test.go
@@ -2,6 +2,7 @@ package ingest
import (
"context"
+ "encoding/json"
"fmt"
"net/http"
"net/http/httptest"
@@ -73,3 +74,110 @@ func TestFetchExportLimitedDoesNotRetry400(t *testing.T) {
t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got)
}
}
+
+func TestFetchDIDLogRetries429ThenSucceeds(t *testing.T) {
+ const did = "did:plc:retry429"
+ var attempts atomic.Int32
+
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/"+did+"/log" {
+ http.NotFound(w, r)
+
+ return
+ }
+
+ n := attempts.Add(1)
+ if n <= 2 {
+ w.Header().Set("Retry-After", "0")
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+
+ return
+ }
+
+ _ = json.NewEncoder(w).Encode([]map[string]any{
+ {
+ "did": did,
+ "cid": "bafy-retry",
+ "createdAt": "2026-02-27T00:00:00Z",
+ "operation": json.RawMessage(`{"did":"did:plc:retry429","sig":"x","sigPayload":"e30","rotationKeys":["did:key:z"],"verificationMethods":{"atproto":"did:key:z"}}`),
+ },
+ })
+ }))
+
+ defer srv.Close()
+
+ client := NewClient(srv.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ records, err := client.FetchDIDLog(context.Background(), did)
+
+ if err != nil {
+ t.Fatalf("fetch did log: %v", err)
+ }
+
+ if len(records) != 1 {
+ t.Fatalf("record count mismatch: got %d want 1", len(records))
+ }
+
+ if got := attempts.Load(); got != 3 {
+ t.Fatalf("attempt count mismatch: got %d want 3", got)
+ }
+}
+
+func TestFetchDIDLogAuditFallbackRetries429ThenSucceeds(t *testing.T) {
+ const did = "did:plc:auditretry"
+ var auditAttempts atomic.Int32
+
+ op := json.RawMessage(`{"did":"did:plc:auditretry","sig":"x","sigPayload":"e30","rotationKeys":["did:key:z"],"verificationMethods":{"atproto":"did:key:z"}}`)
+
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/" + did + "/log":
+ _ = json.NewEncoder(w).Encode([]json.RawMessage{op})
+ case "/" + did + "/log/audit":
+ n := auditAttempts.Add(1)
+ if n == 1 {
+ w.Header().Set("Retry-After", "0")
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+
+ return
+ }
+
+ _ = json.NewEncoder(w).Encode([]map[string]any{{
+ "did": did,
+ "operation": op,
+ "cid": "bafy-audit",
+ "createdAt": "2026-02-27T00:00:00Z",
+ }})
+ default:
+ http.NotFound(w, r)
+ }
+ }))
+
+ defer srv.Close()
+
+ client := NewClient(srv.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ records, err := client.FetchDIDLog(context.Background(), did)
+
+ if err != nil {
+ t.Fatalf("fetch did log: %v", err)
+ }
+
+ if len(records) != 1 {
+ t.Fatalf("record count mismatch: got %d want 1", len(records))
+ }
+
+ if records[0].CID != "bafy-audit" {
+ t.Fatalf("cid mismatch: got %q want %q", records[0].CID, "bafy-audit")
+ }
+
+ if got := auditAttempts.Load(); got != 2 {
+ t.Fatalf("audit attempt count mismatch: got %d want 2", got)
+ }
+}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 873204f..b0bd68e 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -1234,6 +1234,7 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ
verifier := verify.New(config.VerifyFull)
var previous *types.StateV1
+ var lastPayload map[string]any
normalized := make([]types.ExportRecord, 0, len(records))
for i, rec := range records {
@@ -1265,6 +1266,8 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ
return types.StateV1{}, nil, err
}
+ lastPayload = op.Payload
+
normalized = append(normalized, types.ExportRecord{
Seq: rec.Seq,
DID: did,
@@ -1280,9 +1283,96 @@ func (s *Service) verifyThinChain(ctx context.Context, did string, records []typ
return types.StateV1{}, nil, ErrDIDNotFound
}
+ if lastPayload != nil {
+ doc, err := buildThinDIDDocumentFromPayload(did, lastPayload, previous.RotationKeys)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ previous.DIDDocument = doc
+ }
+
return *previous, normalized, nil
}
+func buildThinDIDDocumentFromPayload(did string, payload map[string]any, rotationKeys []string) ([]byte, error) {
+ data := normalizePLCDataFromOperation(did, payload, rotationKeys)
+ doc := map[string]any{
+ "id": did,
+ }
+
+ if aka, ok := data["alsoKnownAs"].([]string); ok && len(aka) > 0 {
+ doc["alsoKnownAs"] = aka
+ }
+
+ if methods, ok := data["verificationMethods"].(map[string]string); ok && len(methods) > 0 {
+ names := make([]string, 0, len(methods))
+ for name := range methods {
+ names = append(names, name)
+ }
+
+ sort.Strings(names)
+
+ entries := make([]map[string]any, 0, len(names))
+ for _, name := range names {
+ key := strings.TrimSpace(methods[name])
+ if key == "" {
+ continue
+ }
+
+ entries = append(entries, map[string]any{
+ "id": did + "#" + name,
+ "type": "Multikey",
+ "controller": did,
+ "publicKeyMultibase": key,
+ })
+ }
+
+ if len(entries) > 0 {
+ doc["verificationMethod"] = entries
+ }
+ }
+
+ if services, ok := data["services"].(map[string]map[string]string); ok && len(services) > 0 {
+ names := make([]string, 0, len(services))
+ for name := range services {
+ names = append(names, name)
+ }
+
+ sort.Strings(names)
+
+ entries := make([]map[string]any, 0, len(names))
+ for _, name := range names {
+ entry := services[name]
+ endpoint := strings.TrimSpace(entry["endpoint"])
+ if endpoint == "" {
+ continue
+ }
+
+ entries = append(entries, map[string]any{
+ "id": "#" + name,
+ "type": entry["type"],
+ "serviceEndpoint": endpoint,
+ })
+ }
+
+ if len(entries) > 0 {
+ doc["service"] = entries
+ }
+ }
+
+ if typ, _ := payload["type"].(string); typ == "plc_tombstone" || typ == "tombstone" {
+ doc["deactivated"] = true
+ }
+
+ raw, err := json.Marshal(doc)
+ if err != nil {
+ return nil, err
+ }
+
+ return types.CanonicalizeJSON(raw)
+}
+
func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool {
if meta.LastVerified.IsZero() {
return false