aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 14:46:02 -0800
committerFuwn <[email protected]>2026-02-26 14:48:52 -0800
commit0099d621e97b6048971fadb5c71918cc9f2b5a09 (patch)
treea38ba31585200bacd61f453ef7158de7f0aaf7a3 /internal/ingest/service.go
parentInitial commit (diff)
downloadplutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.tar.xz
plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.zip
feat: initial Plutia release — verifiable high-performance PLC mirror (mirror + resolver modes)
Diffstat (limited to 'internal/ingest/service.go')
-rw-r--r--internal/ingest/service.go741
1 files changed, 741 insertions, 0 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
new file mode 100644
index 0000000..8451246
--- /dev/null
+++ b/internal/ingest/service.go
@@ -0,0 +1,741 @@
+package ingest
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "hash/fnv"
+ "log"
+ "os"
+ "os/exec"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/state"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "github.com/Fuwn/plutia/internal/verify"
+)
+
+type Stats struct {
+ IngestedOps uint64 `json:"ingested_ops"`
+ Errors uint64 `json:"errors"`
+ LastSeq uint64 `json:"last_seq"`
+}
+
+type Service struct {
+ cfg config.Config
+ store storage.Store
+ client *Client
+ verifier *verify.Verifier
+ engine *state.Engine
+ checkpoints *checkpoint.Manager
+ blockLog *storage.BlockLog
+ appender *blockAppender
+ stats Stats
+ maxOps uint64
+ runOps uint64
+ corrupted atomic.Bool
+ corruptErr atomic.Value
+}
+
+func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
+ s := &Service{
+ cfg: cfg,
+ store: store,
+ client: client,
+ verifier: verify.New(cfg.VerifyPolicy),
+ engine: state.New(store, cfg.Mode),
+ checkpoints: checkpointMgr,
+ blockLog: blockLog,
+ }
+ if cfg.Mode == config.ModeMirror && blockLog != nil {
+ s.appender = newBlockAppender(blockLog, 1024)
+ if _, err := blockLog.ValidateAndRecover(store); err != nil {
+ s.MarkCorrupted(err)
+ }
+ }
+ return s
+}
+
+func (s *Service) SetMaxOps(max uint64) {
+ s.maxOps = max
+}
+
+func (s *Service) Replay(ctx context.Context) error {
+ for {
+ changed, err := s.RunOnce(ctx)
+ if err != nil {
+ return err
+ }
+ if !changed {
+ return nil
+ }
+ }
+}
+
+func (s *Service) Poll(ctx context.Context) error {
+ ticker := time.NewTicker(s.cfg.PollInterval)
+ defer ticker.Stop()
+ for {
+ if _, err := s.RunOnce(ctx); err != nil {
+ atomic.AddUint64(&s.stats.Errors, 1)
+ }
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ }
+ }
+}
+
+func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if err := s.CorruptionError(); err != nil {
+ return false, err
+ }
+ if s.maxOps > 0 && s.runOps >= s.maxOps {
+ return false, nil
+ }
+ lastSeq, err := s.store.GetGlobalSeq()
+ if err != nil {
+ return false, fmt.Errorf("load global sequence: %w", err)
+ }
+ limit := uint64(0)
+ if s.maxOps > 0 {
+ remaining := s.maxOps - s.runOps
+ if remaining == 0 {
+ return false, nil
+ }
+ limit = remaining
+ }
+ records, err := s.client.FetchExportLimited(ctx, lastSeq, limit)
+ if err != nil {
+ return false, err
+ }
+ if len(records) == 0 {
+ return false, nil
+ }
+ committed, err := s.processRecords(ctx, records)
+ s.runOps += committed
+ if err != nil {
+ atomic.AddUint64(&s.stats.Errors, 1)
+ return committed > 0, err
+ }
+ return true, nil
+}
+
+type verifyTask struct {
+ index int
+ rec types.ExportRecord
+}
+
+type verifyResult struct {
+ index int
+ op types.ParsedOperation
+ state types.StateV1
+ err error
+}
+
+func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) {
+ verified, err := s.verifyRecords(ctx, records)
+ if err != nil {
+ return 0, err
+ }
+ return s.commitVerified(ctx, verified)
+}
+
+func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) {
+ _ = ctx
+ workers := s.cfg.VerifyWorkers
+ if workers < 1 {
+ workers = 1
+ }
+ queues := make([]chan verifyTask, workers)
+ results := make(chan verifyResult, len(records))
+ var wg sync.WaitGroup
+
+ for i := 0; i < workers; i++ {
+ queues[i] = make(chan verifyTask, 64)
+ wg.Add(1)
+ go func(queue <-chan verifyTask) {
+ defer wg.Done()
+ cache := map[string]*types.StateV1{}
+ for task := range queue {
+ op, err := types.ParseOperation(task.rec)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ existing, err := s.loadExistingState(cache, op.DID)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ if err := s.verifier.VerifyOperation(op, existing); err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ next, err := state.ComputeNextState(op, existing)
+ if err != nil {
+ results <- verifyResult{index: task.index, err: err}
+ continue
+ }
+ cache[op.DID] = cloneState(&next)
+ results <- verifyResult{index: task.index, op: op, state: next}
+ }
+ }(queues[i])
+ }
+
+ for idx, rec := range records {
+ worker := didWorker(rec.DID, workers)
+ queues[worker] <- verifyTask{index: idx, rec: rec}
+ }
+ for _, q := range queues {
+ close(q)
+ }
+ wg.Wait()
+ close(results)
+ return collectVerifiedInOrder(len(records), results)
+}
+
+func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) (*types.StateV1, error) {
+ if existing, ok := cache[did]; ok {
+ return cloneState(existing), nil
+ }
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ cache[did] = nil
+ return nil, nil
+ }
+ cache[did] = cloneState(&stateVal)
+ return cloneState(&stateVal), nil
+}
+
+func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyResult, error) {
+ ordered := make([]verifyResult, total)
+ seen := make([]bool, total)
+ firstErr := error(nil)
+ received := 0
+ for r := range results {
+ received++
+ if r.index < 0 || r.index >= total {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index)
+ }
+ continue
+ }
+ if seen[r.index] {
+ if firstErr == nil {
+ firstErr = fmt.Errorf("duplicate verify result for index %d", r.index)
+ }
+ continue
+ }
+ seen[r.index] = true
+ ordered[r.index] = r
+ if r.err != nil && firstErr == nil {
+ firstErr = r.err
+ }
+ }
+ if received != total && firstErr == nil {
+ firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total)
+ }
+ if firstErr != nil {
+ return nil, firstErr
+ }
+ for i := range seen {
+ if !seen[i] {
+ return nil, fmt.Errorf("missing verify result index %d", i)
+ }
+ }
+ return ordered, nil
+}
+
+func didWorker(did string, workers int) int {
+ if workers <= 1 {
+ return 0
+ }
+ hasher := fnv.New32a()
+ _, _ = hasher.Write([]byte(did))
+ return int(hasher.Sum32() % uint32(workers))
+}
+
+func cloneState(in *types.StateV1) *types.StateV1 {
+ if in == nil {
+ return nil
+ }
+ out := *in
+ out.DIDDocument = append([]byte(nil), in.DIDDocument...)
+ out.RotationKeys = append([]string(nil), in.RotationKeys...)
+ return &out
+}
+
+func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) {
+ _ = ctx
+ batchSize := s.cfg.CommitBatchSize
+ if batchSize < 1 {
+ batchSize = 1
+ }
+ pendingOps := make([]storage.OperationMutation, 0, batchSize)
+ pendingSeqs := make([]uint64, 0, batchSize)
+ pendingBlockHashes := map[uint64]string{}
+ var committed uint64
+
+ commit := func() error {
+ if len(pendingOps) == 0 {
+ return nil
+ }
+ if s.cfg.Mode == config.ModeMirror {
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return err
+ }
+ if flush != nil {
+ pendingBlockHashes[flush.BlockID] = flush.Hash
+ }
+ }
+ blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes))
+ for id, hash := range pendingBlockHashes {
+ blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash})
+ }
+ sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID })
+
+ if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil {
+ return err
+ }
+ lastSeq := pendingSeqs[len(pendingSeqs)-1]
+ atomic.StoreUint64(&s.stats.LastSeq, lastSeq)
+ atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps)))
+ committed += uint64(len(pendingOps))
+
+ if s.checkpoints != nil {
+ for _, seq := range pendingSeqs {
+ if seq%s.cfg.CheckpointInterval == 0 {
+ if err := s.createCheckpoint(seq); err != nil {
+ return err
+ }
+ }
+ }
+ }
+
+ pendingOps = pendingOps[:0]
+ pendingSeqs = pendingSeqs[:0]
+ clear(pendingBlockHashes)
+ return nil
+ }
+
+ for _, v := range verified {
+ var ref *types.BlockRefV1
+ if s.cfg.Mode == config.ModeMirror {
+ if s.appender == nil {
+ return committed, fmt.Errorf("mirror mode requires block appender")
+ }
+ result, err := s.appender.Append(v.op)
+ if err != nil {
+ return committed, err
+ }
+ if result.Flush != nil {
+ pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash
+ }
+ result.Ref.Received = time.Now().UTC().Format(time.RFC3339)
+ ref = &result.Ref
+ }
+ pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref})
+ pendingSeqs = append(pendingSeqs, v.op.Sequence)
+ if len(pendingOps) >= batchSize {
+ if err := commit(); err != nil {
+ return committed, err
+ }
+ }
+ }
+ if err := commit(); err != nil {
+ return committed, err
+ }
+ return committed, nil
+}
+
+func (s *Service) createCheckpoint(sequence uint64) error {
+ blocks, err := s.store.ListBlockHashes()
+ if err != nil {
+ return fmt.Errorf("list blocks for checkpoint: %w", err)
+ }
+ done := make(chan struct{})
+ usageCh := make(chan checkpointProcessUsage, 1)
+ go sampleCheckpointProcessUsage(done, usageCh)
+
+ cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks)
+ close(done)
+ usage := <-usageCh
+ if err != nil {
+ return fmt.Errorf("build checkpoint: %w", err)
+ }
+ log.Printf(
+ "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d",
+ cp.Sequence,
+ metrics.DIDCount,
+ metrics.MerkleCompute.Milliseconds(),
+ metrics.Total.Milliseconds(),
+ usage.CPUPercentAvg,
+ float64(usage.RSSPeakKB)/1024.0,
+ time.Now().UnixMilli(),
+ )
+ return nil
+}
+
+func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) {
+ if s.appender != nil {
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ if flush != nil {
+ if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
+ return types.CheckpointV1{}, err
+ }
+ }
+ }
+ _ = ctx
+ seq, err := s.store.GetGlobalSeq()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ blocks, err := s.store.ListBlockHashes()
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks)
+ if err != nil {
+ return types.CheckpointV1{}, err
+ }
+ return cp, nil
+}
+
+type checkpointProcessUsage struct {
+ CPUPercentAvg float64
+ RSSPeakKB int64
+}
+
+func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) {
+ pid := os.Getpid()
+ ticker := time.NewTicker(25 * time.Millisecond)
+ defer ticker.Stop()
+ var cpuSum float64
+ var samples int
+ var rssPeak int64
+ sample := func() {
+ cpu, rss, err := processMetrics(pid)
+ if err != nil {
+ return
+ }
+ cpuSum += cpu
+ samples++
+ if rss > rssPeak {
+ rssPeak = rss
+ }
+ }
+ sample()
+ for {
+ select {
+ case <-done:
+ sample()
+ avg := 0.0
+ if samples > 0 {
+ avg = cpuSum / float64(samples)
+ }
+ out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak}
+ return
+ case <-ticker.C:
+ sample()
+ }
+ }
+}
+
+func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
+ out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+ if err != nil {
+ return 0, 0, err
+ }
+ fields := strings.Fields(string(out))
+ if len(fields) < 2 {
+ return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
+ }
+ cpuPct, _ = strconv.ParseFloat(fields[0], 64)
+ rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+ return cpuPct, rssKB, nil
+}
+
+func (s *Service) VerifyDID(ctx context.Context, did string) error {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return err
+ }
+ if s.cfg.Mode != config.ModeMirror {
+ _, ok, err := s.store.GetState(did)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return fmt.Errorf("did not found")
+ }
+ return nil
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return err
+ }
+ if len(seqs) == 0 {
+ return fmt.Errorf("no operations for did %s", did)
+ }
+
+ var previous *types.StateV1
+ for _, seq := range seqs {
+ ref, ok, err := s.store.GetOpSeqRef(seq)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return fmt.Errorf("missing op reference for seq %d", seq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return err
+ }
+ rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}
+ op, err := types.ParseOperation(rec)
+ if err != nil {
+ return err
+ }
+ if err := s.verifier.VerifyOperation(op, previous); err != nil {
+ return fmt.Errorf("verify seq %d failed: %w", seq, err)
+ }
+ s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq}
+ previous = &s
+ }
+ return nil
+}
+
+func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return "", nil, err
+ }
+ if s.cfg.Mode != config.ModeMirror {
+ return "", nil, errors.New("historical proof requires mirror mode")
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return "", nil, err
+ }
+ if len(seqs) == 0 {
+ return "", nil, fmt.Errorf("no operations for did %s", did)
+ }
+ filtered := make([]uint64, 0, len(seqs))
+ for _, seq := range seqs {
+ if seq <= sequence {
+ filtered = append(filtered, seq)
+ }
+ }
+ if len(filtered) == 0 {
+ return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence)
+ }
+ sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] })
+
+ var previous *types.StateV1
+ var tip string
+ for _, seq := range filtered {
+ ref, ok, err := s.store.GetOpSeqRef(seq)
+ if err != nil {
+ return "", nil, err
+ }
+ if !ok {
+ return "", nil, fmt.Errorf("missing op reference for seq %d", seq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return "", nil, err
+ }
+ op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload})
+ if err != nil {
+ return "", nil, err
+ }
+ if err := s.verifier.VerifyOperation(op, previous); err != nil {
+ return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err)
+ }
+ tip = op.CID
+ next := types.StateV1{
+ DID: did,
+ ChainTipHash: op.CID,
+ LatestOpSeq: seq,
+ }
+ prior := []string(nil)
+ if previous != nil {
+ prior = append(prior, previous.RotationKeys...)
+ }
+ next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior)
+ previous = &next
+ }
+ return tip, filtered, nil
+}
+
+func (s *Service) Flush(ctx context.Context) error {
+ _ = ctx
+ if err := s.CorruptionError(); err != nil {
+ return err
+ }
+ if s.appender == nil {
+ return nil
+ }
+ flush, err := s.appender.Flush()
+ if err != nil {
+ return err
+ }
+ if flush != nil {
+ if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (s *Service) MarkCorrupted(err error) {
+ if err == nil {
+ return
+ }
+ s.corrupted.Store(true)
+ s.corruptErr.Store(err.Error())
+}
+
+func (s *Service) IsCorrupted() bool {
+ return s.corrupted.Load()
+}
+
+func (s *Service) CorruptionError() error {
+ if !s.IsCorrupted() {
+ return nil
+ }
+ msg, _ := s.corruptErr.Load().(string)
+ if msg == "" {
+ msg = "data corruption detected"
+ }
+ return fmt.Errorf("data corruption detected: %s", msg)
+}
+
+func extractRotationKeysFromPayload(payload map[string]any, prior []string) []string {
+ out := make([]string, 0, len(prior)+4)
+ seen := map[string]struct{}{}
+ add := func(v string) {
+ v = strings.TrimSpace(v)
+ if v == "" {
+ return
+ }
+ if _, ok := seen[v]; ok {
+ return
+ }
+ seen[v] = struct{}{}
+ out = append(out, v)
+ }
+ if arr, ok := payload["rotationKeys"].([]any); ok {
+ for _, v := range arr {
+ if s, ok := v.(string); ok {
+ add(s)
+ }
+ }
+ }
+ if recovery, ok := payload["recoveryKey"].(string); ok {
+ add(recovery)
+ }
+ if signing, ok := payload["signingKey"].(string); ok {
+ add(signing)
+ }
+ if len(out) == 0 {
+ for _, v := range prior {
+ add(v)
+ }
+ }
+ return out
+}
+
+func (s *Service) Close() {
+ if s.appender != nil {
+ s.appender.Close()
+ }
+}
+
+func (s *Service) Stats() Stats {
+ return Stats{
+ IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps),
+ Errors: atomic.LoadUint64(&s.stats.Errors),
+ LastSeq: atomic.LoadUint64(&s.stats.LastSeq),
+ }
+}
+
+type appendResult struct {
+ Ref types.BlockRefV1
+ Flush *storage.FlushInfo
+ Err error
+}
+
+type appendRequest struct {
+ Operation types.ParsedOperation
+ Reply chan appendResult
+ Flush bool
+}
+
+type blockAppender struct {
+ log *storage.BlockLog
+ queue chan appendRequest
+ once sync.Once
+ closed chan struct{}
+}
+
+func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender {
+ ba := &blockAppender{
+ log: log,
+ queue: make(chan appendRequest, buffer),
+ closed: make(chan struct{}),
+ }
+ go ba.run()
+ return ba
+}
+
+func (b *blockAppender) run() {
+ defer close(b.closed)
+ for req := range b.queue {
+ if req.Flush {
+ flush, err := b.log.Flush()
+ req.Reply <- appendResult{Flush: flush, Err: err}
+ continue
+ }
+ ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes)
+ req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err}
+ }
+}
+
+func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) {
+ reply := make(chan appendResult, 1)
+ b.queue <- appendRequest{Operation: op, Reply: reply}
+ res := <-reply
+ return res, res.Err
+}
+
+func (b *blockAppender) Flush() (*storage.FlushInfo, error) {
+ reply := make(chan appendResult, 1)
+ b.queue <- appendRequest{Flush: true, Reply: reply}
+ res := <-reply
+ return res.Flush, res.Err
+}
+
+func (b *blockAppender) Close() {
+ b.once.Do(func() {
+ close(b.queue)
+ <-b.closed
+ })
+}