diff options
Diffstat (limited to 'internal')
| -rw-r--r-- | internal/api/plc_compatibility_test.go | 232 | ||||
| -rw-r--r-- | internal/api/server.go | 251 | ||||
| -rw-r--r-- | internal/ingest/service.go | 186 | ||||
| -rw-r--r-- | internal/storage/pebble_store.go | 27 | ||||
| -rw-r--r-- | internal/storage/store.go | 1 |
5 files changed, 697 insertions, 0 deletions
diff --git a/internal/api/plc_compatibility_test.go b/internal/api/plc_compatibility_test.go new file mode 100644 index 0000000..67cbafa --- /dev/null +++ b/internal/api/plc_compatibility_test.go @@ -0,0 +1,232 @@ +package api + +import ( + "context" + "crypto/ed25519" + "crypto/rand" + "encoding/base64" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/ingest" + "github.com/Fuwn/plutia/internal/storage" + "github.com/Fuwn/plutia/internal/types" +) + +func TestPLCCompatibilityGetDIDMatchesStoredDocument(t *testing.T) { + ts, store, _, cleanup := newCompatibilityServer(t) + defer cleanup() + + state, ok, err := store.GetState("did:plc:alice") + if err != nil { + t.Fatalf("get state: %v", err) + } + if !ok { + t.Fatalf("state not found") + } + + resp, err := http.Get(ts.URL + "/did:plc:alice") + if err != nil { + t.Fatalf("get did: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status: got %d want 200", resp.StatusCode) + } + if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/did+ld+json") { + t.Fatalf("content-type mismatch: %s", got) + } + body, _ := io.ReadAll(resp.Body) + if strings.TrimSpace(string(body)) != strings.TrimSpace(string(state.DIDDocument)) { + t.Fatalf("did document mismatch\n got: %s\nwant: %s", string(body), string(state.DIDDocument)) + } +} + +func TestPLCCompatibilityGetLogOrdered(t *testing.T) { + ts, _, recs, cleanup := newCompatibilityServer(t) + defer cleanup() + + resp, err := http.Get(ts.URL + "/did:plc:alice/log") + if err != nil { + t.Fatalf("get log: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status: got %d want 200", resp.StatusCode) + } + var ops []map[string]any + if err := json.NewDecoder(resp.Body).Decode(&ops); err != nil { + t.Fatalf("decode log: %v", err) + } + if len(ops) != 2 { + t.Fatalf("log length mismatch: got %d want 2", len(ops)) + } + if _, ok := ops[0]["prev"]; ok { + t.Fatalf("first op should be genesis without prev") + } + if prev, _ := ops[1]["prev"].(string); prev != recs[0].CID { + t.Fatalf("second op prev mismatch: got %q want %q", prev, recs[0].CID) + } +} + +func TestPLCCompatibilityExportCount(t *testing.T) { + ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() + + resp, err := http.Get(ts.URL + "/export?count=2") + if err != nil { + t.Fatalf("get export: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status: got %d want 200", resp.StatusCode) + } + if got := resp.Header.Get("Content-Type"); !strings.Contains(got, "application/jsonlines") { + t.Fatalf("content-type mismatch: %s", got) + } + body, _ := io.ReadAll(resp.Body) + lines := strings.Split(strings.TrimSpace(string(body)), "\n") + if len(lines) != 2 { + t.Fatalf("line count mismatch: got %d want 2", len(lines)) + } + for _, line := range lines { + var entry map[string]any + if err := json.Unmarshal([]byte(line), &entry); err != nil { + t.Fatalf("decode export line: %v", err) + } + for _, key := range []string{"did", "operation", "cid", "nullified", "createdAt"} { + if _, ok := entry[key]; !ok { + t.Fatalf("missing export key %q in %v", key, entry) + } + } + } +} + +func TestPLCCompatibilityPostIsMethodNotAllowed(t *testing.T) { + ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() + + req, err := http.NewRequest(http.MethodPost, ts.URL+"/did:plc:alice", strings.NewReader(`{}`)) + if err != nil { + t.Fatalf("new request: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("post did: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusMethodNotAllowed { + t.Fatalf("status: got %d want 405", resp.StatusCode) + } + if allow := resp.Header.Get("Allow"); allow != http.MethodGet { + t.Fatalf("allow header mismatch: got %q want %q", allow, http.MethodGet) + } +} + +func TestPLCCompatibilityNoVerificationMetadataLeak(t *testing.T) { + ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() + + resp, err := http.Get(ts.URL + "/did:plc:alice") + if err != nil { + t.Fatalf("get did: %v", err) + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if strings.Contains(string(body), "checkpoint_reference") { + t.Fatalf("compatibility endpoint leaked verification metadata: %s", string(body)) + } +} + +func TestPLCCompatibilityProofEndpointStillWorks(t *testing.T) { + ts, _, _, cleanup := newCompatibilityServer(t) + defer cleanup() + + resp, err := http.Get(ts.URL + "/did/did:plc:alice/proof") + if err != nil { + t.Fatalf("get proof: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("proof status: got %d want 200 body=%s", resp.StatusCode, string(body)) + } +} + +func newCompatibilityServer(t *testing.T) (*httptest.Server, *storage.PebbleStore, []types.ExportRecord, func()) { + t.Helper() + tmp := t.TempDir() + dataDir := filepath.Join(tmp, "data") + if err := os.MkdirAll(dataDir, 0o755); err != nil { + t.Fatalf("mkdir data: %v", err) + } + + seed := make([]byte, ed25519.SeedSize) + if _, err := rand.Read(seed); err != nil { + t.Fatalf("seed: %v", err) + } + keyPath := filepath.Join(tmp, "mirror.key") + if err := os.WriteFile(keyPath, []byte(base64.RawURLEncoding.EncodeToString(seed)), 0o600); err != nil { + t.Fatalf("write key: %v", err) + } + + recs := buildCheckpointScenarioRecords(t) + sourcePath := filepath.Join(tmp, "records.ndjson") + writeRecordsFile(t, sourcePath, recs) + + store, err := storage.OpenPebble(dataDir) + if err != nil { + t.Fatalf("open pebble: %v", err) + } + if err := store.SetMode(config.ModeMirror); err != nil { + t.Fatalf("set mode: %v", err) + } + bl, err := storage.OpenBlockLog(dataDir, 3, 4) + if err != nil { + t.Fatalf("open block log: %v", err) + } + + cfg := config.Config{ + Mode: config.ModeMirror, + DataDir: dataDir, + PLCSource: sourcePath, + VerifyPolicy: config.VerifyFull, + ZstdLevel: 3, + BlockSizeMB: 4, + CheckpointInterval: 2, + CommitBatchSize: 2, + VerifyWorkers: 2, + ListenAddr: ":0", + MirrorPrivateKeyPath: keyPath, + PollInterval: 5 * time.Second, + RequestTimeout: 10 * time.Second, + } + cpMgr := checkpoint.NewManager(store, dataDir, keyPath) + svc := ingest.NewService(cfg, store, ingest.NewClient(sourcePath), bl, cpMgr) + if err := svc.Replay(context.Background()); err != nil { + t.Fatalf("replay: %v", err) + } + if err := svc.Flush(context.Background()); err != nil { + t.Fatalf("flush: %v", err) + } + if _, err := svc.Snapshot(context.Background()); err != nil { + t.Fatalf("snapshot: %v", err) + } + + ts := httptest.NewServer(NewServer(cfg, store, svc, cpMgr).Handler()) + cleanup := func() { + ts.Close() + svc.Close() + _ = store.Close() + } + return ts, store, recs, cleanup +} diff --git a/internal/api/server.go b/internal/api/server.go index f1ffc7c..f773145 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -65,6 +65,8 @@ func (s *Server) Handler() http.Handler { mux.Handle("/checkpoints/latest", s.withTimeout(http.HandlerFunc(s.handleLatestCheckpoint))) mux.Handle("/checkpoints/", s.withTimeout(http.HandlerFunc(s.handleCheckpointBySequence))) mux.Handle("/did/", s.withTimeout(http.HandlerFunc(s.handleDID))) + mux.Handle("/export", s.withTimeout(http.HandlerFunc(s.handleExportCompatibility))) + mux.Handle("/", s.withTimeout(http.HandlerFunc(s.handlePLCCompatibility))) return mux } @@ -275,6 +277,248 @@ func (s *Server) handleDIDProof(w http.ResponseWriter, r *http.Request, did stri }) } +type plcAuditEntry struct { + DID string `json:"did"` + Operation json.RawMessage `json:"operation"` + CID string `json:"cid"` + Nullified bool `json:"nullified"` + CreatedAt string `json:"createdAt"` +} + +func (s *Server) handlePLCCompatibility(w http.ResponseWriter, r *http.Request) { + path := strings.TrimPrefix(r.URL.Path, "/") + if path == "" { + writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return + } + if path == "export" { + s.handleExportCompatibility(w, r) + return + } + parts := strings.Split(path, "/") + did := parts[0] + if !strings.HasPrefix(did, "did:") { + writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + return + } + if r.Method == http.MethodPost && len(parts) == 1 { + w.Header().Set("Allow", http.MethodGet) + writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("write operations are not supported by this mirror")) + return + } + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return + } + + switch { + case len(parts) == 1: + s.handleGetDIDCompatibility(w, did) + case len(parts) == 2 && parts[1] == "data": + s.handleGetDIDDataCompatibility(w, r, did) + case len(parts) == 2 && parts[1] == "log": + s.handleGetDIDLogCompatibility(w, r, did) + case len(parts) == 3 && parts[1] == "log" && parts[2] == "last": + s.handleGetDIDLogLastCompatibility(w, r, did) + case len(parts) == 3 && parts[1] == "log" && parts[2] == "audit": + s.handleGetDIDLogAuditCompatibility(w, r, did) + default: + writeErr(w, http.StatusNotFound, fmt.Errorf("not found")) + } +} + +func (s *Server) handleGetDIDCompatibility(w http.ResponseWriter, did string) { + state, ok, err := s.store.GetState(did) + if err != nil { + writeErr(w, http.StatusInternalServerError, err) + return + } + if !ok { + writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return + } + status := http.StatusOK + if isTombstonedDIDDocument(state.DIDDocument) { + status = http.StatusGone + } + w.Header().Set("Content-Type", "application/did+ld+json") + w.WriteHeader(status) + _, _ = w.Write(state.DIDDocument) +} + +func (s *Server) handleGetDIDLogCompatibility(w http.ResponseWriter, r *http.Request, did string) { + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { + writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } + writeErr(w, http.StatusInternalServerError, err) + return + } + ops := make([]json.RawMessage, 0, len(logEntries)) + for _, rec := range logEntries { + ops = append(ops, rec.Operation) + } + writeJSONWithContentType(w, http.StatusOK, "application/json", ops) +} + +func (s *Server) handleGetDIDLogLastCompatibility(w http.ResponseWriter, r *http.Request, did string) { + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } + rec, err := s.ingestor.LoadLatestDIDOperation(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { + writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } + writeErr(w, http.StatusInternalServerError, err) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(rec.Operation) +} + +func (s *Server) handleGetDIDLogAuditCompatibility(w http.ResponseWriter, r *http.Request, did string) { + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } + logEntries, err := s.ingestor.LoadDIDLog(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) || errors.Is(err, ingest.ErrHistoryNotStored) { + writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } + writeErr(w, http.StatusInternalServerError, err) + return + } + audit := make([]plcAuditEntry, 0, len(logEntries)) + for _, rec := range logEntries { + audit = append(audit, plcAuditEntry{ + DID: did, + Operation: rec.Operation, + CID: rec.CID, + Nullified: rec.Nullified, + CreatedAt: rec.CreatedAt, + }) + } + writeJSONWithContentType(w, http.StatusOK, "application/json", audit) +} + +func (s *Server) handleGetDIDDataCompatibility(w http.ResponseWriter, r *http.Request, did string) { + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } + data, err := s.ingestor.LoadCurrentPLCData(r.Context(), did) + if err != nil { + if errors.Is(err, ingest.ErrDIDNotFound) { + writeErr(w, http.StatusNotFound, fmt.Errorf("did not found")) + return + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + writeErr(w, http.StatusGatewayTimeout, err) + return + } + writeErr(w, http.StatusInternalServerError, err) + return + } + writeJSONWithContentType(w, http.StatusOK, "application/json", data) +} + +func (s *Server) handleExportCompatibility(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.Header().Set("Allow", http.MethodGet) + writeErr(w, http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + return + } + if s.ingestor == nil { + writeErr(w, http.StatusServiceUnavailable, fmt.Errorf("ingestor unavailable")) + return + } + + count := 1000 + if rawCount := strings.TrimSpace(r.URL.Query().Get("count")); rawCount != "" { + n, err := strconv.Atoi(rawCount) + if err != nil || n < 1 { + writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid count query parameter")) + return + } + if n > 1000 { + n = 1000 + } + count = n + } + + var after time.Time + if rawAfter := strings.TrimSpace(r.URL.Query().Get("after")); rawAfter != "" { + parsed, err := time.Parse(time.RFC3339, rawAfter) + if err != nil { + writeErr(w, http.StatusBadRequest, fmt.Errorf("invalid after query parameter")) + return + } + after = parsed + } + + w.Header().Set("Content-Type", "application/jsonlines") + w.WriteHeader(http.StatusOK) + flusher, _ := w.(http.Flusher) + enc := json.NewEncoder(w) + err := s.ingestor.StreamExport(r.Context(), after, count, func(rec types.ExportRecord) error { + entry := plcAuditEntry{ + DID: rec.DID, + Operation: rec.Operation, + CID: rec.CID, + Nullified: rec.Nullified, + CreatedAt: rec.CreatedAt, + } + if err := enc.Encode(entry); err != nil { + return err + } + if flusher != nil { + flusher.Flush() + } + return nil + }) + if err != nil { + // Response has already started; best effort termination. + return + } +} + +func isTombstonedDIDDocument(raw []byte) bool { + if len(raw) == 0 { + return false + } + var doc map[string]any + if err := json.Unmarshal(raw, &doc); err != nil { + return false + } + deactivated, _ := doc["deactivated"].(bool) + return deactivated +} + func (s *Server) withTimeout(next http.Handler) http.Handler { timeout := s.cfg.RequestTimeout if timeout <= 0 { @@ -343,7 +587,14 @@ func (s *Server) selectCheckpointForProof(r *http.Request) (types.CheckpointV1, } func writeJSON(w http.ResponseWriter, code int, v any) { + writeJSONWithContentType(w, code, "application/json", v) +} + +func writeJSONWithContentType(w http.ResponseWriter, code int, contentType string, v any) { w.Header().Set("Content-Type", "application/json") + if strings.TrimSpace(contentType) != "" { + w.Header().Set("Content-Type", contentType) + } w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } diff --git a/internal/ingest/service.go b/internal/ingest/service.go index 817a8bd..f6b2433 100644 --- a/internal/ingest/service.go +++ b/internal/ingest/service.go @@ -2,6 +2,7 @@ package ingest import ( "context" + "encoding/json" "errors" "fmt" "hash/fnv" @@ -56,6 +57,11 @@ type MetricsSink interface { ObserveCheckpoint(duration time.Duration, sequence uint64) } +var ( + ErrDIDNotFound = errors.New("did not found") + ErrHistoryNotStored = errors.New("history not available in current mode") +) + func NewService(cfg config.Config, store storage.Store, client *Client, blockLog *storage.BlockLog, checkpointMgr *checkpoint.Manager) *Service { s := &Service{ cfg: cfg, @@ -631,6 +637,186 @@ func (s *Service) RecomputeTipAtOrBefore(ctx context.Context, did string, sequen return tip, filtered, nil } +func (s *Service) LoadDIDLog(ctx context.Context, did string) ([]types.ExportRecord, error) { + if err := s.CorruptionError(); err != nil { + return nil, err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { + return nil, ErrHistoryNotStored + } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { + return nil, err + } + if len(seqs) == 0 { + return nil, ErrDIDNotFound + } + out := make([]types.ExportRecord, 0, len(seqs)) + for _, seq := range seqs { + if err := ctx.Err(); err != nil { + return nil, err + } + ref, ok, err := s.store.GetOpSeqRef(seq) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("missing op reference for seq %d", seq) + } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { + return nil, err + } + out = append(out, types.ExportRecord{ + Seq: seq, + DID: did, + CreatedAt: ref.Received, + CID: ref.CID, + Nullified: detectNullified(payload), + Operation: json.RawMessage(payload), + }) + } + return out, nil +} + +func (s *Service) LoadLatestDIDOperation(ctx context.Context, did string) (types.ExportRecord, error) { + if err := s.CorruptionError(); err != nil { + return types.ExportRecord{}, err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { + return types.ExportRecord{}, ErrHistoryNotStored + } + seqs, err := s.store.ListDIDSequences(did) + if err != nil { + return types.ExportRecord{}, err + } + if len(seqs) == 0 { + return types.ExportRecord{}, ErrDIDNotFound + } + lastSeq := seqs[len(seqs)-1] + if err := ctx.Err(); err != nil { + return types.ExportRecord{}, err + } + ref, ok, err := s.store.GetOpSeqRef(lastSeq) + if err != nil { + return types.ExportRecord{}, err + } + if !ok { + return types.ExportRecord{}, fmt.Errorf("missing op reference for seq %d", lastSeq) + } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { + return types.ExportRecord{}, err + } + return types.ExportRecord{ + Seq: lastSeq, + DID: did, + CreatedAt: ref.Received, + CID: ref.CID, + Nullified: detectNullified(payload), + Operation: json.RawMessage(payload), + }, nil +} + +func (s *Service) LoadCurrentPLCData(ctx context.Context, did string) (map[string]any, error) { + if err := s.CorruptionError(); err != nil { + return nil, err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { + state, ok, err := s.store.GetState(did) + if err != nil { + return nil, err + } + if !ok { + return nil, ErrDIDNotFound + } + var doc map[string]any + if err := json.Unmarshal(state.DIDDocument, &doc); err != nil { + return nil, err + } + return doc, nil + } + last, err := s.LoadLatestDIDOperation(ctx, did) + if err != nil { + return nil, err + } + var op map[string]any + if err := json.Unmarshal(last.Operation, &op); err != nil { + return nil, fmt.Errorf("decode latest operation: %w", err) + } + delete(op, "sig") + delete(op, "signature") + delete(op, "sigPayload") + delete(op, "signaturePayload") + return op, nil +} + +func (s *Service) StreamExport(ctx context.Context, after time.Time, limit int, emit func(types.ExportRecord) error) error { + if err := s.CorruptionError(); err != nil { + return err + } + if s.cfg.Mode != config.ModeMirror || s.blockLog == nil { + return nil + } + if limit <= 0 { + limit = 1000 + } + const maxCount = 1000 + if limit > maxCount { + limit = maxCount + } + afterSet := !after.IsZero() + emitted := 0 + stop := errors.New("stop export iteration") + err := s.store.ForEachOpSeqRef(func(seq uint64, ref types.BlockRefV1) error { + if err := ctx.Err(); err != nil { + return err + } + if emitted >= limit { + return stop + } + if afterSet { + if strings.TrimSpace(ref.Received) == "" { + return nil + } + ts, err := time.Parse(time.RFC3339, ref.Received) + if err != nil { + return nil + } + if !ts.After(after) { + return nil + } + } + payload, err := s.blockLog.ReadRecord(ref) + if err != nil { + return err + } + rec := types.ExportRecord{ + Seq: seq, + DID: ref.DID, + CreatedAt: ref.Received, + CID: ref.CID, + Nullified: detectNullified(payload), + Operation: json.RawMessage(payload), + } + emitted++ + return emit(rec) + }) + if err == nil || errors.Is(err, stop) { + return nil + } + return err +} + +func detectNullified(operation []byte) bool { + var payload map[string]any + if err := json.Unmarshal(operation, &payload); err != nil { + return false + } + v, _ := payload["nullified"].(bool) + return v +} + func (s *Service) Flush(ctx context.Context) error { _ = ctx if err := s.CorruptionError(); err != nil { diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go index 5e15d7e..109310f 100644 --- a/internal/storage/pebble_store.go +++ b/internal/storage/pebble_store.go @@ -228,6 +228,33 @@ func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) { return ref, true, nil } +func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error { + iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")}) + if err != nil { + return fmt.Errorf("new iterator: %w", err) + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + key := iter.Key() + if len(key) < len("opseq:")+8 { + continue + } + seq := binary.BigEndian.Uint64(key[len("opseq:"):]) + var ref types.BlockRefV1 + if err := json.Unmarshal(iter.Value(), &ref); err != nil { + return fmt.Errorf("unmarshal opseq ref: %w", err) + } + if err := fn(seq, ref); err != nil { + return err + } + } + if err := iter.Error(); err != nil { + return fmt.Errorf("iterate opseq refs: %w", err) + } + return nil +} + func (p *PebbleStore) PutBlockHash(blockID uint64, hash string) error { return p.db.Set(blockKey(blockID), []byte(hash), pebble.Sync) } diff --git a/internal/storage/store.go b/internal/storage/store.go index 59055b7..05ce5ae 100644 --- a/internal/storage/store.go +++ b/internal/storage/store.go @@ -34,6 +34,7 @@ type Store interface { PutOpSeqRef(seq uint64, ref types.BlockRefV1) error GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) + ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error PutBlockHash(blockID uint64, hash string) error GetBlockHash(blockID uint64) (string, bool, error) |