diff options
| author | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
| commit | 8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch) | |
| tree | 60bdb4bbbe5755223c3387179ee7406432d084ab /internal/ingest/client.go | |
| parent | fix: make mirror replay lossless with strict seq accounting and trace (diff) | |
| download | plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip | |
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal/ingest/client.go')
| -rw-r--r-- | internal/ingest/client.go | 132 |
1 files changed, 132 insertions, 0 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index eba6fd0..bdef942 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -26,6 +26,13 @@ type Client struct { opts ClientOptions } +type didLogEntry struct { + Operation json.RawMessage `json:"operation"` + CID string `json:"cid,omitempty"` + CreatedAt string `json:"createdAt,omitempty"` + Nullified bool `json:"nullified,omitempty"` +} + type ClientOptions struct { MaxAttempts int BaseDelay time.Duration @@ -116,6 +123,131 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR return c.FetchExportLimited(ctx, after, 0) } +func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { + if strings.TrimSpace(did) == "" { + return nil, fmt.Errorf("did is required") + } + + u, err := url.Parse(c.source) + if err != nil { + return nil, fmt.Errorf("parse plc source: %w", err) + } + + u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + req.Header.Set("Accept", "application/json") + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch did log: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { + return nil, ErrDIDNotFound + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) + return nil, fmt.Errorf("did log response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + + rawBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read did log body: %w", err) + } + + var wrapped []didLogEntry + if err := json.Unmarshal(rawBody, &wrapped); err == nil && len(wrapped) > 0 && len(bytes.TrimSpace(wrapped[0].Operation)) > 0 { + records := make([]types.ExportRecord, 0, len(wrapped)) + for i, entry := range wrapped { + records = append(records, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + CID: entry.CID, + CreatedAt: entry.CreatedAt, + Nullified: entry.Nullified, + Operation: entry.Operation, + }) + } + + return records, nil + } + + var ops []json.RawMessage + if err := json.Unmarshal(rawBody, &ops); err != nil { + return nil, fmt.Errorf("decode did log: %w", err) + } + + records := make([]types.ExportRecord, 0, len(ops)) + for i, op := range ops { + records = append(records, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + Operation: op, + }) + } + + auditRecords, err := c.fetchDIDAudit(ctx, did) + if err != nil { + return nil, fmt.Errorf("did log missing CIDs and audit fallback failed: %w", err) + } + + if len(auditRecords) != len(records) { + return nil, fmt.Errorf("did log/audit length mismatch: log=%d audit=%d", len(records), len(auditRecords)) + } + + for i := range records { + records[i].CID = strings.TrimSpace(auditRecords[i].CID) + if records[i].CID == "" { + return nil, fmt.Errorf("missing cid in did audit entry index %d", i) + } + + records[i].CreatedAt = auditRecords[i].CreatedAt + records[i].Nullified = auditRecords[i].Nullified + } + + return records, nil +} + +func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, error) { + u, err := url.Parse(c.source) + if err != nil { + return nil, fmt.Errorf("parse plc source: %w", err) + } + + u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log/audit" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch did audit log: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { + return nil, ErrDIDNotFound + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) + return nil, fmt.Errorf("did audit response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + + var entries []didLogEntry + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { + return nil, fmt.Errorf("decode did audit log: %w", err) + } + + return entries, nil +} + func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { pageSize := uint64(1000) batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) |