aboutsummaryrefslogtreecommitdiff
path: root/internal/api/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/api/server.go')
-rw-r--r--internal/api/server.go186
1 files changed, 185 insertions, 1 deletions
diff --git a/internal/api/server.go b/internal/api/server.go
index f773145..2a3c589 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -10,7 +10,6 @@ import (
"strconv"
"strings"
"time"
-
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
@@ -44,21 +43,27 @@ func NewServer(cfg config.Config, store storage.Store, ingestor *ingest.Service,
},
limiter: newIPRateLimiter(cfg.RateLimit),
}
+
for _, opt := range opts {
opt(s)
}
+
s.metrics = newServerMetrics(cfg, store, ingestor)
+
if cp, ok, err := store.GetLatestCheckpoint(); err == nil && ok {
s.metrics.checkpointSequence.Set(float64(cp.Sequence))
}
+
if ingestor != nil {
ingestor.SetMetricsSink(s.metrics)
}
+
return s
}
func (s *Server) Handler() http.Handler {
mux := http.NewServeMux()
+
mux.Handle("/health", s.withTimeout(http.HandlerFunc(s.handleHealth)))
mux.Handle("/metrics", s.metrics.Handler())
mux.Handle("/status", s.withTimeout(http.HandlerFunc(s.handleStatus)))
@@ -67,6 +72,7 @@ func (s *Server) Handler() http.Handler {
mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID)))
mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility)))
mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility)))
+
return mux
}
@@ -76,19 +82,27 @@ func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
seq, err := s.store.GetGlobalSeq()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
stats := ingest.Stats{}
+
if s.ingestor != nil {
stats = s.ingestor.Stats()
}
+
payload := map[string]any{
"mode": s.cfg.Mode,
"verify_policy": s.cfg.VerifyPolicy,
@@ -96,161 +110,225 @@ func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
"stats": stats,
"build": s.build,
}
+
if s.ingestor != nil {
payload["corrupted"] = s.ingestor.IsCorrupted()
+
if err := s.ingestor.CorruptionError(); err != nil {
payload["corruption_error"] = err.Error()
}
}
+
if ok {
payload["latest_checkpoint"] = cp
}
+
writeJSON(w, http.StatusOK, payload)
}
func (s *Server) handleLatestCheckpoint(w http.ResponseWriter, r *http.Request) {
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("no checkpoint"))
+
return
}
+
writeJSON(w, http.StatusOK, cp)
}
func (s *Server) handleCheckpointBySequence(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/checkpoints/")
+
if path == "" {
writeErr(w, http.StatusNotFound, fmt.Errorf("missing checkpoint sequence"))
+
return
}
+
seq, err := strconv.ParseUint(path, 10, 64)
+
if err != nil {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid checkpoint sequence"))
+
return
}
+
cp, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("checkpoint not found"))
+
return
}
+
writeJSON(w, http.StatusOK, cp)
}
func (s *Server) handleDID(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/did/")
+
if path == "" {
writeErr(w, http.StatusBadRequest, fmt.Errorf("missing did"))
+
return
}
+
if strings.HasSuffix(path, "/proof") {
did := strings.TrimSuffix(path, "/proof")
+
if !s.allowRequest(r, limiterProof) {
writeErr(w, http.StatusTooManyRequests, fmt.Errorf("proof rate limit exceeded"))
+
return
}
+
s.handleDIDProof(w, r, did)
+
return
}
+
if !s.allowRequest(r, limiterResolve) {
writeErr(w, http.StatusTooManyRequests, fmt.Errorf("resolve rate limit exceeded"))
+
return
}
+
s.handleDIDResolve(w, r, path)
}
func (s *Server) handleDIDResolve(w http.ResponseWriter, r *http.Request, did string) {
state, ok, err := s.store.GetState(did)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
cp, cpOK, err := s.store.GetLatestCheckpoint()
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
resp := map[string]any{
"did": did,
"did_document": json.RawMessage(state.DIDDocument),
"chain_tip_hash": state.ChainTipHash,
}
+
if cpOK {
resp["checkpoint_reference"] = map[string]any{
"sequence": cp.Sequence,
"checkpoint_hash": cp.CheckpointHash,
}
}
+
writeJSON(w, http.StatusOK, resp)
}
func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
if err := s.ingestor.CorruptionError(); err != nil {
writeErr(w, http.StatusServiceUnavailable, err)
+
return
}
cp, verifyCheckpointUnchanged, err := s.selectCheckpointForProof(r)
+
if err != nil {
writeErr(w, http.StatusBadRequest, err)
+
return
}
tipHash, seqs, err := s.ingestor.RecomputeTipAtOrBefore(r.Context(), did, cp.Sequence)
+
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusNotFound, err)
+
return
}
+
siblings, leafHash, found, err := s.checkpoints.BuildDIDProofAtCheckpoint(r.Context(), did, tipHash, cp.Sequence)
+
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !found {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not present in checkpoint state"))
+
return
}
leafBytes, err := hex.DecodeString(leafHash)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid leaf hash: %w", err))
+
return
}
+
root, err := hex.DecodeString(cp.DIDMerkleRoot)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("invalid checkpoint root"))
+
return
}
+
if !merkle.VerifyProof(leafBytes, siblings, root) {
writeErr(w, http.StatusInternalServerError, fmt.Errorf("inclusion proof failed consistency check"))
+
return
}
if err := verifyCheckpointUnchanged(); err != nil {
writeErr(w, http.StatusConflict, err)
+
return
}
@@ -265,6 +343,7 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri
CheckpointSig: cp.Signature,
CheckpointKeyID: cp.KeyID,
}
+
writeJSON(w, http.StatusOK, map[string]any{
"did": did,
"checkpoint_sequence": cp.Sequence,
@@ -287,28 +366,39 @@ type plcAuditEntry struct {
func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/")
+
if path == "" {
writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+
return
}
+
if path == "export" {
s.handleExportCompatibility(w, r)
+
return
}
+
parts := strings.Split(path, "/")
did := parts[0]
+
if !strings.HasPrefix(did, "did:") {
writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+
return
}
+
if r.Method == http.MethodPost && len(parts) == 1 {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror"))
+
return
}
+
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+
return
}
@@ -330,90 +420,129 @@ func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request)
func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) {
state, ok, err := s.store.GetState(did)
+
if err != nil {
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
if !ok {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
status := http.StatusOK
+
if isTombstonedDIDDocument(state.DIDDocument) {
status = http.StatusGone
}
+
w.Header().Set("Content-Type", "application/did+ld+json")
w.WriteHeader(status)
+
_, _ = w.Write(state.DIDDocument)
}
func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
ops := make([]json.RawMessage, 0, len(logEntries))
+
for _, rec := range logEntries {
ops = append(ops, rec.Operation)
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", ops)
}
func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
+
_, _ = w.Write(rec.Operation)
}
func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
audit := make([]plcAuditEntry, 0, len(logEntries))
+
for _, rec := range logEntries {
audit = append(audit, plcAuditEntry{
DID: did,
@@ -423,27 +552,37 @@ func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *htt
CreatedAt: rec.CreatedAt,
})
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", audit)
}
func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) {
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
+
data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did)
+
if err != nil {
if errors.Is(err, ingest.ErrDIDNotFound) {
writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+
return
}
+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
writeErr(w, http.StatusGatewayTimeout, err)
+
return
}
+
writeErr(w, http.StatusInternalServerError, err)
+
return
}
+
writeJSONWithContentType(w, http.StatusOK, "application/json", data)
}
@@ -451,38 +590,51 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques
if r.Method != http.MethodGet {
w.Header().Set("Allow", http.MethodGet)
writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+
return
}
+
if s.ingestor == nil {
writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+
return
}
count := 1000
+
if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" {
n, err := strconv.Atoi(rawCount)
+
if err != nil || n < 1 {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter"))
+
return
}
+
if n > 1000 {
n = 1000
}
+
count = n
}
var after time.Time
+
if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" {
parsed, err := time.Parse(time.RFC3339, rawAfter)
+
if err != nil {
writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter"))
+
return
}
+
after = parsed
}
w.Header().Set("Content-Type", "application/jsonlines")
w.WriteHeader(http.StatusOK)
+
flusher, _ := w.(http.Flusher)
enc := json.NewEncoder(w)
err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error {
@@ -493,14 +645,18 @@ func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Reques
Nullified: rec.Nullified,
CreatedAt: rec.CreatedAt,
}
+
if err := enc.Encode(entry); err != nil {
return err
}
+
if flusher != nil {
flusher.Flush()
}
+
return nil
})
+
if err != nil {
// Response has already started; best effort termination.
return
@@ -511,22 +667,30 @@ func isTombstonedDIDDocument(raw []byte) bool {
if len(raw) == 0 {
return false
}
+
var doc map[string]any
+
if err := json.Unmarshal(raw, &doc); err != nil {
return false
}
+
deactivated, _ := doc["deactivated"].(bool)
+
return deactivated
}
func (s *Server) withTimeout(next http.Handler) http.Handler {
timeout := s.cfg.RequestTimeout
+
if timeout <= 0 {
timeout = 10 * time.Second
}
+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), timeout)
+
defer cancel()
+
next.ServeHTTP(w, r.WithContext(ctx))
})
}
@@ -535,53 +699,70 @@ func (s *Server) allowRequest(r *http.Request, class limiterClass) bool {
if s.limiter == nil {
return true
}
+
return s.limiter.Allow(clientIP(r), class)
}
func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, func() error, error) {
checkpointParam := strings.TrimSpace(r.URL.Query().Get("checkpoint"))
+
if checkpointParam == "" {
cp, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
return types.CheckpointV1{}, nil, err
}
+
if !ok {
return types.CheckpointV1{}, nil, fmt.Errorf("no checkpoint available")
}
+
return cp, func() error {
now, ok, err := s.store.GetLatestCheckpoint()
+
if err != nil {
return err
}
+
if !ok {
return fmt.Errorf("latest checkpoint disappeared during request")
}
+
if now.CheckpointHash != cp.CheckpointHash {
return fmt.Errorf("checkpoint advanced during proof generation")
}
+
return nil
}, nil
}
seq, err := strconv.ParseUint(checkpointParam, 10, 64)
+
if err != nil {
return types.CheckpointV1{}, nil, fmt.Errorf("invalid checkpoint query parameter")
}
+
cp, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
return types.CheckpointV1{}, nil, err
}
+
if !ok {
return types.CheckpointV1{}, nil, fmt.Errorf("checkpoint %d unavailable", seq)
}
+
return cp, func() error {
again, ok, err := s.store.GetCheckpoint(seq)
+
if err != nil {
return err
}
+
if !ok || again.CheckpointHash != cp.CheckpointHash {
return fmt.Errorf("checkpoint %d changed during proof generation", seq)
}
+
return nil
}, nil
}
@@ -592,10 +773,13 @@ func writeJSON(w http.ResponseWriter, code int, v any) {
func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) {
w.Header().Set("Content-Type", "application/json")
+
if strings.TrimSpace(contentType) != "" {
w.Header().Set("Content-Type", contentType)
}
+
w.WriteHeader(code)
+
_ = json.NewEncoder(w).Encode(v)
}