aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-27 09:26:06 -0800
committerFuwn <[email protected]>2026-02-27 09:26:06 -0800
commit8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch)
tree60bdb4bbbe5755223c3387179ee7406432d084ab /internal
parentfix: make mirror replay lossless with strict seq accounting and trace (diff)
downloadplutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz
plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal')
-rw-r--r--internal/api/observability.go52
-rw-r--r--internal/api/server.go90
-rw-r--r--internal/api/server_hardening_test.go4
-rw-r--r--internal/api/server_thin_test.go172
-rw-r--r--internal/config/config.go17
-rw-r--r--internal/config/config_thin_test.go35
-rw-r--r--internal/ingest/client.go132
-rw-r--r--internal/ingest/client_test.go63
-rw-r--r--internal/ingest/service.go264
-rw-r--r--internal/ingest/service_thin_test.go323
-rw-r--r--internal/ingest/thin_disk_test.go181
-rw-r--r--internal/storage/pebble_store.go74
-rw-r--r--internal/storage/store.go5
-rw-r--r--internal/types/state.go8
14 files changed, 1399 insertions, 21 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go
index 008f5f2..ad67bc9 100644
--- a/internal/api/observability.go
+++ b/internal/api/observability.go
@@ -146,6 +146,58 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S
return float64(count)
},
))
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "thin_cache_hits_total",
+ Help: "Total thin-mode cache hits.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+
+ return float64(ingestor.Stats().ThinCacheHits)
+ },
+ ))
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "thin_cache_misses_total",
+ Help: "Total thin-mode cache misses.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+
+ return float64(ingestor.Stats().ThinCacheMisses)
+ },
+ ))
+ reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Name: "thin_cache_entries",
+ Help: "Current number of entries in the thin-mode DID cache.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+
+ return float64(ingestor.Stats().ThinCacheCount)
+ },
+ ))
+ reg.MustRegister(prometheus.NewCounterFunc(
+ prometheus.CounterOpts{
+ Name: "thin_cache_evictions_total",
+ Help: "Total entries evicted from thin-mode DID cache.",
+ },
+ func() float64 {
+ if ingestor == nil {
+ return 0
+ }
+
+ return float64(ingestor.Stats().ThinEvictions)
+ },
+ ))
return m
}
diff --git a/internal/api/server.go b/internal/api/server.go
index c66e5af..9895a65 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -212,18 +212,43 @@ func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) {
}
func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) {
- state, ok, err := s.store.GetState(did)
+ var state types.StateV1
+ if s.ingestor != nil {
+ resolvedState, err := s.ingestor.ResolveState(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found"))
- if err != nil {
- writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
- return
- }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
- if !ok {
- writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found"))
+ return
+ }
- return
+ writeErr(w, http.StatusInternalServerError, err)
+
+ return
+ }
+
+ state = resolvedState
+ } else {
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+
+ return
+ }
+
+ if !ok {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found"))
+
+ return
+ }
+
+ state = stateVal
}
cp, cpOK, err := s.store.GetLatestCheckpoint()
@@ -251,6 +276,12 @@ func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did st
}
func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) {
+ if s.cfg.Mode == config.ModeThin {
+ writeErr(w, http.StatusNotImplemented, fmt.Errorf("thin mode does not support checkpoint proofs"))
+
+ return
+ }
+
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("service unavailable"))
@@ -448,24 +479,43 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request)
}
func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, r *http.Request, did string) {
- state, ok, err := s.store.GetState(did)
+ var state types.StateV1
+ if s.ingestor != nil {
+ resolvedState, err := s.ingestor.ResolveState(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) {
+ writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did)
- if err != nil {
- writeCompatibilityErr(w, http.StatusInternalServerError, err.Error())
+ return
+ }
- return
- }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeCompatibilityErr(w, http.StatusGatewayTimeout, err.Error())
- if !ok {
- writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did)
+ return
+ }
- return
- }
+ writeCompatibilityErr(w, http.StatusInternalServerError, err.Error())
- if s.ingestor == nil {
- writeCompatibilityErr(w, http.StatusServiceUnavailable, "service unavailable")
+ return
+ }
- return
+ state = resolvedState
+ } else {
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ writeCompatibilityErr(w, http.StatusInternalServerError, err.Error())
+
+ return
+ }
+
+ if !ok {
+ writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did)
+
+ return
+ }
+
+ state = stateVal
}
data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did)
diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go
index 065e8a4..97eae0e 100644
--- a/internal/api/server_hardening_test.go
+++ b/internal/api/server_hardening_test.go
@@ -130,6 +130,10 @@ func TestMetricsExposeRequiredSeries(t *testing.T) {
"checkpoint_sequence",
"disk_bytes_total",
"did_count",
+ "thin_cache_hits_total",
+ "thin_cache_misses_total",
+ "thin_cache_entries",
+ "thin_cache_evictions_total",
} {
if !strings.Contains(text, metric) {
t.Fatalf("metrics output missing %q", metric)
diff --git a/internal/api/server_thin_test.go b/internal/api/server_thin_test.go
new file mode 100644
index 0000000..8b9dfac
--- /dev/null
+++ b/internal/api/server_thin_test.go
@@ -0,0 +1,172 @@
+package api
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+)
+
+func TestThinModeResolveCompatibilityAndProofNotImplemented(t *testing.T) {
+ did := "did:plc:thin-http"
+ upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/"+did+"/log" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(thinHTTPOperationLog(t, did))
+ }))
+ defer upstream.Close()
+
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir: %v", err)
+ }
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ defer store.Close()
+
+ if err := store.SetMode(config.ModeThin); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ cfg := config.Default()
+ cfg.Mode = config.ModeThin
+ cfg.DataDir = dataDir
+ cfg.PLCSource = upstream.URL
+ cfg.ThinCacheTTL = 24 * time.Hour
+ cfg.ThinCacheMaxEntries = 1000
+
+ svc := ingest.NewService(cfg, store, ingest.NewClient(upstream.URL), nil, nil)
+ defer svc.Close()
+
+ ts := httptest.NewServer(NewServer(cfg, store, svc, nil).Handler())
+ defer ts.Close()
+
+ resp, err := http.Get(ts.URL + "/" + did)
+ if err != nil {
+ t.Fatalf("compat resolve: %v", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("compat resolve status: got %d want 200", resp.StatusCode)
+ }
+
+ if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") {
+ t.Fatalf("compat resolve content-type mismatch: %s", got)
+ }
+
+ var doc map[string]any
+ if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil {
+ t.Fatalf("decode did doc: %v", err)
+ }
+
+ if got, _ := doc["id"].(string); got != did {
+ t.Fatalf("did doc id mismatch: got %q want %q", got, did)
+ }
+
+ proofResp, err := http.Get(ts.URL + "/did/" + did + "/proof")
+ if err != nil {
+ t.Fatalf("thin proof request: %v", err)
+ }
+ defer proofResp.Body.Close()
+
+ if proofResp.StatusCode != http.StatusNotImplemented {
+ t.Fatalf("thin proof status: got %d want 501", proofResp.StatusCode)
+ }
+
+ var statusBody map[string]any
+ statusResp, err := http.Get(ts.URL + "/status")
+ if err != nil {
+ t.Fatalf("status request: %v", err)
+ }
+ defer statusResp.Body.Close()
+
+ statusBytes, err := io.ReadAll(statusResp.Body)
+ if err != nil {
+ t.Fatalf("read status body: %v", err)
+ }
+
+ t.Logf("thin_status=%s", strings.TrimSpace(string(statusBytes)))
+
+ if err := json.Unmarshal(statusBytes, &statusBody); err != nil {
+ t.Fatalf("decode status: %v", err)
+ }
+
+ if mode, _ := statusBody["mode"].(string); mode != config.ModeThin {
+ t.Fatalf("status mode mismatch: got %q want %q", mode, config.ModeThin)
+ }
+
+ if err := svc.VerifyDID(context.Background(), did); err != nil {
+ t.Fatalf("verify thin did: %v", err)
+ }
+}
+
+func thinHTTPOperationLog(t *testing.T, did string) []map[string]any {
+ t.Helper()
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+
+ ops := make([]map[string]any, 0, 2)
+ prev := ""
+ for i := 0; i < 2; i++ {
+ unsigned := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": i + 1},
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ }
+ if prev != "" {
+ unsigned["prev"] = prev
+ }
+
+ payload, _ := json.Marshal(unsigned)
+ canon, _ := types.CanonicalizeJSON(payload)
+ sig := ed25519.Sign(priv, canon)
+
+ op := map[string]any{}
+ for k, v := range unsigned {
+ op[k] = v
+ }
+ op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon)
+ op["sig"] = base64.RawURLEncoding.EncodeToString(sig)
+
+ raw, _ := json.Marshal(op)
+ parsed, err := types.ParseOperation(types.ExportRecord{Seq: uint64(i + 1), DID: did, Operation: raw})
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+
+ prev = parsed.CID
+ ops = append(ops, map[string]any{
+ "operation": op,
+ "cid": parsed.CID,
+ "createdAt": time.Now().UTC().Add(time.Duration(i) * time.Second).Format(time.RFC3339),
+ "nullified": false,
+ })
+ }
+
+ return ops
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index f777a21..e94b5ea 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -14,6 +14,7 @@ import (
const (
ModeResolver = "resolver"
ModeMirror = "mirror"
+ ModeThin = "thin"
VerifyFull = "full"
VerifyLazy = "lazy"
VerifyStateOnly = "state-only"
@@ -31,6 +32,8 @@ type Config struct {
VerifyWorkers int `yaml:"verify_workers"`
ExportPageSize int `yaml:"export_page_size"`
ReplayTrace bool `yaml:"replay_trace"`
+ ThinCacheTTL time.Duration `yaml:"thin_cache_ttl"`
+ ThinCacheMaxEntries int `yaml:"thin_cache_max_entries"`
ListenAddr string `yaml:"listen_addr"`
MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"`
PollInterval time.Duration `yaml:"poll_interval"`
@@ -61,6 +64,8 @@ func Default() Config {
VerifyWorkers: runtime.NumCPU(),
ExportPageSize: 1000,
ReplayTrace: false,
+ ThinCacheTTL: 24 * time.Hour,
+ ThinCacheMaxEntries: 100000,
ListenAddr: ":8080",
MirrorPrivateKeyPath: "./mirror.key",
PollInterval: 5 * time.Second,
@@ -153,6 +158,8 @@ func applyEnv(cfg *Config) {
setInt("PLUTIA_VERIFY_WORKERS", &cfg.VerifyWorkers)
setInt("PLUTIA_EXPORT_PAGE_SIZE", &cfg.ExportPageSize)
setBool("PLUTIA_REPLAY_TRACE", &cfg.ReplayTrace)
+ setDuration("PLUTIA_THIN_CACHE_TTL", &cfg.ThinCacheTTL)
+ setInt("PLUTIA_THIN_CACHE_MAX_ENTRIES", &cfg.ThinCacheMaxEntries)
setUint64("PLUTIA_CHECKPOINT_INTERVAL", &cfg.CheckpointInterval)
setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr)
setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath)
@@ -168,7 +175,7 @@ func applyEnv(cfg *Config) {
}
func (c Config) Validate() error {
- if c.Mode != ModeResolver && c.Mode != ModeMirror {
+ if c.Mode != ModeResolver && c.Mode != ModeMirror && c.Mode != ModeThin {
return fmt.Errorf("invalid mode %q", c.Mode)
}
@@ -210,6 +217,14 @@ func (c Config) Validate() error {
return fmt.Errorf("export_page_size must be between 1 and 1000, got %d", c.ExportPageSize)
}
+ if c.ThinCacheTTL <= 0 {
+ return errors.New("thin_cache_ttl must be > 0")
+ }
+
+ if c.ThinCacheMaxEntries <= 0 {
+ return errors.New("thin_cache_max_entries must be > 0")
+ }
+
if c.ListenAddr == "" {
return errors.New("listen_addr is required")
}
diff --git a/internal/config/config_thin_test.go b/internal/config/config_thin_test.go
new file mode 100644
index 0000000..ff549e1
--- /dev/null
+++ b/internal/config/config_thin_test.go
@@ -0,0 +1,35 @@
+package config
+
+import (
+ "testing"
+ "time"
+)
+
+func TestValidateAcceptsThinMode(t *testing.T) {
+ cfg := Default()
+ cfg.Mode = ModeThin
+ cfg.ThinCacheTTL = 24 * time.Hour
+ cfg.ThinCacheMaxEntries = 10
+
+ if err := cfg.Validate(); err != nil {
+ t.Fatalf("validate thin mode: %v", err)
+ }
+}
+
+func TestValidateRejectsInvalidThinCacheConfig(t *testing.T) {
+ cfg := Default()
+ cfg.Mode = ModeThin
+ cfg.ThinCacheTTL = 0
+
+ if err := cfg.Validate(); err == nil {
+ t.Fatalf("expected thin_cache_ttl validation error")
+ }
+
+ cfg = Default()
+ cfg.Mode = ModeThin
+ cfg.ThinCacheMaxEntries = 0
+
+ if err := cfg.Validate(); err == nil {
+ t.Fatalf("expected thin_cache_max_entries validation error")
+ }
+}
diff --git a/internal/ingest/client.go b/internal/ingest/client.go
index eba6fd0..bdef942 100644
--- a/internal/ingest/client.go
+++ b/internal/ingest/client.go
@@ -26,6 +26,13 @@ type Client struct {
opts ClientOptions
}
+type didLogEntry struct {
+ Operation json.RawMessage `json:"operation"`
+ CID string `json:"cid,omitempty"`
+ CreatedAt string `json:"createdAt,omitempty"`
+ Nullified bool `json:"nullified,omitempty"`
+}
+
type ClientOptions struct {
MaxAttempts int
BaseDelay time.Duration
@@ -116,6 +123,131 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR
return c.FetchExportLimited(ctx, after, 0)
}
+func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) {
+ if strings.TrimSpace(did) == "" {
+ return nil, fmt.Errorf("did is required")
+ }
+
+ u, err := url.Parse(c.source)
+ if err != nil {
+ return nil, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log"
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+ if err != nil {
+ return nil, fmt.Errorf("new request: %w", err)
+ }
+
+ req.Header.Set("Accept", "application/json")
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("fetch did log: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
+ return nil, ErrDIDNotFound
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
+ return nil, fmt.Errorf("did log response %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
+ }
+
+ rawBody, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, fmt.Errorf("read did log body: %w", err)
+ }
+
+ var wrapped []didLogEntry
+ if err := json.Unmarshal(rawBody, &wrapped); err == nil && len(wrapped) > 0 && len(bytes.TrimSpace(wrapped[0].Operation)) > 0 {
+ records := make([]types.ExportRecord, 0, len(wrapped))
+ for i, entry := range wrapped {
+ records = append(records, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ CID: entry.CID,
+ CreatedAt: entry.CreatedAt,
+ Nullified: entry.Nullified,
+ Operation: entry.Operation,
+ })
+ }
+
+ return records, nil
+ }
+
+ var ops []json.RawMessage
+ if err := json.Unmarshal(rawBody, &ops); err != nil {
+ return nil, fmt.Errorf("decode did log: %w", err)
+ }
+
+ records := make([]types.ExportRecord, 0, len(ops))
+ for i, op := range ops {
+ records = append(records, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ Operation: op,
+ })
+ }
+
+ auditRecords, err := c.fetchDIDAudit(ctx, did)
+ if err != nil {
+ return nil, fmt.Errorf("did log missing CIDs and audit fallback failed: %w", err)
+ }
+
+ if len(auditRecords) != len(records) {
+ return nil, fmt.Errorf("did log/audit length mismatch: log=%d audit=%d", len(records), len(auditRecords))
+ }
+
+ for i := range records {
+ records[i].CID = strings.TrimSpace(auditRecords[i].CID)
+ if records[i].CID == "" {
+ return nil, fmt.Errorf("missing cid in did audit entry index %d", i)
+ }
+
+ records[i].CreatedAt = auditRecords[i].CreatedAt
+ records[i].Nullified = auditRecords[i].Nullified
+ }
+
+ return records, nil
+}
+
+func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, error) {
+ u, err := url.Parse(c.source)
+ if err != nil {
+ return nil, fmt.Errorf("parse plc source: %w", err)
+ }
+
+ u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log/audit"
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
+ if err != nil {
+ return nil, fmt.Errorf("new request: %w", err)
+ }
+
+ resp, err := c.http.Do(req)
+ if err != nil {
+ return nil, fmt.Errorf("fetch did audit log: %w", err)
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
+ return nil, ErrDIDNotFound
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024))
+ return nil, fmt.Errorf("did audit response %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
+ }
+
+ var entries []didLogEntry
+ if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil {
+ return nil, fmt.Errorf("decode did audit log: %w", err)
+ }
+
+ return entries, nil
+}
+
func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) {
pageSize := uint64(1000)
batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil)
diff --git a/internal/ingest/client_test.go b/internal/ingest/client_test.go
index 50595ec..2552855 100644
--- a/internal/ingest/client_test.go
+++ b/internal/ingest/client_test.go
@@ -1,6 +1,10 @@
package ingest
import (
+ "context"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
"strings"
"testing"
)
@@ -38,3 +42,62 @@ func TestDecodeExportBody_FailsOnMalformedNonTrailingNDJSONLine(t *testing.T) {
t.Fatalf("expected malformed middle line to fail")
}
}
+
+func TestFetchDIDLog_UsesAuditFallbackForCIDs(t *testing.T) {
+ const did = "did:plc:alice"
+ op1 := json.RawMessage(`{"did":"did:plc:alice","sig":"x","sigPayload":"e30","publicKey":"a"}`)
+ op2 := json.RawMessage(`{"did":"did:plc:alice","prev":"bafy-one","sig":"x","sigPayload":"e30","publicKey":"a"}`)
+
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/" + did + "/log":
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode([]json.RawMessage{op1, op2})
+ case "/" + did + "/log/audit":
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode([]map[string]any{
+ {"did": did, "operation": op1, "cid": "bafy-one", "createdAt": "2026-02-27T00:00:00Z"},
+ {"did": did, "operation": op2, "cid": "bafy-two", "createdAt": "2026-02-27T00:00:01Z"},
+ })
+ default:
+ http.NotFound(w, r)
+ }
+ }))
+ defer srv.Close()
+
+ client := NewClient(srv.URL)
+ records, err := client.FetchDIDLog(context.Background(), did)
+ if err != nil {
+ t.Fatalf("fetch did log: %v", err)
+ }
+
+ if len(records) != 2 {
+ t.Fatalf("record count mismatch: got %d want 2", len(records))
+ }
+
+ if records[0].CID != "bafy-one" || records[1].CID != "bafy-two" {
+ t.Fatalf("cid fallback mismatch: got [%s %s]", records[0].CID, records[1].CID)
+ }
+}
+
+func TestFetchDIDLog_FailsWhenCIDsUnavailable(t *testing.T) {
+ const did = "did:plc:bob"
+
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/" + did + "/log":
+ _ = json.NewEncoder(w).Encode([]json.RawMessage{json.RawMessage(`{"did":"did:plc:bob"}`)})
+ case "/" + did + "/log/audit":
+ http.NotFound(w, r)
+ default:
+ http.NotFound(w, r)
+ }
+ }))
+ defer srv.Close()
+
+ client := NewClient(srv.URL)
+ _, err := client.FetchDIDLog(context.Background(), did)
+ if err == nil {
+ t.Fatalf("expected cid fallback error")
+ }
+}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 1c95068..873204f 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -32,6 +32,10 @@ type Stats struct {
DIDCount uint64 `json:"did_count"`
CheckpointSeq uint64 `json:"checkpoint_sequence"`
IngestOpsPerSec float64 `json:"ingest_ops_per_second"`
+ ThinCacheHits uint64 `json:"thin_cache_hits"`
+ ThinCacheMisses uint64 `json:"thin_cache_misses"`
+ ThinCacheCount uint64 `json:"thin_cache_entries"`
+ ThinEvictions uint64 `json:"thin_cache_evictions"`
}
type Service struct {
@@ -85,6 +89,9 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog
if didCount, err := countStates(store); err == nil {
atomic.StoreUint64(&s.stats.DIDCount, didCount)
+ if cfg.Mode == config.ModeThin {
+ atomic.StoreUint64(&s.stats.ThinCacheCount, didCount)
+ }
}
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
@@ -121,6 +128,12 @@ func (s *Service) Replay(ctx context.Context) error {
}
func (s *Service) Poll(ctx context.Context) error {
+ if s.cfg.Mode == config.ModeThin {
+ <-ctx.Done()
+
+ return ctx.Err()
+ }
+
ticker := time.NewTicker(s.cfg.PollInterval)
defer ticker.Stop()
@@ -143,6 +156,10 @@ func (s *Service) Poll(ctx context.Context) error {
}
func (s *Service) RunOnce(ctx context.Context) (bool, error) {
+ if s.cfg.Mode == config.ModeThin {
+ return false, nil
+ }
+
if s.startupErr != nil {
return false, s.startupErr
}
@@ -956,6 +973,12 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error {
return err
}
+ if s.cfg.Mode == config.ModeThin {
+ _, _, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+
+ return err
+ }
+
if s.cfg.Mode != config.ModeMirror {
_, ok, err := s.store.GetState(did)
@@ -1107,11 +1130,227 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
return tip, filtered, nil
}
+func (s *Service) ResolveState(ctx context.Context, did string) (types.StateV1, error) {
+ if err := s.CorruptionError(); err != nil {
+ return types.StateV1{}, err
+ }
+
+ if s.cfg.Mode != config.ModeThin {
+ stateVal, ok, err := s.store.GetState(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ if !ok {
+ return types.StateV1{}, ErrDIDNotFound
+ }
+
+ return stateVal, nil
+ }
+
+ return s.resolveThinState(ctx, did)
+}
+
+func (s *Service) resolveThinState(ctx context.Context, did string) (types.StateV1, error) {
+ now := time.Now().UTC()
+ stateVal, stateOK, err := s.store.GetState(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ meta, metaOK, err := s.store.GetThinCacheMeta(did)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ if stateOK && metaOK && isThinCacheFresh(meta, now, s.cfg.ThinCacheTTL) {
+ atomic.AddUint64(&s.stats.ThinCacheHits, 1)
+
+ meta.LastAccess = now
+
+ if err := s.store.PutThinCacheMeta(meta); err != nil {
+ return types.StateV1{}, err
+ }
+
+ return stateVal, nil
+ }
+
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+
+ verifiedState, _, err := s.refreshThinStateFromUpstream(ctx, did, now)
+ if err != nil {
+ return types.StateV1{}, err
+ }
+
+ return verifiedState, nil
+}
+
+func (s *Service) refreshThinStateFromUpstream(ctx context.Context, did string, now time.Time) (types.StateV1, []types.ExportRecord, error) {
+ records, err := s.client.FetchDIDLog(ctx, did)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ stateVal, normalized, err := s.verifyThinChain(ctx, did, records)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ stateVal.UpdatedAt = now
+
+ if err := s.store.PutState(stateVal); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ meta := types.ThinCacheMetaV1{
+ Version: 1,
+ DID: did,
+ LastVerified: now,
+ LastAccess: now,
+ }
+ if err := s.store.PutThinCacheMeta(meta); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ evicted, entries, err := s.enforceThinCacheLimit()
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ if evicted > 0 {
+ atomic.AddUint64(&s.stats.ThinEvictions, evicted)
+ }
+
+ atomic.StoreUint64(&s.stats.ThinCacheCount, entries)
+ atomic.StoreUint64(&s.stats.DIDCount, entries)
+
+ return stateVal, normalized, nil
+}
+
+func (s *Service) verifyThinChain(ctx context.Context, did string, records []types.ExportRecord) (types.StateV1, []types.ExportRecord, error) {
+ if len(records) == 0 {
+ return types.StateV1{}, nil, ErrDIDNotFound
+ }
+
+ verifier := verify.New(config.VerifyFull)
+ var previous *types.StateV1
+ normalized := make([]types.ExportRecord, 0, len(records))
+
+ for i, rec := range records {
+ if err := ctx.Err(); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ rec.Seq = uint64(i + 1)
+
+ if rec.DID == "" {
+ rec.DID = did
+ }
+
+ if rec.DID != did {
+ return types.StateV1{}, nil, fmt.Errorf("unexpected DID in log: got %s want %s", rec.DID, did)
+ }
+
+ op, err := types.ParseOperation(rec)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ if err := verifier.VerifyOperation(op, previous); err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ next, err := state.ComputeNextState(op, previous)
+ if err != nil {
+ return types.StateV1{}, nil, err
+ }
+
+ normalized = append(normalized, types.ExportRecord{
+ Seq: rec.Seq,
+ DID: did,
+ CreatedAt: rec.CreatedAt,
+ CID: op.CID,
+ Nullified: rec.Nullified,
+ Operation: op.CanonicalBytes,
+ })
+ previous = &next
+ }
+
+ if previous == nil {
+ return types.StateV1{}, nil, ErrDIDNotFound
+ }
+
+ return *previous, normalized, nil
+}
+
+func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool {
+ if meta.LastVerified.IsZero() {
+ return false
+ }
+
+ return now.Sub(meta.LastVerified) <= ttl
+}
+
+func (s *Service) enforceThinCacheLimit() (uint64, uint64, error) {
+ maxEntries := s.cfg.ThinCacheMaxEntries
+ if maxEntries <= 0 {
+ maxEntries = config.Default().ThinCacheMaxEntries
+ }
+
+ entries, err := s.store.ListThinCacheMeta()
+ if err != nil {
+ return 0, 0, err
+ }
+
+ if len(entries) <= maxEntries {
+ return 0, uint64(len(entries)), nil
+ }
+
+ sort.Slice(entries, func(i, j int) bool {
+ left := entries[i]
+ right := entries[j]
+
+ if !left.LastAccess.Equal(right.LastAccess) {
+ return left.LastAccess.Before(right.LastAccess)
+ }
+
+ if !left.LastVerified.Equal(right.LastVerified) {
+ return left.LastVerified.Before(right.LastVerified)
+ }
+
+ return left.DID < right.DID
+ })
+
+ var evicted uint64
+ for idx := 0; idx < len(entries)-maxEntries; idx++ {
+ did := entries[idx].DID
+
+ if err := s.store.DeleteState(did); err != nil {
+ return evicted, 0, err
+ }
+
+ if err := s.store.DeleteThinCacheMeta(did); err != nil {
+ return evicted, 0, err
+ }
+
+ evicted++
+ }
+
+ return evicted, uint64(len(entries)) - evicted, nil
+}
+
func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) {
if err := s.CorruptionError(); err != nil {
return nil, err
}
+ if s.cfg.Mode == config.ModeThin {
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+ _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+
+ return records, err
+ }
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return nil, ErrHistoryNotStored
}
@@ -1167,6 +1406,21 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types
return types.ExportRecord{}, err
}
+ if s.cfg.Mode == config.ModeThin {
+ atomic.AddUint64(&s.stats.ThinCacheMisses, 1)
+
+ _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC())
+ if err != nil {
+ return types.ExportRecord{}, err
+ }
+
+ if len(records) == 0 {
+ return types.ExportRecord{}, ErrDIDNotFound
+ }
+
+ return records[len(records)-1], nil
+ }
+
if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
return types.ExportRecord{}, ErrHistoryNotStored
}
@@ -1218,6 +1472,12 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin
return nil, err
}
+ if s.cfg.Mode == config.ModeThin {
+ if _, err := s.ResolveState(ctx, did); err != nil {
+ return nil, err
+ }
+ }
+
stateVal, stateOK, err := s.store.GetState(did)
if err != nil {
@@ -1711,6 +1971,10 @@ func (s *Service) Stats() Stats {
DIDCount: atomic.LoadUint64(&s.stats.DIDCount),
CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq),
IngestOpsPerSec: opsPerSec,
+ ThinCacheHits: atomic.LoadUint64(&s.stats.ThinCacheHits),
+ ThinCacheMisses: atomic.LoadUint64(&s.stats.ThinCacheMisses),
+ ThinCacheCount: atomic.LoadUint64(&s.stats.ThinCacheCount),
+ ThinEvictions: atomic.LoadUint64(&s.stats.ThinEvictions),
}
}
diff --git a/internal/ingest/service_thin_test.go b/internal/ingest/service_thin_test.go
new file mode 100644
index 0000000..850433f
--- /dev/null
+++ b/internal/ingest/service_thin_test.go
@@ -0,0 +1,323 @@
+package ingest
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestThinResolveStateCacheTTL(t *testing.T) {
+ did := "did:plc:thin-a"
+ logRecords := signedDIDLog(t, did, 2)
+ upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: logRecords})
+ defer upstream.close()
+
+ store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10)
+ defer cleanup()
+
+ if _, err := svc.ResolveState(context.Background(), did); err != nil {
+ t.Fatalf("first resolve: %v", err)
+ }
+
+ stats := svc.Stats()
+ if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 0 {
+ t.Fatalf("unexpected stats after first resolve: %+v", stats)
+ }
+
+ if _, err := svc.ResolveState(context.Background(), did); err != nil {
+ t.Fatalf("second resolve: %v", err)
+ }
+
+ stats = svc.Stats()
+ if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 1 {
+ t.Fatalf("unexpected stats after second resolve: %+v", stats)
+ }
+
+ if got := upstream.hitCount(did); got != 1 {
+ t.Fatalf("upstream calls mismatch after cache hit: got %d want 1", got)
+ }
+
+ meta, ok, err := store.GetThinCacheMeta(did)
+ if err != nil || !ok {
+ t.Fatalf("load thin meta: ok=%v err=%v", ok, err)
+ }
+
+ meta.LastVerified = time.Now().UTC().Add(-2 * time.Hour)
+ if err := store.PutThinCacheMeta(meta); err != nil {
+ t.Fatalf("expire meta: %v", err)
+ }
+
+ if _, err := svc.ResolveState(context.Background(), did); err != nil {
+ t.Fatalf("third resolve: %v", err)
+ }
+
+ stats = svc.Stats()
+ if stats.ThinCacheMisses != 2 || stats.ThinCacheHits != 1 {
+ t.Fatalf("unexpected stats after ttl refresh: %+v", stats)
+ }
+
+ if got := upstream.hitCount(did); got != 2 {
+ t.Fatalf("upstream calls mismatch after ttl refresh: got %d want 2", got)
+ }
+}
+
+func TestThinResolveStateEvictsLRU(t *testing.T) {
+ didA := "did:plc:thin-1"
+ didB := "did:plc:thin-2"
+ didC := "did:plc:thin-3"
+ upstream := newThinUpstream(t, map[string][]types.ExportRecord{
+ didA: signedDIDLog(t, didA, 1),
+ didB: signedDIDLog(t, didB, 1),
+ didC: signedDIDLog(t, didC, 1),
+ })
+ defer upstream.close()
+
+ store, svc, cleanup := newThinService(t, upstream.url, 24*time.Hour, 2)
+ defer cleanup()
+
+ for _, did := range []string{didA, didB} {
+ if _, err := svc.ResolveState(context.Background(), did); err != nil {
+ t.Fatalf("resolve %s: %v", did, err)
+ }
+ }
+
+ if _, err := svc.ResolveState(context.Background(), didA); err != nil {
+ t.Fatalf("refresh didA access: %v", err)
+ }
+
+ if _, err := svc.ResolveState(context.Background(), didC); err != nil {
+ t.Fatalf("resolve didC: %v", err)
+ }
+
+ if _, ok, err := store.GetState(didB); err != nil || ok {
+ t.Fatalf("didB expected eviction: ok=%v err=%v", ok, err)
+ }
+
+ for _, did := range []string{didA, didC} {
+ if _, ok, err := store.GetState(did); err != nil || !ok {
+ t.Fatalf("did %s expected cached: ok=%v err=%v", did, ok, err)
+ }
+ }
+
+ stats := svc.Stats()
+ if stats.ThinEvictions != 1 {
+ t.Fatalf("evictions mismatch: got %d want 1", stats.ThinEvictions)
+ }
+
+ if stats.ThinCacheCount != 2 || stats.DIDCount != 2 {
+ t.Fatalf("cache count mismatch: %+v", stats)
+ }
+}
+
+func TestThinResolveStateRejectsInvalidChain(t *testing.T) {
+ did := "did:plc:thin-invalid"
+ ops := signedDIDLog(t, did, 2)
+
+ var second map[string]any
+ if err := json.Unmarshal(ops[1].Operation, &second); err != nil {
+ t.Fatalf("decode op: %v", err)
+ }
+
+ second["prev"] = "sha256:invalid"
+ rawSecond, _ := json.Marshal(second)
+ ops[1].Operation = rawSecond
+ ops[1].CID = ""
+
+ upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: ops})
+ defer upstream.close()
+
+ store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10)
+ defer cleanup()
+
+ if _, err := svc.ResolveState(context.Background(), did); err == nil {
+ t.Fatalf("expected invalid chain error")
+ }
+
+ if _, ok, err := store.GetState(did); err != nil || ok {
+ t.Fatalf("invalid chain should not be cached: ok=%v err=%v", ok, err)
+ }
+}
+
+func newThinService(t *testing.T, source string, ttl time.Duration, maxEntries int) (*storage.PebbleStore, *Service, func()) {
+ t.Helper()
+
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir: %v", err)
+ }
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+
+ if err := store.SetMode(config.ModeThin); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+
+ cfg := config.Default()
+ cfg.Mode = config.ModeThin
+ cfg.DataDir = dataDir
+ cfg.PLCSource = source
+ cfg.ThinCacheTTL = ttl
+ cfg.ThinCacheMaxEntries = maxEntries
+ cfg.VerifyPolicy = config.VerifyFull
+
+ svc := NewService(cfg, store, NewClient(source), nil, nil)
+ cleanup := func() {
+ svc.Close()
+ _ = store.Close()
+ }
+
+ return store, svc, cleanup
+}
+
+type thinUpstream struct {
+ t *testing.T
+ url string
+ srv *httptest.Server
+ mu sync.Mutex
+ logs map[string][]types.ExportRecord
+ hits map[string]int
+}
+
+func newThinUpstream(t *testing.T, logs map[string][]types.ExportRecord) *thinUpstream {
+ t.Helper()
+
+ u := &thinUpstream{
+ t: t,
+ logs: logs,
+ hits: map[string]int{},
+ }
+
+ u.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ return
+ }
+
+ path := strings.TrimPrefix(r.URL.Path, "/")
+ if !strings.HasSuffix(path, "/log") {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ did := strings.TrimSuffix(path, "/log")
+ did = strings.TrimSpace(did)
+ if did == "" {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ u.mu.Lock()
+ u.hits[did]++
+ records, ok := u.logs[did]
+ u.mu.Unlock()
+ if !ok {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ type entry struct {
+ Operation json.RawMessage `json:"operation"`
+ CID string `json:"cid"`
+ CreatedAt string `json:"createdAt"`
+ Nullified bool `json:"nullified"`
+ }
+ out := make([]entry, 0, len(records))
+ for _, rec := range records {
+ out = append(out, entry{
+ Operation: rec.Operation,
+ CID: rec.CID,
+ CreatedAt: rec.CreatedAt,
+ Nullified: rec.Nullified,
+ })
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ _ = json.NewEncoder(w).Encode(out)
+ }))
+ u.url = u.srv.URL
+
+ return u
+}
+
+func (u *thinUpstream) hitCount(did string) int {
+ u.mu.Lock()
+ defer u.mu.Unlock()
+
+ return u.hits[did]
+}
+
+func (u *thinUpstream) close() {
+ u.srv.Close()
+}
+
+func signedDIDLog(t *testing.T, did string, length int) []types.ExportRecord {
+ t.Helper()
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("generate key: %v", err)
+ }
+
+ records := make([]types.ExportRecord, 0, length)
+ prev := ""
+
+ for i := 0; i < length; i++ {
+ unsigned := map[string]any{
+ "did": did,
+ "didDoc": map[string]any{"id": did, "seq": i + 1},
+ "publicKey": base64.RawURLEncoding.EncodeToString(pub),
+ }
+ if prev != "" {
+ unsigned["prev"] = prev
+ }
+
+ payload, _ := json.Marshal(unsigned)
+ canon, _ := types.CanonicalizeJSON(payload)
+ sig := ed25519.Sign(priv, canon)
+
+ op := map[string]any{}
+ for k, v := range unsigned {
+ op[k] = v
+ }
+ op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon)
+ op["sig"] = base64.RawURLEncoding.EncodeToString(sig)
+
+ raw, _ := json.Marshal(op)
+ parsed, err := types.ParseOperation(types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ Operation: raw,
+ })
+ if err != nil {
+ t.Fatalf("parse operation: %v", err)
+ }
+
+ prev = parsed.CID
+ records = append(records, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ CID: parsed.CID,
+ CreatedAt: time.Now().UTC().Add(time.Duration(i) * time.Second).Format(time.RFC3339),
+ Operation: raw,
+ })
+ }
+
+ return records
+}
diff --git a/internal/ingest/thin_disk_test.go b/internal/ingest/thin_disk_test.go
new file mode 100644
index 0000000..caa1a04
--- /dev/null
+++ b/internal/ingest/thin_disk_test.go
@@ -0,0 +1,181 @@
+package ingest
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "testing"
+ "time"
+)
+
+func TestThinDiskFootprintFor1000DIDs(t *testing.T) {
+ const didCount = 1000
+
+ dids := make([]string, 0, didCount)
+ recordsByDID := make(map[string][]types.ExportRecord, didCount)
+ mirrorRecords := make([]types.ExportRecord, 0, didCount)
+
+ for i := 0; i < didCount; i++ {
+ did := fmt.Sprintf("did:plc:thin-%04d", i)
+ dids = append(dids, did)
+
+ logRecords := signedDIDLog(t, did, 1)
+ recordsByDID[did] = logRecords
+ mirrorRecords = append(mirrorRecords, types.ExportRecord{
+ Seq: uint64(i + 1),
+ DID: did,
+ CreatedAt: logRecords[0].CreatedAt,
+ CID: logRecords[0].CID,
+ Operation: logRecords[0].Operation,
+ })
+ }
+
+ rng := rand.New(rand.NewSource(42))
+ rng.Shuffle(len(dids), func(i, j int) { dids[i], dids[j] = dids[j], dids[i] })
+
+ upstream := newThinUpstream(t, recordsByDID)
+ defer upstream.close()
+
+ thinDir := filepath.Join(t.TempDir(), "thin-data")
+ if err := os.MkdirAll(thinDir, 0o755); err != nil {
+ t.Fatalf("mkdir thin dir: %v", err)
+ }
+
+ thinStore, err := storage.OpenPebble(thinDir)
+ if err != nil {
+ t.Fatalf("open thin pebble: %v", err)
+ }
+ defer thinStore.Close()
+
+ if err := thinStore.SetMode(config.ModeThin); err != nil {
+ t.Fatalf("set thin mode: %v", err)
+ }
+
+ thinCfg := config.Default()
+ thinCfg.Mode = config.ModeThin
+ thinCfg.DataDir = thinDir
+ thinCfg.PLCSource = upstream.url
+ thinCfg.VerifyPolicy = config.VerifyFull
+ thinCfg.ThinCacheTTL = 24 * time.Hour
+ thinCfg.ThinCacheMaxEntries = didCount + 10
+
+ thinSvc := NewService(thinCfg, thinStore, NewClient(upstream.url), nil, nil)
+ defer thinSvc.Close()
+
+ for _, did := range dids {
+ if _, err := thinSvc.ResolveState(context.Background(), did); err != nil {
+ t.Fatalf("thin resolve %s: %v", did, err)
+ }
+ }
+
+ thinSize, err := dirSizeForTest(thinDir)
+ if err != nil {
+ t.Fatalf("thin dir size: %v", err)
+ }
+
+ mirrorDir := filepath.Join(t.TempDir(), "mirror-data")
+ if err := os.MkdirAll(mirrorDir, 0o755); err != nil {
+ t.Fatalf("mkdir mirror dir: %v", err)
+ }
+
+ mirrorStore, err := storage.OpenPebble(mirrorDir)
+ if err != nil {
+ t.Fatalf("open mirror pebble: %v", err)
+ }
+ defer mirrorStore.Close()
+
+ if err := mirrorStore.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mirror mode: %v", err)
+ }
+
+ mirrorLog, err := storage.OpenBlockLog(mirrorDir, 9, 4)
+ if err != nil {
+ t.Fatalf("open mirror blocklog: %v", err)
+ }
+
+ exportPath := filepath.Join(mirrorDir, "mirror-source.ndjson")
+ f, err := os.Create(exportPath)
+ if err != nil {
+ t.Fatalf("create mirror source: %v", err)
+ }
+
+ enc := json.NewEncoder(f)
+ for _, rec := range mirrorRecords {
+ if err := enc.Encode(rec); err != nil {
+ _ = f.Close()
+ t.Fatalf("encode mirror source: %v", err)
+ }
+ }
+
+ if err := f.Close(); err != nil {
+ t.Fatalf("close mirror source: %v", err)
+ }
+
+ mirrorCfg := config.Default()
+ mirrorCfg.Mode = config.ModeMirror
+ mirrorCfg.DataDir = mirrorDir
+ mirrorCfg.PLCSource = exportPath
+ mirrorCfg.VerifyPolicy = config.VerifyFull
+ mirrorCfg.ZstdLevel = 9
+ mirrorCfg.BlockSizeMB = 4
+ mirrorCfg.CommitBatchSize = 128
+ mirrorCfg.VerifyWorkers = 8
+ mirrorCfg.CheckpointInterval = 1000000
+
+ mirrorSvc := NewService(mirrorCfg, mirrorStore, NewClient(exportPath), mirrorLog, nil)
+ defer mirrorSvc.Close()
+
+ if err := mirrorSvc.Replay(context.Background()); err != nil {
+ t.Fatalf("mirror replay: %v", err)
+ }
+
+ if err := mirrorSvc.Flush(context.Background()); err != nil {
+ t.Fatalf("mirror flush: %v", err)
+ }
+
+ mirrorSize, err := dirSizeForTest(mirrorDir)
+ if err != nil {
+ t.Fatalf("mirror dir size: %v", err)
+ }
+
+ if thinSize >= mirrorSize {
+ t.Fatalf("expected thin footprint smaller than mirror footprint: thin=%d mirror=%d", thinSize, mirrorSize)
+ }
+
+ t.Logf("thin_cache_total_bytes=%d did_count=%d bytes_per_did=%.2f", thinSize, didCount, float64(thinSize)/didCount)
+ t.Logf("mirror_total_bytes=%d op_count=%d bytes_per_op=%.2f", mirrorSize, didCount, float64(mirrorSize)/didCount)
+}
+
+func dirSizeForTest(path string) (int64, error) {
+ var total int64
+
+ err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+
+ if d.IsDir() {
+ return nil
+ }
+
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+
+ total += info.Size()
+
+ return nil
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ return total, nil
+}
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go
index b0a8e01..11e2dad 100644
--- a/internal/storage/pebble_store.go
+++ b/internal/storage/pebble_store.go
@@ -71,6 +71,27 @@ func (p *PebbleStore) PutState(state types.StateV1) error {
return p.db.Set(didKey(state.DID), b, pebble.Sync)
}
+func (p *PebbleStore) DeleteState(did string) error {
+ batch := p.db.NewBatch()
+ defer batch.Close()
+
+ if err := batch.Delete(didKey(did), nil); err != nil {
+ return err
+ }
+
+ if err := batch.Delete(chainKey(did), nil); err != nil {
+ return err
+ }
+
+ prefix := []byte("didop:" + did + ":")
+ upper := append(append([]byte(nil), prefix...), 0xFF)
+ if err := batch.DeleteRange(prefix, upper, nil); err != nil {
+ return err
+ }
+
+ return batch.Commit(pebble.Sync)
+}
+
func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error {
var opRef *types.BlockRefV1
@@ -141,6 +162,57 @@ func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error {
return nil
}
+func (p *PebbleStore) PutThinCacheMeta(meta types.ThinCacheMetaV1) error {
+ b, err := json.Marshal(meta)
+ if err != nil {
+ return fmt.Errorf("marshal thin cache meta: %w", err)
+ }
+
+ return p.db.Set(thinMetaKey(meta.DID), b, pebble.Sync)
+}
+
+func (p *PebbleStore) GetThinCacheMeta(did string) (types.ThinCacheMetaV1, bool, error) {
+ b, ok, err := p.getBytes(thinMetaKey(did))
+ if err != nil || !ok {
+ return types.ThinCacheMetaV1{}, ok, err
+ }
+
+ var meta types.ThinCacheMetaV1
+ if err := json.Unmarshal(b, &meta); err != nil {
+ return types.ThinCacheMetaV1{}, false, fmt.Errorf("unmarshal thin cache meta: %w", err)
+ }
+
+ return meta, true, nil
+}
+
+func (p *PebbleStore) ListThinCacheMeta() ([]types.ThinCacheMetaV1, error) {
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("thinmeta:"), UpperBound: []byte("thinmeta;")})
+ if err != nil {
+ return nil, fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ out := make([]types.ThinCacheMetaV1, 0)
+ for iter.First(); iter.Valid(); iter.Next() {
+ var meta types.ThinCacheMetaV1
+ if err := json.Unmarshal(iter.Value(), &meta); err != nil {
+ return nil, fmt.Errorf("unmarshal thin cache meta: %w", err)
+ }
+
+ out = append(out, meta)
+ }
+
+ if err := iter.Error(); err != nil {
+ return nil, fmt.Errorf("iterate thin cache meta: %w", err)
+ }
+
+ return out, nil
+}
+
+func (p *PebbleStore) DeleteThinCacheMeta(did string) error {
+ return p.db.Delete(thinMetaKey(did), pebble.Sync)
+}
+
func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error {
if len(ops) == 0 && len(blockHashes) == 0 {
return nil
@@ -484,6 +556,8 @@ func didOpKey(did string, seq uint64) []byte {
return append([]byte("didop:"+did+":"), u64bytes(seq)...)
}
+func thinMetaKey(did string) []byte { return []byte("thinmeta:" + did) }
+
func u64bytes(v uint64) []byte {
b := make([]byte, 8)
diff --git a/internal/storage/store.go b/internal/storage/store.go
index 5267386..456e758 100644
--- a/internal/storage/store.go
+++ b/internal/storage/store.go
@@ -19,9 +19,14 @@ type Store interface {
GetGlobalSeq() (uint64, error)
SetGlobalSeq(seq uint64) error
PutState(state types.StateV1) error
+ DeleteState(did string) error
GetState(did string) (types.StateV1, bool, error)
ListStates() ([]types.StateV1, error)
ForEachState(fn func(types.StateV1) error) error
+ PutThinCacheMeta(meta types.ThinCacheMetaV1) error
+ GetThinCacheMeta(did string) (types.ThinCacheMetaV1, bool, error)
+ ListThinCacheMeta() ([]types.ThinCacheMetaV1, error)
+ DeleteThinCacheMeta(did string) error
ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error
SetChainHead(did string, seq uint64) error
GetChainHead(did string) (uint64, bool, error)
diff --git a/internal/types/state.go b/internal/types/state.go
index 31b9372..3f97c34 100644
--- a/internal/types/state.go
+++ b/internal/types/state.go
@@ -44,3 +44,11 @@ type CheckpointV1 struct {
Signature string `json:"signature"`
CheckpointHash string `json:"checkpoint_hash"`
}
+
+// ThinCacheMetaV1 tracks thin-mode cache freshness and eviction order.
+type ThinCacheMetaV1 struct {
+ Version uint8 `json:"v"`
+ DID string `json:"did"`
+ LastVerified time.Time `json:"last_verified"`
+ LastAccess time.Time `json:"last_access"`
+}