package ingest import ( "context" "crypto/ed25519" "crypto/rand" "encoding/base64" "encoding/json" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "sync" "testing" "time" ) func TestThinResolveStateCacheTTL(t *testing.T) { did := "did:plc:thin-a" logRecords := signedDIDLog(t, did, 2) upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: logRecords}) defer upstream.close() store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10) defer cleanup() if _, err := svc.ResolveState(context.Background(), did); err != nil { t.Fatalf("first resolve: %v", err) } stats := svc.Stats() if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 0 { t.Fatalf("unexpected stats after first resolve: %+v", stats) } if _, err := svc.ResolveState(context.Background(), did); err != nil { t.Fatalf("second resolve: %v", err) } stats = svc.Stats() if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 1 { t.Fatalf("unexpected stats after second resolve: %+v", stats) } if got := upstream.hitCount(did); got != 1 { t.Fatalf("upstream calls mismatch after cache hit: got %d want 1", got) } meta, ok, err := store.GetThinCacheMeta(did) if err != nil || !ok { t.Fatalf("load thin meta: ok=%v err=%v", ok, err) } meta.LastVerified = time.Now().UTC().Add(-2 * time.Hour) if err := store.PutThinCacheMeta(meta); err != nil { t.Fatalf("expire meta: %v", err) } if _, err := svc.ResolveState(context.Background(), did); err != nil { t.Fatalf("third resolve: %v", err) } stats = svc.Stats() if stats.ThinCacheMisses != 2 || stats.ThinCacheHits != 1 { t.Fatalf("unexpected stats after ttl refresh: %+v", stats) } if got := upstream.hitCount(did); got != 2 { t.Fatalf("upstream calls mismatch after ttl refresh: got %d want 2", got) } } func TestThinResolveStateEvictsLRU(t *testing.T) { didA := "did:plc:thin-1" didB := "did:plc:thin-2" didC := "did:plc:thin-3" upstream := newThinUpstream(t, map[string][]types.ExportRecord{ didA: signedDIDLog(t, didA, 1), didB: signedDIDLog(t, didB, 1), didC: signedDIDLog(t, didC, 1), }) defer upstream.close() store, svc, cleanup := newThinService(t, upstream.url, 24*time.Hour, 2) defer cleanup() for _, did := range []string{didA, didB} { if _, err := svc.ResolveState(context.Background(), did); err != nil { t.Fatalf("resolve %s: %v", did, err) } } if _, err := svc.ResolveState(context.Background(), didA); err != nil { t.Fatalf("refresh didA access: %v", err) } if _, err := svc.ResolveState(context.Background(), didC); err != nil { t.Fatalf("resolve didC: %v", err) } if _, ok, err := store.GetState(didB); err != nil || ok { t.Fatalf("didB expected eviction: ok=%v err=%v", ok, err) } for _, did := range []string{didA, didC} { if _, ok, err := store.GetState(did); err != nil || !ok { t.Fatalf("did %s expected cached: ok=%v err=%v", did, ok, err) } } stats := svc.Stats() if stats.ThinEvictions != 1 { t.Fatalf("evictions mismatch: got %d want 1", stats.ThinEvictions) } if stats.ThinCacheCount != 2 || stats.DIDCount != 2 { t.Fatalf("cache count mismatch: %+v", stats) } } func TestThinResolveStateRejectsInvalidChain(t *testing.T) { did := "did:plc:thin-invalid" ops := signedDIDLog(t, did, 2) var second map[string]any if err := json.Unmarshal(ops[1].Operation, &second); err != nil { t.Fatalf("decode op: %v", err) } second["prev"] = "sha256:invalid" rawSecond, _ := json.Marshal(second) ops[1].Operation = rawSecond ops[1].CID = "" upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: ops}) defer upstream.close() store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10) defer cleanup() if _, err := svc.ResolveState(context.Background(), did); err == nil { t.Fatalf("expected invalid chain error") } if _, ok, err := store.GetState(did); err != nil || ok { t.Fatalf("invalid chain should not be cached: ok=%v err=%v", ok, err) } } func newThinService(t *testing.T, source string, ttl time.Duration, maxEntries int) (*storage.PebbleStore, *Service, func()) { t.Helper() tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir: %v", err) } store, err := storage.OpenPebble(dataDir) if err != nil { t.Fatalf("open pebble: %v", err) } if err := store.SetMode(config.ModeThin); err != nil { t.Fatalf("set mode: %v", err) } cfg := config.Default() cfg.Mode = config.ModeThin cfg.DataDir = dataDir cfg.PLCSource = source cfg.ThinCacheTTL = ttl cfg.ThinCacheMaxEntries = maxEntries cfg.VerifyPolicy = config.VerifyFull svc := NewService(cfg, store, NewClient(source), nil, nil) cleanup := func() { svc.Close() _ = store.Close() } return store, svc, cleanup } type thinUpstream struct { t *testing.T url string srv *httptest.Server mu sync.Mutex logs map[string][]types.ExportRecord hits map[string]int } func newThinUpstream(t *testing.T, logs map[string][]types.ExportRecord) *thinUpstream { t.Helper() u := &thinUpstream{ t: t, logs: logs, hits: map[string]int{}, } u.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } path := strings.TrimPrefix(r.URL.Path, "/") if !strings.HasSuffix(path, "/log") { w.WriteHeader(http.StatusNotFound) return } did := strings.TrimSuffix(path, "/log") did = strings.TrimSpace(did) if did == "" { w.WriteHeader(http.StatusNotFound) return } u.mu.Lock() u.hits[did]++ records, ok := u.logs[did] u.mu.Unlock() if !ok { w.WriteHeader(http.StatusNotFound) return } type entry struct { Operation json.RawMessage `json:"operation"` CID string `json:"cid"` CreatedAt string `json:"createdAt"` Nullified bool `json:"nullified"` } out := make([]entry, 0, len(records)) for _, rec := range records { out = append(out, entry{ Operation: rec.Operation, CID: rec.CID, CreatedAt: rec.CreatedAt, Nullified: rec.Nullified, }) } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(out) })) u.url = u.srv.URL return u } func (u *thinUpstream) hitCount(did string) int { u.mu.Lock() defer u.mu.Unlock() return u.hits[did] } func (u *thinUpstream) close() { u.srv.Close() } func signedDIDLog(t *testing.T, did string, length int) []types.ExportRecord { t.Helper() pub, priv, err := ed25519.GenerateKey(rand.Reader) if err != nil { t.Fatalf("generate key: %v", err) } records := make([]types.ExportRecord, 0, length) prev := "" for i := 0; i < length; i++ { unsigned := map[string]any{ "did": did, "didDoc": map[string]any{"id": did, "seq": i + 1}, "publicKey": base64.RawURLEncoding.EncodeToString(pub), } if prev != "" { unsigned["prev"] = prev } payload, _ := json.Marshal(unsigned) canon, _ := types.CanonicalizeJSON(payload) sig := ed25519.Sign(priv, canon) op := map[string]any{} for k, v := range unsigned { op[k] = v } op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon) op["sig"] = base64.RawURLEncoding.EncodeToString(sig) raw, _ := json.Marshal(op) parsed, err := types.ParseOperation(types.ExportRecord{ Seq: uint64(i + 1), DID: did, Operation: raw, }) if err != nil { t.Fatalf("parse operation: %v", err) } prev = parsed.CID records = append(records, types.ExportRecord{ Seq: uint64(i + 1), DID: did, CID: parsed.CID, CreatedAt: time.Now().UTC().Add(time.Duration(i) * time.Second).Format(time.RFC3339), Operation: raw, }) } return records }