aboutsummaryrefslogtreecommitdiff
path: root/internal/storage/blocklog.go
diff options
context:
space:
mode:
Diffstat (limited to 'internal/storage/blocklog.go')
-rw-r--r--internal/storage/blocklog.go94
1 files changed, 92 insertions, 2 deletions
diff --git a/internal/storage/blocklog.go b/internal/storage/blocklog.go
index 879d3bd..e343b45 100644
--- a/internal/storage/blocklog.go
+++ b/internal/storage/blocklog.go
@@ -13,7 +13,6 @@ import (
"strings"
"sync"
"time"
-
"github.com/Fuwn/plutia/internal/types"
"github.com/klauspost/compress/zstd"
)
@@ -33,7 +32,6 @@ type BlockLog struct {
dir string
zstdLevel int
targetSize int
-
mu sync.Mutex
buf bytes.Buffer
blockID uint64
@@ -41,13 +39,17 @@ type BlockLog struct {
func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) {
dir := filepath.Join(dataDir, "ops")
+
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("mkdir ops: %w", err)
}
+
nextID, err := detectNextBlockID(dir)
+
if err != nil {
return nil, err
}
+
return &BlockLog{
dir: dir,
zstdLevel: zstdLevel,
@@ -58,10 +60,12 @@ func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error
func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) {
l.mu.Lock()
+
defer l.mu.Unlock()
record := encodeRecord(canonical)
offset := uint64(l.buf.Len())
+
if _, err := l.buf.Write(record); err != nil {
return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err)
}
@@ -82,16 +86,21 @@ func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (
if l.buf.Len() < l.targetSize {
return ref, nil, nil
}
+
flush, err := l.flushLocked()
+
if err != nil {
return types.BlockRefV1{}, nil, err
}
+
return ref, flush, nil
}
func (l *BlockLog) Flush() (*FlushInfo, error) {
l.mu.Lock()
+
defer l.mu.Unlock()
+
return l.flushLocked()
}
@@ -99,12 +108,16 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) {
if l.buf.Len() == 0 {
return nil, nil
}
+
encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel)
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel))
+
if err != nil {
return nil, fmt.Errorf("zstd encoder: %w", err)
}
+
compressed := enc.EncodeAll(l.buf.Bytes(), nil)
+
if err := enc.Close(); err != nil {
return nil, fmt.Errorf("close zstd encoder: %w", err)
}
@@ -112,36 +125,50 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID))
tmpPath := path + ".tmp"
f, err := os.Create(tmpPath)
+
if err != nil {
return nil, fmt.Errorf("create temp block file: %w", err)
}
+
half := len(compressed) / 2
+
if _, err := f.Write(compressed[:half]); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("write temp block first half: %w", err)
}
+
if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 {
time.Sleep(time.Duration(sleepMs) * time.Millisecond)
}
+
if _, err := f.Write(compressed[half:]); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("write temp block second half: %w", err)
}
+
if err := f.Sync(); err != nil {
_ = f.Close()
+
return nil, fmt.Errorf("sync temp block file: %w", err)
}
+
if err := f.Close(); err != nil {
return nil, fmt.Errorf("close temp block file: %w", err)
}
+
if err := os.Rename(tmpPath, path); err != nil {
return nil, fmt.Errorf("rename temp block file: %w", err)
}
+
sum := sha256.Sum256(compressed)
hash := hex.EncodeToString(sum[:])
l.blockID++
+
l.buf.Reset()
+
return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil
}
@@ -151,56 +178,77 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) {
RemovedOrphans: make([]uint64, 0),
}
entries, err := os.ReadDir(l.dir)
+
if err != nil {
return nil, fmt.Errorf("read ops dir: %w", err)
}
+
storedEntries, err := store.ListBlockHashEntries()
+
if err != nil {
return nil, fmt.Errorf("list stored block hashes: %w", err)
}
+
stored := make(map[uint64]string, len(storedEntries))
+
for _, e := range storedEntries {
stored[e.BlockID] = e.Hash
}
files := make(map[uint64]string)
+
for _, e := range entries {
name := e.Name()
+
if strings.HasSuffix(name, ".tmp") {
path := filepath.Join(l.dir, name)
+
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("remove temp block file %s: %w", name, err)
}
+
report.RemovedTempFiles = append(report.RemovedTempFiles, name)
+
continue
}
+
if !strings.HasSuffix(name, ".zst") {
continue
}
+
base := strings.TrimSuffix(name, ".zst")
id, err := strconv.Atoi(base)
+
if err != nil {
continue
}
+
files[uint64(id)] = filepath.Join(l.dir, name)
}
for id, path := range files {
expected, ok := stored[id]
+
if !ok {
if err := os.Remove(path); err != nil {
return nil, fmt.Errorf("remove orphan block %d: %w", id, err)
}
+
report.RemovedOrphans = append(report.RemovedOrphans, id)
+
continue
}
+
actual, err := fileHash(path)
+
if err != nil {
return nil, fmt.Errorf("hash block %d: %w", id, err)
}
+
if actual != expected {
return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual)
}
+
report.VerifiedBlocks++
}
@@ -216,115 +264,157 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) {
func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID))
compressed, err := os.ReadFile(path)
+
if err != nil {
return nil, fmt.Errorf("read block file: %w", err)
}
+
dec, err := zstd.NewReader(nil)
+
if err != nil {
return nil, fmt.Errorf("zstd reader: %w", err)
}
+
decompressed, err := dec.DecodeAll(compressed, nil)
+
if err != nil {
return nil, fmt.Errorf("decode block: %w", err)
}
+
dec.Close()
+
if ref.Offset+ref.Length > uint64(len(decompressed)) {
return nil, fmt.Errorf("record bounds out of range")
}
+
record := decompressed[ref.Offset : ref.Offset+ref.Length]
_, payload, err := decodeRecord(record)
+
if err != nil {
return nil, err
}
+
return payload, nil
}
func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error {
path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID))
compressed, err := os.ReadFile(path)
+
if err != nil {
return fmt.Errorf("read block file: %w", err)
}
+
dec, err := zstd.NewReader(nil)
+
if err != nil {
return fmt.Errorf("zstd reader: %w", err)
}
+
decompressed, err := dec.DecodeAll(compressed, nil)
+
if err != nil {
return fmt.Errorf("decode block: %w", err)
}
+
dec.Close()
var offset uint64
+
for offset < uint64(len(decompressed)) {
start := offset
length, n := binary.Uvarint(decompressed[offset:])
+
if n <= 0 {
return fmt.Errorf("invalid varint at offset %d", offset)
}
+
offset += uint64(n)
+
if offset+length > uint64(len(decompressed)) {
return fmt.Errorf("record out of bounds at offset %d", start)
}
+
payload := decompressed[offset : offset+length]
offset += length
+
if err := fn(start, payload); err != nil {
return err
}
}
+
return nil
}
func encodeRecord(payload []byte) []byte {
var hdr [binary.MaxVarintLen64]byte
+
n := binary.PutUvarint(hdr[:], uint64(len(payload)))
out := make([]byte, n+len(payload))
+
copy(out, hdr[:n])
copy(out[n:], payload)
+
return out
}
func decodeRecord(b []byte) (uint64, []byte, error) {
length, n := binary.Uvarint(b)
+
if n <= 0 {
return 0, nil, fmt.Errorf("invalid varint record")
}
+
if uint64(n)+length != uint64(len(b)) {
return 0, nil, fmt.Errorf("record length mismatch")
}
+
return length, b[n:], nil
}
func detectNextBlockID(dir string) (uint64, error) {
entries, err := os.ReadDir(dir)
+
if err != nil {
return 0, fmt.Errorf("read ops dir: %w", err)
}
+
ids := make([]int, 0)
+
for _, e := range entries {
name := e.Name()
+
if !strings.HasSuffix(name, ".zst") {
continue
}
+
base := strings.TrimSuffix(name, ".zst")
n, err := strconv.Atoi(base)
+
if err != nil {
continue
}
+
ids = append(ids, n)
}
+
if len(ids) == 0 {
return 1, nil
}
+
sort.Ints(ids)
+
return uint64(ids[len(ids)-1] + 1), nil
}
func fileHash(path string) (string, error) {
b, err := os.ReadFile(path)
+
if err != nil {
return "", err
}
+
sum := sha256.Sum256(b)
+
return hex.EncodeToString(sum[:]), nil
}