diff options
| author | Fuwn <[email protected]> | 2026-02-26 14:46:02 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 14:48:52 -0800 |
| commit | 0099d621e97b6048971fadb5c71918cc9f2b5a09 (patch) | |
| tree | a38ba31585200bacd61f453ef7158de7f0aaf7a3 /internal/ingest/service.go | |
| parent | Initial commit (diff) | |
| download | plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.tar.xz plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.zip | |
feat: initial Plutia release — verifiable high-performance PLC mirror (mirror + resolver modes)
Diffstat (limited to 'internal/ingest/service.go')
| -rw-r--r-- | internal/ingest/service.go | 741 |
1 files changed, 741 insertions, 0 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go new file mode 100644 index 0000000..8451246 --- /dev/null +++ b/internal/ingest/service.go @@ -0,0 +1,741 @@ +package ingest + +import ( + "context" + "errors" + "fmt" + "hash/fnv" + "log" + "os" + "os/exec" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/state" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "github.com/Fuwn/plutia/internal/verify" +) + +type Stats struct { + IngestedOps uint64 `json:"ingested_ops"` + Errors uint64 `json:"errors"` + LastSeq uint64 `json:"last_seq"` +} + +type Service struct { + cfg config.Config + store storage.Store + client *Client + verifier *verify.Verifier + engine *state.Engine + checkpoints *checkpoint.Manager + blockLog *storage.BlockLog + appender *blockAppender + stats Stats + maxOps uint64 + runOps uint64 + corrupted atomic.Bool + corruptErr atomic.Value +} + +func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { + s := &Service{ + cfg: cfg, + store: store, + client: client, + verifier: verify.New(cfg.VerifyPolicy), + engine: state.New(store, cfg.Mode), + checkpoints: checkpointMgr, + blockLog: blockLog, + } + if cfg.Mode == config.ModeMirror && blockLog != nil { + s.appender = newBlockAppender(blockLog, 1024) + if _, err := blockLog.ValidateAndRecover(store); err != nil { + s.MarkCorrupted(err) + } + } + return s +} + +func (s *Service) SetMaxOps(max uint64) { + s.maxOps = max +} + +func (s *Service) Replay(ctx context.Context) error { + for { + changed, err := s.RunOnce(ctx) + if err != nil { + return err + } + if !changed { + return nil + } + } +} + +func (s *Service) Poll(ctx context.Context) error { + ticker := time.NewTicker(s.cfg.PollInterval) + defer ticker.Stop() + for { + if _, err := s.RunOnce(ctx); err != nil { + atomic.AddUint64(&s.stats.Errors, 1) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + } + } +} + +func (s *Service) RunOnce(ctx context.Context) (bool, error) { + if err := s.CorruptionError(); err != nil { + return false, err + } + if s.maxOps > 0 && s.runOps >= s.maxOps { + return false, nil + } + lastSeq, err := s.store.GetGlobalSeq() + if err != nil { + return false, fmt.Errorf("load global sequence: %w", err) + } + limit := uint64(0) + if s.maxOps > 0 { + remaining := s.maxOps - s.runOps + if remaining == 0 { + return false, nil + } + limit = remaining + } + records, err := s.client.FetchExportLimited(ctx, lastSeq, limit) + if err != nil { + return false, err + } + if len(records) == 0 { + return false, nil + } + committed, err := s.processRecords(ctx, records) + s.runOps += committed + if err != nil { + atomic.AddUint64(&s.stats.Errors, 1) + return committed > 0, err + } + return true, nil +} + +type verifyTask struct { + index int + rec types.ExportRecord +} + +type verifyResult struct { + index int + op types.ParsedOperation + state types.StateV1 + err error +} + +func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { + verified, err := s.verifyRecords(ctx, records) + if err != nil { + return 0, err + } + return s.commitVerified(ctx, verified) +} + +func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { + _ = ctx + workers := s.cfg.VerifyWorkers + if workers < 1 { + workers = 1 + } + queues := make([]chan verifyTask, workers) + results := make(chan verifyResult, len(records)) + var wg sync.WaitGroup + + for i := 0; i < workers; i++ { + queues[i] = make(chan verifyTask, 64) + wg.Add(1) + go func(queue <-chan verifyTask) { + defer wg.Done() + cache := map[string]*types.StateV1{} + for task := range queue { + op, err := types.ParseOperation(task.rec) + if err != nil { + results <- verifyResult{index: task.index, err: err} + continue + } + existing, err := s.loadExistingState(cache, op.DID) + if err != nil { + results <- verifyResult{index: task.index, err: err} + continue + } + if err := s.verifier.VerifyOperation(op, existing); err != nil { + results <- verifyResult{index: task.index, err: err} + continue + } + next, err := state.ComputeNextState(op, existing) + if err != nil { + results <- verifyResult{index: task.index, err: err} + continue + } + cache[op.DID] = cloneState(&next) + results <- verifyResult{index: task.index, op: op, state: next} + } + }(queues[i]) + } + + for idx, rec := range records { + worker := didWorker(rec.DID, workers) + queues[worker] <- verifyTask{index: idx, rec: rec} + } + for _, q := range queues { + close(q) + } + wg.Wait() + close(results) + return collectVerifiedInOrder(len(records), results) +} + +func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) (*types.StateV1, error) { + if existing, ok := cache[did]; ok { + return cloneState(existing), nil + } + stateVal, ok, err := s.store.GetState(did) + if err != nil { + return nil, err + } + if !ok { + cache[did] = nil + return nil, nil + } + cache[did] = cloneState(&stateVal) + return cloneState(&stateVal), nil +} + +func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyResult, error) { + ordered := make([]verifyResult, total) + seen := make([]bool, total) + firstErr := error(nil) + received := 0 + for r := range results { + received++ + if r.index < 0 || r.index >= total { + if firstErr == nil { + firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index) + } + continue + } + if seen[r.index] { + if firstErr == nil { + firstErr = fmt.Errorf("duplicate verify result for index %d", r.index) + } + continue + } + seen[r.index] = true + ordered[r.index] = r + if r.err != nil && firstErr == nil { + firstErr = r.err + } + } + if received != total && firstErr == nil { + firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total) + } + if firstErr != nil { + return nil, firstErr + } + for i := range seen { + if !seen[i] { + return nil, fmt.Errorf("missing verify result index %d", i) + } + } + return ordered, nil +} + +func didWorker(did string, workers int) int { + if workers <= 1 { + return 0 + } + hasher := fnv.New32a() + _, _ = hasher.Write([]byte(did)) + return int(hasher.Sum32() % uint32(workers)) +} + +func cloneState(in *types.StateV1) *types.StateV1 { + if in == nil { + return nil + } + out := *in + out.DIDDocument = append([]byte(nil), in.DIDDocument...) + out.RotationKeys = append([]string(nil), in.RotationKeys...) + return &out +} + +func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { + _ = ctx + batchSize := s.cfg.CommitBatchSize + if batchSize < 1 { + batchSize = 1 + } + pendingOps := make([]storage.OperationMutation, 0, batchSize) + pendingSeqs := make([]uint64, 0, batchSize) + pendingBlockHashes := map[uint64]string{} + var committed uint64 + + commit := func() error { + if len(pendingOps) == 0 { + return nil + } + if s.cfg.Mode == config.ModeMirror { + flush, err := s.appender.Flush() + if err != nil { + return err + } + if flush != nil { + pendingBlockHashes[flush.BlockID] = flush.Hash + } + } + blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes)) + for id, hash := range pendingBlockHashes { + blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash}) + } + sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID }) + + if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil { + return err + } + lastSeq := pendingSeqs[len(pendingSeqs)-1] + atomic.StoreUint64(&s.stats.LastSeq, lastSeq) + atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) + committed += uint64(len(pendingOps)) + + if s.checkpoints != nil { + for _, seq := range pendingSeqs { + if seq%s.cfg.CheckpointInterval == 0 { + if err := s.createCheckpoint(seq); err != nil { + return err + } + } + } + } + + pendingOps = pendingOps[:0] + pendingSeqs = pendingSeqs[:0] + clear(pendingBlockHashes) + return nil + } + + for _, v := range verified { + var ref *types.BlockRefV1 + if s.cfg.Mode == config.ModeMirror { + if s.appender == nil { + return committed, fmt.Errorf("mirror mode requires block appender") + } + result, err := s.appender.Append(v.op) + if err != nil { + return committed, err + } + if result.Flush != nil { + pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash + } + result.Ref.Received = time.Now().UTC().Format(time.RFC3339) + ref = &result.Ref + } + pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) + pendingSeqs = append(pendingSeqs, v.op.Sequence) + if len(pendingOps) >= batchSize { + if err := commit(); err != nil { + return committed, err + } + } + } + if err := commit(); err != nil { + return committed, err + } + return committed, nil +} + +func (s *Service) createCheckpoint(sequence uint64) error { + blocks, err := s.store.ListBlockHashes() + if err != nil { + return fmt.Errorf("list blocks for checkpoint: %w", err) + } + done := make(chan struct{}) + usageCh := make(chan checkpointProcessUsage, 1) + go sampleCheckpointProcessUsage(done, usageCh) + + cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks) + close(done) + usage := <-usageCh + if err != nil { + return fmt.Errorf("build checkpoint: %w", err) + } + log.Printf( + "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d", + cp.Sequence, + metrics.DIDCount, + metrics.MerkleCompute.Milliseconds(), + metrics.Total.Milliseconds(), + usage.CPUPercentAvg, + float64(usage.RSSPeakKB)/1024.0, + time.Now().UnixMilli(), + ) + return nil +} + +func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { + if s.appender != nil { + flush, err := s.appender.Flush() + if err != nil { + return types.CheckpointV1{}, err + } + if flush != nil { + if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { + return types.CheckpointV1{}, err + } + } + } + _ = ctx + seq, err := s.store.GetGlobalSeq() + if err != nil { + return types.CheckpointV1{}, err + } + blocks, err := s.store.ListBlockHashes() + if err != nil { + return types.CheckpointV1{}, err + } + cp, err := s.checkpoints.BuildAndStoreFromStore(seq, blocks) + if err != nil { + return types.CheckpointV1{}, err + } + return cp, nil +} + +type checkpointProcessUsage struct { + CPUPercentAvg float64 + RSSPeakKB int64 +} + +func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) { + pid := os.Getpid() + ticker := time.NewTicker(25 * time.Millisecond) + defer ticker.Stop() + var cpuSum float64 + var samples int + var rssPeak int64 + sample := func() { + cpu, rss, err := processMetrics(pid) + if err != nil { + return + } + cpuSum += cpu + samples++ + if rss > rssPeak { + rssPeak = rss + } + } + sample() + for { + select { + case <-done: + sample() + avg := 0.0 + if samples > 0 { + avg = cpuSum / float64(samples) + } + out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak} + return + case <-ticker.C: + sample() + } + } +} + +func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { + out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() + if err != nil { + return 0, 0, err + } + fields := strings.Fields(string(out)) + if len(fields) < 2 { + return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) + } + cpuPct, _ = strconv.ParseFloat(fields[0], 64) + rssKB, _ = strconv.ParseInt(fields[1], 10, 64) + return cpuPct, rssKB, nil +} + +func (s *Service) VerifyDID(ctx context.Context, did string) error { + _ = ctx + if err := s.CorruptionError(); err != nil { + return err + } + if s.cfg.Mode != config.ModeMirror { + _, ok, err := s.store.GetState(did) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("did not found") + } + return nil + } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { + return err + } + if len(seqs) == 0 { + return fmt.Errorf("no operations for did %s", did) + } + + var previous *types.StateV1 + for _, seq := range seqs { + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { + return err + } + if !ok { + return fmt.Errorf("missing op reference for seq %d", seq) + } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { + return err + } + rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload} + op, err := types.ParseOperation(rec) + if err != nil { + return err + } + if err := s.verifier.VerifyOperation(op, previous); err != nil { + return fmt.Errorf("verify seq %d failed: %w", seq, err) + } + s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq} + previous = &s + } + return nil +} + +func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) { + _ = ctx + if err := s.CorruptionError(); err != nil { + return "", nil, err + } + if s.cfg.Mode != config.ModeMirror { + return "", nil, errors.New("historical proof requires mirror mode") + } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { + return "", nil, err + } + if len(seqs) == 0 { + return "", nil, fmt.Errorf("no operations for did %s", did) + } + filtered := make([]uint64, 0, len(seqs)) + for _, seq := range seqs { + if seq <= sequence { + filtered = append(filtered, seq) + } + } + if len(filtered) == 0 { + return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence) + } + sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] }) + + var previous *types.StateV1 + var tip string + for _, seq := range filtered { + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { + return "", nil, err + } + if !ok { + return "", nil, fmt.Errorf("missing op reference for seq %d", seq) + } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { + return "", nil, err + } + op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}) + if err != nil { + return "", nil, err + } + if err := s.verifier.VerifyOperation(op, previous); err != nil { + return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err) + } + tip = op.CID + next := types.StateV1{ + DID: did, + ChainTipHash: op.CID, + LatestOpSeq: seq, + } + prior := []string(nil) + if previous != nil { + prior = append(prior, previous.RotationKeys...) + } + next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior) + previous = &next + } + return tip, filtered, nil +} + +func (s *Service) Flush(ctx context.Context) error { + _ = ctx + if err := s.CorruptionError(); err != nil { + return err + } + if s.appender == nil { + return nil + } + flush, err := s.appender.Flush() + if err != nil { + return err + } + if flush != nil { + if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { + return err + } + } + return nil +} + +func (s *Service) MarkCorrupted(err error) { + if err == nil { + return + } + s.corrupted.Store(true) + s.corruptErr.Store(err.Error()) +} + +func (s *Service) IsCorrupted() bool { + return s.corrupted.Load() +} + +func (s *Service) CorruptionError() error { + if !s.IsCorrupted() { + return nil + } + msg, _ := s.corruptErr.Load().(string) + if msg == "" { + msg = "data corruption detected" + } + return fmt.Errorf("data corruption detected: %s", msg) +} + +func extractRotationKeysFromPayload(payload map[string]any, prior []string) []string { + out := make([]string, 0, len(prior)+4) + seen := map[string]struct{}{} + add := func(v string) { + v = strings.TrimSpace(v) + if v == "" { + return + } + if _, ok := seen[v]; ok { + return + } + seen[v] = struct{}{} + out = append(out, v) + } + if arr, ok := payload["rotationKeys"].([]any); ok { + for _, v := range arr { + if s, ok := v.(string); ok { + add(s) + } + } + } + if recovery, ok := payload["recoveryKey"].(string); ok { + add(recovery) + } + if signing, ok := payload["signingKey"].(string); ok { + add(signing) + } + if len(out) == 0 { + for _, v := range prior { + add(v) + } + } + return out +} + +func (s *Service) Close() { + if s.appender != nil { + s.appender.Close() + } +} + +func (s *Service) Stats() Stats { + return Stats{ + IngestedOps: atomic.LoadUint64(&s.stats.IngestedOps), + Errors: atomic.LoadUint64(&s.stats.Errors), + LastSeq: atomic.LoadUint64(&s.stats.LastSeq), + } +} + +type appendResult struct { + Ref types.BlockRefV1 + Flush *storage.FlushInfo + Err error +} + +type appendRequest struct { + Operation types.ParsedOperation + Reply chan appendResult + Flush bool +} + +type blockAppender struct { + log *storage.BlockLog + queue chan appendRequest + once sync.Once + closed chan struct{} +} + +func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender { + ba := &blockAppender{ + log: log, + queue: make(chan appendRequest, buffer), + closed: make(chan struct{}), + } + go ba.run() + return ba +} + +func (b *blockAppender) run() { + defer close(b.closed) + for req := range b.queue { + if req.Flush { + flush, err := b.log.Flush() + req.Reply <- appendResult{Flush: flush, Err: err} + continue + } + ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes) + req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err} + } +} + +func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) { + reply := make(chan appendResult, 1) + b.queue <- appendRequest{Operation: op, Reply: reply} + res := <-reply + return res, res.Err +} + +func (b *blockAppender) Flush() (*storage.FlushInfo, error) { + reply := make(chan appendResult, 1) + b.queue <- appendRequest{Flush: true, Reply: reply} + res := <-reply + return res.Flush, res.Err +} + +func (b *blockAppender) Close() { + b.once.Do(func() { + close(b.queue) + <-b.closed + }) +} |