package ingest import ( "context" "encoding/json" "errors" "fmt" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/state" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "github.com/Fuwn/plutia/internal/verify" "hash/fnv" "log" "os" "os/exec" "sort" "strconv" "strings" "sync" "sync/atomic" "time" ) type Stats struct { IngestedOps uint64 `json:"ingested_ops"` Errors uint64 `json:"errors"` LastSeq uint64 `json:"last_seq"` VerifyFailures uint64 `json:"verify_failures"` LagOps uint64 `json:"lag_ops"` DIDCount uint64 `json:"did_count"` CheckpointSeq uint64 `json:"checkpoint_sequence"` IngestOpsPerSec float64 `json:"ingest_ops_per_second"` ThinCacheHits uint64 `json:"thin_cache_hits"` ThinCacheMisses uint64 `json:"thin_cache_misses"` ThinCacheCount uint64 `json:"thin_cache_entries"` ThinEvictions uint64 `json:"thin_cache_evictions"` } type Service struct { cfg config.Config store storage.Store client *Client verifier *verify.Verifier checkpoints *checkpoint.Manager blockLog *storage.BlockLog appender *blockAppender stats Stats maxOps uint64 runOps uint64 corrupted atomic.Bool corruptErr atomic.Value startedAt time.Time metricsSink MetricsSink tracer *replayTracer startupErr error } type MetricsSink interface { ObserveCheckpoint(duration time.Duration, sequence uint64) } var ( ErrDIDNotFound = errors.New("DID not found") ErrHistoryNotStored = errors.New("history not available in current mode") ) func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { s := &Service{ cfg: cfg, store: store, client: client, verifier: verify.New(cfg.VerifyPolicy), checkpoints: checkpointMgr, blockLog: blockLog, startedAt: time.Now(), } if cfg.ReplayTrace { tracer, err := openReplayTracer(cfg.DataDir) if err != nil { s.startupErr = err } else { s.tracer = tracer } } if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) if cfg.Mode == config.ModeThin { atomic.StoreUint64(&s.stats.ThinCacheCount, didCount) } } if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) } if cfg.Mode == config.ModeMirror && blockLog != nil { s.appender = newBlockAppender(blockLog, 1024) if _, err := blockLog.ValidateAndRecover(store); err != nil { s.MarkCorrupted(err) } } return s } func (s *Service) SetMaxOps(max uint64) { s.maxOps = max } func (s *Service) Replay(ctx context.Context) error { for { changed, err := s.RunOnce(ctx) if err != nil { return err } if !changed { return nil } } } func (s *Service) Poll(ctx context.Context) error { if s.cfg.Mode == config.ModeThin { <-ctx.Done() return ctx.Err() } ticker := time.NewTicker(s.cfg.PollInterval) defer ticker.Stop() for { if _, err := s.RunOnce(ctx); err != nil { atomic.AddUint64(&s.stats.Errors, 1) if isFatalIntegrityError(err) { return err } } select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: } } } func (s *Service) RunOnce(ctx context.Context) (bool, error) { if s.cfg.Mode == config.ModeThin { return false, nil } if s.startupErr != nil { return false, s.startupErr } if err := s.CorruptionError(); err != nil { return false, err } if s.maxOps > 0 && s.runOps >= s.maxOps { return false, nil } lastSeq, err := s.store.GetGlobalSeq() if err != nil { return false, fmt.Errorf("load global sequence: %w", err) } limit := uint64(0) if s.maxOps > 0 { remaining := s.maxOps - s.runOps if remaining == 0 { return false, nil } limit = remaining } pageSize := uint64(s.cfg.ExportPageSize) if pageSize == 0 { pageSize = 1000 } batch, err := s.client.FetchExportBatch(ctx, lastSeq, limit, pageSize, s.traceFetchAttempt) if err != nil { return false, err } records, err := s.prepareRecordsForCommit(lastSeq, batch) if err != nil { return false, err } if len(records) == 0 { atomic.StoreUint64(&s.stats.LagOps, 0) return false, nil } latestSourceSeq := records[len(records)-1].Seq committed, err := s.processRecords(ctx, records) s.runOps += committed if err != nil { atomic.AddUint64(&s.stats.Errors, 1) return committed > 0, err } lastCommitted := atomic.LoadUint64(&s.stats.LastSeq) if lastCommitted != lastSeq+committed { err := fatalIntegrityErrorf( "sequence accounting mismatch: previous_global_seq=%d committed=%d new_global_seq=%d", lastSeq, committed, lastCommitted, ) s.traceIntegrityError(err.Error(), map[string]any{ "after": lastSeq, "batch_count": len(records), "last_committed": lastCommitted, }) return committed > 0, err } if latestSourceSeq > lastCommitted { atomic.StoreUint64(&s.stats.LagOps, latestSourceSeq-lastCommitted) } else { atomic.StoreUint64(&s.stats.LagOps, 0) } return true, nil } type verifyTask struct { index int rec types.ExportRecord } type verifyResult struct { index int op types.ParsedOperation state types.StateV1 newDID bool err error } type fatalIntegrityError struct { msg string } func (e fatalIntegrityError) Error() string { return e.msg } func fatalIntegrityErrorf(format string, args ...any) error { return fatalIntegrityError{msg: fmt.Sprintf(format, args...)} } func isFatalIntegrityError(err error) bool { var target fatalIntegrityError return errors.As(err, &target) } func (s *Service) processRecords(ctx context.Context, records []types.ExportRecord) (uint64, error) { verified, err := s.verifyRecords(ctx, records) if err != nil { return 0, err } return s.commitVerified(ctx, verified) } func (s *Service) prepareRecordsForCommit(lastSeq uint64, batch FetchBatch) ([]types.ExportRecord, error) { if len(batch.Records) == 0 { return nil, nil } sorted := append([]types.ExportRecord(nil), batch.Records...) sort.SliceStable(sorted, func(i, j int) bool { if sorted[i].Seq == sorted[j].Seq { if sorted[i].DID == sorted[j].DID { return sorted[i].CID < sorted[j].CID } return sorted[i].DID < sorted[j].DID } return sorted[i].Seq < sorted[j].Seq }) out := make([]types.ExportRecord, 0, len(sorted)) seen := make(map[uint64]struct{}, len(sorted)) expected := lastSeq + 1 for _, rec := range sorted { if rec.Seq == 0 { err := fatalIntegrityErrorf( "export record missing seq: after=%d request=%s did=%s cid=%s created_at=%s", lastSeq, batch.RequestURL, rec.DID, rec.CID, rec.CreatedAt, ) s.traceIntegrityError(err.Error(), map[string]any{ "after": lastSeq, "request_url": batch.RequestURL, "did": rec.DID, "cid": rec.CID, "created_at": rec.CreatedAt, "parsed_records": batch.ParsedRecords, }) return nil, err } if rec.Seq <= lastSeq { s.traceDedupDecision(rec, "already_committed_seq", batch, lastSeq) continue } if _, ok := seen[rec.Seq]; ok { s.traceDedupDecision(rec, "duplicate_seq_in_batch", batch, lastSeq) continue } if rec.Seq != expected { err := fatalIntegrityErrorf( "sequence gap detected: expected=%d got=%d previous_global_seq=%d request=%s first_created_at=%s last_created_at=%s parsed_records=%d skipped_lines=%d truncated_tail=%t", expected, rec.Seq, lastSeq, batch.RequestURL, batch.FirstCreated, batch.LastCreated, batch.ParsedRecords, batch.SkippedLines, batch.TruncatedTail, ) s.traceIntegrityError(err.Error(), map[string]any{ "expected_seq": expected, "got_seq": rec.Seq, "after": lastSeq, "request_url": batch.RequestURL, "count": batch.Count, "first_created_at": batch.FirstCreated, "last_created_at": batch.LastCreated, "parsed_records": batch.ParsedRecords, "skipped_lines": batch.SkippedLines, "truncated_tail": batch.TruncatedTail, }) return nil, err } seen[rec.Seq] = struct{}{} out = append(out, rec) expected++ } return out, nil } func (s *Service) verifyRecords(ctx context.Context, records []types.ExportRecord) ([]verifyResult, error) { _ = ctx workers := s.cfg.VerifyWorkers if workers < 1 { workers = 1 } queues := make([]chan verifyTask, workers) results := make(chan verifyResult, len(records)) var wg sync.WaitGroup for i := 0; i < workers; i++ { queues[i] = make(chan verifyTask, 64) wg.Add(1) go func(queue <-chan verifyTask) { defer wg.Done() cache := map[string]*types.StateV1{} for task := range queue { op, err := types.ParseOperation(task.rec) if err != nil { results <- verifyResult{index: task.index, err: err} continue } existing, err := s.loadExistingState(cache, op.DID) if err != nil { results <- verifyResult{index: task.index, err: err} continue } if err := s.verifier.VerifyOperation(op, existing); err != nil { atomic.AddUint64(&s.stats.VerifyFailures, 1) if strings.Contains(err.Error(), "prev linkage mismatch") { diag := s.buildPrevMismatchDiagnostic(op, existing) s.tracePrevMismatch(diag) results <- verifyResult{index: task.index, err: fatalIntegrityErrorf("%s", diag.Message)} continue } results <- verifyResult{index: task.index, err: err} continue } next, err := state.ComputeNextState(op, existing) if err != nil { results <- verifyResult{index: task.index, err: err} continue } cache[op.DID] = cloneState(&next) results <- verifyResult{index: task.index, op: op, state: next, newDID: existing == nil} } }(queues[i]) } for idx, rec := range records { worker := didWorker(rec.DID, workers) queues[worker] <- verifyTask{index: idx, rec: rec} } for _, q := range queues { close(q) } wg.Wait() close(results) return collectVerifiedInOrder(len(records), results) } func (s *Service) loadExistingState(cache map[string]*types.StateV1, did string) (*types.StateV1, error) { if existing, ok := cache[did]; ok { return cloneState(existing), nil } stateVal, ok, err := s.store.GetState(did) if err != nil { return nil, err } if !ok { cache[did] = nil return nil, nil } cache[did] = cloneState(&stateVal) return cloneState(&stateVal), nil } func collectVerifiedInOrder(total int, results <-chan verifyResult) ([]verifyResult, error) { ordered := make([]verifyResult, total) seen := make([]bool, total) firstErr := error(nil) received := 0 for r := range results { received++ if r.index < 0 || r.index >= total { if firstErr == nil { firstErr = fmt.Errorf("verify worker returned out-of-range index %d", r.index) } continue } if seen[r.index] { if firstErr == nil { firstErr = fmt.Errorf("duplicate verify result for index %d", r.index) } continue } seen[r.index] = true ordered[r.index] = r if r.err != nil && firstErr == nil { firstErr = r.err } } if received != total && firstErr == nil { firstErr = fmt.Errorf("verify result count mismatch: got %d want %d", received, total) } if firstErr != nil { return nil, firstErr } for i := range seen { if !seen[i] { return nil, fmt.Errorf("missing verify result index %d", i) } } return ordered, nil } func didWorker(did string, workers int) int { if workers <= 1 { return 0 } hasher := fnv.New32a() _, _ = hasher.Write([]byte(did)) return int(hasher.Sum32() % uint32(workers)) } func cloneState(in *types.StateV1) *types.StateV1 { if in == nil { return nil } out := *in out.DIDDocument = append([]byte(nil), in.DIDDocument...) out.RotationKeys = append([]string(nil), in.RotationKeys...) return &out } type prevMismatchDiagnostic struct { Message string `json:"message"` DID string `json:"did"` TipCID string `json:"tip_cid"` PrevCID string `json:"prev_cid"` OpCID string `json:"op_cid"` CreatedAt string `json:"created_at"` LastOps []types.BlockRefV1 `json:"last_ops"` } func (s *Service) buildPrevMismatchDiagnostic(op types.ParsedOperation, existing *types.StateV1) prevMismatchDiagnostic { diag := prevMismatchDiagnostic{ DID: op.DID, PrevCID: op.Prev, OpCID: op.CID, CreatedAt: op.RawRecord.CreatedAt, } if existing != nil { diag.TipCID = existing.ChainTipHash } const tail = 8 seqs, err := s.store.ListDIDSequences(op.DID) if err == nil && len(seqs) > 0 { start := len(seqs) - tail if start < 0 { start = 0 } for _, seq := range seqs[start:] { ref, ok, getErr := s.store.GetOpSeqRef(seq) if getErr != nil || !ok { continue } diag.LastOps = append(diag.LastOps, ref) } } diag.Message = fmt.Sprintf( "prev linkage mismatch: did=%s tip=%s op_prev=%s op_cid=%s created_at=%s", diag.DID, diag.TipCID, diag.PrevCID, diag.OpCID, diag.CreatedAt, ) return diag } func (s *Service) traceFetchAttempt(trace FetchAttemptTrace) { if s.tracer == nil { return } s.tracer.Write(replayTraceEvent{ Kind: "export_fetch", RequestURL: trace.URL, After: trace.After, Count: trace.Count, Attempt: trace.Attempt, StatusCode: trace.StatusCode, Retryable: trace.Retryable, RetryAfterMS: trace.RetryAfter.Milliseconds(), ParsedLines: trace.ParsedLines, ParsedRecords: trace.ParsedRecords, SkippedLines: trace.SkippedLines, TruncatedTail: trace.TruncatedTail, FirstCreatedAt: trace.FirstCreated, LastCreatedAt: trace.LastCreated, Error: trace.Error, }) } func (s *Service) traceDedupDecision(rec types.ExportRecord, reason string, batch FetchBatch, after uint64) { if s.tracer == nil { return } s.tracer.Write(replayTraceEvent{ Kind: "dedup_decision", Reason: reason, Sequence: rec.Seq, DID: rec.DID, CID: rec.CID, CreatedAt: rec.CreatedAt, After: after, Count: batch.Count, RequestURL: batch.RequestURL, FirstCreatedAt: batch.FirstCreated, LastCreatedAt: batch.LastCreated, }) } func (s *Service) tracePrevMismatch(diag prevMismatchDiagnostic) { log.Printf("%s", diag.Message) if s.tracer == nil { return } s.tracer.Write(replayTraceEvent{ Kind: "prev_mismatch", DID: diag.DID, Tip: diag.TipCID, Prev: diag.PrevCID, CID: diag.OpCID, CreatedAt: diag.CreatedAt, Message: diag.Message, Details: diag.LastOps, }) } func (s *Service) traceIntegrityError(message string, details map[string]any) { log.Printf("%s", message) if s.tracer == nil { return } s.tracer.Write(replayTraceEvent{ Kind: "integrity_error", Message: message, Details: details, }) } func (s *Service) commitVerified(ctx context.Context, verified []verifyResult) (uint64, error) { _ = ctx batchSize := s.cfg.CommitBatchSize if batchSize < 1 { batchSize = 1 } pendingOps := make([]storage.OperationMutation, 0, batchSize) pendingSeqs := make([]uint64, 0, batchSize) pendingBlockHashes := map[uint64]string{} pendingNewDIDs := map[string]struct{}{} var committed uint64 commit := func() error { if len(pendingOps) == 0 { return nil } if s.cfg.Mode == config.ModeMirror { flush, err := s.appender.Flush() if err != nil { return err } if flush != nil { pendingBlockHashes[flush.BlockID] = flush.Hash } } blockEntries := make([]storage.BlockHashEntry, 0, len(pendingBlockHashes)) for id, hash := range pendingBlockHashes { blockEntries = append(blockEntries, storage.BlockHashEntry{BlockID: id, Hash: hash}) } sort.Slice(blockEntries, func(i, j int) bool { return blockEntries[i].BlockID < blockEntries[j].BlockID }) if err := s.store.ApplyOperationsBatch(pendingOps, blockEntries); err != nil { return err } lastSeq := pendingSeqs[len(pendingSeqs)-1] atomic.StoreUint64(&s.stats.LastSeq, lastSeq) atomic.AddUint64(&s.stats.IngestedOps, uint64(len(pendingOps))) atomic.AddUint64(&s.stats.DIDCount, uint64(len(pendingNewDIDs))) committed += uint64(len(pendingOps)) if s.checkpoints != nil { for _, seq := range pendingSeqs { if seq%s.cfg.CheckpointInterval == 0 { if err := s.createCheckpoint(seq); err != nil { return err } } } } pendingOps = pendingOps[:0] pendingSeqs = pendingSeqs[:0] clear(pendingBlockHashes) clear(pendingNewDIDs) return nil } for _, v := range verified { var ref *types.BlockRefV1 if s.cfg.Mode == config.ModeMirror { if s.appender == nil { return committed, fmt.Errorf("mirror mode requires block appender") } result, err := s.appender.Append(v.op) if err != nil { return committed, err } if result.Flush != nil { pendingBlockHashes[result.Flush.BlockID] = result.Flush.Hash } result.Ref.Received = time.Now().UTC().Format(time.RFC3339) result.Ref.CreatedAt = v.op.RawRecord.CreatedAt ref = &result.Ref } pendingOps = append(pendingOps, storage.OperationMutation{State: v.state, Ref: ref}) pendingSeqs = append(pendingSeqs, v.op.Sequence) if v.newDID { pendingNewDIDs[v.op.DID] = struct{}{} } if len(pendingOps) >= batchSize { if err := commit(); err != nil { return committed, err } } } if err := commit(); err != nil { return committed, err } return committed, nil } func (s *Service) createCheckpoint(sequence uint64) error { blocks, err := s.store.ListBlockHashes() if err != nil { return fmt.Errorf("list blocks for checkpoint: %w", err) } done := make(chan struct{}) usageCh := make(chan checkpointProcessUsage, 1) go sampleCheckpointProcessUsage(done, usageCh) cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(sequence, blocks) close(done) usage := <-usageCh if err != nil { return fmt.Errorf("build checkpoint: %w", err) } log.Printf( "checkpoint_metrics seq=%d did_count=%d merkle_compute_ms=%d total_checkpoint_ms=%d cpu_pct=%.2f rss_peak_mb=%.2f completed_unix_ms=%d", cp.Sequence, metrics.DIDCount, metrics.MerkleCompute.Milliseconds(), metrics.Total.Milliseconds(), usage.CPUPercentAvg, float64(usage.RSSPeakKB)/1024.0, time.Now().UnixMilli(), ) atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } return nil } func (s *Service) Snapshot(ctx context.Context) (types.CheckpointV1, error) { if s.appender != nil { flush, err := s.appender.Flush() if err != nil { return types.CheckpointV1{}, err } if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return types.CheckpointV1{}, err } } } _ = ctx seq, err := s.store.GetGlobalSeq() if err != nil { return types.CheckpointV1{}, err } blocks, err := s.store.ListBlockHashes() if err != nil { return types.CheckpointV1{}, err } cp, metrics, err := s.checkpoints.BuildAndStoreFromStoreWithMetrics(seq, blocks) if err != nil { return types.CheckpointV1{}, err } atomic.StoreUint64(&s.stats.CheckpointSeq, cp.Sequence) if s.metricsSink != nil { s.metricsSink.ObserveCheckpoint(metrics.Total, cp.Sequence) } return cp, nil } type checkpointProcessUsage struct { CPUPercentAvg float64 RSSPeakKB int64 } func sampleCheckpointProcessUsage(done <-chan struct{}, out chan<- checkpointProcessUsage) { pid := os.Getpid() ticker := time.NewTicker(25 * time.Millisecond) defer ticker.Stop() var cpuSum float64 var samples int var rssPeak int64 sample := func() { cpu, rss, err := processMetrics(pid) if err != nil { return } cpuSum += cpu samples++ if rss > rssPeak { rssPeak = rss } } sample() for { select { case <-done: sample() avg := 0.0 if samples > 0 { avg = cpuSum / float64(samples) } out <- checkpointProcessUsage{CPUPercentAvg: avg, RSSPeakKB: rssPeak} return case <-ticker.C: sample() } } } func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() if err != nil { return 0, 0, err } fields := strings.Fields(string(out)) if len(fields) < 2 { return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) } cpuPct, _ = strconv.ParseFloat(fields[0], 64) rssKB, _ = strconv.ParseInt(fields[1], 10, 64) return cpuPct, rssKB, nil } func (s *Service) VerifyDID(ctx context.Context, did string) error { if err := s.CorruptionError(); err != nil { return err } if s.cfg.Mode == config.ModeThin { _, _, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) return err } if s.cfg.Mode != config.ModeMirror { _, ok, err := s.store.GetState(did) if err != nil { return err } if !ok { return fmt.Errorf("DID not found") } return nil } seqs, err := s.store.ListDIDSequences(did) if err != nil { return err } if len(seqs) == 0 { return fmt.Errorf("no operations for did %s", did) } var previous *types.StateV1 for _, seq := range seqs { if err := ctx.Err(); err != nil { return err } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return err } if !ok { return fmt.Errorf("missing op reference for seq %d", seq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return err } rec := types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload} op, err := types.ParseOperation(rec) if err != nil { return err } if err := s.verifier.VerifyOperation(op, previous); err != nil { return fmt.Errorf("verify seq %d failed: %w", seq, err) } s := types.StateV1{DID: did, ChainTipHash: op.CID, LatestOpSeq: seq} previous = &s } return nil } func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequence uint64) (string, []uint64, error) { if err := s.CorruptionError(); err != nil { return "", nil, err } if s.cfg.Mode != config.ModeMirror { return "", nil, errors.New("historical proof requires mirror mode") } seqs, err := s.store.ListDIDSequences(did) if err != nil { return "", nil, err } if len(seqs) == 0 { return "", nil, fmt.Errorf("no operations for did %s", did) } filtered := make([]uint64, 0, len(seqs)) for _, seq := range seqs { if seq <= sequence { filtered = append(filtered, seq) } } if len(filtered) == 0 { return "", nil, fmt.Errorf("no operations for did %s at checkpoint %d", did, sequence) } sort.Slice(filtered, func(i, j int) bool { return filtered[i] < filtered[j] }) var previous *types.StateV1 var tip string for _, seq := range filtered { if err := ctx.Err(); err != nil { return "", nil, err } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return "", nil, err } if !ok { return "", nil, fmt.Errorf("missing op reference for seq %d", seq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return "", nil, err } op, err := types.ParseOperation(types.ExportRecord{Seq: seq, DID: did, CID: ref.CID, Operation: payload}) if err != nil { return "", nil, err } if err := s.verifier.VerifyOperation(op, previous); err != nil { return "", nil, fmt.Errorf("verify seq %d failed: %w", seq, err) } tip = op.CID next := types.StateV1{ DID: did, ChainTipHash: op.CID, LatestOpSeq: seq, } prior := []string(nil) if previous != nil { prior = append(prior, previous.RotationKeys...) } next.RotationKeys = extractRotationKeysFromPayload(op.Payload, prior) previous = &next } return tip, filtered, nil } func (s *Service) ResolveState(ctx context.Context, did string) (types.StateV1, error) { if err := s.CorruptionError(); err != nil { return types.StateV1{}, err } if s.cfg.Mode != config.ModeThin { stateVal, ok, err := s.store.GetState(did) if err != nil { return types.StateV1{}, err } if !ok { return types.StateV1{}, ErrDIDNotFound } return stateVal, nil } return s.resolveThinState(ctx, did) } func (s *Service) resolveThinState(ctx context.Context, did string) (types.StateV1, error) { now := time.Now().UTC() stateVal, stateOK, err := s.store.GetState(did) if err != nil { return types.StateV1{}, err } meta, metaOK, err := s.store.GetThinCacheMeta(did) if err != nil { return types.StateV1{}, err } if stateOK && metaOK && isThinCacheFresh(meta, now, s.cfg.ThinCacheTTL) { atomic.AddUint64(&s.stats.ThinCacheHits, 1) meta.LastAccess = now if err := s.store.PutThinCacheMeta(meta); err != nil { return types.StateV1{}, err } return stateVal, nil } atomic.AddUint64(&s.stats.ThinCacheMisses, 1) verifiedState, _, err := s.refreshThinStateFromUpstream(ctx, did, now) if err != nil { return types.StateV1{}, err } return verifiedState, nil } func (s *Service) refreshThinStateFromUpstream(ctx context.Context, did string, now time.Time) (types.StateV1, []types.ExportRecord, error) { records, err := s.client.FetchDIDLog(ctx, did) if err != nil { return types.StateV1{}, nil, err } stateVal, normalized, err := s.verifyThinChain(ctx, did, records) if err != nil { return types.StateV1{}, nil, err } stateVal.UpdatedAt = now if err := s.store.PutState(stateVal); err != nil { return types.StateV1{}, nil, err } meta := types.ThinCacheMetaV1{ Version: 1, DID: did, LastVerified: now, LastAccess: now, } if err := s.store.PutThinCacheMeta(meta); err != nil { return types.StateV1{}, nil, err } evicted, entries, err := s.enforceThinCacheLimit() if err != nil { return types.StateV1{}, nil, err } if evicted > 0 { atomic.AddUint64(&s.stats.ThinEvictions, evicted) } atomic.StoreUint64(&s.stats.ThinCacheCount, entries) atomic.StoreUint64(&s.stats.DIDCount, entries) return stateVal, normalized, nil } func (s *Service) verifyThinChain(ctx context.Context, did string, records []types.ExportRecord) (types.StateV1, []types.ExportRecord, error) { if len(records) == 0 { return types.StateV1{}, nil, ErrDIDNotFound } verifier := verify.New(config.VerifyFull) var previous *types.StateV1 var lastPayload map[string]any normalized := make([]types.ExportRecord, 0, len(records)) for i, rec := range records { if err := ctx.Err(); err != nil { return types.StateV1{}, nil, err } rec.Seq = uint64(i + 1) if rec.DID == "" { rec.DID = did } if rec.DID != did { return types.StateV1{}, nil, fmt.Errorf("unexpected DID in log: got %s want %s", rec.DID, did) } op, err := types.ParseOperation(rec) if err != nil { return types.StateV1{}, nil, err } if err := verifier.VerifyOperation(op, previous); err != nil { return types.StateV1{}, nil, err } next, err := state.ComputeNextState(op, previous) if err != nil { return types.StateV1{}, nil, err } lastPayload = op.Payload normalized = append(normalized, types.ExportRecord{ Seq: rec.Seq, DID: did, CreatedAt: rec.CreatedAt, CID: op.CID, Nullified: rec.Nullified, Operation: op.CanonicalBytes, }) previous = &next } if previous == nil { return types.StateV1{}, nil, ErrDIDNotFound } if lastPayload != nil { doc, err := buildThinDIDDocumentFromPayload(did, lastPayload, previous.RotationKeys) if err != nil { return types.StateV1{}, nil, err } previous.DIDDocument = doc } return *previous, normalized, nil } func buildThinDIDDocumentFromPayload(did string, payload map[string]any, rotationKeys []string) ([]byte, error) { data := normalizePLCDataFromOperation(did, payload, rotationKeys) doc := map[string]any{ "id": did, } if aka, ok := data["alsoKnownAs"].([]string); ok && len(aka) > 0 { doc["alsoKnownAs"] = aka } if methods, ok := data["verificationMethods"].(map[string]string); ok && len(methods) > 0 { names := make([]string, 0, len(methods)) for name := range methods { names = append(names, name) } sort.Strings(names) entries := make([]map[string]any, 0, len(names)) for _, name := range names { key := strings.TrimSpace(methods[name]) if key == "" { continue } entries = append(entries, map[string]any{ "id": did + "#" + name, "type": "Multikey", "controller": did, "publicKeyMultibase": key, }) } if len(entries) > 0 { doc["verificationMethod"] = entries } } if services, ok := data["services"].(map[string]map[string]string); ok && len(services) > 0 { names := make([]string, 0, len(services)) for name := range services { names = append(names, name) } sort.Strings(names) entries := make([]map[string]any, 0, len(names)) for _, name := range names { entry := services[name] endpoint := strings.TrimSpace(entry["endpoint"]) if endpoint == "" { continue } entries = append(entries, map[string]any{ "id": "#" + name, "type": entry["type"], "serviceEndpoint": endpoint, }) } if len(entries) > 0 { doc["service"] = entries } } if typ, _ := payload["type"].(string); typ == "plc_tombstone" || typ == "tombstone" { doc["deactivated"] = true } raw, err := json.Marshal(doc) if err != nil { return nil, err } return types.CanonicalizeJSON(raw) } func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool { if meta.LastVerified.IsZero() { return false } return now.Sub(meta.LastVerified) <= ttl } func (s *Service) enforceThinCacheLimit() (uint64, uint64, error) { maxEntries := s.cfg.ThinCacheMaxEntries if maxEntries <= 0 { maxEntries = config.Default().ThinCacheMaxEntries } entries, err := s.store.ListThinCacheMeta() if err != nil { return 0, 0, err } if len(entries) <= maxEntries { return 0, uint64(len(entries)), nil } sort.Slice(entries, func(i, j int) bool { left := entries[i] right := entries[j] if !left.LastAccess.Equal(right.LastAccess) { return left.LastAccess.Before(right.LastAccess) } if !left.LastVerified.Equal(right.LastVerified) { return left.LastVerified.Before(right.LastVerified) } return left.DID < right.DID }) var evicted uint64 for idx := 0; idx < len(entries)-maxEntries; idx++ { did := entries[idx].DID if err := s.store.DeleteState(did); err != nil { return evicted, 0, err } if err := s.store.DeleteThinCacheMeta(did); err != nil { return evicted, 0, err } evicted++ } return evicted, uint64(len(entries)) - evicted, nil } func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { if err := s.CorruptionError(); err != nil { return nil, err } if s.cfg.Mode == config.ModeThin { atomic.AddUint64(&s.stats.ThinCacheMisses, 1) _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) return records, err } if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil, ErrHistoryNotStored } seqs, err := s.store.ListDIDSequences(did) if err != nil { return nil, err } if len(seqs) == 0 { return nil, ErrDIDNotFound } out := make([]types.ExportRecord, 0, len(seqs)) for _, seq := range seqs { if err := ctx.Err(); err != nil { return nil, err } ref, ok, err := s.store.GetOpSeqRef(seq) if err != nil { return nil, err } if !ok { return nil, fmt.Errorf("missing op reference for seq %d", seq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return nil, err } out = append(out, types.ExportRecord{ Seq: seq, DID: did, CreatedAt: ref.Received, CID: ref.CID, Nullified: detectNullified(payload), Operation: json.RawMessage(payload), }) } return out, nil } func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types.ExportRecord, error) { if err := s.CorruptionError(); err != nil { return types.ExportRecord{}, err } if s.cfg.Mode == config.ModeThin { atomic.AddUint64(&s.stats.ThinCacheMisses, 1) _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) if err != nil { return types.ExportRecord{}, err } if len(records) == 0 { return types.ExportRecord{}, ErrDIDNotFound } return records[len(records)-1], nil } if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return types.ExportRecord{}, ErrHistoryNotStored } seqs, err := s.store.ListDIDSequences(did) if err != nil { return types.ExportRecord{}, err } if len(seqs) == 0 { return types.ExportRecord{}, ErrDIDNotFound } lastSeq := seqs[len(seqs)-1] if err := ctx.Err(); err != nil { return types.ExportRecord{}, err } ref, ok, err := s.store.GetOpSeqRef(lastSeq) if err != nil { return types.ExportRecord{}, err } if !ok { return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq) } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return types.ExportRecord{}, err } return types.ExportRecord{ Seq: lastSeq, DID: did, CreatedAt: ref.Received, CID: ref.CID, Nullified: detectNullified(payload), Operation: json.RawMessage(payload), }, nil } func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[string]any, error) { if err := s.CorruptionError(); err != nil { return nil, err } if s.cfg.Mode == config.ModeThin { if _, err := s.ResolveState(ctx, did); err != nil { return nil, err } } stateVal, stateOK, err := s.store.GetState(did) if err != nil { return nil, err } if !stateOK { return nil, ErrDIDNotFound } if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { var doc map[string]any if err := json.Unmarshal(stateVal.DIDDocument, &doc); err != nil { return nil, err } return normalizePLCDataFromDIDDocument(did, doc, stateVal.RotationKeys), nil } last, err := s.LoadLatestDIDOperation(ctx, did) if err != nil { return nil, err } var op map[string]any if err := json.Unmarshal(last.Operation, &op); err != nil { return nil, fmt.Errorf("decode latest operation: %w", err) } return normalizePLCDataFromOperation(did, op, stateVal.RotationKeys), nil } func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, emit func(types.ExportRecord) error) error { if err := s.CorruptionError(); err != nil { return err } if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil } if limit <= 0 { limit = 1000 } const maxCount = 1000 if limit > maxCount { limit = maxCount } afterSet := !after.IsZero() emitted := 0 stop := errors.New("stop export iteration") err := s.store.ForEachOpSeqRef(func(seq uint64, ref types.BlockRefV1) error { if err := ctx.Err(); err != nil { return err } if emitted >= limit { return stop } if afterSet { if strings.TrimSpace(ref.Received) == "" { return nil } ts, err := time.Parse(time.RFC3339, ref.Received) if err != nil { return nil } if !ts.After(after) { return nil } } payload, err := s.blockLog.ReadRecord(ref) if err != nil { return err } rec := types.ExportRecord{ Seq: seq, DID: ref.DID, CreatedAt: ref.Received, CID: ref.CID, Nullified: detectNullified(payload), Operation: json.RawMessage(payload), } emitted++ return emit(rec) }) if err == nil || errors.Is(err, stop) { return nil } return err } func detectNullified(operation []byte) bool { var payload map[string]any if err := json.Unmarshal(operation, &payload); err != nil { return false } v, _ := payload["nullified"].(bool) return v } func normalizePLCDataFromOperation(did string, op map[string]any, fallbackRotation []string) map[string]any { data := map[string]any{ "did": did, "verificationMethods": map[string]string{}, "rotationKeys": []string{}, "alsoKnownAs": []string{}, "services": map[string]map[string]string{}, } if aka := extractStringSlice(op["alsoKnownAs"]); len(aka) > 0 { data["alsoKnownAs"] = aka } else if handle, _ := op["handle"].(string); strings.TrimSpace(handle) != "" { data["alsoKnownAs"] = []string{"at://" + handle} } services := normalizeServices(op) if len(services) > 0 { data["services"] = services } verificationMethods := normalizeVerificationMethods(op) if len(verificationMethods) > 0 { data["verificationMethods"] = verificationMethods } rotationKeys := extractStringSlice(op["rotationKeys"]) if len(rotationKeys) == 0 { if recovery, _ := op["recoveryKey"].(string); strings.TrimSpace(recovery) != "" { rotationKeys = append(rotationKeys, recovery) } } if len(rotationKeys) == 0 && len(fallbackRotation) > 0 { rotationKeys = append(rotationKeys, fallbackRotation...) } data["rotationKeys"] = dedupeStrings(rotationKeys) return data } func normalizePLCDataFromDIDDocument(did string, doc map[string]any, fallbackRotation []string) map[string]any { data := map[string]any{ "did": did, "verificationMethods": map[string]string{}, "rotationKeys": dedupeStrings(fallbackRotation), "alsoKnownAs": []string{}, "services": map[string]map[string]string{}, } if aka := extractStringSlice(doc["alsoKnownAs"]); len(aka) > 0 { data["alsoKnownAs"] = aka } if rawVMList, ok := doc["verificationMethod"].([]any); ok { verificationMethods := map[string]string{} for _, rawVM := range rawVMList { vm, ok := rawVM.(map[string]any) if !ok { continue } name := "atproto" if id, _ := vm["id"].(string); id != "" { if idx := strings.LastIndex(id, "#"); idx >= 0 && idx < len(id)-1 { name = id[idx+1:] } } if key, _ := vm["publicKeyMultibase"].(string); strings.TrimSpace(key) != "" { verificationMethods[name] = key } } if len(verificationMethods) > 0 { data["verificationMethods"] = verificationMethods } } if rawServiceList, ok := doc["service"].([]any); ok { services := map[string]map[string]string{} for _, rawService := range rawServiceList { service, ok := rawService.(map[string]any) if !ok { continue } name := "atproto_pds" if id, _ := service["id"].(string); id != "" { name = strings.TrimPrefix(id, "#") if strings.TrimSpace(name) == "" { name = "atproto_pds" } } typ, _ := service["type"].(string) endpoint, _ := service["serviceEndpoint"].(string) if endpoint == "" { endpoint, _ = service["endpoint"].(string) } if strings.TrimSpace(endpoint) == "" { continue } services[name] = map[string]string{ "type": typ, "endpoint": endpoint, } } if len(services) > 0 { data["services"] = services } } return data } func normalizeServices(op map[string]any) map[string]map[string]string { services := map[string]map[string]string{} if rawServices, ok := op["services"].(map[string]any); ok { for name, rawEntry := range rawServices { entry, ok := rawEntry.(map[string]any) if !ok { continue } typ, _ := entry["type"].(string) endpoint, _ := entry["endpoint"].(string) if endpoint == "" { endpoint, _ = entry["serviceEndpoint"].(string) } if strings.TrimSpace(endpoint) == "" { continue } services[name] = map[string]string{ "type": typ, "endpoint": endpoint, } } return services } if endpoint, _ := op["service"].(string); strings.TrimSpace(endpoint) != "" { services["atproto_pds"] = map[string]string{ "type": "AtprotoPersonalDataServer", "endpoint": endpoint, } } return services } func normalizeVerificationMethods(op map[string]any) map[string]string { methods := map[string]string{} if rawVM, ok := op["verificationMethods"].(map[string]any); ok { for name, rawValue := range rawVM { value, _ := rawValue.(string) if strings.TrimSpace(value) == "" { continue } methods[name] = value } if len(methods) > 0 { return methods } } if signingKey, _ := op["signingKey"].(string); strings.TrimSpace(signingKey) != "" { methods["atproto"] = signingKey } return methods } func extractStringSlice(v any) []string { raw, ok := v.([]any) if !ok { return nil } out := make([]string, 0, len(raw)) for _, item := range raw { s, _ := item.(string) if strings.TrimSpace(s) == "" { continue } out = append(out, s) } return out } func dedupeStrings(input []string) []string { if len(input) == 0 { return []string{} } out := make([]string, 0, len(input)) seen := make(map[string]struct{}, len(input)) for _, item := range input { item = strings.TrimSpace(item) if item == "" { continue } if _, ok := seen[item]; ok { continue } seen[item] = struct{}{} out = append(out, item) } return out } func (s *Service) Flush(ctx context.Context) error { _ = ctx if err := s.CorruptionError(); err != nil { return err } if s.appender == nil { return nil } flush, err := s.appender.Flush() if err != nil { return err } if flush != nil { if err := s.store.ApplyOperationsBatch(nil, []storage.BlockHashEntry{{BlockID: flush.BlockID, Hash: flush.Hash}}); err != nil { return err } } return nil } func (s *Service) MarkCorrupted(err error) { if err == nil { return } s.corrupted.Store(true) s.corruptErr.Store(err.Error()) } func (s *Service) IsCorrupted() bool { return s.corrupted.Load() } func (s *Service) CorruptionError() error { if !s.IsCorrupted() { return nil } msg, _ := s.corruptErr.Load().(string) if msg == "" { msg = "data corruption detected" } return fmt.Errorf("data corruption detected: %s", msg) } func extractRotationKeysFromPayload(payload map[string]any, prior []string) []string { out := make([]string, 0, len(prior)+4) seen := map[string]struct{}{} add := func(v string) { v = strings.TrimSpace(v) if v == "" { return } if _, ok := seen[v]; ok { return } seen[v] = struct{}{} out = append(out, v) } if arr, ok := payload["rotationKeys"].([]any); ok { for _, v := range arr { if s, ok := v.(string); ok { add(s) } } } if recovery, ok := payload["recoveryKey"].(string); ok { add(recovery) } if signing, ok := payload["signingKey"].(string); ok { add(signing) } if len(out) == 0 { for _, v := range prior { add(v) } } return out } func (s *Service) Close() { if s.tracer != nil { _ = s.tracer.Close() } if s.appender != nil { s.appender.Close() } } func (s *Service) SetMetricsSink(sink MetricsSink) { s.metricsSink = sink } func (s *Service) Stats() Stats { ingested := atomic.LoadUint64(&s.stats.IngestedOps) opsPerSec := 0.0 if elapsed := time.Since(s.startedAt).Seconds(); elapsed > 0 { opsPerSec = float64(ingested) / elapsed } return Stats{ IngestedOps: ingested, Errors: atomic.LoadUint64(&s.stats.Errors), LastSeq: atomic.LoadUint64(&s.stats.LastSeq), VerifyFailures: atomic.LoadUint64(&s.stats.VerifyFailures), LagOps: atomic.LoadUint64(&s.stats.LagOps), DIDCount: atomic.LoadUint64(&s.stats.DIDCount), CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), IngestOpsPerSec: opsPerSec, ThinCacheHits: atomic.LoadUint64(&s.stats.ThinCacheHits), ThinCacheMisses: atomic.LoadUint64(&s.stats.ThinCacheMisses), ThinCacheCount: atomic.LoadUint64(&s.stats.ThinCacheCount), ThinEvictions: atomic.LoadUint64(&s.stats.ThinEvictions), } } func countStates(store storage.Store) (uint64, error) { var count uint64 if err := store.ForEachState(func(types.StateV1) error { count++ return nil }); err != nil { return 0, err } return count, nil } type appendResult struct { Ref types.BlockRefV1 Flush *storage.FlushInfo Err error } type appendRequest struct { Operation types.ParsedOperation Reply chan appendResult Flush bool } type blockAppender struct { log *storage.BlockLog queue chan appendRequest once sync.Once closed chan struct{} } func newBlockAppender(log *storage.BlockLog, buffer int) *blockAppender { ba := &blockAppender{ log: log, queue: make(chan appendRequest, buffer), closed: make(chan struct{}), } go ba.run() return ba } func (b *blockAppender) run() { defer close(b.closed) for req := range b.queue { if req.Flush { flush, err := b.log.Flush() req.Reply <- appendResult{Flush: flush, Err: err} continue } ref, flush, err := b.log.Append(req.Operation.Sequence, req.Operation.DID, req.Operation.CID, req.Operation.Prev, req.Operation.CanonicalBytes) req.Reply <- appendResult{Ref: ref, Flush: flush, Err: err} } } func (b *blockAppender) Append(op types.ParsedOperation) (appendResult, error) { reply := make(chan appendResult, 1) b.queue <- appendRequest{Operation: op, Reply: reply} res := <-reply return res, res.Err } func (b *blockAppender) Flush() (*storage.FlushInfo, error) { reply := make(chan appendResult, 1) b.queue <- appendRequest{Flush: true, Reply: reply} res := <-reply return res.Flush, res.Err } func (b *blockAppender) Close() { b.once.Do(func() { close(b.queue) <-b.closed }) }