package ingest import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "github.com/Fuwn/plutia/internal/types" "io" "log" "net" "net/http" "net/url" "os" "path/filepath" "strconv" "strings" "time" ) type Client struct { source string http *http.Client opts ClientOptions } type didLogEntry struct { Operation json.RawMessage `json:"operation"` CID string `json:"cid,omitempty"` CreatedAt string `json:"createdAt,omitempty"` Nullified bool `json:"nullified,omitempty"` } type ClientOptions struct { MaxAttempts int BaseDelay time.Duration MaxDelay time.Duration } type FetchAttemptTrace struct { URL string After uint64 Count uint64 Attempt int StatusCode int Retryable bool RetryAfter time.Duration ParsedLines int ParsedRecords int SkippedLines int TruncatedTail bool FirstCreated string LastCreated string Error string } type FetchBatch struct { Records []types.ExportRecord RequestURL string After uint64 Count uint64 Attempts int ParsedLines int ParsedRecords int SkippedLines int TruncatedTail bool FirstCreated string LastCreated string } type decodeStats struct { ParsedLines int ParsedRecords int SkippedLines int TruncatedTail bool } func NewClient(source string, opts ...ClientOptions) *Client { cfg := ClientOptions{ MaxAttempts: 8, BaseDelay: 250 * time.Millisecond, MaxDelay: 10 * time.Second, } if len(opts) > 0 { if opts[0].MaxAttempts > 0 { cfg.MaxAttempts = opts[0].MaxAttempts } if opts[0].BaseDelay > 0 { cfg.BaseDelay = opts[0].BaseDelay } if opts[0].MaxDelay > 0 { cfg.MaxDelay = opts[0].MaxDelay } } transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, ForceAttemptHTTP2: true, MaxIdleConns: 256, MaxIdleConnsPerHost: 64, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } return &Client{ source: strings.TrimRight(source, "/"), opts: cfg, http: &http.Client{ Timeout: 60 * time.Second, Transport: transport, }, } } func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportRecord, error) { return c.FetchExportLimited(ctx, after, 0) } func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { if strings.TrimSpace(did) == "" { return nil, fmt.Errorf("did is required") } u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) } u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log" req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, fmt.Errorf("new request: %w", err) } req.Header.Set("Accept", "application/json") resp, err := c.doRequestWithRetries(ctx, req, "did log fetch") if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { return nil, ErrDIDNotFound } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) return nil, fmt.Errorf("did log response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) } rawBody, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("read did log body: %w", err) } var wrapped []didLogEntry if err := json.Unmarshal(rawBody, &wrapped); err == nil && len(wrapped) > 0 && len(bytes.TrimSpace(wrapped[0].Operation)) > 0 { records := make([]types.ExportRecord, 0, len(wrapped)) for i, entry := range wrapped { records = append(records, types.ExportRecord{ Seq: uint64(i + 1), DID: did, CID: entry.CID, CreatedAt: entry.CreatedAt, Nullified: entry.Nullified, Operation: entry.Operation, }) } return records, nil } var ops []json.RawMessage if err := json.Unmarshal(rawBody, &ops); err != nil { return nil, fmt.Errorf("decode did log: %w", err) } records := make([]types.ExportRecord, 0, len(ops)) for i, op := range ops { records = append(records, types.ExportRecord{ Seq: uint64(i + 1), DID: did, Operation: op, }) } auditRecords, err := c.fetchDIDAudit(ctx, did) if err != nil { return nil, fmt.Errorf("did log missing CIDs and audit fallback failed: %w", err) } if len(auditRecords) != len(records) { return nil, fmt.Errorf("did log/audit length mismatch: log=%d audit=%d", len(records), len(auditRecords)) } for i := range records { records[i].CID = strings.TrimSpace(auditRecords[i].CID) if records[i].CID == "" { return nil, fmt.Errorf("missing cid in did audit entry index %d", i) } records[i].CreatedAt = auditRecords[i].CreatedAt records[i].Nullified = auditRecords[i].Nullified } return records, nil } func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, error) { u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) } u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log/audit" req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return nil, fmt.Errorf("new request: %w", err) } resp, err := c.doRequestWithRetries(ctx, req, "did audit fetch") if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { return nil, ErrDIDNotFound } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) return nil, fmt.Errorf("did audit response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) } var entries []didLogEntry if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { return nil, fmt.Errorf("decode did audit log: %w", err) } return entries, nil } func (c *Client) doRequestWithRetries(ctx context.Context, req *http.Request, operation string) (*http.Response, error) { maxAttempts := c.opts.MaxAttempts if maxAttempts < 1 { maxAttempts = 1 } var lastErr error for attempt := 1; attempt <= maxAttempts; attempt++ { reqClone := req.Clone(ctx) resp, err := c.http.Do(reqClone) if err != nil { lastErr = fmt.Errorf("%s: %w", operation, err) if !isTransientNetworkErr(err) || attempt == maxAttempts || ctx.Err() != nil { return nil, lastErr } delay := c.backoffDelay(attempt) log.Printf("retrying %s attempt=%d url=%s delay=%s reason=%v", operation, attempt, req.URL.String(), delay, err) if err := waitForRetry(ctx, delay); err != nil { return nil, err } continue } if shouldRetryStatus(resp.StatusCode) && attempt < maxAttempts { delay := parseRetryAfter(resp.Header.Get("Retry-After")) if delay <= 0 { delay = c.backoffDelay(attempt) } log.Printf("retrying %s attempt=%d url=%s status=%d delay=%s", operation, attempt, req.URL.String(), resp.StatusCode, delay) _ = resp.Body.Close() if err := waitForRetry(ctx, delay); err != nil { return nil, err } continue } return resp, nil } if lastErr != nil { return nil, lastErr } return nil, fmt.Errorf("%s: request attempts exhausted", operation) } func waitForRetry(ctx context.Context, delay time.Duration) error { timer := time.NewTimer(delay) defer timer.Stop() select { case <-ctx.Done(): return ctx.Err() case <-timer.C: return nil } } func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { pageSize := uint64(1000) batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) if err != nil { return nil, err } return batch.Records, nil } func (c *Client) FetchExportBatch(ctx context.Context, after uint64, limit uint64, pageSize uint64, traceHook func(FetchAttemptTrace)) (FetchBatch, error) { if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { records, err := c.fetchFromFile(after, limit) if err != nil { return FetchBatch{}, err } batch := FetchBatch{ Records: records, RequestURL: c.source, After: after, Count: uint64(len(records)), Attempts: 1, ParsedLines: len(records), ParsedRecords: len(records), } if len(records) > 0 { batch.FirstCreated = records[0].CreatedAt batch.LastCreated = records[len(records)-1].CreatedAt } return batch, nil } u, err := url.Parse(c.source) if err != nil { return FetchBatch{}, fmt.Errorf("parse plc source: %w", err) } if pageSize == 0 { pageSize = 1000 } count := pageSize if limit > 0 && limit < count { count = limit } u.Path = strings.TrimRight(u.Path, "/") + "/export" q := u.Query() q.Set("after", fmt.Sprintf("%d", after)) q.Set("count", fmt.Sprintf("%d", count)) u.RawQuery = q.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) if err != nil { return FetchBatch{}, fmt.Errorf("new request: %w", err) } maxAttempts := c.opts.MaxAttempts if maxAttempts < 1 { maxAttempts = 1 } var ( lastErr error attempts int ) for attempt := 1; attempt <= maxAttempts; attempt++ { records, retryAfter, retryable, statusCode, decStats, err := c.fetchExportOnce(req, limit) attempts = attempt if traceHook != nil { trace := FetchAttemptTrace{ URL: req.URL.String(), After: after, Count: count, Attempt: attempt, StatusCode: statusCode, Retryable: retryable, RetryAfter: retryAfter, ParsedLines: decStats.ParsedLines, ParsedRecords: decStats.ParsedRecords, SkippedLines: decStats.SkippedLines, TruncatedTail: decStats.TruncatedTail, } if len(records) > 0 { trace.FirstCreated = records[0].CreatedAt trace.LastCreated = records[len(records)-1].CreatedAt } if err != nil { trace.Error = err.Error() } traceHook(trace) } if err == nil { batch := FetchBatch{ Records: records, RequestURL: req.URL.String(), After: after, Count: count, Attempts: attempt, ParsedLines: decStats.ParsedLines, ParsedRecords: decStats.ParsedRecords, SkippedLines: decStats.SkippedLines, TruncatedTail: decStats.TruncatedTail, } if len(records) > 0 { batch.FirstCreated = records[0].CreatedAt batch.LastCreated = records[len(records)-1].CreatedAt } return batch, nil } lastErr = err if !retryable || attempt == maxAttempts || ctx.Err() != nil { break } delay := retryAfter if delay <= 0 { delay = c.backoffDelay(attempt) } log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) timer := time.NewTimer(delay) select { case <-ctx.Done(): timer.Stop() return FetchBatch{}, ctx.Err() case <-timer.C: } } return FetchBatch{ RequestURL: req.URL.String(), After: after, Count: count, Attempts: attempts, }, lastErr } type httpStatusError struct { StatusCode int Body string } func (e httpStatusError) Error() string { return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) } func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, int, decodeStats, error) { reqClone := req.Clone(req.Context()) reqClone.Header.Set("Accept-Encoding", "gzip") resp, err := c.http.Do(reqClone) if err != nil { return nil, 0, isTransientNetworkErr(err), 0, decodeStats{}, fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) body := strings.TrimSpace(string(b)) retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) err := httpStatusError{StatusCode: resp.StatusCode, Body: body} return nil, retryDelay, shouldRetryStatus(resp.StatusCode), resp.StatusCode, decodeStats{}, err } records, decStats, err := decodeExportBodyWithStats(resp.Body, limit) if err != nil { return nil, 0, false, resp.StatusCode, decStats, err } return records, 0, false, resp.StatusCode, decStats, nil } func (c *Client) backoffDelay(attempt int) time.Duration { if attempt < 1 { attempt = 1 } delay := c.opts.BaseDelay if delay <= 0 { delay = 250 * time.Millisecond } maxDelay := c.opts.MaxDelay if maxDelay <= 0 { maxDelay = 10 * time.Second } for i := 1; i < attempt; i++ { delay *= 2 if delay >= maxDelay { return maxDelay } } if delay > maxDelay { return maxDelay } return delay } func shouldRetryStatus(status int) bool { switch status { case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: return true default: return false } } func parseRetryAfter(v string) time.Duration { v = strings.TrimSpace(v) if v == "" { return 0 } if secs, err := strconv.Atoi(v); err == nil && secs > 0 { return time.Duration(secs) * time.Second } if ts, err := http.ParseTime(v); err == nil { delay := time.Until(ts) if delay > 0 { return delay } } return 0 } func isTransientNetworkErr(err error) bool { if err == nil { return false } var netErr net.Error if errors.As(err, &netErr) { return true } return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { records, _, err := decodeExportBodyWithStats(r, limit) return records, err } func decodeExportBodyWithStats(r io.Reader, limit uint64) ([]types.ExportRecord, decodeStats, error) { br := bufio.NewReader(r) first, err := peekFirstNonSpace(br) if err != nil { if err == io.EOF { return nil, decodeStats{}, nil } return nil, decodeStats{}, err } if first == '[' { b, err := io.ReadAll(br) if err != nil { return nil, decodeStats{}, fmt.Errorf("read export body: %w", err) } trimmed := bytes.TrimSpace(b) var records []types.ExportRecord if err := json.Unmarshal(trimmed, &records); err != nil { return nil, decodeStats{}, fmt.Errorf("decode export json array: %w", err) } if limit > 0 && uint64(len(records)) > limit { records = records[:limit] } return records, decodeStats{ ParsedLines: len(records), ParsedRecords: len(records), }, nil } out := make([]types.ExportRecord, 0, 1024) stats := decodeStats{} for { line, err := br.ReadBytes('\n') isEOF := errors.Is(err, io.EOF) if err != nil && !isEOF { return nil, stats, fmt.Errorf("read ndjson line: %w", err) } if limit > 0 && uint64(len(out)) >= limit { stats.ParsedRecords = len(out) return out, stats, nil } trimmed := bytes.TrimSpace(line) if len(trimmed) > 0 { stats.ParsedLines++ var rec types.ExportRecord if err := json.Unmarshal(trimmed, &rec); err != nil { if isEOF && isTrailingNDJSONPartial(err) { stats.SkippedLines++ stats.TruncatedTail = true stats.ParsedRecords = len(out) return out, stats, nil } return nil, stats, fmt.Errorf("decode ndjson line: %w", err) } out = append(out, rec) } if isEOF { break } } stats.ParsedRecords = len(out) return out, stats, nil } func isTrailingNDJSONPartial(err error) bool { if !errors.Is(err, io.ErrUnexpectedEOF) && !strings.Contains(err.Error(), "unexpected end of JSON input") { return false } return true } func peekFirstNonSpace(br *bufio.Reader) (byte, error) { for { b, err := br.ReadByte() if err != nil { return 0, err } if !isSpace(b) { if err := br.UnreadByte(); err != nil { return 0, fmt.Errorf("unread byte: %w", err) } return b, nil } } } func isSpace(b byte) bool { switch b { case ' ', '\n', '\t', '\r': return true default: return false } } func (c *Client) fetchFromFile(after uint64, limit uint64) ([]types.ExportRecord, error) { path := strings.TrimPrefix(c.source, "file://") path = filepath.Clean(path) b, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("read source file: %w", err) } recs, err := decodeExportBody(bytes.NewReader(b), 0) if err != nil { return nil, err } out := make([]types.ExportRecord, 0, len(recs)) for _, r := range recs { if r.Seq <= after { continue } out = append(out, r) if limit > 0 && uint64(len(out)) >= limit { break } } return out, nil }