aboutsummaryrefslogtreecommitdiff
path: root/internal/storage/pebble_store.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/storage/pebble_store.go')
-rw-r--r--internal/storage/pebble_store.go107
1 files changed, 106 insertions, 1 deletions
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go
index 109310f..55c04fb 100644
--- a/internal/storage/pebble_store.go
+++ b/internal/storage/pebble_store.go
@@ -6,7 +6,6 @@ import (
"fmt"
"path/filepath"
"sort"
-
"github.com/Fuwn/plutia/internal/types"
"github.com/cockroachdb/pebble"
)
@@ -20,9 +19,11 @@ type PebbleStore struct {
func OpenPebble(dataDir string) (*PebbleStore, error) {
indexDir := filepath.Join(dataDir, "index")
db, err := pebble.Open(indexDir, &pebble.Options{})
+
if err != nil {
return nil, fmt.Errorf("open pebble: %w", err)
}
+
return &PebbleStore{db: db}, nil
}
@@ -30,6 +31,7 @@ func (p *PebbleStore) Close() error { return p.db.Close() }
func (p *PebbleStore) GetMode() (string, error) {
v, ok, err := p.getString(metaKey("mode"))
+
return v, okOrErr(ok, err)
}
@@ -39,80 +41,103 @@ func (p *PebbleStore) SetMode(mode string) error {
func (p *PebbleStore) GetGlobalSeq() (uint64, error) {
v, ok, err := p.getUint64(metaKey("global_seq"))
+
if err != nil {
return 0, err
}
+
if !ok {
return 0, nil
}
+
return v, nil
}
func (p *PebbleStore) SetGlobalSeq(seq uint64) error {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, seq)
+
return p.db.Set(metaKey("global_seq"), b, pebble.Sync)
}
func (p *PebbleStore) PutState(state types.StateV1) error {
b, err := json.Marshal(state)
+
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}
+
return p.db.Set(didKey(state.DID), b, pebble.Sync)
}
func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error {
var opRef *types.BlockRefV1
+
if includeOpRef {
opRef = ref
}
+
return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil)
}
func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) {
b, ok, err := p.getBytes(didKey(did))
+
if err != nil || !ok {
return types.StateV1{}, ok, err
}
+
var s types.StateV1
+
if err := json.Unmarshal(b, &s); err != nil {
return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err)
}
+
return s, true, nil
}
func (p *PebbleStore) ListStates() ([]types.StateV1, error) {
states := make([]types.StateV1, 0)
+
if err := p.ForEachState(func(s types.StateV1) error {
states = append(states, s)
+
return nil
}); err != nil {
return nil, err
}
+
sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID })
+
return states, nil
}
func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")})
+
if err != nil {
return fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
var s types.StateV1
+
if err := json.Unmarshal(iter.Value(), &s); err != nil {
return fmt.Errorf("unmarshal state: %w", err)
}
+
if err := fn(s); err != nil {
return err
}
}
+
if err := iter.Error(); err != nil {
return fmt.Errorf("iterate states: %w", err)
}
+
return nil
}
@@ -120,31 +145,41 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
if len(ops) == 0 && len(blockHashes) == 0 {
return nil
}
+
batch := p.db.NewBatch()
+
defer batch.Close()
for _, op := range ops {
stateBytes, err := json.Marshal(op.State)
+
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}
+
seqBytes := make([]byte, 8)
+
binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq)
if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil {
return err
}
+
if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil {
return err
}
+
if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil {
return err
}
+
if op.Ref != nil {
refBytes, err := json.Marshal(op.Ref)
+
if err != nil {
return fmt.Errorf("marshal opseq ref: %w", err)
}
+
if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil {
return err
}
@@ -160,7 +195,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
if len(ops) > 0 {
lastSeq := ops[len(ops)-1].State.LatestOpSeq
seqBytes := make([]byte, 8)
+
binary.BigEndian.PutUint64(seqBytes, lastSeq)
+
if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil {
return err
}
@@ -171,7 +208,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
func (p *PebbleStore) SetChainHead(did string, seq uint64) error {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, seq)
+
return p.db.Set(chainKey(did), b, pebble.Sync)
}
@@ -187,71 +226,94 @@ func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) {
prefix := []byte("didop:" + did + ":")
upper := append(append([]byte(nil), prefix...), 0xFF)
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
seqs := make([]uint64, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len(prefix)+8 {
continue
}
+
seq := binary.BigEndian.Uint64(key[len(prefix):])
seqs = append(seqs, seq)
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate did op sequences: %w", err)
}
+
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
+
return seqs, nil
}
func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error {
b, err := json.Marshal(ref)
+
if err != nil {
return fmt.Errorf("marshal opseq ref: %w", err)
}
+
return p.db.Set(opSeqKey(seq), b, pebble.Sync)
}
func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) {
b, ok, err := p.getBytes(opSeqKey(seq))
+
if err != nil || !ok {
return types.BlockRefV1{}, ok, err
}
+
var ref types.BlockRefV1
+
if err := json.Unmarshal(b, &ref); err != nil {
return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err)
}
+
return ref, true, nil
}
func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")})
+
if err != nil {
return fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len("opseq:")+8 {
continue
}
+
seq := binary.BigEndian.Uint64(key[len("opseq:"):])
+
var ref types.BlockRefV1
+
if err := json.Unmarshal(iter.Value(), &ref); err != nil {
return fmt.Errorf("unmarshal opseq ref: %w", err)
}
+
if err := fn(seq, ref); err != nil {
return err
}
}
+
if err := iter.Error(); err != nil {
return fmt.Errorf("iterate opseq refs: %w", err)
}
+
return nil
}
@@ -265,127 +327,168 @@ func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) {
func (p *PebbleStore) ListBlockHashes() ([]string, error) {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
hashes := make([]string, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
hashes = append(hashes, string(iter.Value()))
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate blocks: %w", err)
}
+
return hashes, nil
}
func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
out := make([]BlockHashEntry, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len("block:")+8 {
continue
}
+
id := binary.BigEndian.Uint64(key[len("block:"):])
out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())})
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate block entries: %w", err)
}
+
sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID })
+
return out, nil
}
func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error {
b, err := json.Marshal(cp)
+
if err != nil {
return fmt.Errorf("marshal checkpoint: %w", err)
}
+
if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil {
return err
}
+
latest := make([]byte, 8)
+
binary.BigEndian.PutUint64(latest, cp.Sequence)
+
return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync)
}
func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) {
b, ok, err := p.getBytes(checkpointKey(sequence))
+
if err != nil || !ok {
return types.CheckpointV1{}, ok, err
}
+
var cp types.CheckpointV1
+
if err := json.Unmarshal(b, &cp); err != nil {
return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err)
}
+
return cp, true, nil
}
func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) {
seq, ok, err := p.getUint64(metaKey("latest_checkpoint"))
+
if err != nil || !ok {
return types.CheckpointV1{}, ok, err
}
+
return p.GetCheckpoint(seq)
}
func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) {
v, closer, err := p.db.Get(key)
+
if err != nil {
if err == pebble.ErrNotFound {
return nil, false, nil
}
+
return nil, false, err
}
+
defer closer.Close()
+
return append([]byte(nil), v...), true, nil
}
func (p *PebbleStore) getString(key []byte) (string, bool, error) {
b, ok, err := p.getBytes(key)
+
if err != nil || !ok {
return "", ok, err
}
+
return string(b), true, nil
}
func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) {
b, ok, err := p.getBytes(key)
+
if err != nil || !ok {
return 0, ok, err
}
+
if len(b) != 8 {
return 0, false, fmt.Errorf("invalid uint64 value length for %q", key)
}
+
return binary.BigEndian.Uint64(b), true, nil
}
func didKey(did string) []byte { return []byte("did:" + did) }
+
func chainKey(did string) []byte { return []byte("chain:" + did) }
+
func metaKey(k string) []byte { return []byte("meta:" + k) }
+
func checkpointKey(s uint64) []byte {
return append([]byte("checkpoint:"), u64bytes(s)...)
}
+
func opSeqKey(s uint64) []byte {
return append([]byte("opseq:"), u64bytes(s)...)
}
+
func blockKey(id uint64) []byte {
return append([]byte("block:"), u64bytes(id)...)
}
+
func didOpKey(did string, seq uint64) []byte {
return append([]byte("didop:"+did+":"), u64bytes(seq)...)
}
func u64bytes(v uint64) []byte {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, v)
+
return b
}
@@ -393,8 +496,10 @@ func okOrErr(ok bool, err error) error {
if err != nil {
return err
}
+
if !ok {
return nil
}
+
return nil
}