diff options
Diffstat (limited to 'internal/storage/blocklog.go')
| -rw-r--r-- | internal/storage/blocklog.go | 94 |
1 files changed, 92 insertions, 2 deletions
diff --git a/internal/storage/blocklog.go b/internal/storage/blocklog.go index 879d3bd..e343b45 100644 --- a/internal/storage/blocklog.go +++ b/internal/storage/blocklog.go @@ -13,7 +13,6 @@ import ( "strings" "sync" "time" - "github.com/Fuwn/plutia/internal/types" "github.com/klauspost/compress/zstd" ) @@ -33,7 +32,6 @@ type BlockLog struct { dir string zstdLevel int targetSize int - mu sync.Mutex buf bytes.Buffer blockID uint64 @@ -41,13 +39,17 @@ type BlockLog struct { func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) { dir := filepath.Join(dataDir, "ops") + if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("mkdir ops: %w", err) } + nextID, err := detectNextBlockID(dir) + if err != nil { return nil, err } + return &BlockLog{ dir: dir, zstdLevel: zstdLevel, @@ -58,10 +60,12 @@ func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) { l.mu.Lock() + defer l.mu.Unlock() record := encodeRecord(canonical) offset := uint64(l.buf.Len()) + if _, err := l.buf.Write(record); err != nil { return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err) } @@ -82,16 +86,21 @@ func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) ( if l.buf.Len() < l.targetSize { return ref, nil, nil } + flush, err := l.flushLocked() + if err != nil { return types.BlockRefV1{}, nil, err } + return ref, flush, nil } func (l *BlockLog) Flush() (*FlushInfo, error) { l.mu.Lock() + defer l.mu.Unlock() + return l.flushLocked() } @@ -99,12 +108,16 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) { if l.buf.Len() == 0 { return nil, nil } + encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel) enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel)) + if err != nil { return nil, fmt.Errorf("zstd encoder: %w", err) } + compressed := enc.EncodeAll(l.buf.Bytes(), nil) + if err := enc.Close(); err != nil { return nil, fmt.Errorf("close zstd encoder: %w", err) } @@ -112,36 +125,50 @@ func (l *BlockLog) flushLocked() (*FlushInfo, error) { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID)) tmpPath := path + ".tmp" f, err := os.Create(tmpPath) + if err != nil { return nil, fmt.Errorf("create temp block file: %w", err) } + half := len(compressed) / 2 + if _, err := f.Write(compressed[:half]); err != nil { _ = f.Close() + return nil, fmt.Errorf("write temp block first half: %w", err) } + if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 { time.Sleep(time.Duration(sleepMs) * time.Millisecond) } + if _, err := f.Write(compressed[half:]); err != nil { _ = f.Close() + return nil, fmt.Errorf("write temp block second half: %w", err) } + if err := f.Sync(); err != nil { _ = f.Close() + return nil, fmt.Errorf("sync temp block file: %w", err) } + if err := f.Close(); err != nil { return nil, fmt.Errorf("close temp block file: %w", err) } + if err := os.Rename(tmpPath, path); err != nil { return nil, fmt.Errorf("rename temp block file: %w", err) } + sum := sha256.Sum256(compressed) hash := hex.EncodeToString(sum[:]) l.blockID++ + l.buf.Reset() + return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil } @@ -151,56 +178,77 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) { RemovedOrphans: make([]uint64, 0), } entries, err := os.ReadDir(l.dir) + if err != nil { return nil, fmt.Errorf("read ops dir: %w", err) } + storedEntries, err := store.ListBlockHashEntries() + if err != nil { return nil, fmt.Errorf("list stored block hashes: %w", err) } + stored := make(map[uint64]string, len(storedEntries)) + for _, e := range storedEntries { stored[e.BlockID] = e.Hash } files := make(map[uint64]string) + for _, e := range entries { name := e.Name() + if strings.HasSuffix(name, ".tmp") { path := filepath.Join(l.dir, name) + if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove temp block file %s: %w", name, err) } + report.RemovedTempFiles = append(report.RemovedTempFiles, name) + continue } + if !strings.HasSuffix(name, ".zst") { continue } + base := strings.TrimSuffix(name, ".zst") id, err := strconv.Atoi(base) + if err != nil { continue } + files[uint64(id)] = filepath.Join(l.dir, name) } for id, path := range files { expected, ok := stored[id] + if !ok { if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove orphan block %d: %w", id, err) } + report.RemovedOrphans = append(report.RemovedOrphans, id) + continue } + actual, err := fileHash(path) + if err != nil { return nil, fmt.Errorf("hash block %d: %w", id, err) } + if actual != expected { return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual) } + report.VerifiedBlocks++ } @@ -216,115 +264,157 @@ func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) { func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID)) compressed, err := os.ReadFile(path) + if err != nil { return nil, fmt.Errorf("read block file: %w", err) } + dec, err := zstd.NewReader(nil) + if err != nil { return nil, fmt.Errorf("zstd reader: %w", err) } + decompressed, err := dec.DecodeAll(compressed, nil) + if err != nil { return nil, fmt.Errorf("decode block: %w", err) } + dec.Close() + if ref.Offset+ref.Length > uint64(len(decompressed)) { return nil, fmt.Errorf("record bounds out of range") } + record := decompressed[ref.Offset : ref.Offset+ref.Length] _, payload, err := decodeRecord(record) + if err != nil { return nil, err } + return payload, nil } func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID)) compressed, err := os.ReadFile(path) + if err != nil { return fmt.Errorf("read block file: %w", err) } + dec, err := zstd.NewReader(nil) + if err != nil { return fmt.Errorf("zstd reader: %w", err) } + decompressed, err := dec.DecodeAll(compressed, nil) + if err != nil { return fmt.Errorf("decode block: %w", err) } + dec.Close() var offset uint64 + for offset < uint64(len(decompressed)) { start := offset length, n := binary.Uvarint(decompressed[offset:]) + if n <= 0 { return fmt.Errorf("invalid varint at offset %d", offset) } + offset += uint64(n) + if offset+length > uint64(len(decompressed)) { return fmt.Errorf("record out of bounds at offset %d", start) } + payload := decompressed[offset : offset+length] offset += length + if err := fn(start, payload); err != nil { return err } } + return nil } func encodeRecord(payload []byte) []byte { var hdr [binary.MaxVarintLen64]byte + n := binary.PutUvarint(hdr[:], uint64(len(payload))) out := make([]byte, n+len(payload)) + copy(out, hdr[:n]) copy(out[n:], payload) + return out } func decodeRecord(b []byte) (uint64, []byte, error) { length, n := binary.Uvarint(b) + if n <= 0 { return 0, nil, fmt.Errorf("invalid varint record") } + if uint64(n)+length != uint64(len(b)) { return 0, nil, fmt.Errorf("record length mismatch") } + return length, b[n:], nil } func detectNextBlockID(dir string) (uint64, error) { entries, err := os.ReadDir(dir) + if err != nil { return 0, fmt.Errorf("read ops dir: %w", err) } + ids := make([]int, 0) + for _, e := range entries { name := e.Name() + if !strings.HasSuffix(name, ".zst") { continue } + base := strings.TrimSuffix(name, ".zst") n, err := strconv.Atoi(base) + if err != nil { continue } + ids = append(ids, n) } + if len(ids) == 0 { return 1, nil } + sort.Ints(ids) + return uint64(ids[len(ids)-1] + 1), nil } func fileHash(path string) (string, error) { b, err := os.ReadFile(path) + if err != nil { return "", err } + sum := sha256.Sum256(b) + return hex.EncodeToString(sum[:]), nil } |