aboutsummaryrefslogtreecommitdiff
path: root/internal/ingest/service.go
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 09:26:06 -0800
committerFuwn <[email protected]>2026-02-27 09:26:06 -0800
commit8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch)
tree60bdb4bbbe5755223c3387179ee7406432d084ab /internal/ingest/service.go
parentfix: make mirror replay lossless with strict seq accounting and trace (diff)
downloadplutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz
plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal/ingest/service.go')
-rw-r--r--internal/ingest/service.go264
1 files changed, 264 insertions, 0 deletions
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 1c95068..873204f 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -32,6 +32,10 @@ type Stats struct {
DIDCount uint64 `json:"did_count"`
CheckpointSeq uint64 `json:"checkpoint_sequence"`
IngestOpsPerSec float64 `json:"ingest_ops_per_second"`
+ ThinCacheHits uint64 `json:"thin_cache_hits"`
+ ThinCacheMisses uint64 `json:"thin_cache_misses"`
+ ThinCacheCount uint64 `json:"thin_cache_entries"`
+ ThinEvictions uint64 `json:"thin_cache_evictions"`
}
type Service struct {
@@ -85,6 +89,9 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
+ if cfg.Mode == config.ModeThin {
+ atomic.StoreUint64(&s.stats.ThinCacheCount, didCount)
+ }
}
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
@@ -121,6 +128,12 @@ func (s *Service) Replay(ctx context.Context) error {
}
func (s *Service) Poll(ctx context.Context) error {
+ if s.cfg.Mode == config.ModeThin {
+ <-ctx.Done()
+
+ return ctx.Err()
+ }
+
ticker := time.NewTicker(s.cfg.PollInterval)
defer ticker.Stop()
@@ -143,6 +156,10 @@ func (s *Service) Poll(ctx context.Context) error {
}
func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if s.cfg.Mode == config.ModeThin {
+ return false, nil
+ }
+
if s.startupErr != nil {
return false, s.startupErr
}
@@ -956,6 +973,12 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
return err
}
+ if s.cfg.Mode == config.ModeThin {
+ _, _, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+
+ return err
+ }
+
if s.cfg.Mode != config.ModeMirror {
_, ok, err := s.store.GetState(did)
@@ -1107,11 +1130,227 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
return tip, filtered, nil
}
+func (s *Service) ResolveState(ctx context.Context, did string) (types.StateV1, error) {
+ if err := s.CorruptionError(); err != nil {
+ return types.StateV1{}, err
+ }
+
+ if s.cfg.Mode != config.ModeThin {
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ if !ok {
+ return types.StateV1{}, ErrDIDNotFound
+ }
+
+ return stateVal, nil
+ }
+
+ return s.resolveThinState(ctx, did)
+}
+
+func (s *Service) resolveThinState(ctx context.Context, did string) (types.StateV1, error) {
+ now := time.Now().UTC()
+ stateVal, stateOK, err := s.store.GetState(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ meta, metaOK, err := s.store.GetThinCacheMeta(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ if stateOK && metaOK && isThinCacheFresh(meta, now, s.cfg.ThinCacheTTL) {
+ atomic.AddUint64(&s.stats.ThinCacheHits, 1)
+
+ meta.LastAccess = now
+
+ if err := s.store.PutThinCacheMeta(meta); err != nil {
+ return types.StateV1{}, err
+ }
+
+ return stateVal, nil
+ }
+
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+
+ verifiedState, _, err := s.refreshThinStateFromUpstream(ctx, did, now)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ return verifiedState, nil
+}
+
+func (s *Service) refreshThinStateFromUpstream(ctx context.Context, did string, now time.Time) (types.StateV1, []types.ExportRecord, error) {
+ records, err := s.client.FetchDIDLog(ctx, did)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ stateVal, normalized, err := s.verifyThinChain(ctx, did, records)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ stateVal.UpdatedAt = now
+
+ if err := s.store.PutState(stateVal); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ meta := types.ThinCacheMetaV1{
+ Version: 1,
+ DID: did,
+ LastVerified: now,
+ LastAccess: now,
+ }
+ if err := s.store.PutThinCacheMeta(meta); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ evicted, entries, err := s.enforceThinCacheLimit()
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ if evicted > 0 {
+ atomic.AddUint64(&s.stats.ThinEvictions, evicted)
+ }
+
+ atomic.StoreUint64(&s.stats.ThinCacheCount, entries)
+ atomic.StoreUint64(&s.stats.DIDCount, entries)
+
+ return stateVal, normalized, nil
+}
+
+func (s *Service) verifyThinChain(ctx context.Context, did string, records []types.ExportRecord) (types.StateV1, []types.ExportRecord, error) {
+ if len(records) == 0 {
+ return types.StateV1{}, nil, ErrDIDNotFound
+ }
+
+ verifier := verify.New(config.VerifyFull)
+ var previous *types.StateV1
+ normalized := make([]types.ExportRecord, 0, len(records))
+
+ for i, rec := range records {
+ if err := ctx.Err(); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ rec.Seq = uint64(i + 1)
+
+ if rec.DID == "" {
+ rec.DID = did
+ }
+
+ if rec.DID != did {
+ return types.StateV1{}, nil, fmt.Errorf("unexpected DID in log: got %s want %s", rec.DID, did)
+ }
+
+ op, err := types.ParseOperation(rec)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ if err := verifier.VerifyOperation(op, previous); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ next, err := state.ComputeNextState(op, previous)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ normalized = append(normalized, types.ExportRecord{
+ Seq: rec.Seq,
+ DID: did,
+ CreatedAt: rec.CreatedAt,
+ CID: op.CID,
+ Nullified: rec.Nullified,
+ Operation: op.CanonicalBytes,
+ })
+ previous = &next
+ }
+
+ if previous == nil {
+ return types.StateV1{}, nil, ErrDIDNotFound
+ }
+
+ return *previous, normalized, nil
+}
+
+func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool {
+ if meta.LastVerified.IsZero() {
+ return false
+ }
+
+ return now.Sub(meta.LastVerified) <= ttl
+}
+
+func (s *Service) enforceThinCacheLimit() (uint64, uint64, error) {
+ maxEntries := s.cfg.ThinCacheMaxEntries
+ if maxEntries <= 0 {
+ maxEntries = config.Default().ThinCacheMaxEntries
+ }
+
+ entries, err := s.store.ListThinCacheMeta()
+ if err != nil {
+ return 0, 0, err
+ }
+
+ if len(entries) <= maxEntries {
+ return 0, uint64(len(entries)), nil
+ }
+
+ sort.Slice(entries, func(i, j int) bool {
+ left := entries[i]
+ right := entries[j]
+
+ if !left.LastAccess.Equal(right.LastAccess) {
+ return left.LastAccess.Before(right.LastAccess)
+ }
+
+ if !left.LastVerified.Equal(right.LastVerified) {
+ return left.LastVerified.Before(right.LastVerified)
+ }
+
+ return left.DID < right.DID
+ })
+
+ var evicted uint64
+ for idx := 0; idx < len(entries)-maxEntries; idx++ {
+ did := entries[idx].DID
+
+ if err := s.store.DeleteState(did); err != nil {
+ return evicted, 0, err
+ }
+
+ if err := s.store.DeleteThinCacheMeta(did); err != nil {
+ return evicted, 0, err
+ }
+
+ evicted++
+ }
+
+ return evicted, uint64(len(entries)) - evicted, nil
+}
+
func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) {
if err := s.CorruptionError(); err != nil {
return nil, err
}
+ if s.cfg.Mode == config.ModeThin {
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+ _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+
+ return records, err
+ }
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil, ErrHistoryNotStored
}
@@ -1167,6 +1406,21 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types
return types.ExportRecord{}, err
}
+ if s.cfg.Mode == config.ModeThin {
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+
+ _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+ if err != nil {
+ return types.ExportRecord{}, err
+ }
+
+ if len(records) == 0 {
+ return types.ExportRecord{}, ErrDIDNotFound
+ }
+
+ return records[len(records)-1], nil
+ }
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return types.ExportRecord{}, ErrHistoryNotStored
}
@@ -1218,6 +1472,12 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin
return nil, err
}
+ if s.cfg.Mode == config.ModeThin {
+ if _, err := s.ResolveState(ctx, did); err != nil {
+ return nil, err
+ }
+ }
+
stateVal, stateOK, err := s.store.GetState(did)
if err != nil {
@@ -1711,6 +1971,10 @@ func (s *Service) Stats() Stats {
DIDCount: atomic.LoadUint64(&s.stats.DIDCount),
CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq),
IngestOpsPerSec: opsPerSec,
+ ThinCacheHits: atomic.LoadUint64(&s.stats.ThinCacheHits),
+ ThinCacheMisses: atomic.LoadUint64(&s.stats.ThinCacheMisses),
+ ThinCacheCount: atomic.LoadUint64(&s.stats.ThinCacheCount),
+ ThinEvictions: atomic.LoadUint64(&s.stats.ThinEvictions),
}
}