diff options
Diffstat (limited to 'internal/storage/pebble_store.go')
| -rw-r--r-- | internal/storage/pebble_store.go | 107 |
1 files changed, 106 insertions, 1 deletions
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go index 109310f..55c04fb 100644 --- a/internal/storage/pebble_store.go +++ b/internal/storage/pebble_store.go @@ -6,7 +6,6 @@ import ( "fmt" "path/filepath" "sort" - "github.com/Fuwn/plutia/internal/types" "github.com/cockroachdb/pebble" ) @@ -20,9 +19,11 @@ type PebbleStore struct { func OpenPebble(dataDir string) (*PebbleStore, error) { indexDir := filepath.Join(dataDir, "index") db, err := pebble.Open(indexDir, &pebble.Options{}) + if err != nil { return nil, fmt.Errorf("open pebble: %w", err) } + return &PebbleStore{db: db}, nil } @@ -30,6 +31,7 @@ func (p *PebbleStore) Close() error { return p.db.Close() } func (p *PebbleStore) GetMode() (string, error) { v, ok, err := p.getString(metaKey("mode")) + return v, okOrErr(ok, err) } @@ -39,80 +41,103 @@ func (p *PebbleStore) SetMode(mode string) error { func (p *PebbleStore) GetGlobalSeq() (uint64, error) { v, ok, err := p.getUint64(metaKey("global_seq")) + if err != nil { return 0, err } + if !ok { return 0, nil } + return v, nil } func (p *PebbleStore) SetGlobalSeq(seq uint64) error { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, seq) + return p.db.Set(metaKey("global_seq"), b, pebble.Sync) } func (p *PebbleStore) PutState(state types.StateV1) error { b, err := json.Marshal(state) + if err != nil { return fmt.Errorf("marshal state: %w", err) } + return p.db.Set(didKey(state.DID), b, pebble.Sync) } func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error { var opRef *types.BlockRefV1 + if includeOpRef { opRef = ref } + return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil) } func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) { b, ok, err := p.getBytes(didKey(did)) + if err != nil || !ok { return types.StateV1{}, ok, err } + var s types.StateV1 + if err := json.Unmarshal(b, &s); err != nil { return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err) } + return s, true, nil } func (p *PebbleStore) ListStates() ([]types.StateV1, error) { states := make([]types.StateV1, 0) + if err := p.ForEachState(func(s types.StateV1) error { states = append(states, s) + return nil }); err != nil { return nil, err } + sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID }) + return states, nil } func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")}) + if err != nil { return fmt.Errorf("new iterator: %w", err) } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { var s types.StateV1 + if err := json.Unmarshal(iter.Value(), &s); err != nil { return fmt.Errorf("unmarshal state: %w", err) } + if err := fn(s); err != nil { return err } } + if err := iter.Error(); err != nil { return fmt.Errorf("iterate states: %w", err) } + return nil } @@ -120,31 +145,41 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes if len(ops) == 0 && len(blockHashes) == 0 { return nil } + batch := p.db.NewBatch() + defer batch.Close() for _, op := range ops { stateBytes, err := json.Marshal(op.State) + if err != nil { return fmt.Errorf("marshal state: %w", err) } + seqBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq) if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil { return err } + if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil { return err } + if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil { return err } + if op.Ref != nil { refBytes, err := json.Marshal(op.Ref) + if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } + if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil { return err } @@ -160,7 +195,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes if len(ops) > 0 { lastSeq := ops[len(ops)-1].State.LatestOpSeq seqBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seqBytes, lastSeq) + if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil { return err } @@ -171,7 +208,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes func (p *PebbleStore) SetChainHead(did string, seq uint64) error { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, seq) + return p.db.Set(chainKey(did), b, pebble.Sync) } @@ -187,71 +226,94 @@ func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) { prefix := []byte("didop:" + did + ":") upper := append(append([]byte(nil), prefix...), 0xFF) iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() seqs := make([]uint64, 0) + for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len(prefix)+8 { continue } + seq := binary.BigEndian.Uint64(key[len(prefix):]) seqs = append(seqs, seq) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate did op sequences: %w", err) } + sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) + return seqs, nil } func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error { b, err := json.Marshal(ref) + if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } + return p.db.Set(opSeqKey(seq), b, pebble.Sync) } func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) { b, ok, err := p.getBytes(opSeqKey(seq)) + if err != nil || !ok { return types.BlockRefV1{}, ok, err } + var ref types.BlockRefV1 + if err := json.Unmarshal(b, &ref); err != nil { return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err) } + return ref, true, nil } func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")}) + if err != nil { return fmt.Errorf("new iterator: %w", err) } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len("opseq:")+8 { continue } + seq := binary.BigEndian.Uint64(key[len("opseq:"):]) + var ref types.BlockRefV1 + if err := json.Unmarshal(iter.Value(), &ref); err != nil { return fmt.Errorf("unmarshal opseq ref: %w", err) } + if err := fn(seq, ref); err != nil { return err } } + if err := iter.Error(); err != nil { return fmt.Errorf("iterate opseq refs: %w", err) } + return nil } @@ -265,127 +327,168 @@ func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) { func (p *PebbleStore) ListBlockHashes() ([]string, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() hashes := make([]string, 0) + for iter.First(); iter.Valid(); iter.Next() { hashes = append(hashes, string(iter.Value())) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate blocks: %w", err) } + return hashes, nil } func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() out := make([]BlockHashEntry, 0) + for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len("block:")+8 { continue } + id := binary.BigEndian.Uint64(key[len("block:"):]) out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())}) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate block entries: %w", err) } + sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID }) + return out, nil } func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error { b, err := json.Marshal(cp) + if err != nil { return fmt.Errorf("marshal checkpoint: %w", err) } + if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil { return err } + latest := make([]byte, 8) + binary.BigEndian.PutUint64(latest, cp.Sequence) + return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync) } func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) { b, ok, err := p.getBytes(checkpointKey(sequence)) + if err != nil || !ok { return types.CheckpointV1{}, ok, err } + var cp types.CheckpointV1 + if err := json.Unmarshal(b, &cp); err != nil { return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err) } + return cp, true, nil } func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) { seq, ok, err := p.getUint64(metaKey("latest_checkpoint")) + if err != nil || !ok { return types.CheckpointV1{}, ok, err } + return p.GetCheckpoint(seq) } func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) { v, closer, err := p.db.Get(key) + if err != nil { if err == pebble.ErrNotFound { return nil, false, nil } + return nil, false, err } + defer closer.Close() + return append([]byte(nil), v...), true, nil } func (p *PebbleStore) getString(key []byte) (string, bool, error) { b, ok, err := p.getBytes(key) + if err != nil || !ok { return "", ok, err } + return string(b), true, nil } func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) { b, ok, err := p.getBytes(key) + if err != nil || !ok { return 0, ok, err } + if len(b) != 8 { return 0, false, fmt.Errorf("invalid uint64 value length for %q", key) } + return binary.BigEndian.Uint64(b), true, nil } func didKey(did string) []byte { return []byte("did:" + did) } + func chainKey(did string) []byte { return []byte("chain:" + did) } + func metaKey(k string) []byte { return []byte("meta:" + k) } + func checkpointKey(s uint64) []byte { return append([]byte("checkpoint:"), u64bytes(s)...) } + func opSeqKey(s uint64) []byte { return append([]byte("opseq:"), u64bytes(s)...) } + func blockKey(id uint64) []byte { return append([]byte("block:"), u64bytes(id)...) } + func didOpKey(did string, seq uint64) []byte { return append([]byte("didop:"+did+":"), u64bytes(seq)...) } func u64bytes(v uint64) []byte { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b } @@ -393,8 +496,10 @@ func okOrErr(ok bool, err error) error { if err != nil { return err } + if !ok { return nil } + return nil } |