aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/ingest/service.go')
-rw-r--r--internal/ingest/service.go99
1 files changed, 85 insertions, 14 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 8451246..817a8bd 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -24,9 +24,14 @@ import (
)
type Stats struct {
- IngestedOps uint64 `json:"ingested_ops"`
- Errors uint64 `json:"errors"`
- LastSeq uint64 `json:"last_seq"`
+ IngestedOps uint64 `json:"ingested_ops"`
+ Errors uint64 `json:"errors"`
+ LastSeq uint64 `json:"last_seq"`
+ VerifyFailures uint64 `json:"verify_failures"`
+ LagOps uint64 `json:"lag_ops"`
+ DIDCount uint64 `json:"did_count"`
+ CheckpointSeq uint64 `json:"checkpoint_sequence"`
+ IngestOpsPerSec float64 `json:"ingest_ops_per_second"`
}
type Service struct {
@@ -43,6 +48,12 @@ type Service struct {
runOps uint64
corrupted atomic.Bool
corruptErr atomic.Value
+ startedAt time.Time
+ metricsSink MetricsSink
+}
+
+type MetricsSink interface {
+ ObserveCheckpoint(duration time.Duration, sequence uint64)
}
func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
@@ -54,6 +65,13 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
engine: state.New(store, cfg.Mode),
checkpoints: checkpointMgr,
blockLog: blockLog,
+ startedAt: time.Now(),
+ }
+ if didCount, err := countStates(store); err == nil {
+ atomic.StoreUint64(&s.stats.DIDCount, didCount)
+ }
+ if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
}
if cfg.Mode == config.ModeMirror && blockLog != nil {
s.appender = newBlockAppender(blockLog, 1024)
@@ -119,14 +137,22 @@ func (s *Service) RunOnce(ctx context.Context) (bool, error) {
return false, err
}
if len(records) == 0 {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
return false, nil
}
+ latestSourceSeq := records[len(records)-1].Seq
committed, err := s.processRecords(ctx, records)
s.runOps += committed
if err != nil {
atomic.AddUint64(&s.stats.Errors, 1)
return committed > 0, err
}
+ lastCommitted := atomic.LoadUint64(&s.stats.LastSeq)
+ if latestSourceSeq > lastCommitted {
+ atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted)
+ } else {
+ atomic.StoreUint64(&s.stats.LagOps, 0)
+ }
return true, nil
}
@@ -136,10 +162,11 @@ type verifyTask struct {
}
type verifyResult struct {
- index int
- op types.ParsedOperation
- state types.StateV1
- err error
+ index int
+ op types.ParsedOperation
+ state types.StateV1
+ newDID bool
+ err error
}
func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
@@ -178,6 +205,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
if err := s.verifier.VerifyOperation(op, existing); err != nil {
+ atomic.AddUint64(&s.stats.VerifyFailures, 1)
results <- verifyResult{index: task.index, err: err}
continue
}
@@ -187,7 +215,7 @@ func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecor
continue
}
cache[op.DID] = cloneState(&next)
- results <- verifyResult{index: task.index, op: op, state: next}
+ results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil}
}
}(queues[i])
}
@@ -287,6 +315,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps := make([]storage.OperationMutation, 0, batchSize)
pendingSeqs := make([]uint64, 0, batchSize)
pendingBlockHashes := map[uint64]string{}
+ pendingNewDIDs := map[string]struct{}{}
var committed uint64
commit := func() error {
@@ -314,6 +343,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
lastSeq := pendingSeqs[len(pendingSeqs)-1]
atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
+ atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs)))
committed += uint64(len(pendingOps))
if s.checkpoints != nil {
@@ -329,6 +359,7 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
pendingOps = pendingOps[:0]
pendingSeqs = pendingSeqs[:0]
clear(pendingBlockHashes)
+ clear(pendingNewDIDs)
return nil
}
@@ -350,6 +381,9 @@ func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (
}
pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
pendingSeqs = append(pendingSeqs, v.op.Sequence)
+ if v.newDID {
+ pendingNewDIDs[v.op.DID] = struct{}{}
+ }
if len(pendingOps) >= batchSize {
if err := commit(); err != nil {
return committed, err
@@ -387,6 +421,10 @@ func (s *Service) createCheckpoint(sequence uint64) error {
float64(usage.RSSPeakKB)/1024.0,
time.Now().UnixMilli(),
)
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return nil
}
@@ -411,10 +449,14 @@ func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
if err != nil {
return types.CheckpointV1{}, err
}
- cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks)
+ cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks)
if err != nil {
return types.CheckpointV1{}, err
}
+ atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence)
+ if s.metricsSink != nil {
+ s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence)
+ }
return cp, nil
}
@@ -473,7 +515,6 @@ func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
}
func (s *Service) VerifyDID(ctx context.Context, did string) error {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return err
}
@@ -497,6 +538,9 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
var previous *types.StateV1
for _, seq := range seqs {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return err
@@ -523,7 +567,6 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
}
func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) {
- _ = ctx
if err := s.CorruptionError(); err != nil {
return "", nil, err
}
@@ -551,6 +594,9 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
var previous *types.StateV1
var tip string
for _, seq := range filtered {
+ if err := ctx.Err(); err != nil {
+ return "", nil, err
+ }
ref, ok, err := s.store.GetOpSeqRef(seq)
if err != nil {
return "", nil, err
@@ -669,12 +715,37 @@ func (s *Service) Close() {
}
}
+func (s *Service) SetMetricsSink(sink MetricsSink) {
+ s.metricsSink = sink
+}
+
func (s *Service) Stats() Stats {
+ ingested := atomic.LoadUint64(&s.stats.IngestedOps)
+ opsPerSec := 0.0
+ if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 {
+ opsPerSec = float64(ingested) / elapsed
+ }
return Stats{
- IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps),
- Errors: atomic.LoadUint64(&s.stats.Errors),
- LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ IngestedOps: ingested,
+ Errors: atomic.LoadUint64(&s.stats.Errors),
+ LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures),
+ LagOps: atomic.LoadUint64(&s.stats.LagOps),
+ DIDCount: atomic.LoadUint64(&s.stats.DIDCount),
+ CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq),
+ IngestOpsPerSec: opsPerSec,
+ }
+}
+
+func countStates(store storage.Store) (uint64, error) {
+ var count uint64
+ if err := store.ForEachState(func(types.StateV1) error {
+ count++
+ return nil
+ }); err != nil {
+ return 0, err
}
+ return count, nil
}
type appendResult struct {