aboutsummaryrefslogtreecommitdiff
path: root/internal/storage
diff options
context:
space:
mode:
Diffstat (limited to 'internal/storage')
-rw-r--r--internal/storage/blocklog.go94
-rw-r--r--internal/storage/blocklog_test.go14
-rw-r--r--internal/storage/pebble_store.go107
-rw-r--r--internal/storage/pebble_store_batch_durability_test.go8
-rw-r--r--internal/storage/pebble_store_batch_test.go11
-rw-r--r--internal/storage/store.go7
6 files changed, 229 insertions, 12 deletions
diff --git a/internal/storage/blocklog.go b/internal/storage/blocklog.go
index 879d3bd..e343b45 100644
--- a/internal/storage/blocklog.go
+++ b/internal/storage/blocklog.go
@@ -13,7 +13,6 @@ import (
"strings"
"sync"
"time"
-
"github.com/Fuwn/plutia/internal/types"
"github.com/klauspost/compress/zstd"
)
@@ -33,7 +32,6 @@ type BlockLog struct {
dir string
zstdLevel int
targetSize int
-
mu sync.Mutex
buf bytes.Buffer
blockID uint64
@@ -41,13 +39,17 @@ type BlockLog struct {
func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) {
dir := filepath.Join(dataDir, "ops")
+
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("mkdir ops: %w", err)
}
+
nextID, err := detectNextBlockID(dir)
+
if err != nil {
return nil, err
}
+
return &BlockLog{
dir: dir,
zstdLevel: zstdLevel,
@@ -58,10 +60,12 @@ func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error
func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) {
l.mu.Lock()
+
defer l.mu.Unlock()
record := encodeRecord(canonical)
offset := uint64(l.buf.Len())
+
if _, err := l.buf.Write(record); err != nil {
return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err)
}
@@ -82,16 +86,21 @@ func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (
if l.buf.Len() < l.targetSize {
return ref, nil, nil
}
+
flush, err := l.flushLocked()
+
if err != nil {
return types.BlockRefV1{}, nil, err
}
+
return ref, flush, nil
}
func (l *BlockLog) Flush() (*FlushInfo, error) {
l.mu.Lock()
+
defer l.mu.Unlock()
+
return l.flushLocked()
}
@@ -99,12 +108,16 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) {
if l.buf.Len() == 0 {
return nil, nil
}
+
encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel)
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel))
+
if err != nil {
return nil, fmt.Errorf("zstd encoder: %w", err)
}
+
compressed := enc.EncodeAll(l.buf.Bytes(), nil)
+
if err := enc.Close(); err != nil {
return nil, fmt.Errorf("close zstd encoder: %w", err)
}
@@ -112,36 +125,50 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID))
tmpPath := path + ".tmp"
f, err := os.Create(tmpPath)
+
if err != nil {
return nil, fmt.Errorf("create temp block file: %w", err)
}
+
half := len(compressed) / 2
+
if _, err := f.Write(compressed[:half]); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("write temp block first half: %w", err)
}
+
if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 {
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
+
if _, err := f.Write(compressed[half:]); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("write temp block second half: %w", err)
}
+
if err := f.Sync(); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("sync temp block file: %w", err)
}
+
if err := f.Close(); err != nil {
return nil, fmt.Errorf("close temp block file: %w", err)
}
+
if err := os.Rename(tmpPath, path); err != nil {
return nil, fmt.Errorf("rename temp block file: %w", err)
}
+
sum := sha256.Sum256(compressed)
hash := hex.EncodeToString(sum[:])
l.blockID++
+
l.buf.Reset()
+
return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil
}
@@ -151,56 +178,77 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) {
RemovedOrphans: make([]uint64, 0),
}
entries, err := os.ReadDir(l.dir)
+
if err != nil {
return nil, fmt.Errorf("read ops dir: %w", err)
}
+
storedEntries, err := store.ListBlockHashEntries()
+
if err != nil {
return nil, fmt.Errorf("list stored block hashes: %w", err)
}
+
stored := make(map[uint64]string, len(storedEntries))
+
for _, e := range storedEntries {
stored[e.BlockID] = e.Hash
}
files := make(map[uint64]string)
+
for _, e := range entries {
name := e.Name()
+
if strings.HasSuffix(name, ".tmp") {
path := filepath.Join(l.dir, name)
+
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("remove temp block file %s: %w", name, err)
}
+
report.RemovedTempFiles = append(report.RemovedTempFiles, name)
+
continue
}
+
if !strings.HasSuffix(name, ".zst") {
continue
}
+
base := strings.TrimSuffix(name, ".zst")
id, err := strconv.Atoi(base)
+
if err != nil {
continue
}
+
files[uint64(id)] = filepath.Join(l.dir, name)
}
for id, path := range files {
expected, ok := stored[id]
+
if !ok {
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("remove orphan block %d: %w", id, err)
}
+
report.RemovedOrphans = append(report.RemovedOrphans, id)
+
continue
}
+
actual, err := fileHash(path)
+
if err != nil {
return nil, fmt.Errorf("hash block %d: %w", id, err)
}
+
if actual != expected {
return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual)
}
+
report.VerifiedBlocks++
}
@@ -216,115 +264,157 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) {
func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID))
compressed, err := os.ReadFile(path)
+
if err != nil {
return nil, fmt.Errorf("read block file: %w", err)
}
+
dec, err := zstd.NewReader(nil)
+
if err != nil {
return nil, fmt.Errorf("zstd reader: %w", err)
}
+
decompressed, err := dec.DecodeAll(compressed, nil)
+
if err != nil {
return nil, fmt.Errorf("decode block: %w", err)
}
+
dec.Close()
+
if ref.Offset+ref.Length > uint64(len(decompressed)) {
return nil, fmt.Errorf("record bounds out of range")
}
+
record := decompressed[ref.Offset : ref.Offset+ref.Length]
_, payload, err := decodeRecord(record)
+
if err != nil {
return nil, err
}
+
return payload, nil
}
func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID))
compressed, err := os.ReadFile(path)
+
if err != nil {
return fmt.Errorf("read block file: %w", err)
}
+
dec, err := zstd.NewReader(nil)
+
if err != nil {
return fmt.Errorf("zstd reader: %w", err)
}
+
decompressed, err := dec.DecodeAll(compressed, nil)
+
if err != nil {
return fmt.Errorf("decode block: %w", err)
}
+
dec.Close()
var offset uint64
+
for offset < uint64(len(decompressed)) {
start := offset
length, n := binary.Uvarint(decompressed[offset:])
+
if n <= 0 {
return fmt.Errorf("invalid varint at offset %d", offset)
}
+
offset += uint64(n)
+
if offset+length > uint64(len(decompressed)) {
return fmt.Errorf("record out of bounds at offset %d", start)
}
+
payload := decompressed[offset : offset+length]
offset += length
+
if err := fn(start, payload); err != nil {
return err
}
}
+
return nil
}
func encodeRecord(payload []byte) []byte {
var hdr [binary.MaxVarintLen64]byte
+
n := binary.PutUvarint(hdr[:], uint64(len(payload)))
out := make([]byte, n+len(payload))
+
copy(out, hdr[:n])
copy(out[n:], payload)
+
return out
}
func decodeRecord(b []byte) (uint64, []byte, error) {
length, n := binary.Uvarint(b)
+
if n <= 0 {
return 0, nil, fmt.Errorf("invalid varint record")
}
+
if uint64(n)+length != uint64(len(b)) {
return 0, nil, fmt.Errorf("record length mismatch")
}
+
return length, b[n:], nil
}
func detectNextBlockID(dir string) (uint64, error) {
entries, err := os.ReadDir(dir)
+
if err != nil {
return 0, fmt.Errorf("read ops dir: %w", err)
}
+
ids := make([]int, 0)
+
for _, e := range entries {
name := e.Name()
+
if !strings.HasSuffix(name, ".zst") {
continue
}
+
base := strings.TrimSuffix(name, ".zst")
n, err := strconv.Atoi(base)
+
if err != nil {
continue
}
+
ids = append(ids, n)
}
+
if len(ids) == 0 {
return 1, nil
}
+
sort.Ints(ids)
+
return uint64(ids[len(ids)-1] + 1), nil
}
func fileHash(path string) (string, error) {
b, err := os.ReadFile(path)
+
if err != nil {
return "", err
}
+
sum := sha256.Sum256(b)
+
return hex.EncodeToString(sum[:]), nil
}
diff --git a/internal/storage/blocklog_test.go b/internal/storage/blocklog_test.go
index 23045cc..a71febf 100644
--- a/internal/storage/blocklog_test.go
+++ b/internal/storage/blocklog_test.go
@@ -9,6 +9,7 @@ import (
func TestBlockLogAppendReadFlush(t *testing.T) {
tmp := t.TempDir()
log, err := OpenBlockLog(tmp, 3, 4)
+
if err != nil {
t.Fatalf("OpenBlockLog: %v", err)
}
@@ -16,40 +17,53 @@ func TestBlockLogAppendReadFlush(t *testing.T) {
payload1 := []byte(`{"op":1}`)
payload2 := []byte(`{"op":2}`)
ref1, flush, err := log.Append(1, "did:plc:a", "cid1", "", payload1)
+
if err != nil {
t.Fatalf("append 1: %v", err)
}
+
if flush != nil {
t.Fatalf("unexpected automatic flush")
}
+
ref2, _, err := log.Append(2, "did:plc:a", "cid2", "cid1", payload2)
+
if err != nil {
t.Fatalf("append 2: %v", err)
}
flushed, err := log.Flush()
+
if err != nil {
t.Fatalf("flush: %v", err)
}
+
if flushed == nil {
t.Fatalf("expected flushed block")
}
+
blockFile := filepath.Join(tmp, "ops", "000001.zst")
+
if _, err := os.Stat(blockFile); err != nil {
t.Fatalf("expected block file: %v", err)
}
got1, err := log.ReadRecord(ref1)
+
if err != nil {
t.Fatalf("read record1: %v", err)
}
+
if string(got1) != string(payload1) {
t.Fatalf("payload1 mismatch: got %s want %s", got1, payload1)
}
+
got2, err := log.ReadRecord(ref2)
+
if err != nil {
t.Fatalf("read record2: %v", err)
}
+
if string(got2) != string(payload2) {
t.Fatalf("payload2 mismatch: got %s want %s", got2, payload2)
}
diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go
index 109310f..55c04fb 100644
--- a/internal/storage/pebble_store.go
+++ b/internal/storage/pebble_store.go
@@ -6,7 +6,6 @@ import (
"fmt"
"path/filepath"
"sort"
-
"github.com/Fuwn/plutia/internal/types"
"github.com/cockroachdb/pebble"
)
@@ -20,9 +19,11 @@ type PebbleStore struct {
func OpenPebble(dataDir string) (*PebbleStore, error) {
indexDir := filepath.Join(dataDir, "index")
db, err := pebble.Open(indexDir, &pebble.Options{})
+
if err != nil {
return nil, fmt.Errorf("open pebble: %w", err)
}
+
return &PebbleStore{db: db}, nil
}
@@ -30,6 +31,7 @@ func (p *PebbleStore) Close() error { return p.db.Close() }
func (p *PebbleStore) GetMode() (string, error) {
v, ok, err := p.getString(metaKey("mode"))
+
return v, okOrErr(ok, err)
}
@@ -39,80 +41,103 @@ func (p *PebbleStore) SetMode(mode string) error {
func (p *PebbleStore) GetGlobalSeq() (uint64, error) {
v, ok, err := p.getUint64(metaKey("global_seq"))
+
if err != nil {
return 0, err
}
+
if !ok {
return 0, nil
}
+
return v, nil
}
func (p *PebbleStore) SetGlobalSeq(seq uint64) error {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, seq)
+
return p.db.Set(metaKey("global_seq"), b, pebble.Sync)
}
func (p *PebbleStore) PutState(state types.StateV1) error {
b, err := json.Marshal(state)
+
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}
+
return p.db.Set(didKey(state.DID), b, pebble.Sync)
}
func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error {
var opRef *types.BlockRefV1
+
if includeOpRef {
opRef = ref
}
+
return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil)
}
func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) {
b, ok, err := p.getBytes(didKey(did))
+
if err != nil || !ok {
return types.StateV1{}, ok, err
}
+
var s types.StateV1
+
if err := json.Unmarshal(b, &s); err != nil {
return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err)
}
+
return s, true, nil
}
func (p *PebbleStore) ListStates() ([]types.StateV1, error) {
states := make([]types.StateV1, 0)
+
if err := p.ForEachState(func(s types.StateV1) error {
states = append(states, s)
+
return nil
}); err != nil {
return nil, err
}
+
sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID })
+
return states, nil
}
func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")})
+
if err != nil {
return fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
var s types.StateV1
+
if err := json.Unmarshal(iter.Value(), &s); err != nil {
return fmt.Errorf("unmarshal state: %w", err)
}
+
if err := fn(s); err != nil {
return err
}
}
+
if err := iter.Error(); err != nil {
return fmt.Errorf("iterate states: %w", err)
}
+
return nil
}
@@ -120,31 +145,41 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
if len(ops) == 0 && len(blockHashes) == 0 {
return nil
}
+
batch := p.db.NewBatch()
+
defer batch.Close()
for _, op := range ops {
stateBytes, err := json.Marshal(op.State)
+
if err != nil {
return fmt.Errorf("marshal state: %w", err)
}
+
seqBytes := make([]byte, 8)
+
binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq)
if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil {
return err
}
+
if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil {
return err
}
+
if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil {
return err
}
+
if op.Ref != nil {
refBytes, err := json.Marshal(op.Ref)
+
if err != nil {
return fmt.Errorf("marshal opseq ref: %w", err)
}
+
if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil {
return err
}
@@ -160,7 +195,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
if len(ops) > 0 {
lastSeq := ops[len(ops)-1].State.LatestOpSeq
seqBytes := make([]byte, 8)
+
binary.BigEndian.PutUint64(seqBytes, lastSeq)
+
if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil {
return err
}
@@ -171,7 +208,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes
func (p *PebbleStore) SetChainHead(did string, seq uint64) error {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, seq)
+
return p.db.Set(chainKey(did), b, pebble.Sync)
}
@@ -187,71 +226,94 @@ func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) {
prefix := []byte("didop:" + did + ":")
upper := append(append([]byte(nil), prefix...), 0xFF)
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
seqs := make([]uint64, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len(prefix)+8 {
continue
}
+
seq := binary.BigEndian.Uint64(key[len(prefix):])
seqs = append(seqs, seq)
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate did op sequences: %w", err)
}
+
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })
+
return seqs, nil
}
func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error {
b, err := json.Marshal(ref)
+
if err != nil {
return fmt.Errorf("marshal opseq ref: %w", err)
}
+
return p.db.Set(opSeqKey(seq), b, pebble.Sync)
}
func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) {
b, ok, err := p.getBytes(opSeqKey(seq))
+
if err != nil || !ok {
return types.BlockRefV1{}, ok, err
}
+
var ref types.BlockRefV1
+
if err := json.Unmarshal(b, &ref); err != nil {
return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err)
}
+
return ref, true, nil
}
func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")})
+
if err != nil {
return fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len("opseq:")+8 {
continue
}
+
seq := binary.BigEndian.Uint64(key[len("opseq:"):])
+
var ref types.BlockRefV1
+
if err := json.Unmarshal(iter.Value(), &ref); err != nil {
return fmt.Errorf("unmarshal opseq ref: %w", err)
}
+
if err := fn(seq, ref); err != nil {
return err
}
}
+
if err := iter.Error(); err != nil {
return fmt.Errorf("iterate opseq refs: %w", err)
}
+
return nil
}
@@ -265,127 +327,168 @@ func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) {
func (p *PebbleStore) ListBlockHashes() ([]string, error) {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
hashes := make([]string, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
hashes = append(hashes, string(iter.Value()))
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate blocks: %w", err)
}
+
return hashes, nil
}
func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) {
iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")})
+
if err != nil {
return nil, fmt.Errorf("new iterator: %w", err)
}
+
defer iter.Close()
out := make([]BlockHashEntry, 0)
+
for iter.First(); iter.Valid(); iter.Next() {
key := iter.Key()
+
if len(key) < len("block:")+8 {
continue
}
+
id := binary.BigEndian.Uint64(key[len("block:"):])
out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())})
}
+
if err := iter.Error(); err != nil {
return nil, fmt.Errorf("iterate block entries: %w", err)
}
+
sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID })
+
return out, nil
}
func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error {
b, err := json.Marshal(cp)
+
if err != nil {
return fmt.Errorf("marshal checkpoint: %w", err)
}
+
if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil {
return err
}
+
latest := make([]byte, 8)
+
binary.BigEndian.PutUint64(latest, cp.Sequence)
+
return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync)
}
func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) {
b, ok, err := p.getBytes(checkpointKey(sequence))
+
if err != nil || !ok {
return types.CheckpointV1{}, ok, err
}
+
var cp types.CheckpointV1
+
if err := json.Unmarshal(b, &cp); err != nil {
return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err)
}
+
return cp, true, nil
}
func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) {
seq, ok, err := p.getUint64(metaKey("latest_checkpoint"))
+
if err != nil || !ok {
return types.CheckpointV1{}, ok, err
}
+
return p.GetCheckpoint(seq)
}
func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) {
v, closer, err := p.db.Get(key)
+
if err != nil {
if err == pebble.ErrNotFound {
return nil, false, nil
}
+
return nil, false, err
}
+
defer closer.Close()
+
return append([]byte(nil), v...), true, nil
}
func (p *PebbleStore) getString(key []byte) (string, bool, error) {
b, ok, err := p.getBytes(key)
+
if err != nil || !ok {
return "", ok, err
}
+
return string(b), true, nil
}
func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) {
b, ok, err := p.getBytes(key)
+
if err != nil || !ok {
return 0, ok, err
}
+
if len(b) != 8 {
return 0, false, fmt.Errorf("invalid uint64 value length for %q", key)
}
+
return binary.BigEndian.Uint64(b), true, nil
}
func didKey(did string) []byte { return []byte("did:" + did) }
+
func chainKey(did string) []byte { return []byte("chain:" + did) }
+
func metaKey(k string) []byte { return []byte("meta:" + k) }
+
func checkpointKey(s uint64) []byte {
return append([]byte("checkpoint:"), u64bytes(s)...)
}
+
func opSeqKey(s uint64) []byte {
return append([]byte("opseq:"), u64bytes(s)...)
}
+
func blockKey(id uint64) []byte {
return append([]byte("block:"), u64bytes(id)...)
}
+
func didOpKey(did string, seq uint64) []byte {
return append([]byte("didop:"+did+":"), u64bytes(seq)...)
}
func u64bytes(v uint64) []byte {
b := make([]byte, 8)
+
binary.BigEndian.PutUint64(b, v)
+
return b
}
@@ -393,8 +496,10 @@ func okOrErr(ok bool, err error) error {
if err != nil {
return err
}
+
if !ok {
return nil
}
+
return nil
}
diff --git a/internal/storage/pebble_store_batch_durability_test.go b/internal/storage/pebble_store_batch_durability_test.go
index 9d881c3..256ddda 100644
--- a/internal/storage/pebble_store_batch_durability_test.go
+++ b/internal/storage/pebble_store_batch_durability_test.go
@@ -3,13 +3,13 @@ package storage
import (
"testing"
"time"
-
"github.com/Fuwn/plutia/internal/types"
)
func TestApplyOperationsBatch_DurabilityBetweenBatches(t *testing.T) {
tmp := t.TempDir()
store, err := OpenPebble(tmp)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
@@ -40,24 +40,30 @@ func TestApplyOperationsBatch_DurabilityBetweenBatches(t *testing.T) {
}, []BlockHashEntry{{BlockID: 1, Hash: "abc"}}); err != nil {
t.Fatalf("apply first batch: %v", err)
}
+
if err := store.Close(); err != nil {
t.Fatalf("close store: %v", err)
}
// Simulate a crash before the second batch commit by reopening without applying it.
reopened, err := OpenPebble(tmp)
+
if err != nil {
t.Fatalf("reopen pebble: %v", err)
}
+
defer reopened.Close()
seq, err := reopened.GetGlobalSeq()
+
if err != nil {
t.Fatalf("get global seq: %v", err)
}
+
if seq != 2 {
t.Fatalf("global seq mismatch after simulated crash: got %d want 2", seq)
}
+
if _, ok, err := reopened.GetOpSeqRef(3); err != nil {
t.Fatalf("get opseq 3: %v", err)
} else if ok {
diff --git a/internal/storage/pebble_store_batch_test.go b/internal/storage/pebble_store_batch_test.go
index 85afc92..742e7bf 100644
--- a/internal/storage/pebble_store_batch_test.go
+++ b/internal/storage/pebble_store_batch_test.go
@@ -3,16 +3,17 @@ package storage
import (
"testing"
"time"
-
"github.com/Fuwn/plutia/internal/types"
)
func TestPebbleStoreApplyOperationBatch(t *testing.T) {
tmp := t.TempDir()
store, err := OpenPebble(tmp)
+
if err != nil {
t.Fatalf("open pebble: %v", err)
}
+
defer store.Close()
state := types.StateV1{
@@ -39,35 +40,43 @@ func TestPebbleStoreApplyOperationBatch(t *testing.T) {
}
gotState, ok, err := store.GetState(state.DID)
+
if err != nil || !ok {
t.Fatalf("get state: ok=%v err=%v", ok, err)
}
+
if gotState.ChainTipHash != state.ChainTipHash || gotState.LatestOpSeq != state.LatestOpSeq {
t.Fatalf("state mismatch: got tip=%s seq=%d", gotState.ChainTipHash, gotState.LatestOpSeq)
}
head, ok, err := store.GetChainHead(state.DID)
+
if err != nil || !ok || head != state.LatestOpSeq {
t.Fatalf("chain head mismatch: head=%d ok=%v err=%v", head, ok, err)
}
seqs, err := store.ListDIDSequences(state.DID)
+
if err != nil {
t.Fatalf("list did sequences: %v", err)
}
+
if len(seqs) != 1 || seqs[0] != state.LatestOpSeq {
t.Fatalf("did sequence mismatch: %v", seqs)
}
gotRef, ok, err := store.GetOpSeqRef(state.LatestOpSeq)
+
if err != nil || !ok {
t.Fatalf("get opseq ref: ok=%v err=%v", ok, err)
}
+
if gotRef.BlockID != ref.BlockID || gotRef.CID != ref.CID {
t.Fatalf("op ref mismatch: got block=%d cid=%s", gotRef.BlockID, gotRef.CID)
}
globalSeq, err := store.GetGlobalSeq()
+
if err != nil || globalSeq != state.LatestOpSeq {
t.Fatalf("global seq mismatch: seq=%d err=%v", globalSeq, err)
}
diff --git a/internal/storage/store.go b/internal/storage/store.go
index 05ce5ae..5267386 100644
--- a/internal/storage/store.go
+++ b/internal/storage/store.go
@@ -14,33 +14,26 @@ type OperationMutation struct {
type Store interface {
Close() error
-
GetMode() (string, error)
SetMode(mode string) error
-
GetGlobalSeq() (uint64, error)
SetGlobalSeq(seq uint64) error
-
PutState(state types.StateV1) error
GetState(did string) (types.StateV1, bool, error)
ListStates() ([]types.StateV1, error)
ForEachState(fn func(types.StateV1) error) error
ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error
-
SetChainHead(did string, seq uint64) error
GetChainHead(did string) (uint64, bool, error)
AddDIDSequence(did string, seq uint64) error
ListDIDSequences(did string) ([]uint64, error)
-
PutOpSeqRef(seq uint64, ref types.BlockRefV1) error
GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error)
ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error
-
PutBlockHash(blockID uint64, hash string) error
GetBlockHash(blockID uint64) (string, bool, error)
ListBlockHashes() ([]string, error)
ListBlockHashEntries() ([]BlockHashEntry, error)
-
PutCheckpoint(cp types.CheckpointV1) error
GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error)
GetLatestCheckpoint() (types.CheckpointV1, bool, error)