package storage import ( "encoding/binary" "encoding/json" "fmt" "github.com/Fuwn/plutia/internal/types" "github.com/cockroachdb/pebble" "path/filepath" "sort" ) var _ Store = (*PebbleStore)(nil) type PebbleStore struct { db *pebble.DB } func OpenPebble(dataDir string) (*PebbleStore, error) { indexDir := filepath.Join(dataDir, "index") db, err := pebble.Open(indexDir, &pebble.Options{}) if err != nil { return nil, fmt.Errorf("open pebble: %w", err) } return &PebbleStore{db: db}, nil } func (p *PebbleStore) Close() error { return p.db.Close() } func (p *PebbleStore) GetMode() (string, error) { v, _, err := p.getString(metaKey("mode")) return v, err } func (p *PebbleStore) SetMode(mode string) error { return p.db.Set(metaKey("mode"), []byte(mode), pebble.Sync) } func (p *PebbleStore) GetGlobalSeq() (uint64, error) { v, ok, err := p.getUint64(metaKey("global_seq")) if err != nil { return 0, err } if !ok { return 0, nil } return v, nil } func (p *PebbleStore) SetGlobalSeq(seq uint64) error { b := make([]byte, 8) binary.BigEndian.PutUint64(b, seq) return p.db.Set(metaKey("global_seq"), b, pebble.Sync) } func (p *PebbleStore) PutState(state types.StateV1) error { b, err := json.Marshal(state) if err != nil { return fmt.Errorf("marshal state: %w", err) } return p.db.Set(didKey(state.DID), b, pebble.Sync) } func (p *PebbleStore) DeleteState(did string) error { batch := p.db.NewBatch() defer batch.Close() if err := batch.Delete(didKey(did), nil); err != nil { return err } if err := batch.Delete(chainKey(did), nil); err != nil { return err } prefix := []byte("didop:" + did + ":") upper := append(append([]byte(nil), prefix...), 0xFF) if err := batch.DeleteRange(prefix, upper, nil); err != nil { return err } return batch.Commit(pebble.Sync) } func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error { var opRef *types.BlockRefV1 if includeOpRef { opRef = ref } return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil) } func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) { b, ok, err := p.getBytes(didKey(did)) if err != nil || !ok { return types.StateV1{}, ok, err } var s types.StateV1 if err := json.Unmarshal(b, &s); err != nil { return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err) } return s, true, nil } func (p *PebbleStore) ListStates() ([]types.StateV1, error) { states := make([]types.StateV1, 0) if err := p.ForEachState(func(s types.StateV1) error { states = append(states, s) return nil }); err != nil { return nil, err } sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID }) return states, nil } func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")}) if err != nil { return fmt.Errorf("new iterator: %w", err) } defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { var s types.StateV1 if err := json.Unmarshal(iter.Value(), &s); err != nil { return fmt.Errorf("unmarshal state: %w", err) } if err := fn(s); err != nil { return err } } if err := iter.Error(); err != nil { return fmt.Errorf("iterate states: %w", err) } return nil } func (p *PebbleStore) PutThinCacheMeta(meta types.ThinCacheMetaV1) error { b, err := json.Marshal(meta) if err != nil { return fmt.Errorf("marshal thin cache meta: %w", err) } return p.db.Set(thinMetaKey(meta.DID), b, pebble.Sync) } func (p *PebbleStore) GetThinCacheMeta(did string) (types.ThinCacheMetaV1, bool, error) { b, ok, err := p.getBytes(thinMetaKey(did)) if err != nil || !ok { return types.ThinCacheMetaV1{}, ok, err } var meta types.ThinCacheMetaV1 if err := json.Unmarshal(b, &meta); err != nil { return types.ThinCacheMetaV1{}, false, fmt.Errorf("unmarshal thin cache meta: %w", err) } return meta, true, nil } func (p *PebbleStore) ListThinCacheMeta() ([]types.ThinCacheMetaV1, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("thinmeta:"), UpperBound: []byte("thinmeta;")}) if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } defer iter.Close() out := make([]types.ThinCacheMetaV1, 0) for iter.First(); iter.Valid(); iter.Next() { var meta types.ThinCacheMetaV1 if err := json.Unmarshal(iter.Value(), &meta); err != nil { return nil, fmt.Errorf("unmarshal thin cache meta: %w", err) } out = append(out, meta) } if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate thin cache meta: %w", err) } return out, nil } func (p *PebbleStore) DeleteThinCacheMeta(did string) error { return p.db.Delete(thinMetaKey(did), pebble.Sync) } func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error { if len(ops) == 0 && len(blockHashes) == 0 { return nil } batch := p.db.NewBatch() defer batch.Close() for _, op := range ops { stateBytes, err := json.Marshal(op.State) if err != nil { return fmt.Errorf("marshal state: %w", err) } seqBytes := make([]byte, 8) binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq) if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil { return err } if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil { return err } if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil { return err } if op.Ref != nil { refBytes, err := json.Marshal(op.Ref) if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil { return err } } } for _, bh := range blockHashes { if err := batch.Set(blockKey(bh.BlockID), []byte(bh.Hash), nil); err != nil { return err } } if len(ops) > 0 { lastSeq := ops[len(ops)-1].State.LatestOpSeq seqBytes := make([]byte, 8) binary.BigEndian.PutUint64(seqBytes, lastSeq) if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil { return err } } return batch.Commit(pebble.Sync) } func (p *PebbleStore) SetChainHead(did string, seq uint64) error { b := make([]byte, 8) binary.BigEndian.PutUint64(b, seq) return p.db.Set(chainKey(did), b, pebble.Sync) } func (p *PebbleStore) GetChainHead(did string) (uint64, bool, error) { return p.getUint64(chainKey(did)) } func (p *PebbleStore) AddDIDSequence(did string, seq uint64) error { return p.db.Set(didOpKey(did, seq), []byte{1}, pebble.Sync) } func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) { prefix := []byte("didop:" + did + ":") upper := append(append([]byte(nil), prefix...), 0xFF) iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper}) if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } defer iter.Close() seqs := make([]uint64, 0) for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() if len(key) < len(prefix)+8 { continue } seq := binary.BigEndian.Uint64(key[len(prefix):]) seqs = append(seqs, seq) } if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate did op sequences: %w", err) } sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) return seqs, nil } func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error { b, err := json.Marshal(ref) if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } return p.db.Set(opSeqKey(seq), b, pebble.Sync) } func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) { b, ok, err := p.getBytes(opSeqKey(seq)) if err != nil || !ok { return types.BlockRefV1{}, ok, err } var ref types.BlockRefV1 if err := json.Unmarshal(b, &ref); err != nil { return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err) } return ref, true, nil } func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")}) if err != nil { return fmt.Errorf("new iterator: %w", err) } defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() if len(key) < len("opseq:")+8 { continue } seq := binary.BigEndian.Uint64(key[len("opseq:"):]) var ref types.BlockRefV1 if err := json.Unmarshal(iter.Value(), &ref); err != nil { return fmt.Errorf("unmarshal opseq ref: %w", err) } if err := fn(seq, ref); err != nil { return err } } if err := iter.Error(); err != nil { return fmt.Errorf("iterate opseq refs: %w", err) } return nil } func (p *PebbleStore) PutBlockHash(blockID uint64, hash string) error { return p.db.Set(blockKey(blockID), []byte(hash), pebble.Sync) } func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) { return p.getString(blockKey(blockID)) } func (p *PebbleStore) ListBlockHashes() ([]string, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } defer iter.Close() hashes := make([]string, 0) for iter.First(); iter.Valid(); iter.Next() { hashes = append(hashes, string(iter.Value())) } if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate blocks: %w", err) } return hashes, nil } func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } defer iter.Close() out := make([]BlockHashEntry, 0) for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() if len(key) < len("block:")+8 { continue } id := binary.BigEndian.Uint64(key[len("block:"):]) out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())}) } if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate block entries: %w", err) } sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID }) return out, nil } func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error { b, err := json.Marshal(cp) if err != nil { return fmt.Errorf("marshal checkpoint: %w", err) } if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil { return err } latest := make([]byte, 8) binary.BigEndian.PutUint64(latest, cp.Sequence) return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync) } func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) { b, ok, err := p.getBytes(checkpointKey(sequence)) if err != nil || !ok { return types.CheckpointV1{}, ok, err } var cp types.CheckpointV1 if err := json.Unmarshal(b, &cp); err != nil { return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err) } return cp, true, nil } func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) { seq, ok, err := p.getUint64(metaKey("latest_checkpoint")) if err != nil || !ok { return types.CheckpointV1{}, ok, err } return p.GetCheckpoint(seq) } func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) { v, closer, err := p.db.Get(key) if err != nil { if err == pebble.ErrNotFound { return nil, false, nil } return nil, false, err } defer closer.Close() return append([]byte(nil), v...), true, nil } func (p *PebbleStore) getString(key []byte) (string, bool, error) { b, ok, err := p.getBytes(key) if err != nil || !ok { return "", ok, err } return string(b), true, nil } func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) { b, ok, err := p.getBytes(key) if err != nil || !ok { return 0, ok, err } if len(b) != 8 { return 0, false, fmt.Errorf("invalid uint64 value length for %q", key) } return binary.BigEndian.Uint64(b), true, nil } func didKey(did string) []byte { return []byte("did:" + did) } func chainKey(did string) []byte { return []byte("chain:" + did) } func metaKey(k string) []byte { return []byte("meta:" + k) } func checkpointKey(s uint64) []byte { return append([]byte("checkpoint:"), u64bytes(s)...) } func opSeqKey(s uint64) []byte { return append([]byte("opseq:"), u64bytes(s)...) } func blockKey(id uint64) []byte { return append([]byte("block:"), u64bytes(id)...) } func didOpKey(did string, seq uint64) []byte { return append([]byte("didop:"+did+":"), u64bytes(seq)...) } func thinMetaKey(did string) []byte { return []byte("thinmeta:" + did) } func u64bytes(v uint64) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, v) return b }