package api import ( "context" "encoding/hex" "encoding/json" "errors" "fmt" "net/http" "strconv" "strings" "time" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/merkle" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "github.com/Fuwn/plutia/pkg/proof" ) type Server struct { cfg config.Config store storage.Store ingestor *ingest.Service checkpoints *checkpoint.Manager build BuildInfo limiter *ipRateLimiter metrics *serverMetrics } func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, checkpoints *checkpoint.Manager, opts ...serverOption) *Server { s := &Server{ cfg: cfg, store: store, ingestor: ingestor, checkpoints: checkpoints, build: BuildInfo{ Version: "dev", Commit: "unknown", BuildDate: "unknown", GoVersion: "unknown", }, limiter: newIPRateLimiter(cfg.RateLimit), } for _, opt := range opts { opt(s) } s.metrics = newServerMetrics(cfg, store, ingestor) if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { s.metrics.checkpointSequence.Set(float64(cp.Sequence)) } if ingestor != nil { ingestor.SetMetricsSink(s.metrics) } return s } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth))) mux.Handle("/metrics", s.metrics.Handler()) mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus))) mux.Handle("/checkpoints/latest", s.withTimeout(http.HandlerFunc(s.handleLatestCheckpoint))) mux.Handle("/checkpoints/", s.withTimeout(http.HandlerFunc(s.handleCheckpointBySequence))) mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID))) mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility))) mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility))) return mux } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) } func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { seq, err := s.store.GetGlobalSeq() if err != nil { writeErr(w, http.StatusInternalServerError, err) return } cp, ok, err := s.store.GetLatestCheckpoint() if err != nil { writeErr(w, http.StatusInternalServerError, err) return } stats := ingest.Stats{} if s.ingestor != nil { stats = s.ingestor.Stats() } payload := map[string]any{ "mode": s.cfg.Mode, "verify_policy": s.cfg.VerifyPolicy, "global_seq": seq, "stats": stats, "build": s.build, } if s.ingestor != nil { payload["corrupted"] = s.ingestor.IsCorrupted() if err := s.ingestor.CorruptionError(); err != nil { payload["corruption_error"] = err.Error() } } if ok { payload["latest_checkpoint"] = cp } writeJSON(w, http.StatusOK, payload) } func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) { cp, ok, err := s.store.GetLatestCheckpoint() if err != nil { writeErr(w, http.StatusInternalServerError, err) return } if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint")) return } writeJSON(w, http.StatusOK, cp) } func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/checkpoints/") if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence")) return } seq, err := strconv.ParseUint(path, 10, 64) if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence")) return } cp, ok, err := s.store.GetCheckpoint(seq) if err != nil { writeErr(w, http.StatusInternalServerError, err) return } if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found")) return } writeJSON(w, http.StatusOK, cp) } func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/did/") if path == "" { writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did")) return } if strings.HasSuffix(path, "/proof") { did := strings.TrimSuffix(path, "/proof") if !s.allowRequest(r, limiterProof) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded")) return } s.handleDIDProof(w, r, did) return } if !s.allowRequest(r, limiterResolve) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded")) return } s.handleDIDResolve(w, r, path) } func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) { state, ok, err := s.store.GetState(did) if err != nil { writeErr(w, http.StatusInternalServerError, err) return } if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } cp, cpOK, err := s.store.GetLatestCheckpoint() if err != nil { writeErr(w, http.StatusInternalServerError, err) return } resp := map[string]any{ "did": did, "did_document": json.RawMessage(state.DIDDocument), "chain_tip_hash": state.ChainTipHash, } if cpOK { resp["checkpoint_reference"] = map[string]any{ "sequence": cp.Sequence, "checkpoint_hash": cp.CheckpointHash, } } writeJSON(w, http.StatusOK, resp) } func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } if err := s.ingestor.CorruptionError(); err != nil { writeErr(w, http.StatusServiceUnavailable, err) return } cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r) if err != nil { writeErr(w, http.StatusBadRequest, err) return } tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusNotFound, err) return } siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusInternalServerError, err) return } if !found { writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state")) return } leafBytes, err := hex.DecodeString(leafHash) if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err)) return } root, err := hex.DecodeString(cp.DIDMerkleRoot) if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root")) return } if !merkle.VerifyProof(leafBytes, siblings, root) { writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check")) return } if err := verifyCheckpointUnchanged(); err != nil { writeErr(w, http.StatusConflict, err) return } response := proof.DIDInclusionProof{ DID: did, ChainTipHash: tipHash, LeafHash: leafHash, MerkleRoot: cp.DIDMerkleRoot, Siblings: siblings, CheckpointSeq: cp.Sequence, CheckpointHash: cp.CheckpointHash, CheckpointSig: cp.Signature, CheckpointKeyID: cp.KeyID, } writeJSON(w, http.StatusOK, map[string]any{ "did": did, "checkpoint_sequence": cp.Sequence, "checkpoint_hash": cp.CheckpointHash, "checkpoint_signature": cp.Signature, "merkle_root": cp.DIDMerkleRoot, "chain_tip_reference": tipHash, "inclusion_proof": response, "chain_operation_indices": seqs, }) } type plcAuditEntry struct { DID string `json:"did"` Operation json.RawMessage `json:"operation"` CID string `json:"cid"` Nullified bool `json:"nullified"` CreatedAt string `json:"createdAt"` } func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/") if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) return } if path == "export" { s.handleExportCompatibility(w, r) return } parts := strings.Split(path, "/") did := parts[0] if !strings.HasPrefix(did, "did:") { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) return } if r.Method == http.MethodPost && len(parts) == 1 { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror")) return } if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) return } switch { case len(parts) == 1: s.handleGetDIDCompatibility(w, did) case len(parts) == 2 && parts[1] == "data": s.handleGetDIDDataCompatibility(w, r, did) case len(parts) == 2 && parts[1] == "log": s.handleGetDIDLogCompatibility(w, r, did) case len(parts) == 3 && parts[1] == "log" && parts[2] == "last": s.handleGetDIDLogLastCompatibility(w, r, did) case len(parts) == 3 && parts[1] == "log" && parts[2] == "audit": s.handleGetDIDLogAuditCompatibility(w, r, did) default: writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) } } func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) { state, ok, err := s.store.GetState(did) if err != nil { writeErr(w, http.StatusInternalServerError, err) return } if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } status := http.StatusOK if isTombstonedDIDDocument(state.DIDDocument) { status = http.StatusGone } w.Header().Set("Content-Type", "application/did+ld+json") w.WriteHeader(status) _, _ = w.Write(state.DIDDocument) } func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusInternalServerError, err) return } ops := make([]json.RawMessage, 0, len(logEntries)) for _, rec := range logEntries { ops = append(ops, rec.Operation) } writeJSONWithContentType(w, http.StatusOK, "application/json", ops) } func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did) if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusInternalServerError, err) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write(rec.Operation) } func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusInternalServerError, err) return } audit := make([]plcAuditEntry, 0, len(logEntries)) for _, rec := range logEntries { audit = append(audit, plcAuditEntry{ DID: did, Operation: rec.Operation, CID: rec.CID, Nullified: rec.Nullified, CreatedAt: rec.CreatedAt, }) } writeJSONWithContentType(w, http.StatusOK, "application/json", audit) } func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did) if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) return } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) return } writeErr(w, http.StatusInternalServerError, err) return } writeJSONWithContentType(w, http.StatusOK, "application/json", data) } func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) return } if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) return } count := 1000 if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" { n, err := strconv.Atoi(rawCount) if err != nil || n < 1 { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter")) return } if n > 1000 { n = 1000 } count = n } var after time.Time if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" { parsed, err := time.Parse(time.RFC3339, rawAfter) if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter")) return } after = parsed } w.Header().Set("Content-Type", "application/jsonlines") w.WriteHeader(http.StatusOK) flusher, _ := w.(http.Flusher) enc := json.NewEncoder(w) err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error { entry := plcAuditEntry{ DID: rec.DID, Operation: rec.Operation, CID: rec.CID, Nullified: rec.Nullified, CreatedAt: rec.CreatedAt, } if err := enc.Encode(entry); err != nil { return err } if flusher != nil { flusher.Flush() } return nil }) if err != nil { // Response has already started; best effort termination. return } } func isTombstonedDIDDocument(raw []byte) bool { if len(raw) == 0 { return false } var doc map[string]any if err := json.Unmarshal(raw, &doc); err != nil { return false } deactivated, _ := doc["deactivated"].(bool) return deactivated } func (s *Server) withTimeout(next http.Handler) http.Handler { timeout := s.cfg.RequestTimeout if timeout <= 0 { timeout = 10 * time.Second } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), timeout) defer cancel() next.ServeHTTP(w, r.WithContext(ctx)) }) } func (s *Server) allowRequest(r *http.Request, class limiterClass) bool { if s.limiter == nil { return true } return s.limiter.Allow(clientIP(r), class) } func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) { checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint")) if checkpointParam == "" { cp, ok, err := s.store.GetLatestCheckpoint() if err != nil { return types.CheckpointV1{}, nil, err } if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available") } return cp, func() error { now, ok, err := s.store.GetLatestCheckpoint() if err != nil { return err } if !ok { return fmt.Errorf("latest checkpoint disappeared during request") } if now.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint advanced during proof generation") } return nil }, nil } seq, err := strconv.ParseUint(checkpointParam, 10, 64) if err != nil { return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter") } cp, ok, err := s.store.GetCheckpoint(seq) if err != nil { return types.CheckpointV1{}, nil, err } if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq) } return cp, func() error { again, ok, err := s.store.GetCheckpoint(seq) if err != nil { return err } if !ok || again.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint %d changed during proof generation", seq) } return nil }, nil } func writeJSON(w http.ResponseWriter, code int, v any) { writeJSONWithContentType(w, code, "application/json", v) } func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) { w.Header().Set("Content-Type", "application/json") if strings.TrimSpace(contentType) != "" { w.Header().Set("Content-Type", contentType) } w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } func writeErr(w http.ResponseWriter, code int, err error) { writeJSON(w, code, map[string]any{"error": err.Error()}) }