aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/client.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 09:26:06 -0800
committerFuwn <[email protected]>2026-02-27 09:26:06 -0800
commit8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch)
tree60bdb4bbbe5755223c3387179ee7406432d084ab /internal/ingest/client.go
parentfix: make mirror replay lossless with strict seq accounting and trace (diff)
downloadplutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz
plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal/ingest/client.go')
-rw-r--r--internal/ingest/client.go132
1 files changed, 132 insertions, 0 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index eba6fd0..bdef942 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -26,6 +26,13 @@ type Client struct {
opts ClientOptions
}
+type didLogEntry struct {
+ Operation json.RawMessage `json:"operation"`
+ CID string `json:"cid,omitempty"`
+ CreatedAt string `json:"createdAt,omitempty"`
+ Nullified bool `json:"nullified,omitempty"`
+}
+
type ClientOptions struct {
MaxAttempts int
BaseDelay time.Duration
@@ -116,6 +123,131 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR
return c.FetchExportLimited(ctx, after, 0)
}
+func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) {
+ if strings.TrimSpace(did) == "" {
+ return nil, fmt.Errorf("did is required")
+ }
+
+ u, err := url.Parse(c.source)
+ if err != nil {
+ return nil, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log"
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+ if err != nil {
+ return nil, fmt.Errorf("new request: %w", err)
+ }
+
+ req.Header.Set("Accept", "application/json")
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("fetch did log: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
+ return nil, ErrDIDNotFound
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
+ return nil, fmt.Errorf("did log response %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
+ }
+
+ rawBody, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("read did log body: %w", err)
+ }
+
+ var wrapped []didLogEntry
+ if err := json.Unmarshal(rawBody, &wrapped); err == nil && len(wrapped) > 0 && len(bytes.TrimSpace(wrapped[0].Operation)) > 0 {
+ records := make([]types.ExportRecord, 0, len(wrapped))
+ for i, entry := range wrapped {
+ records = append(records, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ CID: entry.CID,
+ CreatedAt: entry.CreatedAt,
+ Nullified: entry.Nullified,
+ Operation: entry.Operation,
+ })
+ }
+
+ return records, nil
+ }
+
+ var ops []json.RawMessage
+ if err := json.Unmarshal(rawBody, &ops); err != nil {
+ return nil, fmt.Errorf("decode did log: %w", err)
+ }
+
+ records := make([]types.ExportRecord, 0, len(ops))
+ for i, op := range ops {
+ records = append(records, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ Operation: op,
+ })
+ }
+
+ auditRecords, err := c.fetchDIDAudit(ctx, did)
+ if err != nil {
+ return nil, fmt.Errorf("did log missing CIDs and audit fallback failed: %w", err)
+ }
+
+ if len(auditRecords) != len(records) {
+ return nil, fmt.Errorf("did log/audit length mismatch: log=%d audit=%d", len(records), len(auditRecords))
+ }
+
+ for i := range records {
+ records[i].CID = strings.TrimSpace(auditRecords[i].CID)
+ if records[i].CID == "" {
+ return nil, fmt.Errorf("missing cid in did audit entry index %d", i)
+ }
+
+ records[i].CreatedAt = auditRecords[i].CreatedAt
+ records[i].Nullified = auditRecords[i].Nullified
+ }
+
+ return records, nil
+}
+
+func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, error) {
+ u, err := url.Parse(c.source)
+ if err != nil {
+ return nil, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log/audit"
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+ if err != nil {
+ return nil, fmt.Errorf("new request: %w", err)
+ }
+
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("fetch did audit log: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
+ return nil, ErrDIDNotFound
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
+ return nil, fmt.Errorf("did audit response %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
+ }
+
+ var entries []didLogEntry
+ if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil {
+ return nil, fmt.Errorf("decode did audit log: %w", err)
+ }
+
+ return entries, nil
+}
+
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
pageSize := uint64(1000)
batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)