diff options
Diffstat (limited to 'internal/ingest')
| -rw-r--r-- | internal/ingest/client.go | 148 | ||||
| -rw-r--r-- | internal/ingest/client_retry_test.go | 63 | ||||
| -rw-r--r-- | internal/ingest/service.go | 99 | ||||
| -rw-r--r-- | internal/ingest/service_integration_test.go | 69 |
4 files changed, 360 insertions, 19 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go index 0f9ad43..d25b73f 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -8,11 +8,13 @@ import ( "errors" "fmt" "io" + "log" "net" "net/http" "net/url" "os" "path/filepath" + "strconv" "strings" "time" @@ -22,9 +24,32 @@ import ( type Client struct { source string http *http.Client + opts ClientOptions } -func NewClient(source string) *Client { +type ClientOptions struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration +} + +func NewClient(source string, opts ...ClientOptions) *Client { + cfg := ClientOptions{ + MaxAttempts: 8, + BaseDelay: 250 * time.Millisecond, + MaxDelay: 10 * time.Second, + } + if len(opts) > 0 { + if opts[0].MaxAttempts > 0 { + cfg.MaxAttempts = opts[0].MaxAttempts + } + if opts[0].BaseDelay > 0 { + cfg.BaseDelay = opts[0].BaseDelay + } + if opts[0].MaxDelay > 0 { + cfg.MaxDelay = opts[0].MaxDelay + } + } transport := &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext, @@ -37,6 +62,7 @@ func NewClient(source string) *Client { } return &Client{ source: strings.TrimRight(source, "/"), + opts: cfg, http: &http.Client{ Timeout: 60 * time.Second, Transport: transport, @@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") { return c.fetchFromFile(after, limit) } + u, err := url.Parse(c.source) if err != nil { return nil, fmt.Errorf("parse plc source: %w", err) @@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin if err != nil { return nil, fmt.Errorf("new request: %w", err) } - resp, err := c.http.Do(req) + maxAttempts := c.opts.MaxAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + records, retryAfter, retryable, err := c.fetchExportOnce(req, limit) + if err == nil { + return records, nil + } + lastErr = err + if !retryable || attempt == maxAttempts || ctx.Err() != nil { + break + } + delay := retryAfter + if delay <= 0 { + delay = c.backoffDelay(attempt) + } + log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err) + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + } + } + return nil, lastErr +} + +type httpStatusError struct { + StatusCode int + Body string +} + +func (e httpStatusError) Error() string { + return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body) +} + +func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) { + reqClone := req.Clone(req.Context()) + reqClone.Header.Set("Accept-Encoding", "gzip") + resp, err := c.http.Do(reqClone) if err != nil { - return nil, fmt.Errorf("fetch export: %w", err) + return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err) } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) - return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b))) + body := strings.TrimSpace(string(b)) + retryDelay := parseRetryAfter(resp.Header.Get("Retry-After")) + err := httpStatusError{StatusCode: resp.StatusCode, Body: body} + return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err + } + records, err := decodeExportBody(resp.Body, limit) + if err != nil { + return nil, 0, false, err + } + return records, 0, false, nil +} + +func (c *Client) backoffDelay(attempt int) time.Duration { + if attempt < 1 { + attempt = 1 + } + delay := c.opts.BaseDelay + if delay <= 0 { + delay = 250 * time.Millisecond + } + maxDelay := c.opts.MaxDelay + if maxDelay <= 0 { + maxDelay = 10 * time.Second + } + for i := 1; i < attempt; i++ { + delay *= 2 + if delay >= maxDelay { + return maxDelay + } + } + if delay > maxDelay { + return maxDelay + } + return delay +} + +func shouldRetryStatus(status int) bool { + switch status { + case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout: + return true + default: + return false + } +} + +func parseRetryAfter(v string) time.Duration { + v = strings.TrimSpace(v) + if v == "" { + return 0 + } + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { + return time.Duration(secs) * time.Second + } + if ts, err := http.ParseTime(v); err == nil { + delay := time.Until(ts) + if delay > 0 { + return delay + } + } + return 0 +} + +func isTransientNetworkErr(err error) bool { + if err == nil { + return false + } + var netErr net.Error + if errors.As(err, &netErr) { + return true } - return decodeExportBody(resp.Body, limit) + return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF) } func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) { diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go new file mode 100644 index 0000000..a23fceb --- /dev/null +++ b/internal/ingest/client_retry_test.go @@ -0,0 +1,63 @@ +package ingest + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) { + var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + n := attempts.Add(1) + if n <= 2 { + w.Header().Set("Retry-After", "0") + http.Error(w, "rate limited", http.StatusTooManyRequests) + return + } + _, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`) + })) + defer ts.Close() + + client := NewClient(ts.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + records, err := client.FetchExportLimited(context.Background(), 0, 0) + if err != nil { + t.Fatalf("fetch export: %v", err) + } + if len(records) != 1 { + t.Fatalf("record count mismatch: got %d want 1", len(records)) + } + if got := attempts.Load(); got != 3 { + t.Fatalf("attempt count mismatch: got %d want 3", got) + } +} + +func TestFetchExportLimitedDoesNotRetry400(t *testing.T) { + var attempts atomic.Int32 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + http.Error(w, "bad request", http.StatusBadRequest) + })) + defer ts.Close() + + client := NewClient(ts.URL, ClientOptions{ + MaxAttempts: 5, + BaseDelay: time.Millisecond, + MaxDelay: 2 * time.Millisecond, + }) + _, err := client.FetchExportLimited(context.Background(), 0, 0) + if err == nil { + t.Fatalf("expected error for 400 response") + } + if got := attempts.Load(); got != 1 { + t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got) + } +} diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 8451246..817a8bd 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -24,9 +24,14 @@ import ( ) type Stats struct { - IngestedOps uint64 `json:"ingested_ops"` - Errors uint64 `json:"errors"` - LastSeq uint64 `json:"last_seq"` + IngestedOps uint64 `json:"ingested_ops"` + Errors uint64 `json:"errors"` + LastSeq uint64 `json:"last_seq"` + VerifyFailures uint64 `json:"verify_failures"` + LagOps uint64 `json:"lag_ops"` + DIDCount uint64 `json:"did_count"` + CheckpointSeq uint64 `json:"checkpoint_sequence"` + IngestOpsPerSec float64 `json:"ingest_ops_per_second"` } type Service struct { @@ -43,6 +48,12 @@ type Service struct { runOps uint64 corrupted atomic.Bool corruptErr atomic.Value + startedAt time.Time + metricsSink MetricsSink +} + +type MetricsSink interface { + ObserveCheckpoint(duration time.Duration, sequence uint64) } func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { @@ -54,6 +65,13 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog engine: state.New(store, cfg.Mode), checkpoints: checkpointMgr, blockLog: blockLog, + startedAt: time.Now(), + } + if didCount, err := countStates(store); err == nil { + atomic.StoreUint64(&s.stats.DIDCount, didCount) + } + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) } if cfg.Mode == config.ModeMirror && blockLog != nil { s.appender = newBlockAppender(blockLog, 1024) @@ -119,14 +137,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) { return false, err } if len(records) == 0 { + atomic.StoreUint64(&s.stats.LagOps, 0) return false, nil } + latestSourceSeq := records[len(records)-1].Seq committed, err := s.processRecords(ctx, records) s.runOps += committed if err != nil { atomic.AddUint64(&s.stats.Errors, 1) return committed > 0, err } + lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) + if latestSourceSeq > lastCommitted { + atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) + } else { + atomic.StoreUint64(&s.stats.LagOps, 0) + } return true, nil } @@ -136,10 +162,11 @@ type verifyTask struct { } type verifyResult struct { - index int - op types.ParsedOperation - state types.StateV1 - err error + index int + op types.ParsedOperation + state types.StateV1 + newDID bool + err error } func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { @@ -178,6 +205,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor continue } if err := s.verifier.VerifyOperation(op, existing); err != nil { + atomic.AddUint64(&s.stats.VerifyFailures, 1) results <- verifyResult{index: task.index, err: err} continue } @@ -187,7 +215,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor continue } cache[op.DID] = cloneState(&next) - results <- verifyResult{index: task.index, op: op, state: next} + results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil} } }(queues[i]) } @@ -287,6 +315,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( pendingOps := make([]storage.OperationMutation, 0, batchSize) pendingSeqs := make([]uint64, 0, batchSize) pendingBlockHashes := map[uint64]string{} + pendingNewDIDs := map[string]struct{}{} var committed uint64 commit := func() error { @@ -314,6 +343,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( lastSeq := pendingSeqs[len(pendingSeqs)-1] atomic.StoreUint64(&s.stats.LastSeq, lastSeq) atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) + atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs))) committed += uint64(len(pendingOps)) if s.checkpoints != nil { @@ -329,6 +359,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( pendingOps = pendingOps[:0] pendingSeqs = pendingSeqs[:0] clear(pendingBlockHashes) + clear(pendingNewDIDs) return nil } @@ -350,6 +381,9 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) ( } pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) pendingSeqs = append(pendingSeqs, v.op.Sequence) + if v.newDID { + pendingNewDIDs[v.op.DID] = struct{}{} + } if len(pendingOps) >= batchSize { if err := commit(); err != nil { return committed, err @@ -387,6 +421,10 @@ func (s *Service) createCheckpoint(sequence uint64) error { float64(usage.RSSPeakKB)/1024.0, time.Now().UnixMilli(), ) + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { + s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) + } return nil } @@ -411,10 +449,14 @@ func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { if err != nil { return types.CheckpointV1{}, err } - cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks) + cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks) if err != nil { return types.CheckpointV1{}, err } + atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) + if s.metricsSink != nil { + s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) + } return cp, nil } @@ -473,7 +515,6 @@ func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { } func (s *Service) VerifyDID(ctx context.Context, did string) error { - _ = ctx if err := s.CorruptionError(); err != nil { return err } @@ -497,6 +538,9 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { var previous *types.StateV1 for _, seq := range seqs { + if err := ctx.Err(); err != nil { + return err + } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return err @@ -523,7 +567,6 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { } func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) { - _ = ctx if err := s.CorruptionError(); err != nil { return "", nil, err } @@ -551,6 +594,9 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen var previous *types.StateV1 var tip string for _, seq := range filtered { + if err := ctx.Err(); err != nil { + return "", nil, err + } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return "", nil, err @@ -669,12 +715,37 @@ func (s *Service) Close() { } } +func (s *Service) SetMetricsSink(sink MetricsSink) { + s.metricsSink = sink +} + func (s *Service) Stats() Stats { + ingested := atomic.LoadUint64(&s.stats.IngestedOps) + opsPerSec := 0.0 + if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 { + opsPerSec = float64(ingested) / elapsed + } return Stats{ - IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps), - Errors: atomic.LoadUint64(&s.stats.Errors), - LastSeq: atomic.LoadUint64(&s.stats.LastSeq), + IngestedOps: ingested, + Errors: atomic.LoadUint64(&s.stats.Errors), + LastSeq: atomic.LoadUint64(&s.stats.LastSeq), + VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures), + LagOps: atomic.LoadUint64(&s.stats.LagOps), + DIDCount: atomic.LoadUint64(&s.stats.DIDCount), + CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), + IngestOpsPerSec: opsPerSec, + } +} + +func countStates(store storage.Store) (uint64, error) { + var count uint64 + if err := store.ForEachState(func(types.StateV1) error { + count++ + return nil + }); err != nil { + return 0, err } + return count, nil } type appendResult struct { diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go index d7bd1b6..e01dd4e 100644 --- a/internal/ingest/service_integration_test.go +++ b/internal/ingest/service_integration_test.go @@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord { out = append(out, rec1, rec2, rec3) return out } + +func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) { + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data dir: %v", err) + } + + keySeed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(keySeed); err != nil { + t.Fatalf("rand seed: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil { + t.Fatalf("write mirror key: %v", err) + } + + records := buildSignedRecords(t) + sourcePath := filepath.Join(tmp, "sample.ndjson") + file, err := os.Create(sourcePath) + if err != nil { + t.Fatalf("create source: %v", err) + } + for _, rec := range records { + b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(file, string(b)); err != nil { + t.Fatalf("write source: %v", err) + } + } + file.Close() + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: sourcePath, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: 5, + } + service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath)) + if err := service.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + if err := service.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil { + t.Fatalf("expected cancellation error") + } +} |