diff options
Diffstat (limited to 'internal/storage')
| -rw-r--r-- | internal/storage/blocklog.go | 94 | ||||
| -rw-r--r-- | internal/storage/blocklog_test.go | 14 | ||||
| -rw-r--r-- | internal/storage/pebble_store.go | 107 | ||||
| -rw-r--r-- | internal/storage/pebble_store_batch_durability_test.go | 8 | ||||
| -rw-r--r-- | internal/storage/pebble_store_batch_test.go | 11 | ||||
| -rw-r--r-- | internal/storage/store.go | 7 |
6 files changed, 229 insertions, 12 deletions
diff --git a/internal/storage/blocklog.go b/internal/storage/blocklog.go index 879d3bd..e343b45 100644 --- a/internal/storage/blocklog.go +++ b/internal/storage/blocklog.go @@ -13,7 +13,6 @@ import ( "strings" "sync" "time" - "github.com/Fuwn/plutia/internal/types" "github.com/klauspost/compress/zstd" ) @@ -33,7 +32,6 @@ type BlockLog struct { dir string zstdLevel int targetSize int - mu sync.Mutex buf bytes.Buffer blockID uint64 @@ -41,13 +39,17 @@ type BlockLog struct { func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) { dir := filepath.Join(dataDir, "ops") + if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("mkdir ops: %w", err) } + nextID, err := detectNextBlockID(dir) + if err != nil { return nil, err } + return &BlockLog{ dir: dir, zstdLevel: zstdLevel, @@ -58,10 +60,12 @@ func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) { l.mu.Lock() + defer l.mu.Unlock() record := encodeRecord(canonical) offset := uint64(l.buf.Len()) + if _, err := l.buf.Write(record); err != nil { return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err) } @@ -82,16 +86,21 @@ func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) ( if l.buf.Len() < l.targetSize { return ref, nil, nil } + flush, err := l.flushLocked() + if err != nil { return types.BlockRefV1{}, nil, err } + return ref, flush, nil } func (l *BlockLog) Flush() (*FlushInfo, error) { l.mu.Lock() + defer l.mu.Unlock() + return l.flushLocked() } @@ -99,12 +108,16 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) { if l.buf.Len() == 0 { return nil, nil } + encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel) enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel)) + if err != nil { return nil, fmt.Errorf("zstd encoder: %w", err) } + compressed := enc.EncodeAll(l.buf.Bytes(), nil) + if err := enc.Close(); err != nil { return nil, fmt.Errorf("close zstd encoder: %w", err) } @@ -112,36 +125,50 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID)) tmpPath := path + ".tmp" f, err := os.Create(tmpPath) + if err != nil { return nil, fmt.Errorf("create temp block file: %w", err) } + half := len(compressed) / 2 + if _, err := f.Write(compressed[:half]); err != nil { _ = f.Close() + return nil, fmt.Errorf("write temp block first half: %w", err) } + if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 { time.Sleep(time.Duration(sleepMs) * time.Millisecond) } + if _, err := f.Write(compressed[half:]); err != nil { _ = f.Close() + return nil, fmt.Errorf("write temp block second half: %w", err) } + if err := f.Sync(); err != nil { _ = f.Close() + return nil, fmt.Errorf("sync temp block file: %w", err) } + if err := f.Close(); err != nil { return nil, fmt.Errorf("close temp block file: %w", err) } + if err := os.Rename(tmpPath, path); err != nil { return nil, fmt.Errorf("rename temp block file: %w", err) } + sum := sha256.Sum256(compressed) hash := hex.EncodeToString(sum[:]) l.blockID++ + l.buf.Reset() + return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil } @@ -151,56 +178,77 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) { RemovedOrphans: make([]uint64, 0), } entries, err := os.ReadDir(l.dir) + if err != nil { return nil, fmt.Errorf("read ops dir: %w", err) } + storedEntries, err := store.ListBlockHashEntries() + if err != nil { return nil, fmt.Errorf("list stored block hashes: %w", err) } + stored := make(map[uint64]string, len(storedEntries)) + for _, e := range storedEntries { stored[e.BlockID] = e.Hash } files := make(map[uint64]string) + for _, e := range entries { name := e.Name() + if strings.HasSuffix(name, ".tmp") { path := filepath.Join(l.dir, name) + if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove temp block file %s: %w", name, err) } + report.RemovedTempFiles = append(report.RemovedTempFiles, name) + continue } + if !strings.HasSuffix(name, ".zst") { continue } + base := strings.TrimSuffix(name, ".zst") id, err := strconv.Atoi(base) + if err != nil { continue } + files[uint64(id)] = filepath.Join(l.dir, name) } for id, path := range files { expected, ok := stored[id] + if !ok { if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove orphan block %d: %w", id, err) } + report.RemovedOrphans = append(report.RemovedOrphans, id) + continue } + actual, err := fileHash(path) + if err != nil { return nil, fmt.Errorf("hash block %d: %w", id, err) } + if actual != expected { return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual) } + report.VerifiedBlocks++ } @@ -216,115 +264,157 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) { func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID)) compressed, err := os.ReadFile(path) + if err != nil { return nil, fmt.Errorf("read block file: %w", err) } + dec, err := zstd.NewReader(nil) + if err != nil { return nil, fmt.Errorf("zstd reader: %w", err) } + decompressed, err := dec.DecodeAll(compressed, nil) + if err != nil { return nil, fmt.Errorf("decode block: %w", err) } + dec.Close() + if ref.Offset+ref.Length > uint64(len(decompressed)) { return nil, fmt.Errorf("record bounds out of range") } + record := decompressed[ref.Offset : ref.Offset+ref.Length] _, payload, err := decodeRecord(record) + if err != nil { return nil, err } + return payload, nil } func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID)) compressed, err := os.ReadFile(path) + if err != nil { return fmt.Errorf("read block file: %w", err) } + dec, err := zstd.NewReader(nil) + if err != nil { return fmt.Errorf("zstd reader: %w", err) } + decompressed, err := dec.DecodeAll(compressed, nil) + if err != nil { return fmt.Errorf("decode block: %w", err) } + dec.Close() var offset uint64 + for offset < uint64(len(decompressed)) { start := offset length, n := binary.Uvarint(decompressed[offset:]) + if n <= 0 { return fmt.Errorf("invalid varint at offset %d", offset) } + offset += uint64(n) + if offset+length > uint64(len(decompressed)) { return fmt.Errorf("record out of bounds at offset %d", start) } + payload := decompressed[offset : offset+length] offset += length + if err := fn(start, payload); err != nil { return err } } + return nil } func encodeRecord(payload []byte) []byte { var hdr [binary.MaxVarintLen64]byte + n := binary.PutUvarint(hdr[:], uint64(len(payload))) out := make([]byte, n+len(payload)) + copy(out, hdr[:n]) copy(out[n:], payload) + return out } func decodeRecord(b []byte) (uint64, []byte, error) { length, n := binary.Uvarint(b) + if n <= 0 { return 0, nil, fmt.Errorf("invalid varint record") } + if uint64(n)+length != uint64(len(b)) { return 0, nil, fmt.Errorf("record length mismatch") } + return length, b[n:], nil } func detectNextBlockID(dir string) (uint64, error) { entries, err := os.ReadDir(dir) + if err != nil { return 0, fmt.Errorf("read ops dir: %w", err) } + ids := make([]int, 0) + for _, e := range entries { name := e.Name() + if !strings.HasSuffix(name, ".zst") { continue } + base := strings.TrimSuffix(name, ".zst") n, err := strconv.Atoi(base) + if err != nil { continue } + ids = append(ids, n) } + if len(ids) == 0 { return 1, nil } + sort.Ints(ids) + return uint64(ids[len(ids)-1] + 1), nil } func fileHash(path string) (string, error) { b, err := os.ReadFile(path) + if err != nil { return "", err } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]), nil } diff --git a/internal/storage/blocklog_test.go b/internal/storage/blocklog_test.go index 23045cc..a71febf 100644 --- a/internal/storage/blocklog_test.go +++ b/internal/storage/blocklog_test.go @@ -9,6 +9,7 @@ import ( func TestBlockLogAppendReadFlush(t *testing.T) { tmp := t.TempDir() log, err := OpenBlockLog(tmp, 3, 4) + if err != nil { t.Fatalf("OpenBlockLog: %v", err) } @@ -16,40 +17,53 @@ func TestBlockLogAppendReadFlush(t *testing.T) { payload1 := []byte(`{"op":1}`) payload2 := []byte(`{"op":2}`) ref1, flush, err := log.Append(1, "did:plc:a", "cid1", "", payload1) + if err != nil { t.Fatalf("append 1: %v", err) } + if flush != nil { t.Fatalf("unexpected automatic flush") } + ref2, _, err := log.Append(2, "did:plc:a", "cid2", "cid1", payload2) + if err != nil { t.Fatalf("append 2: %v", err) } flushed, err := log.Flush() + if err != nil { t.Fatalf("flush: %v", err) } + if flushed == nil { t.Fatalf("expected flushed block") } + blockFile := filepath.Join(tmp, "ops", "000001.zst") + if _, err := os.Stat(blockFile); err != nil { t.Fatalf("expected block file: %v", err) } got1, err := log.ReadRecord(ref1) + if err != nil { t.Fatalf("read record1: %v", err) } + if string(got1) != string(payload1) { t.Fatalf("payload1 mismatch: got %s want %s", got1, payload1) } + got2, err := log.ReadRecord(ref2) + if err != nil { t.Fatalf("read record2: %v", err) } + if string(got2) != string(payload2) { t.Fatalf("payload2 mismatch: got %s want %s", got2, payload2) } diff --git a/internal/storage/pebble_store.go b/internal/storage/pebble_store.go index 109310f..55c04fb 100644 --- a/internal/storage/pebble_store.go +++ b/internal/storage/pebble_store.go @@ -6,7 +6,6 @@ import ( "fmt" "path/filepath" "sort" - "github.com/Fuwn/plutia/internal/types" "github.com/cockroachdb/pebble" ) @@ -20,9 +19,11 @@ type PebbleStore struct { func OpenPebble(dataDir string) (*PebbleStore, error) { indexDir := filepath.Join(dataDir, "index") db, err := pebble.Open(indexDir, &pebble.Options{}) + if err != nil { return nil, fmt.Errorf("open pebble: %w", err) } + return &PebbleStore{db: db}, nil } @@ -30,6 +31,7 @@ func (p *PebbleStore) Close() error { return p.db.Close() } func (p *PebbleStore) GetMode() (string, error) { v, ok, err := p.getString(metaKey("mode")) + return v, okOrErr(ok, err) } @@ -39,80 +41,103 @@ func (p *PebbleStore) SetMode(mode string) error { func (p *PebbleStore) GetGlobalSeq() (uint64, error) { v, ok, err := p.getUint64(metaKey("global_seq")) + if err != nil { return 0, err } + if !ok { return 0, nil } + return v, nil } func (p *PebbleStore) SetGlobalSeq(seq uint64) error { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, seq) + return p.db.Set(metaKey("global_seq"), b, pebble.Sync) } func (p *PebbleStore) PutState(state types.StateV1) error { b, err := json.Marshal(state) + if err != nil { return fmt.Errorf("marshal state: %w", err) } + return p.db.Set(didKey(state.DID), b, pebble.Sync) } func (p *PebbleStore) ApplyOperationBatch(state types.StateV1, ref *types.BlockRefV1, includeOpRef bool) error { var opRef *types.BlockRefV1 + if includeOpRef { opRef = ref } + return p.ApplyOperationsBatch([]OperationMutation{{State: state, Ref: opRef}}, nil) } func (p *PebbleStore) GetState(did string) (types.StateV1, bool, error) { b, ok, err := p.getBytes(didKey(did)) + if err != nil || !ok { return types.StateV1{}, ok, err } + var s types.StateV1 + if err := json.Unmarshal(b, &s); err != nil { return types.StateV1{}, false, fmt.Errorf("unmarshal state: %w", err) } + return s, true, nil } func (p *PebbleStore) ListStates() ([]types.StateV1, error) { states := make([]types.StateV1, 0) + if err := p.ForEachState(func(s types.StateV1) error { states = append(states, s) + return nil }); err != nil { return nil, err } + sort.Slice(states, func(i, j int) bool { return states[i].DID < states[j].DID }) + return states, nil } func (p *PebbleStore) ForEachState(fn func(types.StateV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("did:"), UpperBound: []byte("did;")}) + if err != nil { return fmt.Errorf("new iterator: %w", err) } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { var s types.StateV1 + if err := json.Unmarshal(iter.Value(), &s); err != nil { return fmt.Errorf("unmarshal state: %w", err) } + if err := fn(s); err != nil { return err } } + if err := iter.Error(); err != nil { return fmt.Errorf("iterate states: %w", err) } + return nil } @@ -120,31 +145,41 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes if len(ops) == 0 && len(blockHashes) == 0 { return nil } + batch := p.db.NewBatch() + defer batch.Close() for _, op := range ops { stateBytes, err := json.Marshal(op.State) + if err != nil { return fmt.Errorf("marshal state: %w", err) } + seqBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seqBytes, op.State.LatestOpSeq) if err := batch.Set(didKey(op.State.DID), stateBytes, nil); err != nil { return err } + if err := batch.Set(chainKey(op.State.DID), seqBytes, nil); err != nil { return err } + if err := batch.Set(didOpKey(op.State.DID, op.State.LatestOpSeq), []byte{1}, nil); err != nil { return err } + if op.Ref != nil { refBytes, err := json.Marshal(op.Ref) + if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } + if err := batch.Set(opSeqKey(op.State.LatestOpSeq), refBytes, nil); err != nil { return err } @@ -160,7 +195,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes if len(ops) > 0 { lastSeq := ops[len(ops)-1].State.LatestOpSeq seqBytes := make([]byte, 8) + binary.BigEndian.PutUint64(seqBytes, lastSeq) + if err := batch.Set(metaKey("global_seq"), seqBytes, nil); err != nil { return err } @@ -171,7 +208,9 @@ func (p *PebbleStore) ApplyOperationsBatch(ops []OperationMutation, blockHashes func (p *PebbleStore) SetChainHead(did string, seq uint64) error { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, seq) + return p.db.Set(chainKey(did), b, pebble.Sync) } @@ -187,71 +226,94 @@ func (p *PebbleStore) ListDIDSequences(did string) ([]uint64, error) { prefix := []byte("didop:" + did + ":") upper := append(append([]byte(nil), prefix...), 0xFF) iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: prefix, UpperBound: upper}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() seqs := make([]uint64, 0) + for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len(prefix)+8 { continue } + seq := binary.BigEndian.Uint64(key[len(prefix):]) seqs = append(seqs, seq) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate did op sequences: %w", err) } + sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) + return seqs, nil } func (p *PebbleStore) PutOpSeqRef(seq uint64, ref types.BlockRefV1) error { b, err := json.Marshal(ref) + if err != nil { return fmt.Errorf("marshal opseq ref: %w", err) } + return p.db.Set(opSeqKey(seq), b, pebble.Sync) } func (p *PebbleStore) GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) { b, ok, err := p.getBytes(opSeqKey(seq)) + if err != nil || !ok { return types.BlockRefV1{}, ok, err } + var ref types.BlockRefV1 + if err := json.Unmarshal(b, &ref); err != nil { return types.BlockRefV1{}, false, fmt.Errorf("unmarshal opseq ref: %w", err) } + return ref, true, nil } func (p *PebbleStore) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("opseq:"), UpperBound: []byte("opseq;")}) + if err != nil { return fmt.Errorf("new iterator: %w", err) } + defer iter.Close() for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len("opseq:")+8 { continue } + seq := binary.BigEndian.Uint64(key[len("opseq:"):]) + var ref types.BlockRefV1 + if err := json.Unmarshal(iter.Value(), &ref); err != nil { return fmt.Errorf("unmarshal opseq ref: %w", err) } + if err := fn(seq, ref); err != nil { return err } } + if err := iter.Error(); err != nil { return fmt.Errorf("iterate opseq refs: %w", err) } + return nil } @@ -265,127 +327,168 @@ func (p *PebbleStore) GetBlockHash(blockID uint64) (string, bool, error) { func (p *PebbleStore) ListBlockHashes() ([]string, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() hashes := make([]string, 0) + for iter.First(); iter.Valid(); iter.Next() { hashes = append(hashes, string(iter.Value())) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate blocks: %w", err) } + return hashes, nil } func (p *PebbleStore) ListBlockHashEntries() ([]BlockHashEntry, error) { iter, err := p.db.NewIter(&pebble.IterOptions{LowerBound: []byte("block:"), UpperBound: []byte("block;")}) + if err != nil { return nil, fmt.Errorf("new iterator: %w", err) } + defer iter.Close() out := make([]BlockHashEntry, 0) + for iter.First(); iter.Valid(); iter.Next() { key := iter.Key() + if len(key) < len("block:")+8 { continue } + id := binary.BigEndian.Uint64(key[len("block:"):]) out = append(out, BlockHashEntry{BlockID: id, Hash: string(iter.Value())}) } + if err := iter.Error(); err != nil { return nil, fmt.Errorf("iterate block entries: %w", err) } + sort.Slice(out, func(i, j int) bool { return out[i].BlockID < out[j].BlockID }) + return out, nil } func (p *PebbleStore) PutCheckpoint(cp types.CheckpointV1) error { b, err := json.Marshal(cp) + if err != nil { return fmt.Errorf("marshal checkpoint: %w", err) } + if err := p.db.Set(checkpointKey(cp.Sequence), b, pebble.Sync); err != nil { return err } + latest := make([]byte, 8) + binary.BigEndian.PutUint64(latest, cp.Sequence) + return p.db.Set(metaKey("latest_checkpoint"), latest, pebble.Sync) } func (p *PebbleStore) GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) { b, ok, err := p.getBytes(checkpointKey(sequence)) + if err != nil || !ok { return types.CheckpointV1{}, ok, err } + var cp types.CheckpointV1 + if err := json.Unmarshal(b, &cp); err != nil { return types.CheckpointV1{}, false, fmt.Errorf("unmarshal checkpoint: %w", err) } + return cp, true, nil } func (p *PebbleStore) GetLatestCheckpoint() (types.CheckpointV1, bool, error) { seq, ok, err := p.getUint64(metaKey("latest_checkpoint")) + if err != nil || !ok { return types.CheckpointV1{}, ok, err } + return p.GetCheckpoint(seq) } func (p *PebbleStore) getBytes(key []byte) ([]byte, bool, error) { v, closer, err := p.db.Get(key) + if err != nil { if err == pebble.ErrNotFound { return nil, false, nil } + return nil, false, err } + defer closer.Close() + return append([]byte(nil), v...), true, nil } func (p *PebbleStore) getString(key []byte) (string, bool, error) { b, ok, err := p.getBytes(key) + if err != nil || !ok { return "", ok, err } + return string(b), true, nil } func (p *PebbleStore) getUint64(key []byte) (uint64, bool, error) { b, ok, err := p.getBytes(key) + if err != nil || !ok { return 0, ok, err } + if len(b) != 8 { return 0, false, fmt.Errorf("invalid uint64 value length for %q", key) } + return binary.BigEndian.Uint64(b), true, nil } func didKey(did string) []byte { return []byte("did:" + did) } + func chainKey(did string) []byte { return []byte("chain:" + did) } + func metaKey(k string) []byte { return []byte("meta:" + k) } + func checkpointKey(s uint64) []byte { return append([]byte("checkpoint:"), u64bytes(s)...) } + func opSeqKey(s uint64) []byte { return append([]byte("opseq:"), u64bytes(s)...) } + func blockKey(id uint64) []byte { return append([]byte("block:"), u64bytes(id)...) } + func didOpKey(did string, seq uint64) []byte { return append([]byte("didop:"+did+":"), u64bytes(seq)...) } func u64bytes(v uint64) []byte { b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b } @@ -393,8 +496,10 @@ func okOrErr(ok bool, err error) error { if err != nil { return err } + if !ok { return nil } + return nil } diff --git a/internal/storage/pebble_store_batch_durability_test.go b/internal/storage/pebble_store_batch_durability_test.go index 9d881c3..256ddda 100644 --- a/internal/storage/pebble_store_batch_durability_test.go +++ b/internal/storage/pebble_store_batch_durability_test.go @@ -3,13 +3,13 @@ package storage import ( "testing" "time" - "github.com/Fuwn/plutia/internal/types" ) func TestApplyOperationsBatch_DurabilityBetweenBatches(t *testing.T) { tmp := t.TempDir() store, err := OpenPebble(tmp) + if err != nil { t.Fatalf("open pebble: %v", err) } @@ -40,24 +40,30 @@ func TestApplyOperationsBatch_DurabilityBetweenBatches(t *testing.T) { }, []BlockHashEntry{{BlockID: 1, Hash: "abc"}}); err != nil { t.Fatalf("apply first batch: %v", err) } + if err := store.Close(); err != nil { t.Fatalf("close store: %v", err) } // Simulate a crash before the second batch commit by reopening without applying it. reopened, err := OpenPebble(tmp) + if err != nil { t.Fatalf("reopen pebble: %v", err) } + defer reopened.Close() seq, err := reopened.GetGlobalSeq() + if err != nil { t.Fatalf("get global seq: %v", err) } + if seq != 2 { t.Fatalf("global seq mismatch after simulated crash: got %d want 2", seq) } + if _, ok, err := reopened.GetOpSeqRef(3); err != nil { t.Fatalf("get opseq 3: %v", err) } else if ok { diff --git a/internal/storage/pebble_store_batch_test.go b/internal/storage/pebble_store_batch_test.go index 85afc92..742e7bf 100644 --- a/internal/storage/pebble_store_batch_test.go +++ b/internal/storage/pebble_store_batch_test.go @@ -3,16 +3,17 @@ package storage import ( "testing" "time" - "github.com/Fuwn/plutia/internal/types" ) func TestPebbleStoreApplyOperationBatch(t *testing.T) { tmp := t.TempDir() store, err := OpenPebble(tmp) + if err != nil { t.Fatalf("open pebble: %v", err) } + defer store.Close() state := types.StateV1{ @@ -39,35 +40,43 @@ func TestPebbleStoreApplyOperationBatch(t *testing.T) { } gotState, ok, err := store.GetState(state.DID) + if err != nil || !ok { t.Fatalf("get state: ok=%v err=%v", ok, err) } + if gotState.ChainTipHash != state.ChainTipHash || gotState.LatestOpSeq != state.LatestOpSeq { t.Fatalf("state mismatch: got tip=%s seq=%d", gotState.ChainTipHash, gotState.LatestOpSeq) } head, ok, err := store.GetChainHead(state.DID) + if err != nil || !ok || head != state.LatestOpSeq { t.Fatalf("chain head mismatch: head=%d ok=%v err=%v", head, ok, err) } seqs, err := store.ListDIDSequences(state.DID) + if err != nil { t.Fatalf("list did sequences: %v", err) } + if len(seqs) != 1 || seqs[0] != state.LatestOpSeq { t.Fatalf("did sequence mismatch: %v", seqs) } gotRef, ok, err := store.GetOpSeqRef(state.LatestOpSeq) + if err != nil || !ok { t.Fatalf("get opseq ref: ok=%v err=%v", ok, err) } + if gotRef.BlockID != ref.BlockID || gotRef.CID != ref.CID { t.Fatalf("op ref mismatch: got block=%d cid=%s", gotRef.BlockID, gotRef.CID) } globalSeq, err := store.GetGlobalSeq() + if err != nil || globalSeq != state.LatestOpSeq { t.Fatalf("global seq mismatch: seq=%d err=%v", globalSeq, err) } diff --git a/internal/storage/store.go b/internal/storage/store.go index 05ce5ae..5267386 100644 --- a/internal/storage/store.go +++ b/internal/storage/store.go @@ -14,33 +14,26 @@ type OperationMutation struct { type Store interface { Close() error - GetMode() (string, error) SetMode(mode string) error - GetGlobalSeq() (uint64, error) SetGlobalSeq(seq uint64) error - PutState(state types.StateV1) error GetState(did string) (types.StateV1, bool, error) ListStates() ([]types.StateV1, error) ForEachState(fn func(types.StateV1) error) error ApplyOperationsBatch(ops []OperationMutation, blockHashes []BlockHashEntry) error - SetChainHead(did string, seq uint64) error GetChainHead(did string) (uint64, bool, error) AddDIDSequence(did string, seq uint64) error ListDIDSequences(did string) ([]uint64, error) - PutOpSeqRef(seq uint64, ref types.BlockRefV1) error GetOpSeqRef(seq uint64) (types.BlockRefV1, bool, error) ForEachOpSeqRef(fn func(seq uint64, ref types.BlockRefV1) error) error - PutBlockHash(blockID uint64, hash string) error GetBlockHash(blockID uint64) (string, bool, error) ListBlockHashes() ([]string, error) ListBlockHashEntries() ([]BlockHashEntry, error) - PutCheckpoint(cp types.CheckpointV1) error GetCheckpoint(sequence uint64) (types.CheckpointV1, bool, error) GetLatestCheckpoint() (types.CheckpointV1, bool, error) |