aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:19:05 -0800
committerFuwn <[email protected]>2026-02-26 15:19:05 -0800
commit4c13bd523d4deb36d8e7dfce6d446f701674e073 (patch)
treed189cbbf09742c6b9408a5450db34dba34fe3074
parentfeat: harden launch readiness with versioning, metrics, and resilience (diff)
downloadplutia-test-4c13bd523d4deb36d8e7dfce6d446f701674e073.tar.xz
plutia-test-4c13bd523d4deb36d8e7dfce6d446f701674e073.zip
feat: add read-only PLC API compatibility endpoints
-rw-r--r--README.md24
-rw-r--r--internal/api/plc_compatibility_test.go232
-rw-r--r--internal/api/server.go251
-rw-r--r--internal/ingest/service.go186
-rw-r--r--internal/storage/pebble_store.go27
-rw-r--r--internal/storage/store.go1
6 files changed, 721 insertions, 0 deletions
diff --git a/README.md b/README.md
index ef52544..42e703c 100644
--- a/README.md
+++ b/README.md
@@ -76,6 +76,30 @@ go build -trimpath \
- `GET /checkpoints/latest`
- `GET /checkpoints/{sequence}`
+## PLC API Compatibility
+
+Plutia includes read-only compatibility endpoints for `plc.directory` API consumers:
+
+- `GET /{did}` (returns `application/did+ld+json`)
+- `GET /{did}/log`
+- `GET /{did}/log/last`
+- `GET /{did}/log/audit`
+- `GET /{did}/data`
+- `GET /export` (NDJSON, `application/jsonlines`, supports `count` up to `1000`, and `after` RFC3339 filtering based on ingested operation timestamps)
+
+For audit/export compatibility fields, `createdAt` is sourced from the mirror's recorded ingest timestamp for each operation reference.
+
+Write behavior is intentionally unsupported:
+
+- `POST /{did}` returns `405 Method Not Allowed` with `Allow: GET`
+
+Verification features are additive extensions and remain available under:
+
+- `GET /did/{did}`
+- `GET /did/{did}/proof`
+- `GET /checkpoints/latest`
+- `GET /checkpoints/{sequence}`
+
## Metrics and Observability
Prometheus series exposed at `/metrics` include:
diff --git a/internal/api/plc_compatibility_test.go b/internal/api/plc_compatibility_test.go
new file mode 100644
index 0000000..67cbafa
--- /dev/null
+++ b/internal/api/plc_compatibility_test.go
@@ -0,0 +1,232 @@
+package api
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/base64"
+ "encoding/json"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "path/filepath"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) {
+ ts, store, _, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ state, ok, err := store.GetState("did:plc:alice")
+ if err != nil {
+ t.Fatalf("get state: %v", err)
+ }
+ if !ok {
+ t.Fatalf("state not found")
+ }
+
+ resp, err := http.Get(ts.URL + "/did:plc:alice")
+ if err != nil {
+ t.Fatalf("get did: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("status: got %d want 200", resp.StatusCode)
+ }
+ if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") {
+ t.Fatalf("content-type mismatch: %s", got)
+ }
+ body, _ := io.ReadAll(resp.Body)
+ if strings.TrimSpace(string(body)) != strings.TrimSpace(string(state.DIDDocument)) {
+ t.Fatalf("did document mismatch\n got: %s\nwant: %s", string(body), string(state.DIDDocument))
+ }
+}
+
+func TestPLCCompatibilityGetLogOrdered(t *testing.T) {
+ ts, _, recs, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ resp, err := http.Get(ts.URL + "/did:plc:alice/log")
+ if err != nil {
+ t.Fatalf("get log: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("status: got %d want 200", resp.StatusCode)
+ }
+ var ops []map[string]any
+ if err := json.NewDecoder(resp.Body).Decode(&ops); err != nil {
+ t.Fatalf("decode log: %v", err)
+ }
+ if len(ops) != 2 {
+ t.Fatalf("log length mismatch: got %d want 2", len(ops))
+ }
+ if _, ok := ops[0]["prev"]; ok {
+ t.Fatalf("first op should be genesis without prev")
+ }
+ if prev, _ := ops[1]["prev"].(string); prev != recs[0].CID {
+ t.Fatalf("second op prev mismatch: got %q want %q", prev, recs[0].CID)
+ }
+}
+
+func TestPLCCompatibilityExportCount(t *testing.T) {
+ ts, _, _, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ resp, err := http.Get(ts.URL + "/export?count=2")
+ if err != nil {
+ t.Fatalf("get export: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ t.Fatalf("status: got %d want 200", resp.StatusCode)
+ }
+ if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/jsonlines") {
+ t.Fatalf("content-type mismatch: %s", got)
+ }
+ body, _ := io.ReadAll(resp.Body)
+ lines := strings.Split(strings.TrimSpace(string(body)), "\n")
+ if len(lines) != 2 {
+ t.Fatalf("line count mismatch: got %d want 2", len(lines))
+ }
+ for _, line := range lines {
+ var entry map[string]any
+ if err := json.Unmarshal([]byte(line), &entry); err != nil {
+ t.Fatalf("decode export line: %v", err)
+ }
+ for _, key := range []string{"did", "operation", "cid", "nullified", "createdAt"} {
+ if _, ok := entry[key]; !ok {
+ t.Fatalf("missing export key %q in %v", key, entry)
+ }
+ }
+ }
+}
+
+func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) {
+ ts, _, _, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ req, err := http.NewRequest(http.MethodPost, ts.URL+"/did:plc:alice", strings.NewReader(`{}`))
+ if err != nil {
+ t.Fatalf("new request: %v", err)
+ }
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("post did: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusMethodNotAllowed {
+ t.Fatalf("status: got %d want 405", resp.StatusCode)
+ }
+ if allow := resp.Header.Get("Allow"); allow != http.MethodGet {
+ t.Fatalf("allow header mismatch: got %q want %q", allow, http.MethodGet)
+ }
+}
+
+func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) {
+ ts, _, _, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ resp, err := http.Get(ts.URL + "/did:plc:alice")
+ if err != nil {
+ t.Fatalf("get did: %v", err)
+ }
+ defer resp.Body.Close()
+ body, _ := io.ReadAll(resp.Body)
+ if strings.Contains(string(body), "checkpoint_reference") {
+ t.Fatalf("compatibility endpoint leaked verification metadata: %s", string(body))
+ }
+}
+
+func TestPLCCompatibilityProofEndpointStillWorks(t *testing.T) {
+ ts, _, _, cleanup := newCompatibilityServer(t)
+ defer cleanup()
+
+ resp, err := http.Get(ts.URL + "/did/did:plc:alice/proof")
+ if err != nil {
+ t.Fatalf("get proof: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ body, _ := io.ReadAll(resp.Body)
+ t.Fatalf("proof status: got %d want 200 body=%s", resp.StatusCode, string(body))
+ }
+}
+
+func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStore, []types.ExportRecord, func()) {
+ t.Helper()
+ tmp := t.TempDir()
+ dataDir := filepath.Join(tmp, "data")
+ if err := os.MkdirAll(dataDir, 0o755); err != nil {
+ t.Fatalf("mkdir data: %v", err)
+ }
+
+ seed := make([]byte, ed25519.SeedSize)
+ if _, err := rand.Read(seed); err != nil {
+ t.Fatalf("seed: %v", err)
+ }
+ keyPath := filepath.Join(tmp, "mirror.key")
+ if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil {
+ t.Fatalf("write key: %v", err)
+ }
+
+ recs := buildCheckpointScenarioRecords(t)
+ sourcePath := filepath.Join(tmp, "records.ndjson")
+ writeRecordsFile(t, sourcePath, recs)
+
+ store, err := storage.OpenPebble(dataDir)
+ if err != nil {
+ t.Fatalf("open pebble: %v", err)
+ }
+ if err := store.SetMode(config.ModeMirror); err != nil {
+ t.Fatalf("set mode: %v", err)
+ }
+ bl, err := storage.OpenBlockLog(dataDir, 3, 4)
+ if err != nil {
+ t.Fatalf("open block log: %v", err)
+ }
+
+ cfg := config.Config{
+ Mode: config.ModeMirror,
+ DataDir: dataDir,
+ PLCSource: sourcePath,
+ VerifyPolicy: config.VerifyFull,
+ ZstdLevel: 3,
+ BlockSizeMB: 4,
+ CheckpointInterval: 2,
+ CommitBatchSize: 2,
+ VerifyWorkers: 2,
+ ListenAddr: ":0",
+ MirrorPrivateKeyPath: keyPath,
+ PollInterval: 5 * time.Second,
+ RequestTimeout: 10 * time.Second,
+ }
+ cpMgr := checkpoint.NewManager(store, dataDir, keyPath)
+ svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr)
+ if err := svc.Replay(context.Background()); err != nil {
+ t.Fatalf("replay: %v", err)
+ }
+ if err := svc.Flush(context.Background()); err != nil {
+ t.Fatalf("flush: %v", err)
+ }
+ if _, err := svc.Snapshot(context.Background()); err != nil {
+ t.Fatalf("snapshot: %v", err)
+ }
+
+ ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler())
+ cleanup := func() {
+ ts.Close()
+ svc.Close()
+ _ = store.Close()
+ }
+ return ts, store, recs, cleanup
+}
diff --git a/internal/api/server.go b/internal/api/server.go
index f1ffc7c..f773145 100644
--- a/internal/api/server.go
+++ b/internal/api/server.go
@@ -65,6 +65,8 @@ func (s *Server) Handler() http.Handler {
mux.Handle("/checkpoints/latest", s.withTimeout(http.HandlerFunc(s.handleLatestCheckpoint)))
mux.Handle("/checkpoints/", s.withTimeout(http.HandlerFunc(s.handleCheckpointBySequence)))
mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID)))
+ mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility)))
+ mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility)))
return mux
}
@@ -275,6 +277,248 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri
})
}
+type plcAuditEntry struct {
+ DID string `json:"did"`
+ Operation json.RawMessage `json:"operation"`
+ CID string `json:"cid"`
+ Nullified bool `json:"nullified"`
+ CreatedAt string `json:"createdAt"`
+}
+
+func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) {
+ path := strings.TrimPrefix(r.URL.Path, "/")
+ if path == "" {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+ return
+ }
+ if path == "export" {
+ s.handleExportCompatibility(w, r)
+ return
+ }
+ parts := strings.Split(path, "/")
+ did := parts[0]
+ if !strings.HasPrefix(did, "did:") {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+ return
+ }
+ if r.Method == http.MethodPost && len(parts) == 1 {
+ w.Header().Set("Allow", http.MethodGet)
+ writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror"))
+ return
+ }
+ if r.Method != http.MethodGet {
+ w.Header().Set("Allow", http.MethodGet)
+ writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+ return
+ }
+
+ switch {
+ case len(parts) == 1:
+ s.handleGetDIDCompatibility(w, did)
+ case len(parts) == 2 && parts[1] == "data":
+ s.handleGetDIDDataCompatibility(w, r, did)
+ case len(parts) == 2 && parts[1] == "log":
+ s.handleGetDIDLogCompatibility(w, r, did)
+ case len(parts) == 3 && parts[1] == "log" && parts[2] == "last":
+ s.handleGetDIDLogLastCompatibility(w, r, did)
+ case len(parts) == 3 && parts[1] == "log" && parts[2] == "audit":
+ s.handleGetDIDLogAuditCompatibility(w, r, did)
+ default:
+ writeErr(w, http.StatusNotFound, fmt.Errorf("not found"))
+ }
+}
+
+func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) {
+ state, ok, err := s.store.GetState(did)
+ if err != nil {
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ if !ok {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ status := http.StatusOK
+ if isTombstonedDIDDocument(state.DIDDocument) {
+ status = http.StatusGone
+ }
+ w.Header().Set("Content-Type", "application/did+ld+json")
+ w.WriteHeader(status)
+ _, _ = w.Write(state.DIDDocument)
+}
+
+func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) {
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
+ logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ ops := make([]json.RawMessage, 0, len(logEntries))
+ for _, rec := range logEntries {
+ ops = append(ops, rec.Operation)
+ }
+ writeJSONWithContentType(w, http.StatusOK, "application/json", ops)
+}
+
+func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) {
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
+ rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write(rec.Operation)
+}
+
+func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) {
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
+ logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ audit := make([]plcAuditEntry, 0, len(logEntries))
+ for _, rec := range logEntries {
+ audit = append(audit, plcAuditEntry{
+ DID: did,
+ Operation: rec.Operation,
+ CID: rec.CID,
+ Nullified: rec.Nullified,
+ CreatedAt: rec.CreatedAt,
+ })
+ }
+ writeJSONWithContentType(w, http.StatusOK, "application/json", audit)
+}
+
+func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) {
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
+ data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did)
+ if err != nil {
+ if errors.Is(err, ingest.ErrDIDNotFound) {
+ writeErr(w, http.StatusNotFound, fmt.Errorf("did not found"))
+ return
+ }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ writeErr(w, http.StatusGatewayTimeout, err)
+ return
+ }
+ writeErr(w, http.StatusInternalServerError, err)
+ return
+ }
+ writeJSONWithContentType(w, http.StatusOK, "application/json", data)
+}
+
+func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Request) {
+ if r.Method != http.MethodGet {
+ w.Header().Set("Allow", http.MethodGet)
+ writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed"))
+ return
+ }
+ if s.ingestor == nil {
+ writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable"))
+ return
+ }
+
+ count := 1000
+ if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" {
+ n, err := strconv.Atoi(rawCount)
+ if err != nil || n < 1 {
+ writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter"))
+ return
+ }
+ if n > 1000 {
+ n = 1000
+ }
+ count = n
+ }
+
+ var after time.Time
+ if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" {
+ parsed, err := time.Parse(time.RFC3339, rawAfter)
+ if err != nil {
+ writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter"))
+ return
+ }
+ after = parsed
+ }
+
+ w.Header().Set("Content-Type", "application/jsonlines")
+ w.WriteHeader(http.StatusOK)
+ flusher, _ := w.(http.Flusher)
+ enc := json.NewEncoder(w)
+ err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error {
+ entry := plcAuditEntry{
+ DID: rec.DID,
+ Operation: rec.Operation,
+ CID: rec.CID,
+ Nullified: rec.Nullified,
+ CreatedAt: rec.CreatedAt,
+ }
+ if err := enc.Encode(entry); err != nil {
+ return err
+ }
+ if flusher != nil {
+ flusher.Flush()
+ }
+ return nil
+ })
+ if err != nil {
+ // Response has already started; best effort termination.
+ return
+ }
+}
+
+func isTombstonedDIDDocument(raw []byte) bool {
+ if len(raw) == 0 {
+ return false
+ }
+ var doc map[string]any
+ if err := json.Unmarshal(raw, &doc); err != nil {
+ return false
+ }
+ deactivated, _ := doc["deactivated"].(bool)
+ return deactivated
+}
+
func (s *Server) withTimeout(next http.Handler) http.Handler {
timeout := s.cfg.RequestTimeout
if timeout <= 0 {
@@ -343,7 +587,14 @@ func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1,
}
func writeJSON(w http.ResponseWriter, code int, v any) {
+ writeJSONWithContentType(w, code, "application/json", v)
+}
+
+func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) {
w.Header().Set("Content-Type", "application/json")
+ if strings.TrimSpace(contentType) != "" {
+ w.Header().Set("Content-Type", contentType)
+ }
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
diff --git a/internal/ingest/service.go b/internal/ingest/service.go
index 817a8bd..f6b2433 100644
--- a/internal/ingest/service.go
+++ b/internal/ingest/service.go
@@ -2,6 +2,7 @@ package ingest
import (
"context"
+ "encoding/json"
"errors"
"fmt"
"hash/fnv"
@@ -56,6 +57,11 @@ type MetricsSink interface {
ObserveCheckpoint(duration time.Duration, sequence uint64)
}
+var (
+ ErrDIDNotFound = errors.New("did not found")
+ ErrHistoryNotStored = errors.New("history not available in current mode")
+)
+
func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service {
s := &Service{
cfg: cfg,
@@ -631,6 +637,186 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen
return tip, filtered, nil
}
+func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) {
+ if err := s.CorruptionError(); err != nil {
+ return nil, err
+ }
+ if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
+ return nil, ErrHistoryNotStored
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return nil, err
+ }
+ if len(seqs) == 0 {
+ return nil, ErrDIDNotFound
+ }
+ out := make([]types.ExportRecord, 0, len(seqs))
+ for _, seq := range seqs {
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+ ref, ok, err := s.store.GetOpSeqRef(seq)
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ return nil, fmt.Errorf("missing op reference for seq %d", seq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return nil, err
+ }
+ out = append(out, types.ExportRecord{
+ Seq: seq,
+ DID: did,
+ CreatedAt: ref.Received,
+ CID: ref.CID,
+ Nullified: detectNullified(payload),
+ Operation: json.RawMessage(payload),
+ })
+ }
+ return out, nil
+}
+
+func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types.ExportRecord, error) {
+ if err := s.CorruptionError(); err != nil {
+ return types.ExportRecord{}, err
+ }
+ if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
+ return types.ExportRecord{}, ErrHistoryNotStored
+ }
+ seqs, err := s.store.ListDIDSequences(did)
+ if err != nil {
+ return types.ExportRecord{}, err
+ }
+ if len(seqs) == 0 {
+ return types.ExportRecord{}, ErrDIDNotFound
+ }
+ lastSeq := seqs[len(seqs)-1]
+ if err := ctx.Err(); err != nil {
+ return types.ExportRecord{}, err
+ }
+ ref, ok, err := s.store.GetOpSeqRef(lastSeq)
+ if err != nil {
+ return types.ExportRecord{}, err
+ }
+ if !ok {
+ return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq)
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return types.ExportRecord{}, err
+ }
+ return types.ExportRecord{
+ Seq: lastSeq,
+ DID: did,
+ CreatedAt: ref.Received,
+ CID: ref.CID,
+ Nullified: detectNullified(payload),
+ Operation: json.RawMessage(payload),
+ }, nil
+}
+
+func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[string]any, error) {
+ if err := s.CorruptionError(); err != nil {
+ return nil, err
+ }
+ if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
+ state, ok, err := s.store.GetState(did)
+ if err != nil {
+ return nil, err
+ }
+ if !ok {
+ return nil, ErrDIDNotFound
+ }
+ var doc map[string]any
+ if err := json.Unmarshal(state.DIDDocument, &doc); err != nil {
+ return nil, err
+ }
+ return doc, nil
+ }
+ last, err := s.LoadLatestDIDOperation(ctx, did)
+ if err != nil {
+ return nil, err
+ }
+ var op map[string]any
+ if err := json.Unmarshal(last.Operation, &op); err != nil {
+ return nil, fmt.Errorf("decode latest operation: %w", err)
+ }
+ delete(op, "sig")
+ delete(op, "signature")
+ delete(op, "sigPayload")
+ delete(op, "signaturePayload")
+ return op, nil
+}
+
+func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, emit func(types.ExportRecord) error) error {
+ if err := s.CorruptionError(); err != nil {
+ return err
+ }
+ if s.cfg.Mode != config.ModeMirror || s.blockLog == nil {
+ return nil
+ }
+ if limit <= 0 {
+ limit = 1000
+ }
+ const maxCount = 1000
+ if limit > maxCount {
+ limit = maxCount
+ }
+ afterSet := !after.IsZero()
+ emitted := 0
+ stop := errors.New("stop export iteration")
+ err := s.store.ForEachOpSeqRef(func(seq uint64, ref types.BlockRefV1) error {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if emitted >= limit {
+ return stop
+ }
+ if afterSet {
+ if strings.TrimSpace(ref.Received) == "" {
+ return nil
+ }
+ ts, err := time.Parse(time.RFC3339, ref.Received)
+ if err != nil {
+ return nil
+ }
+ if !ts.After(after) {
+ return nil
+ }
+ }
+ payload, err := s.blockLog.ReadRecord(ref)
+ if err != nil {
+ return err
+ }
+ rec := types.ExportRecord{
+ Seq: seq,
+ DID: ref.DID,
+ CreatedAt: ref.Received,
+ CID: ref.CID,
+ Nullified: detectNullified(payload),
+ Operation: json.RawMessage(payload),
+ }
+ emitted++
+ return emit(rec)
+ })
+ if err == nil || errors.Is(err, stop) {
+ return nil
+ }
+ return err
+}
+
+func detectNullified(operation []byte) bool {
+ var payload map[string]any
+ if err := json.Unmarshal(operation, &payload); err != nil {
+ return false
+ }
+ v, _ := payload["nullified"].(bool)
+ return v
+}
+
func (s *Service) Flush(ctx context.Context) error {
_ = ctx
if err := s.CorruptionError(); err != nil {
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go
index 5e15d7e..109310f 100644
--- a/internal/storage/pebble_store.go
+++ b/internal/storage/pebble_store.go
@@ -228,6 +228,33 @@ func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) {
return ref, true, nil
}
+func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error {
+ iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")})
+ if err != nil {
+ return fmt.Errorf("new iterator: %w", err)
+ }
+ defer iter.Close()
+
+ for iter.First(); iter.Valid(); iter.Next() {
+ key := iter.Key()
+ if len(key) < len("opseq:")+8 {
+ continue
+ }
+ seq := binary.BigEndian.Uint64(key[len("opseq:"):])
+ var ref types.BlockRefV1
+ if err := json.Unmarshal(iter.Value(), &ref); err != nil {
+ return fmt.Errorf("unmarshal opseq ref: %w", err)
+ }
+ if err := fn(seq, ref); err != nil {
+ return err
+ }
+ }
+ if err := iter.Error(); err != nil {
+ return fmt.Errorf("iterate opseq refs: %w", err)
+ }
+ return nil
+}
+
func (p *PebbleStore) PutBlockHash(blockID uint64, hash string) error {
return p.db.Set(blockKey(blockID), []byte(hash), pebble.Sync)
}
diff --git a/internal/storage/store.go b/internal/storage/store.go
index 59055b7..05ce5ae 100644
--- a/internal/storage/store.go
+++ b/internal/storage/store.go
@@ -34,6 +34,7 @@ type Store interface {
PutOpSeqRef(seq uint64, ref types.BlockRefV1) error
GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error)
+ ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error
PutBlockHash(blockID uint64, hash string) error
GetBlockHash(blockID uint64) (string, bool, error)