diff options
| author | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
| commit | 8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch) | |
| tree | 60bdb4bbbe5755223c3387179ee7406432d084ab /internal/ingest/service.go | |
| parent | fix: make mirror replay lossless with strict seq accounting and trace (diff) | |
| download | plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip | |
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal/ingest/service.go')
| -rw-r--r-- | internal/ingest/service.go | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 1c95068..873204f 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -32,6 +32,10 @@ type Stats struct { DIDCount uint64 `json:"did_count"` CheckpointSeq uint64 `json:"checkpoint_sequence"` IngestOpsPerSec float64 `json:"ingest_ops_per_second"` + ThinCacheHits uint64 `json:"thin_cache_hits"` + ThinCacheMisses uint64 `json:"thin_cache_misses"` + ThinCacheCount uint64 `json:"thin_cache_entries"` + ThinEvictions uint64 `json:"thin_cache_evictions"` } type Service struct { @@ -85,6 +89,9 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) + if cfg.Mode == config.ModeThin { + atomic.StoreUint64(&s.stats.ThinCacheCount, didCount) + } } if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { @@ -121,6 +128,12 @@ func (s *Service) Replay(ctx context.Context) error { } func (s *Service) Poll(ctx context.Context) error { + if s.cfg.Mode == config.ModeThin { + <-ctx.Done() + + return ctx.Err() + } + ticker := time.NewTicker(s.cfg.PollInterval) defer ticker.Stop() @@ -143,6 +156,10 @@ func (s *Service) Poll(ctx context.Context) error { } func (s *Service) RunOnce(ctx context.Context) (bool, error) { + if s.cfg.Mode == config.ModeThin { + return false, nil + } + if s.startupErr != nil { return false, s.startupErr } @@ -956,6 +973,12 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { return err } + if s.cfg.Mode == config.ModeThin { + _, _, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + + return err + } + if s.cfg.Mode != config.ModeMirror { _, ok, err := s.store.GetState(did) @@ -1107,11 +1130,227 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen return tip, filtered, nil } +func (s *Service) ResolveState(ctx context.Context, did string) (types.StateV1, error) { + if err := s.CorruptionError(); err != nil { + return types.StateV1{}, err + } + + if s.cfg.Mode != config.ModeThin { + stateVal, ok, err := s.store.GetState(did) + if err != nil { + return types.StateV1{}, err + } + + if !ok { + return types.StateV1{}, ErrDIDNotFound + } + + return stateVal, nil + } + + return s.resolveThinState(ctx, did) +} + +func (s *Service) resolveThinState(ctx context.Context, did string) (types.StateV1, error) { + now := time.Now().UTC() + stateVal, stateOK, err := s.store.GetState(did) + if err != nil { + return types.StateV1{}, err + } + + meta, metaOK, err := s.store.GetThinCacheMeta(did) + if err != nil { + return types.StateV1{}, err + } + + if stateOK && metaOK && isThinCacheFresh(meta, now, s.cfg.ThinCacheTTL) { + atomic.AddUint64(&s.stats.ThinCacheHits, 1) + + meta.LastAccess = now + + if err := s.store.PutThinCacheMeta(meta); err != nil { + return types.StateV1{}, err + } + + return stateVal, nil + } + + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + + verifiedState, _, err := s.refreshThinStateFromUpstream(ctx, did, now) + if err != nil { + return types.StateV1{}, err + } + + return verifiedState, nil +} + +func (s *Service) refreshThinStateFromUpstream(ctx context.Context, did string, now time.Time) (types.StateV1, []types.ExportRecord, error) { + records, err := s.client.FetchDIDLog(ctx, did) + if err != nil { + return types.StateV1{}, nil, err + } + + stateVal, normalized, err := s.verifyThinChain(ctx, did, records) + if err != nil { + return types.StateV1{}, nil, err + } + + stateVal.UpdatedAt = now + + if err := s.store.PutState(stateVal); err != nil { + return types.StateV1{}, nil, err + } + + meta := types.ThinCacheMetaV1{ + Version: 1, + DID: did, + LastVerified: now, + LastAccess: now, + } + if err := s.store.PutThinCacheMeta(meta); err != nil { + return types.StateV1{}, nil, err + } + + evicted, entries, err := s.enforceThinCacheLimit() + if err != nil { + return types.StateV1{}, nil, err + } + + if evicted > 0 { + atomic.AddUint64(&s.stats.ThinEvictions, evicted) + } + + atomic.StoreUint64(&s.stats.ThinCacheCount, entries) + atomic.StoreUint64(&s.stats.DIDCount, entries) + + return stateVal, normalized, nil +} + +func (s *Service) verifyThinChain(ctx context.Context, did string, records []types.ExportRecord) (types.StateV1, []types.ExportRecord, error) { + if len(records) == 0 { + return types.StateV1{}, nil, ErrDIDNotFound + } + + verifier := verify.New(config.VerifyFull) + var previous *types.StateV1 + normalized := make([]types.ExportRecord, 0, len(records)) + + for i, rec := range records { + if err := ctx.Err(); err != nil { + return types.StateV1{}, nil, err + } + + rec.Seq = uint64(i + 1) + + if rec.DID == "" { + rec.DID = did + } + + if rec.DID != did { + return types.StateV1{}, nil, fmt.Errorf("unexpected DID in log: got %s want %s", rec.DID, did) + } + + op, err := types.ParseOperation(rec) + if err != nil { + return types.StateV1{}, nil, err + } + + if err := verifier.VerifyOperation(op, previous); err != nil { + return types.StateV1{}, nil, err + } + + next, err := state.ComputeNextState(op, previous) + if err != nil { + return types.StateV1{}, nil, err + } + + normalized = append(normalized, types.ExportRecord{ + Seq: rec.Seq, + DID: did, + CreatedAt: rec.CreatedAt, + CID: op.CID, + Nullified: rec.Nullified, + Operation: op.CanonicalBytes, + }) + previous = &next + } + + if previous == nil { + return types.StateV1{}, nil, ErrDIDNotFound + } + + return *previous, normalized, nil +} + +func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool { + if meta.LastVerified.IsZero() { + return false + } + + return now.Sub(meta.LastVerified) <= ttl +} + +func (s *Service) enforceThinCacheLimit() (uint64, uint64, error) { + maxEntries := s.cfg.ThinCacheMaxEntries + if maxEntries <= 0 { + maxEntries = config.Default().ThinCacheMaxEntries + } + + entries, err := s.store.ListThinCacheMeta() + if err != nil { + return 0, 0, err + } + + if len(entries) <= maxEntries { + return 0, uint64(len(entries)), nil + } + + sort.Slice(entries, func(i, j int) bool { + left := entries[i] + right := entries[j] + + if !left.LastAccess.Equal(right.LastAccess) { + return left.LastAccess.Before(right.LastAccess) + } + + if !left.LastVerified.Equal(right.LastVerified) { + return left.LastVerified.Before(right.LastVerified) + } + + return left.DID < right.DID + }) + + var evicted uint64 + for idx := 0; idx < len(entries)-maxEntries; idx++ { + did := entries[idx].DID + + if err := s.store.DeleteState(did); err != nil { + return evicted, 0, err + } + + if err := s.store.DeleteThinCacheMeta(did); err != nil { + return evicted, 0, err + } + + evicted++ + } + + return evicted, uint64(len(entries)) - evicted, nil +} + func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { if err := s.CorruptionError(); err != nil { return nil, err } + if s.cfg.Mode == config.ModeThin { + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + + return records, err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil, ErrHistoryNotStored } @@ -1167,6 +1406,21 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types return types.ExportRecord{}, err } + if s.cfg.Mode == config.ModeThin { + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + + _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + if err != nil { + return types.ExportRecord{}, err + } + + if len(records) == 0 { + return types.ExportRecord{}, ErrDIDNotFound + } + + return records[len(records)-1], nil + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return types.ExportRecord{}, ErrHistoryNotStored } @@ -1218,6 +1472,12 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin return nil, err } + if s.cfg.Mode == config.ModeThin { + if _, err := s.ResolveState(ctx, did); err != nil { + return nil, err + } + } + stateVal, stateOK, err := s.store.GetState(did) if err != nil { @@ -1711,6 +1971,10 @@ func (s *Service) Stats() Stats { DIDCount: atomic.LoadUint64(&s.stats.DIDCount), CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), IngestOpsPerSec: opsPerSec, + ThinCacheHits: atomic.LoadUint64(&s.stats.ThinCacheHits), + ThinCacheMisses: atomic.LoadUint64(&s.stats.ThinCacheMisses), + ThinCacheCount: atomic.LoadUint64(&s.stats.ThinCacheCount), + ThinEvictions: atomic.LoadUint64(&s.stats.ThinEvictions), } } |