diff options
| author | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-27 09:26:06 -0800 |
| commit | 8980607ef8e7426b601f942f26ae2cd4c4f3edff (patch) | |
| tree | 60bdb4bbbe5755223c3387179ee7406432d084ab /internal | |
| parent | fix: make mirror replay lossless with strict seq accounting and trace (diff) | |
| download | plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.tar.xz plutia-test-8980607ef8e7426b601f942f26ae2cd4c4f3edff.zip | |
feat: add thin mode for on-demand verified PLC resolution
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/observability.go | 52 | ||||
| -rw-r--r-- | internal/api/server.go | 90 | ||||
| -rw-r--r-- | internal/api/server_hardening_test.go | 4 | ||||
| -rw-r--r-- | internal/api/server_thin_test.go | 172 | ||||
| -rw-r--r-- | internal/config/config.go | 17 | ||||
| -rw-r--r-- | internal/config/config_thin_test.go | 35 | ||||
| -rw-r--r-- | internal/ingest/client.go | 132 | ||||
| -rw-r--r-- | internal/ingest/client_test.go | 63 | ||||
| -rw-r--r-- | internal/ingest/service.go | 264 | ||||
| -rw-r--r-- | internal/ingest/service_thin_test.go | 323 | ||||
| -rw-r--r-- | internal/ingest/thin_disk_test.go | 181 | ||||
| -rw-r--r-- | internal/storage/pebble_store.go | 74 | ||||
| -rw-r--r-- | internal/storage/store.go | 5 | ||||
| -rw-r--r-- | internal/types/state.go | 8 |
14 files changed, 1399 insertions, 21 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go index 008f5f2..ad67bc9 100644 --- a/internal/api/observability.go +++ b/internal/api/observability.go @@ -146,6 +146,58 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S return float64(count) }, )) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "thin_cache_hits_total", + Help: "Total thin-mode cache hits.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + + return float64(ingestor.Stats().ThinCacheHits) + }, + )) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "thin_cache_misses_total", + Help: "Total thin-mode cache misses.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + + return float64(ingestor.Stats().ThinCacheMisses) + }, + )) + reg.MustRegister(prometheus.NewGaugeFunc( + prometheus.GaugeOpts{ + Name: "thin_cache_entries", + Help: "Current number of entries in the thin-mode DID cache.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + + return float64(ingestor.Stats().ThinCacheCount) + }, + )) + reg.MustRegister(prometheus.NewCounterFunc( + prometheus.CounterOpts{ + Name: "thin_cache_evictions_total", + Help: "Total entries evicted from thin-mode DID cache.", + }, + func() float64 { + if ingestor == nil { + return 0 + } + + return float64(ingestor.Stats().ThinEvictions) + }, + )) return m } diff --git a/internal/api/server.go b/internal/api/server.go index c66e5af..9895a65 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -212,18 +212,43 @@ func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) { } func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) { - state, ok, err := s.store.GetState(did) + var state types.StateV1 + if s.ingestor != nil { + resolvedState, err := s.ingestor.ResolveState(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) { + writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found")) - if err != nil { - writeErr(w, http.StatusInternalServerError, err) + return + } - return - } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) - if !ok { - writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found")) + return + } - return + writeErr(w, http.StatusInternalServerError, err) + + return + } + + state = resolvedState + } else { + stateVal, ok, err := s.store.GetState(did) + if err != nil { + writeErr(w, http.StatusInternalServerError, err) + + return + } + + if !ok { + writeErr(w, http.StatusNotFound, fmt.Errorf("DID not found")) + + return + } + + state = stateVal } cp, cpOK, err := s.store.GetLatestCheckpoint() @@ -251,6 +276,12 @@ func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did st } func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) { + if s.cfg.Mode == config.ModeThin { + writeErr(w, http.StatusNotImplemented, fmt.Errorf("thin mode does not support checkpoint proofs")) + + return + } + if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("service unavailable")) @@ -448,24 +479,43 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) } func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, r *http.Request, did string) { - state, ok, err := s.store.GetState(did) + var state types.StateV1 + if s.ingestor != nil { + resolvedState, err := s.ingestor.ResolveState(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) { + writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did) - if err != nil { - writeCompatibilityErr(w, http.StatusInternalServerError, err.Error()) + return + } - return - } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeCompatibilityErr(w, http.StatusGatewayTimeout, err.Error()) - if !ok { - writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did) + return + } - return - } + writeCompatibilityErr(w, http.StatusInternalServerError, err.Error()) - if s.ingestor == nil { - writeCompatibilityErr(w, http.StatusServiceUnavailable, "service unavailable") + return + } - return + state = resolvedState + } else { + stateVal, ok, err := s.store.GetState(did) + if err != nil { + writeCompatibilityErr(w, http.StatusInternalServerError, err.Error()) + + return + } + + if !ok { + writeCompatibilityErr(w, http.StatusNotFound, "DID not registered: "+did) + + return + } + + state = stateVal } data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did) diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go index 065e8a4..97eae0e 100644 --- a/internal/api/server_hardening_test.go +++ b/internal/api/server_hardening_test.go @@ -130,6 +130,10 @@ func TestMetricsExposeRequiredSeries(t *testing.T) { "checkpoint_sequence", "disk_bytes_total", "did_count", + "thin_cache_hits_total", + "thin_cache_misses_total", + "thin_cache_entries", + "thin_cache_evictions_total", } { if !strings.Contains(text, metric) { t.Fatalf("metrics output missing %q", metric) diff --git a/internal/api/server_thin_test.go b/internal/api/server_thin_test.go new file mode 100644 index 0000000..8b9dfac --- /dev/null +++ b/internal/api/server_thin_test.go @@ -0,0 +1,172 @@ +package api + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/ingest" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" +) + +func TestThinModeResolveCompatibilityAndProofNotImplemented(t *testing.T) { + did := "did:plc:thin-http" + upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/"+did+"/log" { + w.WriteHeader(http.StatusNotFound) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(thinHTTPOperationLog(t, did)) + })) + defer upstream.Close() + + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + defer store.Close() + + if err := store.SetMode(config.ModeThin); err != nil { + t.Fatalf("set mode: %v", err) + } + + cfg := config.Default() + cfg.Mode = config.ModeThin + cfg.DataDir = dataDir + cfg.PLCSource = upstream.URL + cfg.ThinCacheTTL = 24 * time.Hour + cfg.ThinCacheMaxEntries = 1000 + + svc := ingest.NewService(cfg, store, ingest.NewClient(upstream.URL), nil, nil) + defer svc.Close() + + ts := httptest.NewServer(NewServer(cfg, store, svc, nil).Handler()) + defer ts.Close() + + resp, err := http.Get(ts.URL + "/" + did) + if err != nil { + t.Fatalf("compat resolve: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("compat resolve status: got %d want 200", resp.StatusCode) + } + + if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") { + t.Fatalf("compat resolve content-type mismatch: %s", got) + } + + var doc map[string]any + if err := json.NewDecoder(resp.Body).Decode(&doc); err != nil { + t.Fatalf("decode did doc: %v", err) + } + + if got, _ := doc["id"].(string); got != did { + t.Fatalf("did doc id mismatch: got %q want %q", got, did) + } + + proofResp, err := http.Get(ts.URL + "/did/" + did + "/proof") + if err != nil { + t.Fatalf("thin proof request: %v", err) + } + defer proofResp.Body.Close() + + if proofResp.StatusCode != http.StatusNotImplemented { + t.Fatalf("thin proof status: got %d want 501", proofResp.StatusCode) + } + + var statusBody map[string]any + statusResp, err := http.Get(ts.URL + "/status") + if err != nil { + t.Fatalf("status request: %v", err) + } + defer statusResp.Body.Close() + + statusBytes, err := io.ReadAll(statusResp.Body) + if err != nil { + t.Fatalf("read status body: %v", err) + } + + t.Logf("thin_status=%s", strings.TrimSpace(string(statusBytes))) + + if err := json.Unmarshal(statusBytes, &statusBody); err != nil { + t.Fatalf("decode status: %v", err) + } + + if mode, _ := statusBody["mode"].(string); mode != config.ModeThin { + t.Fatalf("status mode mismatch: got %q want %q", mode, config.ModeThin) + } + + if err := svc.VerifyDID(context.Background(), did); err != nil { + t.Fatalf("verify thin did: %v", err) + } +} + +func thinHTTPOperationLog(t *testing.T, did string) []map[string]any { + t.Helper() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + + ops := make([]map[string]any, 0, 2) + prev := "" + for i := 0; i < 2; i++ { + unsigned := map[string]any{ + "did": did, + "didDoc": map[string]any{"id": did, "seq": i + 1}, + "publicKey": base64.RawURLEncoding.EncodeToString(pub), + } + if prev != "" { + unsigned["prev"] = prev + } + + payload, _ := json.Marshal(unsigned) + canon, _ := types.CanonicalizeJSON(payload) + sig := ed25519.Sign(priv, canon) + + op := map[string]any{} + for k, v := range unsigned { + op[k] = v + } + op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon) + op["sig"] = base64.RawURLEncoding.EncodeToString(sig) + + raw, _ := json.Marshal(op) + parsed, err := types.ParseOperation(types.ExportRecord{Seq: uint64(i + 1), DID: did, Operation: raw}) + if err != nil { + t.Fatalf("parse operation: %v", err) + } + + prev = parsed.CID + ops = append(ops, map[string]any{ + "operation": op, + "cid": parsed.CID, + "createdAt": time.Now().UTC().Add(time.Duration(i) * time.Second).Format(time.RFC3339), + "nullified": false, + }) + } + + return ops +} diff --git a/internal/config/config.go b/internal/config/config.go index f777a21..e94b5ea 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -14,6 +14,7 @@ import ( const ( ModeResolver = "resolver" ModeMirror = "mirror" + ModeThin = "thin" VerifyFull = "full" VerifyLazy = "lazy" VerifyStateOnly = "state-only" @@ -31,6 +32,8 @@ type Config struct { VerifyWorkers int `yaml:"verify_workers"` ExportPageSize int `yaml:"export_page_size"` ReplayTrace bool `yaml:"replay_trace"` + ThinCacheTTL time.Duration `yaml:"thin_cache_ttl"` + ThinCacheMaxEntries int `yaml:"thin_cache_max_entries"` ListenAddr string `yaml:"listen_addr"` MirrorPrivateKeyPath string `yaml:"mirror_private_key_path"` PollInterval time.Duration `yaml:"poll_interval"` @@ -61,6 +64,8 @@ func Default() Config { VerifyWorkers: runtime.NumCPU(), ExportPageSize: 1000, ReplayTrace: false, + ThinCacheTTL: 24 * time.Hour, + ThinCacheMaxEntries: 100000, ListenAddr: ":8080", MirrorPrivateKeyPath: "./mirror.key", PollInterval: 5 * time.Second, @@ -153,6 +158,8 @@ func applyEnv(cfg *Config) { setInt("PLUTIA_VERIFY_WORKERS", &cfg.VerifyWorkers) setInt("PLUTIA_EXPORT_PAGE_SIZE", &cfg.ExportPageSize) setBool("PLUTIA_REPLAY_TRACE", &cfg.ReplayTrace) + setDuration("PLUTIA_THIN_CACHE_TTL", &cfg.ThinCacheTTL) + setInt("PLUTIA_THIN_CACHE_MAX_ENTRIES", &cfg.ThinCacheMaxEntries) setUint64("PLUTIA_CHECKPOINT_INTERVAL", &cfg.CheckpointInterval) setString("PLUTIA_LISTEN_ADDR", &cfg.ListenAddr) setString("PLUTIA_MIRROR_PRIVATE_KEY_PATH", &cfg.MirrorPrivateKeyPath) @@ -168,7 +175,7 @@ func applyEnv(cfg *Config) { } func (c Config) Validate() error { - if c.Mode != ModeResolver && c.Mode != ModeMirror { + if c.Mode != ModeResolver && c.Mode != ModeMirror && c.Mode != ModeThin { return fmt.Errorf("invalid mode %q", c.Mode) } @@ -210,6 +217,14 @@ func (c Config) Validate() error { return fmt.Errorf("export_page_size must be between 1 and 1000, got %d", c.ExportPageSize) } + if c.ThinCacheTTL <= 0 { + return errors.New("thin_cache_ttl must be > 0") + } + + if c.ThinCacheMaxEntries <= 0 { + return errors.New("thin_cache_max_entries must be > 0") + } + if c.ListenAddr == "" { return errors.New("listen_addr is required") } diff --git a/internal/config/config_thin_test.go b/internal/config/config_thin_test.go new file mode 100644 index 0000000..ff549e1 --- /dev/null +++ b/internal/config/config_thin_test.go @@ -0,0 +1,35 @@ +package config + +import ( + "testing" + "time" +) + +func TestValidateAcceptsThinMode(t *testing.T) { + cfg := Default() + cfg.Mode = ModeThin + cfg.ThinCacheTTL = 24 * time.Hour + cfg.ThinCacheMaxEntries = 10 + + if err := cfg.Validate(); err != nil { + t.Fatalf("validate thin mode: %v", err) + } +} + +func TestValidateRejectsInvalidThinCacheConfig(t *testing.T) { + cfg := Default() + cfg.Mode = ModeThin + cfg.ThinCacheTTL = 0 + + if err := cfg.Validate(); err == nil { + t.Fatalf("expected thin_cache_ttl validation error") + } + + cfg = Default() + cfg.Mode = ModeThin + cfg.ThinCacheMaxEntries = 0 + + if err := cfg.Validate(); err == nil { + t.Fatalf("expected thin_cache_max_entries validation error") + } +} diff --git a/internal/ingest/client.go b/internal/ingest/client.go index eba6fd0..bdef942 100644 --- a/internal/ingest/client.go +++ b/internal/ingest/client.go @@ -26,6 +26,13 @@ type Client struct { opts ClientOptions } +type didLogEntry struct { + Operation json.RawMessage `json:"operation"` + CID string `json:"cid,omitempty"` + CreatedAt string `json:"createdAt,omitempty"` + Nullified bool `json:"nullified,omitempty"` +} + type ClientOptions struct { MaxAttempts int BaseDelay time.Duration @@ -116,6 +123,131 @@ func (c *Client) FetchExport(ctx context.Context, after uint64) ([]types.ExportR return c.FetchExportLimited(ctx, after, 0) } +func (c *Client) FetchDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { + if strings.TrimSpace(did) == "" { + return nil, fmt.Errorf("did is required") + } + + u, err := url.Parse(c.source) + if err != nil { + return nil, fmt.Errorf("parse plc source: %w", err) + } + + u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + req.Header.Set("Accept", "application/json") + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch did log: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { + return nil, ErrDIDNotFound + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) + return nil, fmt.Errorf("did log response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + + rawBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read did log body: %w", err) + } + + var wrapped []didLogEntry + if err := json.Unmarshal(rawBody, &wrapped); err == nil && len(wrapped) > 0 && len(bytes.TrimSpace(wrapped[0].Operation)) > 0 { + records := make([]types.ExportRecord, 0, len(wrapped)) + for i, entry := range wrapped { + records = append(records, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + CID: entry.CID, + CreatedAt: entry.CreatedAt, + Nullified: entry.Nullified, + Operation: entry.Operation, + }) + } + + return records, nil + } + + var ops []json.RawMessage + if err := json.Unmarshal(rawBody, &ops); err != nil { + return nil, fmt.Errorf("decode did log: %w", err) + } + + records := make([]types.ExportRecord, 0, len(ops)) + for i, op := range ops { + records = append(records, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + Operation: op, + }) + } + + auditRecords, err := c.fetchDIDAudit(ctx, did) + if err != nil { + return nil, fmt.Errorf("did log missing CIDs and audit fallback failed: %w", err) + } + + if len(auditRecords) != len(records) { + return nil, fmt.Errorf("did log/audit length mismatch: log=%d audit=%d", len(records), len(auditRecords)) + } + + for i := range records { + records[i].CID = strings.TrimSpace(auditRecords[i].CID) + if records[i].CID == "" { + return nil, fmt.Errorf("missing cid in did audit entry index %d", i) + } + + records[i].CreatedAt = auditRecords[i].CreatedAt + records[i].Nullified = auditRecords[i].Nullified + } + + return records, nil +} + +func (c *Client) fetchDIDAudit(ctx context.Context, did string) ([]didLogEntry, error) { + u, err := url.Parse(c.source) + if err != nil { + return nil, fmt.Errorf("parse plc source: %w", err) + } + + u.Path = strings.TrimRight(u.Path, "/") + "/" + did + "/log/audit" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, fmt.Errorf("new request: %w", err) + } + + resp, err := c.http.Do(req) + if err != nil { + return nil, fmt.Errorf("fetch did audit log: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone { + return nil, ErrDIDNotFound + } + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4*1024)) + return nil, fmt.Errorf("did audit response %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) + } + + var entries []didLogEntry + if err := json.NewDecoder(resp.Body).Decode(&entries); err != nil { + return nil, fmt.Errorf("decode did audit log: %w", err) + } + + return entries, nil +} + func (c *Client) FetchExportLimited(ctx context.Context, after uint64, limit uint64) ([]types.ExportRecord, error) { pageSize := uint64(1000) batch, err := c.FetchExportBatch(ctx, after, limit, pageSize, nil) diff --git a/internal/ingest/client_test.go b/internal/ingest/client_test.go index 50595ec..2552855 100644 --- a/internal/ingest/client_test.go +++ b/internal/ingest/client_test.go @@ -1,6 +1,10 @@ package ingest import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" "strings" "testing" ) @@ -38,3 +42,62 @@ func TestDecodeExportBody_FailsOnMalformedNonTrailingNDJSONLine(t *testing.T) { t.Fatalf("expected malformed middle line to fail") } } + +func TestFetchDIDLog_UsesAuditFallbackForCIDs(t *testing.T) { + const did = "did:plc:alice" + op1 := json.RawMessage(`{"did":"did:plc:alice","sig":"x","sigPayload":"e30","publicKey":"a"}`) + op2 := json.RawMessage(`{"did":"did:plc:alice","prev":"bafy-one","sig":"x","sigPayload":"e30","publicKey":"a"}`) + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/" + did + "/log": + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode([]json.RawMessage{op1, op2}) + case "/" + did + "/log/audit": + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode([]map[string]any{ + {"did": did, "operation": op1, "cid": "bafy-one", "createdAt": "2026-02-27T00:00:00Z"}, + {"did": did, "operation": op2, "cid": "bafy-two", "createdAt": "2026-02-27T00:00:01Z"}, + }) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := NewClient(srv.URL) + records, err := client.FetchDIDLog(context.Background(), did) + if err != nil { + t.Fatalf("fetch did log: %v", err) + } + + if len(records) != 2 { + t.Fatalf("record count mismatch: got %d want 2", len(records)) + } + + if records[0].CID != "bafy-one" || records[1].CID != "bafy-two" { + t.Fatalf("cid fallback mismatch: got [%s %s]", records[0].CID, records[1].CID) + } +} + +func TestFetchDIDLog_FailsWhenCIDsUnavailable(t *testing.T) { + const did = "did:plc:bob" + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/" + did + "/log": + _ = json.NewEncoder(w).Encode([]json.RawMessage{json.RawMessage(`{"did":"did:plc:bob"}`)}) + case "/" + did + "/log/audit": + http.NotFound(w, r) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := NewClient(srv.URL) + _, err := client.FetchDIDLog(context.Background(), did) + if err == nil { + t.Fatalf("expected cid fallback error") + } +} diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 1c95068..873204f 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -32,6 +32,10 @@ type Stats struct { DIDCount uint64 `json:"did_count"` CheckpointSeq uint64 `json:"checkpoint_sequence"` IngestOpsPerSec float64 `json:"ingest_ops_per_second"` + ThinCacheHits uint64 `json:"thin_cache_hits"` + ThinCacheMisses uint64 `json:"thin_cache_misses"` + ThinCacheCount uint64 `json:"thin_cache_entries"` + ThinEvictions uint64 `json:"thin_cache_evictions"` } type Service struct { @@ -85,6 +89,9 @@ func NewService(cfg config.Config, store storage.Store, client *Client, blockLog if didCount, err := countStates(store); err == nil { atomic.StoreUint64(&s.stats.DIDCount, didCount) + if cfg.Mode == config.ModeThin { + atomic.StoreUint64(&s.stats.ThinCacheCount, didCount) + } } if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { @@ -121,6 +128,12 @@ func (s *Service) Replay(ctx context.Context) error { } func (s *Service) Poll(ctx context.Context) error { + if s.cfg.Mode == config.ModeThin { + <-ctx.Done() + + return ctx.Err() + } + ticker := time.NewTicker(s.cfg.PollInterval) defer ticker.Stop() @@ -143,6 +156,10 @@ func (s *Service) Poll(ctx context.Context) error { } func (s *Service) RunOnce(ctx context.Context) (bool, error) { + if s.cfg.Mode == config.ModeThin { + return false, nil + } + if s.startupErr != nil { return false, s.startupErr } @@ -956,6 +973,12 @@ func (s *Service) VerifyDID(ctx context.Context, did string) error { return err } + if s.cfg.Mode == config.ModeThin { + _, _, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + + return err + } + if s.cfg.Mode != config.ModeMirror { _, ok, err := s.store.GetState(did) @@ -1107,11 +1130,227 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen return tip, filtered, nil } +func (s *Service) ResolveState(ctx context.Context, did string) (types.StateV1, error) { + if err := s.CorruptionError(); err != nil { + return types.StateV1{}, err + } + + if s.cfg.Mode != config.ModeThin { + stateVal, ok, err := s.store.GetState(did) + if err != nil { + return types.StateV1{}, err + } + + if !ok { + return types.StateV1{}, ErrDIDNotFound + } + + return stateVal, nil + } + + return s.resolveThinState(ctx, did) +} + +func (s *Service) resolveThinState(ctx context.Context, did string) (types.StateV1, error) { + now := time.Now().UTC() + stateVal, stateOK, err := s.store.GetState(did) + if err != nil { + return types.StateV1{}, err + } + + meta, metaOK, err := s.store.GetThinCacheMeta(did) + if err != nil { + return types.StateV1{}, err + } + + if stateOK && metaOK && isThinCacheFresh(meta, now, s.cfg.ThinCacheTTL) { + atomic.AddUint64(&s.stats.ThinCacheHits, 1) + + meta.LastAccess = now + + if err := s.store.PutThinCacheMeta(meta); err != nil { + return types.StateV1{}, err + } + + return stateVal, nil + } + + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + + verifiedState, _, err := s.refreshThinStateFromUpstream(ctx, did, now) + if err != nil { + return types.StateV1{}, err + } + + return verifiedState, nil +} + +func (s *Service) refreshThinStateFromUpstream(ctx context.Context, did string, now time.Time) (types.StateV1, []types.ExportRecord, error) { + records, err := s.client.FetchDIDLog(ctx, did) + if err != nil { + return types.StateV1{}, nil, err + } + + stateVal, normalized, err := s.verifyThinChain(ctx, did, records) + if err != nil { + return types.StateV1{}, nil, err + } + + stateVal.UpdatedAt = now + + if err := s.store.PutState(stateVal); err != nil { + return types.StateV1{}, nil, err + } + + meta := types.ThinCacheMetaV1{ + Version: 1, + DID: did, + LastVerified: now, + LastAccess: now, + } + if err := s.store.PutThinCacheMeta(meta); err != nil { + return types.StateV1{}, nil, err + } + + evicted, entries, err := s.enforceThinCacheLimit() + if err != nil { + return types.StateV1{}, nil, err + } + + if evicted > 0 { + atomic.AddUint64(&s.stats.ThinEvictions, evicted) + } + + atomic.StoreUint64(&s.stats.ThinCacheCount, entries) + atomic.StoreUint64(&s.stats.DIDCount, entries) + + return stateVal, normalized, nil +} + +func (s *Service) verifyThinChain(ctx context.Context, did string, records []types.ExportRecord) (types.StateV1, []types.ExportRecord, error) { + if len(records) == 0 { + return types.StateV1{}, nil, ErrDIDNotFound + } + + verifier := verify.New(config.VerifyFull) + var previous *types.StateV1 + normalized := make([]types.ExportRecord, 0, len(records)) + + for i, rec := range records { + if err := ctx.Err(); err != nil { + return types.StateV1{}, nil, err + } + + rec.Seq = uint64(i + 1) + + if rec.DID == "" { + rec.DID = did + } + + if rec.DID != did { + return types.StateV1{}, nil, fmt.Errorf("unexpected DID in log: got %s want %s", rec.DID, did) + } + + op, err := types.ParseOperation(rec) + if err != nil { + return types.StateV1{}, nil, err + } + + if err := verifier.VerifyOperation(op, previous); err != nil { + return types.StateV1{}, nil, err + } + + next, err := state.ComputeNextState(op, previous) + if err != nil { + return types.StateV1{}, nil, err + } + + normalized = append(normalized, types.ExportRecord{ + Seq: rec.Seq, + DID: did, + CreatedAt: rec.CreatedAt, + CID: op.CID, + Nullified: rec.Nullified, + Operation: op.CanonicalBytes, + }) + previous = &next + } + + if previous == nil { + return types.StateV1{}, nil, ErrDIDNotFound + } + + return *previous, normalized, nil +} + +func isThinCacheFresh(meta types.ThinCacheMetaV1, now time.Time, ttl time.Duration) bool { + if meta.LastVerified.IsZero() { + return false + } + + return now.Sub(meta.LastVerified) <= ttl +} + +func (s *Service) enforceThinCacheLimit() (uint64, uint64, error) { + maxEntries := s.cfg.ThinCacheMaxEntries + if maxEntries <= 0 { + maxEntries = config.Default().ThinCacheMaxEntries + } + + entries, err := s.store.ListThinCacheMeta() + if err != nil { + return 0, 0, err + } + + if len(entries) <= maxEntries { + return 0, uint64(len(entries)), nil + } + + sort.Slice(entries, func(i, j int) bool { + left := entries[i] + right := entries[j] + + if !left.LastAccess.Equal(right.LastAccess) { + return left.LastAccess.Before(right.LastAccess) + } + + if !left.LastVerified.Equal(right.LastVerified) { + return left.LastVerified.Before(right.LastVerified) + } + + return left.DID < right.DID + }) + + var evicted uint64 + for idx := 0; idx < len(entries)-maxEntries; idx++ { + did := entries[idx].DID + + if err := s.store.DeleteState(did); err != nil { + return evicted, 0, err + } + + if err := s.store.DeleteThinCacheMeta(did); err != nil { + return evicted, 0, err + } + + evicted++ + } + + return evicted, uint64(len(entries)) - evicted, nil +} + func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { if err := s.CorruptionError(); err != nil { return nil, err } + if s.cfg.Mode == config.ModeThin { + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + + return records, err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return nil, ErrHistoryNotStored } @@ -1167,6 +1406,21 @@ func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types return types.ExportRecord{}, err } + if s.cfg.Mode == config.ModeThin { + atomic.AddUint64(&s.stats.ThinCacheMisses, 1) + + _, records, err := s.refreshThinStateFromUpstream(ctx, did, time.Now().UTC()) + if err != nil { + return types.ExportRecord{}, err + } + + if len(records) == 0 { + return types.ExportRecord{}, ErrDIDNotFound + } + + return records[len(records)-1], nil + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { return types.ExportRecord{}, ErrHistoryNotStored } @@ -1218,6 +1472,12 @@ func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[strin return nil, err } + if s.cfg.Mode == config.ModeThin { + if _, err := s.ResolveState(ctx, did); err != nil { + return nil, err + } + } + stateVal, stateOK, err := s.store.GetState(did) if err != nil { @@ -1711,6 +1971,10 @@ func (s *Service) Stats() Stats { DIDCount: atomic.LoadUint64(&s.stats.DIDCount), CheckpointSeq: atomic.LoadUint64(&s.stats.CheckpointSeq), IngestOpsPerSec: opsPerSec, + ThinCacheHits: atomic.LoadUint64(&s.stats.ThinCacheHits), + ThinCacheMisses: atomic.LoadUint64(&s.stats.ThinCacheMisses), + ThinCacheCount: atomic.LoadUint64(&s.stats.ThinCacheCount), + ThinEvictions: atomic.LoadUint64(&s.stats.ThinEvictions), } } diff --git a/internal/ingest/service_thin_test.go b/internal/ingest/service_thin_test.go new file mode 100644 index 0000000..850433f --- /dev/null +++ b/internal/ingest/service_thin_test.go @@ -0,0 +1,323 @@ +package ingest + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +func TestThinResolveStateCacheTTL(t *testing.T) { + did := "did:plc:thin-a" + logRecords := signedDIDLog(t, did, 2) + upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: logRecords}) + defer upstream.close() + + store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10) + defer cleanup() + + if _, err := svc.ResolveState(context.Background(), did); err != nil { + t.Fatalf("first resolve: %v", err) + } + + stats := svc.Stats() + if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 0 { + t.Fatalf("unexpected stats after first resolve: %+v", stats) + } + + if _, err := svc.ResolveState(context.Background(), did); err != nil { + t.Fatalf("second resolve: %v", err) + } + + stats = svc.Stats() + if stats.ThinCacheMisses != 1 || stats.ThinCacheHits != 1 { + t.Fatalf("unexpected stats after second resolve: %+v", stats) + } + + if got := upstream.hitCount(did); got != 1 { + t.Fatalf("upstream calls mismatch after cache hit: got %d want 1", got) + } + + meta, ok, err := store.GetThinCacheMeta(did) + if err != nil || !ok { + t.Fatalf("load thin meta: ok=%v err=%v", ok, err) + } + + meta.LastVerified = time.Now().UTC().Add(-2 * time.Hour) + if err := store.PutThinCacheMeta(meta); err != nil { + t.Fatalf("expire meta: %v", err) + } + + if _, err := svc.ResolveState(context.Background(), did); err != nil { + t.Fatalf("third resolve: %v", err) + } + + stats = svc.Stats() + if stats.ThinCacheMisses != 2 || stats.ThinCacheHits != 1 { + t.Fatalf("unexpected stats after ttl refresh: %+v", stats) + } + + if got := upstream.hitCount(did); got != 2 { + t.Fatalf("upstream calls mismatch after ttl refresh: got %d want 2", got) + } +} + +func TestThinResolveStateEvictsLRU(t *testing.T) { + didA := "did:plc:thin-1" + didB := "did:plc:thin-2" + didC := "did:plc:thin-3" + upstream := newThinUpstream(t, map[string][]types.ExportRecord{ + didA: signedDIDLog(t, didA, 1), + didB: signedDIDLog(t, didB, 1), + didC: signedDIDLog(t, didC, 1), + }) + defer upstream.close() + + store, svc, cleanup := newThinService(t, upstream.url, 24*time.Hour, 2) + defer cleanup() + + for _, did := range []string{didA, didB} { + if _, err := svc.ResolveState(context.Background(), did); err != nil { + t.Fatalf("resolve %s: %v", did, err) + } + } + + if _, err := svc.ResolveState(context.Background(), didA); err != nil { + t.Fatalf("refresh didA access: %v", err) + } + + if _, err := svc.ResolveState(context.Background(), didC); err != nil { + t.Fatalf("resolve didC: %v", err) + } + + if _, ok, err := store.GetState(didB); err != nil || ok { + t.Fatalf("didB expected eviction: ok=%v err=%v", ok, err) + } + + for _, did := range []string{didA, didC} { + if _, ok, err := store.GetState(did); err != nil || !ok { + t.Fatalf("did %s expected cached: ok=%v err=%v", did, ok, err) + } + } + + stats := svc.Stats() + if stats.ThinEvictions != 1 { + t.Fatalf("evictions mismatch: got %d want 1", stats.ThinEvictions) + } + + if stats.ThinCacheCount != 2 || stats.DIDCount != 2 { + t.Fatalf("cache count mismatch: %+v", stats) + } +} + +func TestThinResolveStateRejectsInvalidChain(t *testing.T) { + did := "did:plc:thin-invalid" + ops := signedDIDLog(t, did, 2) + + var second map[string]any + if err := json.Unmarshal(ops[1].Operation, &second); err != nil { + t.Fatalf("decode op: %v", err) + } + + second["prev"] = "sha256:invalid" + rawSecond, _ := json.Marshal(second) + ops[1].Operation = rawSecond + ops[1].CID = "" + + upstream := newThinUpstream(t, map[string][]types.ExportRecord{did: ops}) + defer upstream.close() + + store, svc, cleanup := newThinService(t, upstream.url, time.Hour, 10) + defer cleanup() + + if _, err := svc.ResolveState(context.Background(), did); err == nil { + t.Fatalf("expected invalid chain error") + } + + if _, ok, err := store.GetState(did); err != nil || ok { + t.Fatalf("invalid chain should not be cached: ok=%v err=%v", ok, err) + } +} + +func newThinService(t *testing.T, source string, ttl time.Duration, maxEntries int) (*storage.PebbleStore, *Service, func()) { + t.Helper() + + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + + if err := store.SetMode(config.ModeThin); err != nil { + t.Fatalf("set mode: %v", err) + } + + cfg := config.Default() + cfg.Mode = config.ModeThin + cfg.DataDir = dataDir + cfg.PLCSource = source + cfg.ThinCacheTTL = ttl + cfg.ThinCacheMaxEntries = maxEntries + cfg.VerifyPolicy = config.VerifyFull + + svc := NewService(cfg, store, NewClient(source), nil, nil) + cleanup := func() { + svc.Close() + _ = store.Close() + } + + return store, svc, cleanup +} + +type thinUpstream struct { + t *testing.T + url string + srv *httptest.Server + mu sync.Mutex + logs map[string][]types.ExportRecord + hits map[string]int +} + +func newThinUpstream(t *testing.T, logs map[string][]types.ExportRecord) *thinUpstream { + t.Helper() + + u := &thinUpstream{ + t: t, + logs: logs, + hits: map[string]int{}, + } + + u.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + path := strings.TrimPrefix(r.URL.Path, "/") + if !strings.HasSuffix(path, "/log") { + w.WriteHeader(http.StatusNotFound) + return + } + + did := strings.TrimSuffix(path, "/log") + did = strings.TrimSpace(did) + if did == "" { + w.WriteHeader(http.StatusNotFound) + return + } + + u.mu.Lock() + u.hits[did]++ + records, ok := u.logs[did] + u.mu.Unlock() + if !ok { + w.WriteHeader(http.StatusNotFound) + return + } + + type entry struct { + Operation json.RawMessage `json:"operation"` + CID string `json:"cid"` + CreatedAt string `json:"createdAt"` + Nullified bool `json:"nullified"` + } + out := make([]entry, 0, len(records)) + for _, rec := range records { + out = append(out, entry{ + Operation: rec.Operation, + CID: rec.CID, + CreatedAt: rec.CreatedAt, + Nullified: rec.Nullified, + }) + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(out) + })) + u.url = u.srv.URL + + return u +} + +func (u *thinUpstream) hitCount(did string) int { + u.mu.Lock() + defer u.mu.Unlock() + + return u.hits[did] +} + +func (u *thinUpstream) close() { + u.srv.Close() +} + +func signedDIDLog(t *testing.T, did string, length int) []types.ExportRecord { + t.Helper() + + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + + records := make([]types.ExportRecord, 0, length) + prev := "" + + for i := 0; i < length; i++ { + unsigned := map[string]any{ + "did": did, + "didDoc": map[string]any{"id": did, "seq": i + 1}, + "publicKey": base64.RawURLEncoding.EncodeToString(pub), + } + if prev != "" { + unsigned["prev"] = prev + } + + payload, _ := json.Marshal(unsigned) + canon, _ := types.CanonicalizeJSON(payload) + sig := ed25519.Sign(priv, canon) + + op := map[string]any{} + for k, v := range unsigned { + op[k] = v + } + op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon) + op["sig"] = base64.RawURLEncoding.EncodeToString(sig) + + raw, _ := json.Marshal(op) + parsed, err := types.ParseOperation(types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + Operation: raw, + }) + if err != nil { + t.Fatalf("parse operation: %v", err) + } + + prev = parsed.CID + records = append(records, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + CID: parsed.CID, + CreatedAt: time.Now().UTC().Add(time.Duration(i) * time.Second).Format(time.RFC3339), + Operation: raw, + }) + } + + return records +} diff --git a/internal/ingest/thin_disk_test.go b/internal/ingest/thin_disk_test.go new file mode 100644 index 0000000..caa1a04 --- /dev/null +++ b/internal/ingest/thin_disk_test.go @@ -0,0 +1,181 @@ +package ingest + +import ( + "context" + "encoding/json" + "fmt" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" + "math/rand" + "os" + "path/filepath" + "testing" + "time" +) + +func TestThinDiskFootprintFor1000DIDs(t *testing.T) { + const didCount = 1000 + + dids := make([]string, 0, didCount) + recordsByDID := make(map[string][]types.ExportRecord, didCount) + mirrorRecords := make([]types.ExportRecord, 0, didCount) + + for i := 0; i < didCount; i++ { + did := fmt.Sprintf("did:plc:thin-%04d", i) + dids = append(dids, did) + + logRecords := signedDIDLog(t, did, 1) + recordsByDID[did] = logRecords + mirrorRecords = append(mirrorRecords, types.ExportRecord{ + Seq: uint64(i + 1), + DID: did, + CreatedAt: logRecords[0].CreatedAt, + CID: logRecords[0].CID, + Operation: logRecords[0].Operation, + }) + } + + rng := rand.New(rand.NewSource(42)) + rng.Shuffle(len(dids), func(i, j int) { dids[i], dids[j] = dids[j], dids[i] }) + + upstream := newThinUpstream(t, recordsByDID) + defer upstream.close() + + thinDir := filepath.Join(t.TempDir(), "thin-data") + if err := os.MkdirAll(thinDir, 0o755); err != nil { + t.Fatalf("mkdir thin dir: %v", err) + } + + thinStore, err := storage.OpenPebble(thinDir) + if err != nil { + t.Fatalf("open thin pebble: %v", err) + } + defer thinStore.Close() + + if err := thinStore.SetMode(config.ModeThin); err != nil { + t.Fatalf("set thin mode: %v", err) + } + + thinCfg := config.Default() + thinCfg.Mode = config.ModeThin + thinCfg.DataDir = thinDir + thinCfg.PLCSource = upstream.url + thinCfg.VerifyPolicy = config.VerifyFull + thinCfg.ThinCacheTTL = 24 * time.Hour + thinCfg.ThinCacheMaxEntries = didCount + 10 + + thinSvc := NewService(thinCfg, thinStore, NewClient(upstream.url), nil, nil) + defer thinSvc.Close() + + for _, did := range dids { + if _, err := thinSvc.ResolveState(context.Background(), did); err != nil { + t.Fatalf("thin resolve %s: %v", did, err) + } + } + + thinSize, err := dirSizeForTest(thinDir) + if err != nil { + t.Fatalf("thin dir size: %v", err) + } + + mirrorDir := filepath.Join(t.TempDir(), "mirror-data") + if err := os.MkdirAll(mirrorDir, 0o755); err != nil { + t.Fatalf("mkdir mirror dir: %v", err) + } + + mirrorStore, err := storage.OpenPebble(mirrorDir) + if err != nil { + t.Fatalf("open mirror pebble: %v", err) + } + defer mirrorStore.Close() + + if err := mirrorStore.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mirror mode: %v", err) + } + + mirrorLog, err := storage.OpenBlockLog(mirrorDir, 9, 4) + if err != nil { + t.Fatalf("open mirror blocklog: %v", err) + } + + exportPath := filepath.Join(mirrorDir, "mirror-source.ndjson") + f, err := os.Create(exportPath) + if err != nil { + t.Fatalf("create mirror source: %v", err) + } + + enc := json.NewEncoder(f) + for _, rec := range mirrorRecords { + if err := enc.Encode(rec); err != nil { + _ = f.Close() + t.Fatalf("encode mirror source: %v", err) + } + } + + if err := f.Close(); err != nil { + t.Fatalf("close mirror source: %v", err) + } + + mirrorCfg := config.Default() + mirrorCfg.Mode = config.ModeMirror + mirrorCfg.DataDir = mirrorDir + mirrorCfg.PLCSource = exportPath + mirrorCfg.VerifyPolicy = config.VerifyFull + mirrorCfg.ZstdLevel = 9 + mirrorCfg.BlockSizeMB = 4 + mirrorCfg.CommitBatchSize = 128 + mirrorCfg.VerifyWorkers = 8 + mirrorCfg.CheckpointInterval = 1000000 + + mirrorSvc := NewService(mirrorCfg, mirrorStore, NewClient(exportPath), mirrorLog, nil) + defer mirrorSvc.Close() + + if err := mirrorSvc.Replay(context.Background()); err != nil { + t.Fatalf("mirror replay: %v", err) + } + + if err := mirrorSvc.Flush(context.Background()); err != nil { + t.Fatalf("mirror flush: %v", err) + } + + mirrorSize, err := dirSizeForTest(mirrorDir) + if err != nil { + t.Fatalf("mirror dir size: %v", err) + } + + if thinSize >= mirrorSize { + t.Fatalf("expected thin footprint smaller than mirror footprint: thin=%d mirror=%d", thinSize, mirrorSize) + } + + t.Logf("thin_cache_total_bytes=%d did_count=%d bytes_per_did=%.2f", thinSize, didCount, float64(thinSize)/didCount) + t.Logf("mirror_total_bytes=%d op_count=%d bytes_per_op=%.2f", mirrorSize, didCount, float64(mirrorSize)/didCount) +} + +func dirSizeForTest(path string) (int64, error) { + var total int64 + + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + info, err := d.Info() + if err != nil { + return err + } + + total += info.Size() + + return nil + }) + if err != nil { + return 0, err + } + + return total, nil +} diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go index b0a8e01..11e2dad 100644 --- a/internal/storage/pebble_store.go +++ b/internal/storage/pebble_store.go @@ -71,6 +71,27 @@ func (p *PebbleStore) PutState(state types.StateV1) error { return p.db.Set(didKey(state.DID), b, pebble.Sync) } +func (p *PebbleStore) DeleteState(did string) error { + batch := p.db.NewBatch() + defer batch.Close() + + if err := batch.Delete(didKey(did), nil); err != nil { + return err + } + + if err := batch.Delete(chainKey(did), nil); err != nil { + return err + } + + prefix := []byte("didop:" + did + ":") + upper := append(append([]byte(nil), prefix...), 0xFF) + if err := batch.DeleteRange(prefix, upper, nil); err != nil { + return err + } + + return batch.Commit(pebble.Sync) +} + func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error { var opRef *types.BlockRefV1 @@ -141,6 +162,57 @@ func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error { return nil } +func (p *PebbleStore) PutThinCacheMeta(meta types.ThinCacheMetaV1) error { + b, err := json.Marshal(meta) + if err != nil { + return fmt.Errorf("marshal thin cache meta: %w", err) + } + + return p.db.Set(thinMetaKey(meta.DID), b, pebble.Sync) +} + +func (p *PebbleStore) GetThinCacheMeta(did string) (types.ThinCacheMetaV1, bool, error) { + b, ok, err := p.getBytes(thinMetaKey(did)) + if err != nil || !ok { + return types.ThinCacheMetaV1{}, ok, err + } + + var meta types.ThinCacheMetaV1 + if err := json.Unmarshal(b, &meta); err != nil { + return types.ThinCacheMetaV1{}, false, fmt.Errorf("unmarshal thin cache meta: %w", err) + } + + return meta, true, nil +} + +func (p *PebbleStore) ListThinCacheMeta() ([]types.ThinCacheMetaV1, error) { + iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("thinmeta:"), UpperBound: []byte("thinmeta;")}) + if err != nil { + return nil, fmt.Errorf("new iterator: %w", err) + } + defer iter.Close() + + out := make([]types.ThinCacheMetaV1, 0) + for iter.First(); iter.Valid(); iter.Next() { + var meta types.ThinCacheMetaV1 + if err := json.Unmarshal(iter.Value(), &meta); err != nil { + return nil, fmt.Errorf("unmarshal thin cache meta: %w", err) + } + + out = append(out, meta) + } + + if err := iter.Error(); err != nil { + return nil, fmt.Errorf("iterate thin cache meta: %w", err) + } + + return out, nil +} + +func (p *PebbleStore) DeleteThinCacheMeta(did string) error { + return p.db.Delete(thinMetaKey(did), pebble.Sync) +} + func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error { if len(ops) == 0 && len(blockHashes) == 0 { return nil @@ -484,6 +556,8 @@ func didOpKey(did string, seq uint64) []byte { return append([]byte("didop:"+did+":"), u64bytes(seq)...) } +func thinMetaKey(did string) []byte { return []byte("thinmeta:" + did) } + func u64bytes(v uint64) []byte { b := make([]byte, 8) diff --git a/internal/storage/store.go b/internal/storage/store.go index 5267386..456e758 100644 --- a/internal/storage/store.go +++ b/internal/storage/store.go @@ -19,9 +19,14 @@ type Store interface { GetGlobalSeq() (uint64, error) SetGlobalSeq(seq uint64) error PutState(state types.StateV1) error + DeleteState(did string) error GetState(did string) (types.StateV1, bool, error) ListStates() ([]types.StateV1, error) ForEachState(fn func(types.StateV1) error) error + PutThinCacheMeta(meta types.ThinCacheMetaV1) error + GetThinCacheMeta(did string) (types.ThinCacheMetaV1, bool, error) + ListThinCacheMeta() ([]types.ThinCacheMetaV1, error) + DeleteThinCacheMeta(did string) error ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error SetChainHead(did string, seq uint64) error GetChainHead(did string) (uint64, bool, error) diff --git a/internal/types/state.go b/internal/types/state.go index 31b9372..3f97c34 100644 --- a/internal/types/state.go +++ b/internal/types/state.go @@ -44,3 +44,11 @@ type CheckpointV1 struct { Signature string `json:"signature"` CheckpointHash string `json:"checkpoint_hash"` } + +// ThinCacheMetaV1 tracks thin-mode cache freshness and eviction order. +type ThinCacheMetaV1 struct { + Version uint8 `json:"v"` + DID string `json:"did"` + LastVerified time.Time `json:"last_verified"` + LastAccess time.Time `json:"last_access"` +} |