package ingest import ( "context" "errors" "fmt" "hash/fnv" "log" "os" "os/exec" "sort" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/state" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "github.com/Fuwn/plutia/internal/verify" ) type Stats struct { IngestedOps uint64 `json:"ingested_ops"` Errors uint64 `json:"errors"` LastSeq uint64 `json:"last_seq"` VerifyFailures uint64 `json:"verify_failures"` LagOps uint64 `json:"lag_ops"` DIDCount uint64 `json:"did_count"` CheckpointSeq uint64 `json:"checkpoint_sequence"` IngestOpsPerSec float64 `json:"ingest_ops_per_second"` } type Service struct { cfg config.Config store storage.Store client *Client verifier *verify.Verifier engine *state.Engine checkpoints *checkpoint.Manager blockLog *storage.BlockLog appender *blockAppender stats Stats maxOps uint64 runOps uint64 corrupted atomic.Bool corruptErr atomic.Value startedAt time.Time metricsSink MetricsSink } type MetricsSink interface { ObserveCheckpoint(duration time.Duration, sequence uint64) } func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { s := &Service{ cfg: cfg, store: store, client: client, verifier: verify.New(cfg.VerifyPolicy), engine: state.New(store, cfg.Mode), checkpoints: checkpointMgr, blockLog: blockLog, startedAt: time.Now(), } if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) } if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) } if cfg.Mode == config.ModeMirror && blockLog != nil { s.appender = newBlockAppender(blockLog, 1024) if _, err := blockLog.ValidateAndRecover(store); err != nil { s.MarkCorrupted(err) } } return s } func (s *Service) SetMaxOps(max uint64) { s.maxOps = max } func (s *Service) Replay(ctx context.Context) error { for { changed, err := s.RunOnce(ctx) if err != nil { return err } if !changed { return nil } } } func (s *Service) Poll(ctx context.Context) error { ticker := time.NewTicker(s.cfg.PollInterval) defer ticker.Stop() for { if _, err := s.RunOnce(ctx); err != nil { atomic.AddUint64(&s.stats.Errors, 1) } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } } func (s *Service) RunOnce(ctx context.Context) (bool, error) { if err := s.CorruptionError(); err != nil { return false, err } if s.maxOps > 0 && s.runOps >= s.maxOps { return false, nil } lastSeq, err := s.store.GetGlobalSeq() if err != nil { return false, fmt.Errorf("load global sequence: %w", err) } limit := uint64(0) if s.maxOps > 0 { remaining := s.maxOps - s.runOps if remaining == 0 { return false, nil } limit = remaining } records, err := s.client.FetchExportLimited(ctx, lastSeq, limit) if err != nil { return false, err } if len(records) == 0 { atomic.StoreUint64(&s.stats.LagOps, 0) return false, nil } latestSourceSeq := records[len(records)-1].Seq committed, err := s.processRecords(ctx, records) s.runOps += committed if err != nil { atomic.AddUint64(&s.stats.Errors, 1) return committed > 0, err } lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) if latestSourceSeq > lastCommitted { atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) } else { atomic.StoreUint64(&s.stats.LagOps, 0) } return true, nil } type verifyTask struct { index int rec types.ExportRecord } type verifyResult struct { index int op types.ParsedOperation state types.StateV1 newDID bool err error } func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { verified, err := s.verifyRecords(ctx, records) if err != nil { return 0, err } return s.commitVerified(ctx, verified) } func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { _ = ctx workers := s.cfg.VerifyWorkers if workers < 1 { workers = 1 } queues := make([]chan verifyTask, workers) results := make(chan verifyResult, len(records)) var wg sync.WaitGroup for i := 0; i < workers; i++ { queues[i] = make(chan verifyTask, 64) wg.Add(1) go func(queue <-chan verifyTask) { defer wg.Done() cache := map[string]*types.StateV1{} for task := range queue { op, err := types.ParseOperation(task.rec) if err != nil { results <- verifyResult{index: task.index, err: err} continue } existing, err := s.loadExistingState(cache, op.DID) if err != nil { results <- verifyResult{index: task.index, err: err} continue } if err := s.verifier.VerifyOperation(op, existing); err != nil { atomic.AddUint64(&s.stats.VerifyFailures, 1) results <- verifyResult{index: task.index, err: err} continue } next, err := state.ComputeNextState(op, existing) if err != nil { results <- verifyResult{index: task.index, err: err} continue } cache[op.DID] = cloneState(&next) results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil} } }(queues[i]) } for idx, rec := range records { worker := didWorker(rec.DID, workers) queues[worker] <- verifyTask{index: idx, rec: rec} } for _, q := range queues { close(q) } wg.Wait() close(results) return collectVerifiedInOrder(len(records), results) } func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) (*types.StateV1, error) { if existing, ok := cache[did]; ok { return cloneState(existing), nil } stateVal, ok, err := s.store.GetState(did) if err != nil { return nil, err } if !ok { cache[did] = nil return nil, nil } cache[did] = cloneState(&stateVal) return cloneState(&stateVal), nil } func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyResult, error) { ordered := make([]verifyResult, total) seen := make([]bool, total) firstErr := error(nil) received := 0 for r := range results { received++ if r.index < 0 || r.index >= total { if firstErr == nil { firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index) } continue } if seen[r.index] { if firstErr == nil { firstErr = fmt.Errorf("duplicate verify result for index %d", r.index) } continue } seen[r.index] = true ordered[r.index] = r if r.err != nil && firstErr == nil { firstErr = r.err } } if received != total && firstErr == nil { firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total) } if firstErr != nil { return nil, firstErr } for i := range seen { if !seen[i] { return nil, fmt.Errorf("missing verify result index %d", i) } } return ordered, nil } func didWorker(did string, workers int) int { if workers <= 1 { return 0 } hasher := fnv.New32a() _, _ = hasher.Write([]byte(did)) return int(hasher.Sum32() % uint32(workers)) } func cloneState(in *types.StateV1) *types.StateV1 { if in == nil { return nil } out := *in out.DIDDocument = append([]byte(nil), in.DIDDocument...) out.RotationKeys = append([]string(nil), in.RotationKeys...) return &out } func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { _ = ctx batchSize := s.cfg.CommitBatchSize if batchSize < 1 { batchSize = 1 } pendingOps := make([]storage.OperationMutation, 0, batchSize) pendingSeqs := make([]uint64, 0, batchSize) pendingBlockHashes := map[uint64]string{} pendingNewDIDs := map[string]struct{}{} var committed uint64 commit := func() error { if len(pendingOps) == 0 { return nil } if s.cfg.Mode == config.ModeMirror { flush, err := s.appender.Flush() if err != nil { return err } if flush != nil { pendingBlockHashes[flush.BlockID] = flush.Hash } } blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes)) for id, hash := range pendingBlockHashes { blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash}) } sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID }) if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil { return err } lastSeq := pendingSeqs[len(pendingSeqs)-1] atomic.StoreUint64(&s.stats.LastSeq, lastSeq) atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs))) committed += uint64(len(pendingOps)) if s.checkpoints != nil { for _, seq := range pendingSeqs { if seq%s.cfg.CheckpointInterval == 0 { if err := s.createCheckpoint(seq); err != nil { return err } } } } pendingOps = pendingOps[:0] pendingSeqs = pendingSeqs[:0] clear(pendingBlockHashes) clear(pendingNewDIDs) return nil } for _, v := range verified { var ref *types.BlockRefV1 if s.cfg.Mode == config.ModeMirror { if s.appender == nil { return committed, fmt.Errorf("mirror mode requires block appender") } result, err := s.appender.Append(v.op) if err != nil { return committed, err } if result.Flush != nil { pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash } result.Ref.Received = time.Now().UTC().Format(time.RFC3339) ref = &result.Ref } pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) pendingSeqs = append(pendingSeqs, v.op.Sequence) if v.newDID { pendingNewDIDs[v.op.DID] = struct{}{} } if len(pendingOps) >= batchSize { if err := commit(); err != nil { return committed, err } } } if err := commit(); err != nil { return committed, err } return committed, nil } func (s *Service) createCheckpoint(sequence uint64) error { blocks, err := s.store.ListBlockHashes() if err != nil { return fmt.Errorf("list blocks for checkpoint: %w", err) } done := make(chan struct{}) usageCh := make(chan checkpointProcessUsage, 1) go sampleCheckpointProcessUsage(done, usageCh) cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks) close(done) usage := <-usageCh if err != nil { return fmt.Errorf("build checkpoint: %w", err) } log.Printf( "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d", cp.Sequence, metrics.DIDCount, metrics.MerkleCompute.Milliseconds(), metrics.Total.Milliseconds(), usage.CPUPercentAvg, float64(usage.RSSPeakKB)/1024.0, time.Now().UnixMilli(), ) atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } return nil } func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { if s.appender != nil { flush, err := s.appender.Flush() if err != nil { return types.CheckpointV1{}, err } if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return types.CheckpointV1{}, err } } } _ = ctx seq, err := s.store.GetGlobalSeq() if err != nil { return types.CheckpointV1{}, err } blocks, err := s.store.ListBlockHashes() if err != nil { return types.CheckpointV1{}, err } cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks) if err != nil { return types.CheckpointV1{}, err } atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } return cp, nil } type checkpointProcessUsage struct { CPUPercentAvg float64 RSSPeakKB int64 } func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) { pid := os.Getpid() ticker := time.NewTicker(25 * time.Millisecond) defer ticker.Stop() var cpuSum float64 var samples int var rssPeak int64 sample := func() { cpu, rss, err := processMetrics(pid) if err != nil { return } cpuSum += cpu samples++ if rss > rssPeak { rssPeak = rss } } sample() for { select { case <-done: sample() avg := 0.0 if samples > 0 { avg = cpuSum / float64(samples) } out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak} return case <-ticker.C: sample() } } } func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() if err != nil { return 0, 0, err } fields := strings.Fields(string(out)) if len(fields) < 2 { return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) } cpuPct, _ = strconv.ParseFloat(fields[0], 64) rssKB, _ = strconv.ParseInt(fields[1], 10, 64) return cpuPct, rssKB, nil } func (s *Service) VerifyDID(ctx context.Context, did string) error { if err := s.CorruptionError(); err != nil { return err } if s.cfg.Mode != config.ModeMirror { _, ok, err := s.store.GetState(did) if err != nil { return err } if !ok { return fmt.Errorf("did not found") } return nil } seqs, err := s.store.ListDIDSequences(did) if err != nil { return err } if len(seqs) == 0 { return fmt.Errorf("no operations for did %s", did) } var previous *types.StateV1 for _, seq := range seqs { if err := ctx.Err(); err != nil { return err } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return err } if !ok { return fmt.Errorf("missing op reference for seq %d", seq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return err } rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload} op, err := types.ParseOperation(rec) if err != nil { return err } if err := s.verifier.VerifyOperation(op, previous); err != nil { return fmt.Errorf("verify seq %d failed: %w", seq, err) } s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq} previous = &s } return nil } func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) { if err := s.CorruptionError(); err != nil { return "", nil, err } if s.cfg.Mode != config.ModeMirror { return "", nil, errors.New("historical proof requires mirror mode") } seqs, err := s.store.ListDIDSequences(did) if err != nil { return "", nil, err } if len(seqs) == 0 { return "", nil, fmt.Errorf("no operations for did %s", did) } filtered := make([]uint64, 0, len(seqs)) for _, seq := range seqs { if seq <= sequence { filtered = append(filtered, seq) } } if len(filtered) == 0 { return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence) } sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] }) var previous *types.StateV1 var tip string for _, seq := range filtered { if err := ctx.Err(); err != nil { return "", nil, err } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return "", nil, err } if !ok { return "", nil, fmt.Errorf("missing op reference for seq %d", seq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return "", nil, err } op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}) if err != nil { return "", nil, err } if err := s.verifier.VerifyOperation(op, previous); err != nil { return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err) } tip = op.CID next := types.StateV1{ DID: did, ChainTipHash: op.CID, LatestOpSeq: seq, } prior := []string(nil) if previous != nil { prior = append(prior, previous.RotationKeys...) } next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior) previous = &next } return tip, filtered, nil } func (s *Service) Flush(ctx context.Context) error { _ = ctx if err := s.CorruptionError(); err != nil { return err } if s.appender == nil { return nil } flush, err := s.appender.Flush() if err != nil { return err } if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return err } } return nil } func (s *Service) MarkCorrupted(err error) { if err == nil { return } s.corrupted.Store(true) s.corruptErr.Store(err.Error()) } func (s *Service) IsCorrupted() bool { return s.corrupted.Load() } func (s *Service) CorruptionError() error { if !s.IsCorrupted() { return nil } msg, _ := s.corruptErr.Load().(string) if msg == "" { msg = "data corruption detected" } return fmt.Errorf("data corruption detected: %s", msg) } func extractRotationKeysFromPayload(payload map[string]any, prior []string) []string { out := make([]string, 0, len(prior)+4) seen := map[string]struct{}{} add := func(v string) { v = strings.TrimSpace(v) if v == "" { return } if _, ok := seen[v]; ok { return } seen[v] = struct{}{} out = append(out, v) } if arr, ok := payload["rotationKeys"].([]any); ok { for _, v := range arr { if s, ok := v.(string); ok { add(s) } } } if recovery, ok := payload["recoveryKey"].(string); ok { add(recovery) } if signing, ok := payload["signingKey"].(string); ok { add(signing) } if len(out) == 0 { for _, v := range prior { add(v) } } return out } func (s *Service) Close() { if s.appender != nil { s.appender.Close() } } func (s *Service) SetMetricsSink(sink MetricsSink) { s.metricsSink = sink } func (s *Service) Stats() Stats { ingested := atomic.LoadUint64(&s.stats.IngestedOps) opsPerSec := 0.0 if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 { opsPerSec = float64(ingested) / elapsed } return Stats{ IngestedOps: ingested, Errors: atomic.LoadUint64(&s.stats.Errors), LastSeq: atomic.LoadUint64(&s.stats.LastSeq), VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures), LagOps: atomic.LoadUint64(&s.stats.LagOps), DIDCount: atomic.LoadUint64(&s.stats.DIDCount), CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), IngestOpsPerSec: opsPerSec, } } func countStates(store storage.Store) (uint64, error) { var count uint64 if err := store.ForEachState(func(types.StateV1) error { count++ return nil }); err != nil { return 0, err } return count, nil } type appendResult struct { Ref types.BlockRefV1 Flush *storage.FlushInfo Err error } type appendRequest struct { Operation types.ParsedOperation Reply chan appendResult Flush bool } type blockAppender struct { log *storage.BlockLog queue chan appendRequest once sync.Once closed chan struct{} } func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender { ba := &blockAppender{ log: log, queue: make(chan appendRequest, buffer), closed: make(chan struct{}), } go ba.run() return ba } func (b *blockAppender) run() { defer close(b.closed) for req := range b.queue { if req.Flush { flush, err := b.log.Flush() req.Reply <- appendResult{Flush: flush, Err: err} continue } ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes) req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err} } } func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) { reply := make(chan appendResult, 1) b.queue <- appendRequest{Operation: op, Reply: reply} res := <-reply return res, res.Err } func (b *blockAppender) Flush() (*storage.FlushInfo, error) { reply := make(chan appendResult, 1) b.queue <- appendRequest{Flush: true, Reply: reply} res := <-reply return res.Flush, res.Err } func (b *blockAppender) Close() { b.once.Do(func() { close(b.queue) <-b.closed }) }