diff options
| author | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
| commit | fec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch) | |
| tree | 0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /internal/api | |
| parent | feat: add read-only PLC API compatibility endpoints (diff) | |
| download | plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip | |
feat: Apply Iku formatting
Diffstat (limited to 'internal/api')
| -rw-r--r-- | internal/api/observability.go | 48 | ||||
| -rw-r--r-- | internal/api/plc_compatibility_test.go | 61 | ||||
| -rw-r--r-- | internal/api/server.go | 186 | ||||
| -rw-r--r-- | internal/api/server_checkpoint_test.go | 9 | ||||
| -rw-r--r-- | internal/api/server_hardening_test.go | 25 | ||||
| -rw-r--r-- | internal/api/server_integration_test.go | 65 |
6 files changed, 385 insertions, 9 deletions
diff --git a/internal/api/observability.go b/internal/api/observability.go index ebd7711..4654b28 100644 --- a/internal/api/observability.go +++ b/internal/api/observability.go @@ -8,7 +8,6 @@ import ( "strings" "sync" "time" - "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" @@ -32,9 +31,11 @@ type serverMetrics struct { func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.Service) *serverMetrics { reg := prometheus.NewRegistry() + var diskMu sync.Mutex var diskCached int64 var diskCachedAt time.Time + m := &serverMetrics{ registry: reg, checkpointDuration: prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -47,6 +48,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S Help: "Latest checkpoint sequence generated by this mirror.", }), } + reg.MustRegister(m.checkpointDuration, m.checkpointSequence) reg.MustRegister(prometheus.NewCounterFunc( prometheus.CounterOpts{ @@ -55,9 +57,11 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S }, func() float64 { seq, err := store.GetGlobalSeq() + if err != nil { return 0 } + return float64(seq) }, )) @@ -70,6 +74,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return ingestor.Stats().IngestOpsPerSec }, )) @@ -82,6 +87,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return float64(ingestor.Stats().LagOps) }, )) @@ -94,6 +100,7 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor == nil { return 0 } + return float64(ingestor.Stats().VerifyFailures) }, )) @@ -104,14 +111,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S }, func() float64 { diskMu.Lock() + defer diskMu.Unlock() + if diskCachedAt.IsZero() || time.Since(diskCachedAt) > 5*time.Second { size, err := dirSize(cfg.DataDir) + if err == nil { diskCached = size diskCachedAt = time.Now() } } + return float64(diskCached) }, )) @@ -124,14 +135,18 @@ func newServerMetrics(cfg config.Config, store storage.Store, ingestor *ingest.S if ingestor != nil { return float64(ingestor.Stats().DIDCount) } + count := uint64(0) _ = store.ForEachState(func(_ types.StateV1) error { count++ + return nil }) + return float64(count) }, )) + return m } @@ -182,18 +197,23 @@ type ipRateLimiter struct { func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { def := config.Default().RateLimit + if cfg.ResolveRPS <= 0 { cfg.ResolveRPS = def.ResolveRPS } + if cfg.ResolveBurst <= 0 { cfg.ResolveBurst = def.ResolveBurst } + if cfg.ProofRPS <= 0 { cfg.ProofRPS = def.ProofRPS } + if cfg.ProofBurst <= 0 { cfg.ProofBurst = def.ProofBurst } + return &ipRateLimiter{ buckets: map[string]*tokenBucket{}, resolve: endpointPolicy{ @@ -210,7 +230,9 @@ func newIPRateLimiter(cfg config.RateLimit) *ipRateLimiter { func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { now := time.Now() + l.mu.Lock() + defer l.mu.Unlock() if now.Sub(l.lastSweep) > 2*time.Minute { @@ -219,75 +241,99 @@ func (l *ipRateLimiter) Allow(ip string, class limiterClass) bool { delete(l.buckets, key) } } + l.lastSweep = now } policy := l.resolve routeKey := "resolve" + if class == limiterProof { policy = l.proof routeKey = "proof" } + key := routeKey + "|" + ip b, ok := l.buckets[key] + if !ok { l.buckets[key] = &tokenBucket{ tokens: policy.burst - 1, last: now, lastSeen: now, } + return true } + elapsed := now.Sub(b.last).Seconds() + if elapsed > 0 { b.tokens += elapsed * policy.rps + if b.tokens > policy.burst { b.tokens = policy.burst } } + b.last = now b.lastSeen = now + if b.tokens < 1 { return false } + b.tokens-- + return true } func clientIP(r *http.Request) string { if forwarded := strings.TrimSpace(r.Header.Get("X-Forwarded-For")); forwarded != "" { parts := strings.Split(forwarded, ",") + if len(parts) > 0 { if ip := strings.TrimSpace(parts[0]); ip != "" { return ip } } } + if realIP := strings.TrimSpace(r.Header.Get("X-Real-IP")); realIP != "" { return realIP } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil && host != "" { return host } + return r.RemoteAddr } func dirSize(path string) (int64, error) { var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { if err != nil { return err } + if d.IsDir() { return nil } + info, err := d.Info() + if err != nil { return err } + total += info.Size() + return nil }) + return total, err } diff --git a/internal/api/plc_compatibility_test.go b/internal/api/plc_compatibility_test.go index 67cbafa..b914278 100644 --- a/internal/api/plc_compatibility_test.go +++ b/internal/api/plc_compatibility_test.go @@ -14,7 +14,6 @@ import ( "strings" "testing" "time" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" @@ -24,28 +23,37 @@ import ( func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) { ts, store, _, cleanup := newCompatibilityServer(t) + defer cleanup() state, ok, err := store.GetState("did:plc:alice") + if err != nil { t.Fatalf("get state: %v", err) } + if !ok { t.Fatalf("state not found") } resp, err := http.Get(ts.URL + "/did:plc:alice") + if err != nil { t.Fatalf("get did: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { t.Fatalf("status: got %d want 200", resp.StatusCode) } + if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") { t.Fatalf("content-type mismatch: %s", got) } + body, _ := io.ReadAll(resp.Body) + if strings.TrimSpace(string(body)) != strings.TrimSpace(string(state.DIDDocument)) { t.Fatalf("did document mismatch\n got: %s\nwant: %s", string(body), string(state.DIDDocument)) } @@ -53,26 +61,35 @@ func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) { func TestPLCCompatibilityGetLogOrdered(t *testing.T) { ts, _, recs, cleanup := newCompatibilityServer(t) + defer cleanup() resp, err := http.Get(ts.URL + "/did:plc:alice/log") + if err != nil { t.Fatalf("get log: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { t.Fatalf("status: got %d want 200", resp.StatusCode) } + var ops []map[string]any + if err := json.NewDecoder(resp.Body).Decode(&ops); err != nil { t.Fatalf("decode log: %v", err) } + if len(ops) != 2 { t.Fatalf("log length mismatch: got %d want 2", len(ops)) } + if _, ok := ops[0]["prev"]; ok { t.Fatalf("first op should be genesis without prev") } + if prev, _ := ops[1]["prev"].(string); prev != recs[0].CID { t.Fatalf("second op prev mismatch: got %q want %q", prev, recs[0].CID) } @@ -80,29 +97,39 @@ func TestPLCCompatibilityGetLogOrdered(t *testing.T) { func TestPLCCompatibilityExportCount(t *testing.T) { ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() resp, err := http.Get(ts.URL + "/export?count=2") + if err != nil { t.Fatalf("get export: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { t.Fatalf("status: got %d want 200", resp.StatusCode) } + if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/jsonlines") { t.Fatalf("content-type mismatch: %s", got) } + body, _ := io.ReadAll(resp.Body) lines := strings.Split(strings.TrimSpace(string(body)), "\n") + if len(lines) != 2 { t.Fatalf("line count mismatch: got %d want 2", len(lines)) } + for _, line := range lines { var entry map[string]any + if err := json.Unmarshal([]byte(line), &entry); err != nil { t.Fatalf("decode export line: %v", err) } + for _, key := range []string{"did", "operation", "cid", "nullified", "createdAt"} { if _, ok := entry[key]; !ok { t.Fatalf("missing export key %q in %v", key, entry) @@ -113,20 +140,27 @@ func TestPLCCompatibilityExportCount(t *testing.T) { func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) { ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() req, err := http.NewRequest(http.MethodPost, ts.URL+"/did:plc:alice", strings.NewReader(`{}`)) + if err != nil { t.Fatalf("new request: %v", err) } + resp, err := http.DefaultClient.Do(req) + if err != nil { t.Fatalf("post did: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusMethodNotAllowed { t.Fatalf("status: got %d want 405", resp.StatusCode) } + if allow := resp.Header.Get("Allow"); allow != http.MethodGet { t.Fatalf("allow header mismatch: got %q want %q", allow, http.MethodGet) } @@ -134,14 +168,19 @@ func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) { func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) { ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() resp, err := http.Get(ts.URL + "/did:plc:alice") + if err != nil { t.Fatalf("get did: %v", err) } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if strings.Contains(string(body), "checkpoint_reference") { t.Fatalf("compatibility endpoint leaked verification metadata: %s", string(body)) } @@ -149,48 +188,63 @@ func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) { func TestPLCCompatibilityProofEndpointStillWorks(t *testing.T) { ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() resp, err := http.Get(ts.URL + "/did/did:plc:alice/proof") + if err != nil { t.Fatalf("get proof: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) + t.Fatalf("proof status: got %d want 200 body=%s", resp.StatusCode, string(body)) } } func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStore, []types.ExportRecord, func()) { t.Helper() + tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data: %v", err) } seed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(seed); err != nil { t.Fatalf("seed: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { t.Fatalf("write key: %v", err) } recs := buildCheckpointScenarioRecords(t) sourcePath := filepath.Join(tmp, "records.ndjson") + writeRecordsFile(t, sourcePath, recs) store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open block log: %v", err) } @@ -212,12 +266,15 @@ func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStor } cpMgr := checkpoint.NewManager(store, dataDir, keyPath) svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) + if err := svc.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } + if err := svc.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } + if _, err := svc.Snapshot(context.Background()); err != nil { t.Fatalf("snapshot: %v", err) } @@ -226,7 +283,9 @@ func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStor cleanup := func() { ts.Close() svc.Close() + _ = store.Close() } + return ts, store, recs, cleanup } diff --git a/internal/api/server.go b/internal/api/server.go index f773145..2a3c589 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "time" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" @@ -44,21 +43,27 @@ func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, }, limiter: newIPRateLimiter(cfg.RateLimit), } + for _, opt := range opts { opt(s) } + s.metrics = newServerMetrics(cfg, store, ingestor) + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { s.metrics.checkpointSequence.Set(float64(cp.Sequence)) } + if ingestor != nil { ingestor.SetMetricsSink(s.metrics) } + return s } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() + mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth))) mux.Handle("/metrics", s.metrics.Handler()) mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus))) @@ -67,6 +72,7 @@ func (s *Server) Handler() http.Handler { mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID))) mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility))) mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility))) + return mux } @@ -76,19 +82,27 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { seq, err := s.store.GetGlobalSeq() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + stats := ingest.Stats{} + if s.ingestor != nil { stats = s.ingestor.Stats() } + payload := map[string]any{ "mode": s.cfg.Mode, "verify_policy": s.cfg.VerifyPolicy, @@ -96,161 +110,225 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { "stats": stats, "build": s.build, } + if s.ingestor != nil { payload["corrupted"] = s.ingestor.IsCorrupted() + if err := s.ingestor.CorruptionError(); err != nil { payload["corruption_error"] = err.Error() } } + if ok { payload["latest_checkpoint"] = cp } + writeJSON(w, http.StatusOK, payload) } func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) { cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint")) + return } + writeJSON(w, http.StatusOK, cp) } func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/checkpoints/") + if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence")) + return } + seq, err := strconv.ParseUint(path, 10, 64) + if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence")) + return } + cp, ok, err := s.store.GetCheckpoint(seq) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found")) + return } + writeJSON(w, http.StatusOK, cp) } func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/did/") + if path == "" { writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did")) + return } + if strings.HasSuffix(path, "/proof") { did := strings.TrimSuffix(path, "/proof") + if !s.allowRequest(r, limiterProof) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded")) + return } + s.handleDIDProof(w, r, did) + return } + if !s.allowRequest(r, limiterResolve) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded")) + return } + s.handleDIDResolve(w, r, path) } func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) { state, ok, err := s.store.GetState(did) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + cp, cpOK, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + resp := map[string]any{ "did": did, "did_document": json.RawMessage(state.DIDDocument), "chain_tip_hash": state.ChainTipHash, } + if cpOK { resp["checkpoint_reference"] = map[string]any{ "sequence": cp.Sequence, "checkpoint_hash": cp.CheckpointHash, } } + writeJSON(w, http.StatusOK, resp) } func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + if err := s.ingestor.CorruptionError(); err != nil { writeErr(w, http.StatusServiceUnavailable, err) + return } cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r) + if err != nil { writeErr(w, http.StatusBadRequest, err) + return } tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence) + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusNotFound, err) + return } + siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence) + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + if !found { writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state")) + return } leafBytes, err := hex.DecodeString(leafHash) + if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err)) + return } + root, err := hex.DecodeString(cp.DIDMerkleRoot) + if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root")) + return } + if !merkle.VerifyProof(leafBytes, siblings, root) { writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check")) + return } if err := verifyCheckpointUnchanged(); err != nil { writeErr(w, http.StatusConflict, err) + return } @@ -265,6 +343,7 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri CheckpointSig: cp.Signature, CheckpointKeyID: cp.KeyID, } + writeJSON(w, http.StatusOK, map[string]any{ "did": did, "checkpoint_sequence": cp.Sequence, @@ -287,28 +366,39 @@ type plcAuditEntry struct { func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/") + if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return } + if path == "export" { s.handleExportCompatibility(w, r) + return } + parts := strings.Split(path, "/") did := parts[0] + if !strings.HasPrefix(did, "did:") { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return } + if r.Method == http.MethodPost && len(parts) == 1 { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror")) + return } + if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return } @@ -330,90 +420,129 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) { state, ok, err := s.store.GetState(did) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + status := http.StatusOK + if isTombstonedDIDDocument(state.DIDDocument) { status = http.StatusGone } + w.Header().Set("Content-Type", "application/did+ld+json") w.WriteHeader(status) + _, _ = w.Write(state.DIDDocument) } func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + ops := make([]json.RawMessage, 0, len(logEntries)) + for _, rec := range logEntries { ops = append(ops, rec.Operation) } + writeJSONWithContentType(w, http.StatusOK, "application/json", ops) } func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + _, _ = w.Write(rec.Operation) } func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + audit := make([]plcAuditEntry, 0, len(logEntries)) + for _, rec := range logEntries { audit = append(audit, plcAuditEntry{ DID: did, @@ -423,27 +552,37 @@ func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *htt CreatedAt: rec.CreatedAt, }) } + writeJSONWithContentType(w, http.StatusOK, "application/json", audit) } func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + writeJSONWithContentType(w, http.StatusOK, "application/json", data) } @@ -451,38 +590,51 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return } + if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } count := 1000 + if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" { n, err := strconv.Atoi(rawCount) + if err != nil || n < 1 { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter")) + return } + if n > 1000 { n = 1000 } + count = n } var after time.Time + if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" { parsed, err := time.Parse(time.RFC3339, rawAfter) + if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter")) + return } + after = parsed } w.Header().Set("Content-Type", "application/jsonlines") w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) enc := json.NewEncoder(w) err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error { @@ -493,14 +645,18 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques Nullified: rec.Nullified, CreatedAt: rec.CreatedAt, } + if err := enc.Encode(entry); err != nil { return err } + if flusher != nil { flusher.Flush() } + return nil }) + if err != nil { // Response has already started; best effort termination. return @@ -511,22 +667,30 @@ func isTombstonedDIDDocument(raw []byte) bool { if len(raw) == 0 { return false } + var doc map[string]any + if err := json.Unmarshal(raw, &doc); err != nil { return false } + deactivated, _ := doc["deactivated"].(bool) + return deactivated } func (s *Server) withTimeout(next http.Handler) http.Handler { timeout := s.cfg.RequestTimeout + if timeout <= 0 { timeout = 10 * time.Second } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + next.ServeHTTP(w, r.WithContext(ctx)) }) } @@ -535,53 +699,70 @@ func (s *Server) allowRequest(r *http.Request, class limiterClass) bool { if s.limiter == nil { return true } + return s.limiter.Allow(clientIP(r), class) } func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) { checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint")) + if checkpointParam == "" { cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { return types.CheckpointV1{}, nil, err } + if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available") } + return cp, func() error { now, ok, err := s.store.GetLatestCheckpoint() + if err != nil { return err } + if !ok { return fmt.Errorf("latest checkpoint disappeared during request") } + if now.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint advanced during proof generation") } + return nil }, nil } seq, err := strconv.ParseUint(checkpointParam, 10, 64) + if err != nil { return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter") } + cp, ok, err := s.store.GetCheckpoint(seq) + if err != nil { return types.CheckpointV1{}, nil, err } + if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq) } + return cp, func() error { again, ok, err := s.store.GetCheckpoint(seq) + if err != nil { return err } + if !ok || again.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint %d changed during proof generation", seq) } + return nil }, nil } @@ -592,10 +773,13 @@ func writeJSON(w http.ResponseWriter, code int, v any) { func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) { w.Header().Set("Content-Type", "application/json") + if strings.TrimSpace(contentType) != "" { w.Header().Set("Content-Type", contentType) } + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) } diff --git a/internal/api/server_checkpoint_test.go b/internal/api/server_checkpoint_test.go index 9a12f61..531f24d 100644 --- a/internal/api/server_checkpoint_test.go +++ b/internal/api/server_checkpoint_test.go @@ -5,7 +5,6 @@ import ( "path/filepath" "strings" "testing" - "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" @@ -14,9 +13,11 @@ import ( func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) { dataDir := t.TempDir() store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() if err := store.PutCheckpoint(types.CheckpointV1{ @@ -32,9 +33,11 @@ func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) { srv := NewServer(config.Default(), store, nil, nil) req := httptest.NewRequest("GET", "/did/did:plc:alice/proof", nil) cp, verifyUnchanged, err := srv.selectCheckpointForProof(req) + if err != nil { t.Fatalf("select checkpoint: %v", err) } + if cp.Sequence != 10 { t.Fatalf("selected unexpected checkpoint sequence: got %d want 10", cp.Sequence) } @@ -57,9 +60,11 @@ func TestSelectCheckpointForProofDetectsLatestAdvance(t *testing.T) { func TestSelectCheckpointForProofDetectsHistoricalMutation(t *testing.T) { dataDir := t.TempDir() store, err := storage.OpenPebble(filepath.Clean(dataDir)) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() if err := store.PutCheckpoint(types.CheckpointV1{ @@ -75,9 +80,11 @@ func TestSelectCheckpointForProofDetectsHistoricalMutation(t *testing.T) { srv := NewServer(config.Default(), store, nil, nil) req := httptest.NewRequest("GET", "/did/did:plc:alice/proof?checkpoint=20", nil) cp, verifyUnchanged, err := srv.selectCheckpointForProof(req) + if err != nil { t.Fatalf("select historical checkpoint: %v", err) } + if cp.CheckpointHash != "cp-20-a" { t.Fatalf("selected unexpected checkpoint hash: got %s", cp.CheckpointHash) } diff --git a/internal/api/server_hardening_test.go b/internal/api/server_hardening_test.go index bb9f24c..fcdf87b 100644 --- a/internal/api/server_hardening_test.go +++ b/internal/api/server_hardening_test.go @@ -8,7 +8,6 @@ import ( "strings" "testing" "time" - "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" @@ -16,10 +15,13 @@ import ( func TestResolveRateLimitPerIP(t *testing.T) { store, err := storage.OpenPebble(t.TempDir()) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() + if err := store.PutState(types.StateV1{Version: 1, DID: "did:plc:alice", DIDDocument: []byte(`{"id":"did:plc:alice"}`), ChainTipHash: "tip", LatestOpSeq: 1, UpdatedAt: time.Now().UTC()}); err != nil { t.Fatalf("put state: %v", err) } @@ -31,11 +33,12 @@ func TestResolveRateLimitPerIP(t *testing.T) { cfg.RateLimit.ProofRPS = 1 cfg.RateLimit.ProofBurst = 1 h := NewServer(cfg, store, nil, nil).Handler() - req1 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil) req1.RemoteAddr = "203.0.113.7:12345" rr1 := httptest.NewRecorder() + h.ServeHTTP(rr1, req1) + if rr1.Code != http.StatusOK { t.Fatalf("first request status: got %d want %d", rr1.Code, http.StatusOK) } @@ -43,7 +46,9 @@ func TestResolveRateLimitPerIP(t *testing.T) { req2 := httptest.NewRequest(http.MethodGet, "/did/did:plc:alice", nil) req2.RemoteAddr = "203.0.113.7:12345" rr2 := httptest.NewRecorder() + h.ServeHTTP(rr2, req2) + if rr2.Code != http.StatusTooManyRequests { t.Fatalf("second request status: got %d want %d", rr2.Code, http.StatusTooManyRequests) } @@ -51,9 +56,11 @@ func TestResolveRateLimitPerIP(t *testing.T) { func TestStatusIncludesBuildInfo(t *testing.T) { store, err := storage.OpenPebble(t.TempDir()) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() h := NewServer(config.Default(), store, nil, nil, WithBuildInfo(BuildInfo{ @@ -62,25 +69,31 @@ func TestStatusIncludesBuildInfo(t *testing.T) { BuildDate: "2026-02-26T00:00:00Z", GoVersion: "go1.test", })).Handler() - req := httptest.NewRequest(http.MethodGet, "/status", nil) rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { t.Fatalf("status code: got %d want %d", rr.Code, http.StatusOK) } var payload map[string]any + if err := json.Unmarshal(rr.Body.Bytes(), &payload); err != nil { t.Fatalf("decode status: %v", err) } + build, ok := payload["build"].(map[string]any) + if !ok { t.Fatalf("missing build section: %v", payload) } + if got := build["version"]; got != "v0.1.0" { t.Fatalf("unexpected build version: %v", got) } + if got := build["commit"]; got != "abc123" { t.Fatalf("unexpected build commit: %v", got) } @@ -88,20 +101,26 @@ func TestStatusIncludesBuildInfo(t *testing.T) { func TestMetricsExposeRequiredSeries(t *testing.T) { store, err := storage.OpenPebble(t.TempDir()) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() h := NewServer(config.Default(), store, nil, nil).Handler() req := httptest.NewRequest(http.MethodGet, "/metrics", nil) rr := httptest.NewRecorder() + h.ServeHTTP(rr, req) + if rr.Code != http.StatusOK { t.Fatalf("metrics status: got %d want %d", rr.Code, http.StatusOK) } + body, _ := io.ReadAll(rr.Body) text := string(body) + for _, metric := range []string{ "ingest_ops_total", "ingest_ops_per_second", diff --git a/internal/api/server_integration_test.go b/internal/api/server_integration_test.go index 1736750..edb64bd 100644 --- a/internal/api/server_integration_test.go +++ b/internal/api/server_integration_test.go @@ -14,7 +14,6 @@ import ( "path/filepath" "strings" "testing" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" @@ -25,32 +24,41 @@ import ( func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data: %v", err) } keyPath := filepath.Join(tmp, "mirror.key") seed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(seed); err != nil { t.Fatalf("seed: %v", err) } + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { t.Fatalf("write key: %v", err) } recs := buildCheckpointScenarioRecords(t) sourcePath := filepath.Join(tmp, "records.ndjson") + writeRecordsFile(t, sourcePath, recs) store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() + if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open block log: %v", err) } @@ -69,29 +77,37 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) { } cpMgr := checkpoint.NewManager(store, dataDir, keyPath) svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) + if err := svc.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } + if err := svc.Flush(context.Background()); err != nil { t.Fatalf("flush: %v", err) } cp2, ok, err := store.GetCheckpoint(2) + if err != nil || !ok { t.Fatalf("checkpoint 2 missing: ok=%v err=%v", ok, err) } ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler()) + defer ts.Close() url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof?checkpoint=2" resp, err := http.Get(url) + if err != nil { t.Fatalf("get proof: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) + t.Fatalf("proof status: %d body=%s", resp.StatusCode, string(bodyBytes)) } @@ -102,15 +118,19 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) { MerkleRoot string `json:"merkle_root"` } `json:"inclusion_proof"` } + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { t.Fatalf("decode response: %v", err) } + if body.CheckpointSequence != 2 { t.Fatalf("unexpected checkpoint sequence: got %d want 2", body.CheckpointSequence) } + if body.ChainTipReference != recs[0].CID { t.Fatalf("expected old tip at checkpoint=2: got %s want %s", body.ChainTipReference, recs[0].CID) } + if body.InclusionProof.MerkleRoot != cp2.DIDMerkleRoot { t.Fatalf("merkle root mismatch: got %s want %s", body.InclusionProof.MerkleRoot, cp2.DIDMerkleRoot) } @@ -119,34 +139,44 @@ func TestProofAgainstOlderCheckpointAfterFurtherIngest(t *testing.T) { func TestCorruptedBlockRefusesProof(t *testing.T) { tmp := t.TempDir() dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { t.Fatalf("mkdir data: %v", err) } seed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(seed); err != nil { t.Fatalf("seed: %v", err) } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { t.Fatalf("write key: %v", err) } recs := buildCheckpointScenarioRecords(t) sourcePath := filepath.Join(tmp, "records.ndjson") + writeRecordsFile(t, sourcePath, recs) store, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open pebble: %v", err) } + if err := store.SetMode(config.ModeMirror); err != nil { t.Fatalf("set mode: %v", err) } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open block log: %v", err) } + cfg := config.Config{ Mode: config.ModeMirror, DataDir: dataDir, @@ -161,49 +191,67 @@ func TestCorruptedBlockRefusesProof(t *testing.T) { } cpMgr := checkpoint.NewManager(store, dataDir, keyPath) svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) + if err := svc.Replay(context.Background()); err != nil { t.Fatalf("replay: %v", err) } + if _, err := svc.Snapshot(context.Background()); err != nil { t.Fatalf("snapshot: %v", err) } + svc.Close() + if err := store.Close(); err != nil { t.Fatalf("close store: %v", err) } blockPath := filepath.Join(dataDir, "ops", "000001.zst") b, err := os.ReadFile(blockPath) + if err != nil { t.Fatalf("read block: %v", err) } + b[len(b)/2] ^= 0xFF + if err := os.WriteFile(blockPath, b, 0o644); err != nil { t.Fatalf("write corrupted block: %v", err) } store2, err := storage.OpenPebble(dataDir) + if err != nil { t.Fatalf("open store2: %v", err) } + defer store2.Close() + bl2, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { t.Fatalf("open blocklog2: %v", err) } + svc2 := ingest.NewService(cfg, store2, ingest.NewClient("file:///nonexistent"), bl2, checkpoint.NewManager(store2, dataDir, keyPath)) + if !svc2.IsCorrupted() { t.Fatalf("expected service to detect corruption on restart") } ts := httptest.NewServer(NewServer(cfg, store2, svc2, checkpoint.NewManager(store2, dataDir, keyPath)).Handler()) + defer ts.Close() + url := ts.URL + "/did/" + strings.ReplaceAll("did:plc:alice", ":", "%3A") + "/proof" resp, err := http.Get(url) + if err != nil { t.Fatalf("request proof: %v", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusServiceUnavailable { t.Fatalf("expected 503 for corrupted proof, got %d", resp.StatusCode) } @@ -211,13 +259,18 @@ func TestCorruptedBlockRefusesProof(t *testing.T) { func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) { t.Helper() + f, err := os.Create(path) + if err != nil { t.Fatalf("create records file: %v", err) } + defer f.Close() + for _, rec := range recs { b, _ := json.Marshal(rec) + if _, err := fmt.Fprintln(f, string(b)); err != nil { t.Fatalf("write record: %v", err) } @@ -226,36 +279,44 @@ func writeRecordsFile(t *testing.T, path string, recs []types.ExportRecord) { func buildCheckpointScenarioRecords(t *testing.T) []types.ExportRecord { t.Helper() + pub, priv, err := ed25519.GenerateKey(rand.Reader) + if err != nil { t.Fatalf("generate key: %v", err) } + mk := func(seq uint64, did, prev string) types.ExportRecord { unsigned := map[string]any{ "did": did, "didDoc": map[string]any{"id": did, "seq": seq}, "publicKey": base64.RawURLEncoding.EncodeToString(pub), } + if prev != "" { unsigned["prev"] = prev } + payload, _ := json.Marshal(unsigned) canon, _ := types.CanonicalizeJSON(payload) sig := ed25519.Sign(priv, canon) op := map[string]any{} + for k, v := range unsigned { op[k] = v } + op["sigPayload"] = base64.RawURLEncoding.EncodeToString(canon) op["sig"] = base64.RawURLEncoding.EncodeToString(sig) raw, _ := json.Marshal(op) opCanon, _ := types.CanonicalizeJSON(raw) cid := types.ComputeDigestCID(opCanon) + return types.ExportRecord{Seq: seq, DID: did, CID: cid, Operation: raw} } - r1 := mk(1, "did:plc:alice", "") r2 := mk(2, "did:plc:bob", "") r3 := mk(3, "did:plc:alice", r1.CID) + return []types.ExportRecord{r1, r2, r3} } |