diff options
Diffstat (limited to 'internal/api/server.go')
| -rw-r--r-- | internal/api/server.go | 186 |
1 files changed, 185 insertions, 1 deletions
diff --git a/internal/api/server.go b/internal/api/server.go index f773145..2a3c589 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" "time" - "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" @@ -44,21 +43,27 @@ func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service, }, limiter: newIPRateLimiter(cfg.RateLimit), } + for _, opt := range opts { opt(s) } + s.metrics = newServerMetrics(cfg, store, ingestor) + if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok { s.metrics.checkpointSequence.Set(float64(cp.Sequence)) } + if ingestor != nil { ingestor.SetMetricsSink(s.metrics) } + return s } func (s *Server) Handler() http.Handler { mux := http.NewServeMux() + mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth))) mux.Handle("/metrics", s.metrics.Handler()) mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus))) @@ -67,6 +72,7 @@ func (s *Server) Handler() http.Handler { mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID))) mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility))) mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility))) + return mux } @@ -76,19 +82,27 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { seq, err := s.store.GetGlobalSeq() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + stats := ingest.Stats{} + if s.ingestor != nil { stats = s.ingestor.Stats() } + payload := map[string]any{ "mode": s.cfg.Mode, "verify_policy": s.cfg.VerifyPolicy, @@ -96,161 +110,225 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) { "stats": stats, "build": s.build, } + if s.ingestor != nil { payload["corrupted"] = s.ingestor.IsCorrupted() + if err := s.ingestor.CorruptionError(); err != nil { payload["corruption_error"] = err.Error() } } + if ok { payload["latest_checkpoint"] = cp } + writeJSON(w, http.StatusOK, payload) } func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) { cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint")) + return } + writeJSON(w, http.StatusOK, cp) } func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/checkpoints/") + if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence")) + return } + seq, err := strconv.ParseUint(path, 10, 64) + if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence")) + return } + cp, ok, err := s.store.GetCheckpoint(seq) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found")) + return } + writeJSON(w, http.StatusOK, cp) } func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/did/") + if path == "" { writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did")) + return } + if strings.HasSuffix(path, "/proof") { did := strings.TrimSuffix(path, "/proof") + if !s.allowRequest(r, limiterProof) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded")) + return } + s.handleDIDProof(w, r, did) + return } + if !s.allowRequest(r, limiterResolve) { writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded")) + return } + s.handleDIDResolve(w, r, path) } func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) { state, ok, err := s.store.GetState(did) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + cp, cpOK, err := s.store.GetLatestCheckpoint() + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + resp := map[string]any{ "did": did, "did_document": json.RawMessage(state.DIDDocument), "chain_tip_hash": state.ChainTipHash, } + if cpOK { resp["checkpoint_reference"] = map[string]any{ "sequence": cp.Sequence, "checkpoint_hash": cp.CheckpointHash, } } + writeJSON(w, http.StatusOK, resp) } func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + if err := s.ingestor.CorruptionError(); err != nil { writeErr(w, http.StatusServiceUnavailable, err) + return } cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r) + if err != nil { writeErr(w, http.StatusBadRequest, err) + return } tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence) + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusNotFound, err) + return } + siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence) + if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + if !found { writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state")) + return } leafBytes, err := hex.DecodeString(leafHash) + if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err)) + return } + root, err := hex.DecodeString(cp.DIDMerkleRoot) + if err != nil { writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root")) + return } + if !merkle.VerifyProof(leafBytes, siblings, root) { writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check")) + return } if err := verifyCheckpointUnchanged(); err != nil { writeErr(w, http.StatusConflict, err) + return } @@ -265,6 +343,7 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri CheckpointSig: cp.Signature, CheckpointKeyID: cp.KeyID, } + writeJSON(w, http.StatusOK, map[string]any{ "did": did, "checkpoint_sequence": cp.Sequence, @@ -287,28 +366,39 @@ type plcAuditEntry struct { func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) { path := strings.TrimPrefix(r.URL.Path, "/") + if path == "" { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return } + if path == "export" { s.handleExportCompatibility(w, r) + return } + parts := strings.Split(path, "/") did := parts[0] + if !strings.HasPrefix(did, "did:") { writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return } + if r.Method == http.MethodPost && len(parts) == 1 { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror")) + return } + if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return } @@ -330,90 +420,129 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) { state, ok, err := s.store.GetState(did) + if err != nil { writeErr(w, http.StatusInternalServerError, err) + return } + if !ok { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + status := http.StatusOK + if isTombstonedDIDDocument(state.DIDDocument) { status = http.StatusGone } + w.Header().Set("Content-Type", "application/did+ld+json") w.WriteHeader(status) + _, _ = w.Write(state.DIDDocument) } func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + ops := make([]json.RawMessage, 0, len(logEntries)) + for _, rec := range logEntries { ops = append(ops, rec.Operation) } + writeJSONWithContentType(w, http.StatusOK, "application/json", ops) } func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + _, _ = w.Write(rec.Operation) } func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + audit := make([]plcAuditEntry, 0, len(logEntries)) + for _, rec := range logEntries { audit = append(audit, plcAuditEntry{ DID: did, @@ -423,27 +552,37 @@ func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *htt CreatedAt: rec.CreatedAt, }) } + writeJSONWithContentType(w, http.StatusOK, "application/json", audit) } func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) { if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } + data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did) + if err != nil { if errors.Is(err, ingest.ErrDIDNotFound) { writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { writeErr(w, http.StatusGatewayTimeout, err) + return } + writeErr(w, http.StatusInternalServerError, err) + return } + writeJSONWithContentType(w, http.StatusOK, "application/json", data) } @@ -451,38 +590,51 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques if r.Method != http.MethodGet { w.Header().Set("Allow", http.MethodGet) writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return } + if s.ingestor == nil { writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return } count := 1000 + if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" { n, err := strconv.Atoi(rawCount) + if err != nil || n < 1 { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter")) + return } + if n > 1000 { n = 1000 } + count = n } var after time.Time + if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" { parsed, err := time.Parse(time.RFC3339, rawAfter) + if err != nil { writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter")) + return } + after = parsed } w.Header().Set("Content-Type", "application/jsonlines") w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) enc := json.NewEncoder(w) err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error { @@ -493,14 +645,18 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques Nullified: rec.Nullified, CreatedAt: rec.CreatedAt, } + if err := enc.Encode(entry); err != nil { return err } + if flusher != nil { flusher.Flush() } + return nil }) + if err != nil { // Response has already started; best effort termination. return @@ -511,22 +667,30 @@ func isTombstonedDIDDocument(raw []byte) bool { if len(raw) == 0 { return false } + var doc map[string]any + if err := json.Unmarshal(raw, &doc); err != nil { return false } + deactivated, _ := doc["deactivated"].(bool) + return deactivated } func (s *Server) withTimeout(next http.Handler) http.Handler { timeout := s.cfg.RequestTimeout + if timeout <= 0 { timeout = 10 * time.Second } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), timeout) + defer cancel() + next.ServeHTTP(w, r.WithContext(ctx)) }) } @@ -535,53 +699,70 @@ func (s *Server) allowRequest(r *http.Request, class limiterClass) bool { if s.limiter == nil { return true } + return s.limiter.Allow(clientIP(r), class) } func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) { checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint")) + if checkpointParam == "" { cp, ok, err := s.store.GetLatestCheckpoint() + if err != nil { return types.CheckpointV1{}, nil, err } + if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available") } + return cp, func() error { now, ok, err := s.store.GetLatestCheckpoint() + if err != nil { return err } + if !ok { return fmt.Errorf("latest checkpoint disappeared during request") } + if now.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint advanced during proof generation") } + return nil }, nil } seq, err := strconv.ParseUint(checkpointParam, 10, 64) + if err != nil { return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter") } + cp, ok, err := s.store.GetCheckpoint(seq) + if err != nil { return types.CheckpointV1{}, nil, err } + if !ok { return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq) } + return cp, func() error { again, ok, err := s.store.GetCheckpoint(seq) + if err != nil { return err } + if !ok || again.CheckpointHash != cp.CheckpointHash { return fmt.Errorf("checkpoint %d changed during proof generation", seq) } + return nil }, nil } @@ -592,10 +773,13 @@ func writeJSON(w http.ResponseWriter, code int, v any) { func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) { w.Header().Set("Content-Type", "application/json") + if strings.TrimSpace(contentType) != "" { w.Header().Set("Content-Type", contentType) } + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) } |