aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest')
-rw-r--r--internal/ingest/client.go148
-rw-r--r--internal/ingest/client_retry_test.go63
-rw-r--r--internal/ingest/service.go99
-rw-r--r--internal/ingest/service_integration_test.go69
4 files changed, 360 insertions, 19 deletions
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index 0f9ad43..d25b73f 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -8,11 +8,13 @@ import (
"errors"
"fmt"
"io"
+ "log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
+ "strconv"
"strings"
"time"
@@ -22,9 +24,32 @@ import (
type Client struct {
source string
http *http.Client
+ opts ClientOptions
}
-func NewClient(source string) *Client {
+type ClientOptions struct {
+ MaxAttempts int
+ BaseDelay time.Duration
+ MaxDelay time.Duration
+}
+
+func NewClient(source string, opts ...ClientOptions) *Client {
+ cfg := ClientOptions{
+ MaxAttempts: 8,
+ BaseDelay: 250 * time.Millisecond,
+ MaxDelay: 10 * time.Second,
+ }
+ if len(opts) > 0 {
+ if opts[0].MaxAttempts > 0 {
+ cfg.MaxAttempts = opts[0].MaxAttempts
+ }
+ if opts[0].BaseDelay > 0 {
+ cfg.BaseDelay = opts[0].BaseDelay
+ }
+ if opts[0].MaxDelay > 0 {
+ cfg.MaxDelay = opts[0].MaxDelay
+ }
+ }
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
@@ -37,6 +62,7 @@ func NewClient(source string) *Client {
}
return &Client{
source: strings.TrimRight(source, "/"),
+ opts: cfg,
http: &http.Client{
Timeout: 60 * time.Second,
Transport: transport,
@@ -52,6 +78,7 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if strings.HasPrefix(c.source, "file://") || strings.HasSuffix(c.source, ".ndjson") || strings.HasSuffix(c.source, ".json") {
return c.fetchFromFile(after, limit)
}
+
u, err := url.Parse(c.source)
if err != nil {
return nil, fmt.Errorf("parse plc source: %w", err)
@@ -65,16 +92,127 @@ func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uin
if err != nil {
return nil, fmt.Errorf("new request: %w", err)
}
- resp, err := c.http.Do(req)
+ maxAttempts := c.opts.MaxAttempts
+ if maxAttempts < 1 {
+ maxAttempts = 1
+ }
+ var lastErr error
+ for attempt := 1; attempt <= maxAttempts; attempt++ {
+ records, retryAfter, retryable, err := c.fetchExportOnce(req, limit)
+ if err == nil {
+ return records, nil
+ }
+ lastErr = err
+ if !retryable || attempt == maxAttempts || ctx.Err() != nil {
+ break
+ }
+ delay := retryAfter
+ if delay <= 0 {
+ delay = c.backoffDelay(attempt)
+ }
+ log.Printf("retrying plc export fetch attempt=%d after_seq=%d delay=%s reason=%v", attempt, after, delay, err)
+ timer := time.NewTimer(delay)
+ select {
+ case <-ctx.Done():
+ timer.Stop()
+ return nil, ctx.Err()
+ case <-timer.C:
+ }
+ }
+ return nil, lastErr
+}
+
+type httpStatusError struct {
+ StatusCode int
+ Body string
+}
+
+func (e httpStatusError) Error() string {
+ return fmt.Sprintf("export response %d: %s", e.StatusCode, e.Body)
+}
+
+func (c *Client) fetchExportOnce(req *http.Request, limit uint64) ([]types.ExportRecord, time.Duration, bool, error) {
+ reqClone := req.Clone(req.Context())
+ reqClone.Header.Set("Accept-Encoding", "gzip")
+ resp, err := c.http.Do(reqClone)
if err != nil {
- return nil, fmt.Errorf("fetch export: %w", err)
+ return nil, 0, isTransientNetworkErr(err), fmt.Errorf("fetch export: %w", err)
}
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
b, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
- return nil, fmt.Errorf("export response %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))
+ body := strings.TrimSpace(string(b))
+ retryDelay := parseRetryAfter(resp.Header.Get("Retry-After"))
+ err := httpStatusError{StatusCode: resp.StatusCode, Body: body}
+ return nil, retryDelay, shouldRetryStatus(resp.StatusCode), err
+ }
+ records, err := decodeExportBody(resp.Body, limit)
+ if err != nil {
+ return nil, 0, false, err
+ }
+ return records, 0, false, nil
+}
+
+func (c *Client) backoffDelay(attempt int) time.Duration {
+ if attempt < 1 {
+ attempt = 1
+ }
+ delay := c.opts.BaseDelay
+ if delay <= 0 {
+ delay = 250 * time.Millisecond
+ }
+ maxDelay := c.opts.MaxDelay
+ if maxDelay <= 0 {
+ maxDelay = 10 * time.Second
+ }
+ for i := 1; i < attempt; i++ {
+ delay *= 2
+ if delay >= maxDelay {
+ return maxDelay
+ }
+ }
+ if delay > maxDelay {
+ return maxDelay
+ }
+ return delay
+}
+
+func shouldRetryStatus(status int) bool {
+ switch status {
+ case http.StatusTooManyRequests, http.StatusInternalServerError, http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
+ return true
+ default:
+ return false
+ }
+}
+
+func parseRetryAfter(v string) time.Duration {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return 0
+ }
+ if secs, err := strconv.Atoi(v); err == nil && secs > 0 {
+ return time.Duration(secs) * time.Second
+ }
+ if ts, err := http.ParseTime(v); err == nil {
+ delay := time.Until(ts)
+ if delay > 0 {
+ return delay
+ }
+ }
+ return 0
+}
+
+func isTransientNetworkErr(err error) bool {
+ if err == nil {
+ return false
+ }
+ var netErr net.Error
+ if errors.As(err, &netErr) {
+ return true
}
- return decodeExportBody(resp.Body, limit)
+ return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func decodeExportBody(r io.Reader, limit uint64) ([]types.ExportRecord, error) {
diff --git a/internal/ingest/client_retry_test.go b/internal/ingest/client_retry_test.go
new file mode 100644
index 0000000..a23fceb
--- /dev/null
+++ b/internal/ingest/client_retry_test.go
@@ -0,0 +1,63 @@
+package ingest
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func TestFetchExportLimitedRetries429ThenSucceeds(t *testing.T) {
+ var attempts atomic.Int32
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ n := attempts.Add(1)
+ if n <= 2 {
+ w.Header().Set("Retry-After", "0")
+ http.Error(w, "rate limited", http.StatusTooManyRequests)
+ return
+ }
+ _, _ = fmt.Fprintln(w, `{"seq":1,"did":"did:plc:alice","cid":"cid1","operation":{"x":1}}`)
+ }))
+ defer ts.Close()
+
+ client := NewClient(ts.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ records, err := client.FetchExportLimited(context.Background(), 0, 0)
+ if err != nil {
+ t.Fatalf("fetch export: %v", err)
+ }
+ if len(records) != 1 {
+ t.Fatalf("record count mismatch: got %d want 1", len(records))
+ }
+ if got := attempts.Load(); got != 3 {
+ t.Fatalf("attempt count mismatch: got %d want 3", got)
+ }
+}
+
+func TestFetchExportLimitedDoesNotRetry400(t *testing.T) {
+ var attempts atomic.Int32
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ attempts.Add(1)
+ http.Error(w, "bad request", http.StatusBadRequest)
+ }))
+ defer ts.Close()
+
+ client := NewClient(ts.URL, ClientOptions{
+ MaxAttempts: 5,
+ BaseDelay: time.Millisecond,
+ MaxDelay: 2 * time.Millisecond,
+ })
+ _, err := client.FetchExportLimited(context.Background(), 0, 0)
+ if err == nil {
+ t.Fatalf("expected error for 400 response")
+ }
+ if got := attempts.Load(); got != 1 {
+ t.Fatalf("unexpected retries on 400: got attempts=%d want 1", got)
+ }
+}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 8451246..817a8bd 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -24,9 +24,14 @@ import (
)
type Stats struct {
- IngestedOps uint64 `json:"ingested_ops"`
- Errors uint64 `json:"errors"`
- LastSeq uint64 `json:"last_seq"`
+ IngestedOps uint64 `json:"ingested_ops"`
+ Errors uint64 `json:"errors"`
+ LastSeq uint64 `json:"last_seq"`
+ VerifyFailures uint64 `json:"verify_failures"`
+ LagOps uint64 `json:"lag_ops"`
+ DIDCount uint64 `json:"did_count"`
+ CheckpointSeq uint64 `json:"checkpoint_sequence"`
+ IngestOpsPerSec float64 `json:"ingest_ops_per_second"`
}
type Service struct {
@@ -43,6 +48,12 @@ type Service struct {
runOps uint64
corrupted atomic.Bool
corruptErr atomic.Value
+ startedAt time.Time
+ metricsSink MetricsSink
+}
+
+type MetricsSink interface {
+ ObserveCheckpoint(duration time.Duration, sequence uint64)
}
func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
@@ -54,6 +65,13 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
engine: state.New(store, cfg.Mode),
checkpoints: checkpointMgr,
blockLog: blockLog,
+ startedAt: time.Now(),
+ }
+ if didCount, err := countStates(store); err == nil {
+ atomic.StoreUint64(&s.stats.DIDCount, didCount)
+ }
+ if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
}
if cfg.Mode == config.ModeMirror && blockLog != nil {
s.appender = newBlockAppender(blockLog, 1024)
@@ -119,14 +137,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
return false, err
}
if len(records) == 0 {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
return false, nil
}
+ latestSourceSeq := records[len(records)-1].Seq
committed, err := s.processRecords(ctx, records)
s.runOps += committed
if err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
return committed > 0, err
}
+ lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+ if latestSourceSeq > lastCommitted {
+ atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
+ } else {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
+ }
return true, nil
}
@@ -136,10 +162,11 @@ type verifyTask struct {
}
type verifyResult struct {
- index int
- op types.ParsedOperation
- state types.StateV1
- err error
+ index int
+ op types.ParsedOperation
+ state types.StateV1
+ newDID bool
+ err error
}
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
@@ -178,6 +205,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
if err := s.verifier.VerifyOperation(op, existing); err != nil {
+ atomic.AddUint64(&s.stats.VerifyFailures, 1)
results <- verifyResult{index: task.index, err: err}
continue
}
@@ -187,7 +215,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
cache[op.DID] = cloneState(&next)
- results <- verifyResult{index: task.index, op: op, state: next}
+ results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil}
}
}(queues[i])
}
@@ -287,6 +315,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps := make([]storage.OperationMutation, 0, batchSize)
pendingSeqs := make([]uint64, 0, batchSize)
pendingBlockHashes := map[uint64]string{}
+ pendingNewDIDs := map[string]struct{}{}
var committed uint64
commit := func() error {
@@ -314,6 +343,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
lastSeq := pendingSeqs[len(pendingSeqs)-1]
atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
+ atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs)))
committed += uint64(len(pendingOps))
if s.checkpoints != nil {
@@ -329,6 +359,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps = pendingOps[:0]
pendingSeqs = pendingSeqs[:0]
clear(pendingBlockHashes)
+ clear(pendingNewDIDs)
return nil
}
@@ -350,6 +381,9 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
}
pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
pendingSeqs = append(pendingSeqs, v.op.Sequence)
+ if v.newDID {
+ pendingNewDIDs[v.op.DID] = struct{}{}
+ }
if len(pendingOps) >= batchSize {
if err := commit(); err != nil {
return committed, err
@@ -387,6 +421,10 @@ func (s *Service) createCheckpoint(sequence uint64) error {
float64(usage.RSSPeakKB)/1024.0,
time.Now().UnixMilli(),
)
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return nil
}
@@ -411,10 +449,14 @@ func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
if err != nil {
return types.CheckpointV1{}, err
}
- cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks)
+ cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks)
if err != nil {
return types.CheckpointV1{}, err
}
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return cp, nil
}
@@ -473,7 +515,6 @@ func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
}
func (s *Service) VerifyDID(ctx context.Context, did string) error {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return err
}
@@ -497,6 +538,9 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
var previous *types.StateV1
for _, seq := range seqs {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return err
@@ -523,7 +567,6 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
}
func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return "", nil, err
}
@@ -551,6 +594,9 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
var previous *types.StateV1
var tip string
for _, seq := range filtered {
+ if err := ctx.Err(); err != nil {
+ return "", nil, err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return "", nil, err
@@ -669,12 +715,37 @@ func (s *Service) Close() {
}
}
+func (s *Service) SetMetricsSink(sink MetricsSink) {
+ s.metricsSink = sink
+}
+
func (s *Service) Stats() Stats {
+ ingested := atomic.LoadUint64(&s.stats.IngestedOps)
+ opsPerSec := 0.0
+ if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 {
+ opsPerSec = float64(ingested) / elapsed
+ }
return Stats{
- IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps),
- Errors: atomic.LoadUint64(&s.stats.Errors),
- LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ IngestedOps: ingested,
+ Errors: atomic.LoadUint64(&s.stats.Errors),
+ LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures),
+ LagOps: atomic.LoadUint64(&s.stats.LagOps),
+ DIDCount: atomic.LoadUint64(&s.stats.DIDCount),
+ CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq),
+ IngestOpsPerSec: opsPerSec,
+ }
+}
+
+func countStates(store storage.Store) (uint64, error) {
+ var count uint64
+ if err := store.ForEachState(func(types.StateV1) error {
+ count++
+ return nil
+ }); err != nil {
+ return 0, err
}
+ return count, nil
}
type appendResult struct {
diff --git a/internal/ingest/service_integration_test.go b/internal/ingest/service_integration_test.go
index d7bd1b6..e01dd4e 100644
--- a/internal/ingest/service_integration_test.go
+++ b/internal/ingest/service_integration_test.go
@@ -155,3 +155,72 @@ func buildSignedRecords(t *testing.T) []types.ExportRecord {
out = append(out, rec1, rec2, rec3)
return out
}
+
+func TestRecomputeTipAtOrBeforeHonorsContextCancellation(t *testing.T) {
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data dir: %v", err)
+ }
+
+ keySeed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(keySeed); err != nil {
+ t.Fatalf("rand seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(keySeed)), 0o600); err != nil {
+ t.Fatalf("write mirror key: %v", err)
+ }
+
+ records := buildSignedRecords(t)
+ sourcePath := filepath.Join(tmp, "sample.ndjson")
+ file, err := os.Create(sourcePath)
+ if err != nil {
+ t.Fatalf("create source: %v", err)
+ }
+ for _, rec := range records {
+ b, _ := json.Marshal(rec)
+ if _, err := fmt.Fprintln(file, string(b)); err != nil {
+ t.Fatalf("write source: %v", err)
+ }
+ }
+ file.Close()
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5,
+ }
+ service := NewService(cfg, store, NewClient(sourcePath), bl, checkpoint.NewManager(store, dataDir, keyPath))
+ if err := service.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := service.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ if _, _, err := service.RecomputeTipAtOrBefore(ctx, "did:plc:alice", 2); err == nil {
+ t.Fatalf("expected cancellation error")
+ }
+}