package storage import ( "bytes" "crypto/sha256" "encoding/binary" "encoding/hex" "fmt" "github.com/Fuwn/plutia/internal/types" "github.com/klauspost/compress/zstd" "os" "path/filepath" "sort" "strconv" "strings" "sync" "time" ) type FlushInfo struct { BlockID uint64 Hash string } type IntegrityReport struct { VerifiedBlocks int RemovedTempFiles []string RemovedOrphans []uint64 } type BlockLog struct { dir string zstdLevel int targetSize int mu sync.Mutex buf bytes.Buffer blockID uint64 } func OpenBlockLog(dataDir string, zstdLevel int, targetMB int) (*BlockLog, error) { dir := filepath.Join(dataDir, "ops") if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("mkdir ops: %w", err) } nextID, err := detectNextBlockID(dir) if err != nil { return nil, err } return &BlockLog{ dir: dir, zstdLevel: zstdLevel, targetSize: targetMB * 1024 * 1024, blockID: nextID, }, nil } func (l *BlockLog) Append(seq uint64, did, cid, prev string, canonical []byte) (types.BlockRefV1, *FlushInfo, error) { l.mu.Lock() defer l.mu.Unlock() record := encodeRecord(canonical) offset := uint64(l.buf.Len()) if _, err := l.buf.Write(record); err != nil { return types.BlockRefV1{}, nil, fmt.Errorf("buffer write: %w", err) } ref := types.BlockRefV1{ Version: 1, BlockID: l.blockID, Offset: offset, Length: uint64(len(record)), OpSeq: seq, DID: did, CID: cid, PrevCID: prev, OpHash: types.ComputeDigestCID(canonical), Received: "", } if l.buf.Len() < l.targetSize { return ref, nil, nil } flush, err := l.flushLocked() if err != nil { return types.BlockRefV1{}, nil, err } return ref, flush, nil } func (l *BlockLog) Flush() (*FlushInfo, error) { l.mu.Lock() defer l.mu.Unlock() return l.flushLocked() } func (l *BlockLog) flushLocked() (*FlushInfo, error) { if l.buf.Len() == 0 { return nil, nil } encLevel := zstd.EncoderLevelFromZstd(l.zstdLevel) enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(encLevel)) if err != nil { return nil, fmt.Errorf("zstd encoder: %w", err) } compressed := enc.EncodeAll(l.buf.Bytes(), nil) if err := enc.Close(); err != nil { return nil, fmt.Errorf("close zstd encoder: %w", err) } path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", l.blockID)) tmpPath := path + ".tmp" f, err := os.Create(tmpPath) if err != nil { return nil, fmt.Errorf("create temp block file: %w", err) } half := len(compressed) / 2 if _, err := f.Write(compressed[:half]); err != nil { _ = f.Close() return nil, fmt.Errorf("write temp block first half: %w", err) } if sleepMs, err := strconv.Atoi(strings.TrimSpace(os.Getenv("PLUTIA_TEST_SLOW_FLUSH_MS"))); err == nil && sleepMs > 0 { time.Sleep(time.Duration(sleepMs) * time.Millisecond) } if _, err := f.Write(compressed[half:]); err != nil { _ = f.Close() return nil, fmt.Errorf("write temp block second half: %w", err) } if err := f.Sync(); err != nil { _ = f.Close() return nil, fmt.Errorf("sync temp block file: %w", err) } if err := f.Close(); err != nil { return nil, fmt.Errorf("close temp block file: %w", err) } if err := os.Rename(tmpPath, path); err != nil { return nil, fmt.Errorf("rename temp block file: %w", err) } sum := sha256.Sum256(compressed) hash := hex.EncodeToString(sum[:]) l.blockID++ l.buf.Reset() return &FlushInfo{BlockID: l.blockID - 1, Hash: hash}, nil } func (l *BlockLog) ValidateAndRecover(store Store) (*IntegrityReport, error) { report := &IntegrityReport{ RemovedTempFiles: make([]string, 0), RemovedOrphans: make([]uint64, 0), } entries, err := os.ReadDir(l.dir) if err != nil { return nil, fmt.Errorf("read ops dir: %w", err) } storedEntries, err := store.ListBlockHashEntries() if err != nil { return nil, fmt.Errorf("list stored block hashes: %w", err) } stored := make(map[uint64]string, len(storedEntries)) for _, e := range storedEntries { stored[e.BlockID] = e.Hash } files := make(map[uint64]string) for _, e := range entries { name := e.Name() if strings.HasSuffix(name, ".tmp") { path := filepath.Join(l.dir, name) if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove temp block file %s: %w", name, err) } report.RemovedTempFiles = append(report.RemovedTempFiles, name) continue } if !strings.HasSuffix(name, ".zst") { continue } base := strings.TrimSuffix(name, ".zst") id, err := strconv.Atoi(base) if err != nil { continue } files[uint64(id)] = filepath.Join(l.dir, name) } for id, path := range files { expected, ok := stored[id] if !ok { if err := os.Remove(path); err != nil { return nil, fmt.Errorf("remove orphan block %d: %w", id, err) } report.RemovedOrphans = append(report.RemovedOrphans, id) continue } actual, err := fileHash(path) if err != nil { return nil, fmt.Errorf("hash block %d: %w", id, err) } if actual != expected { return nil, fmt.Errorf("block hash mismatch block=%d expected=%s got=%s", id, expected, actual) } report.VerifiedBlocks++ } for _, e := range storedEntries { if _, ok := files[e.BlockID]; !ok { return nil, fmt.Errorf("missing block file for indexed block %d", e.BlockID) } } return report, nil } func (l *BlockLog) ReadRecord(ref types.BlockRefV1) ([]byte, error) { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", ref.BlockID)) compressed, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("read block file: %w", err) } dec, err := zstd.NewReader(nil) if err != nil { return nil, fmt.Errorf("zstd reader: %w", err) } decompressed, err := dec.DecodeAll(compressed, nil) if err != nil { return nil, fmt.Errorf("decode block: %w", err) } dec.Close() if ref.Offset+ref.Length > uint64(len(decompressed)) { return nil, fmt.Errorf("record bounds out of range") } record := decompressed[ref.Offset : ref.Offset+ref.Length] _, payload, err := decodeRecord(record) if err != nil { return nil, err } return payload, nil } func (l *BlockLog) IterateBlockRecords(blockID uint64, fn func(offset uint64, payload []byte) error) error { path := filepath.Join(l.dir, fmt.Sprintf("%06d.zst", blockID)) compressed, err := os.ReadFile(path) if err != nil { return fmt.Errorf("read block file: %w", err) } dec, err := zstd.NewReader(nil) if err != nil { return fmt.Errorf("zstd reader: %w", err) } decompressed, err := dec.DecodeAll(compressed, nil) if err != nil { return fmt.Errorf("decode block: %w", err) } dec.Close() var offset uint64 for offset < uint64(len(decompressed)) { start := offset length, n := binary.Uvarint(decompressed[offset:]) if n <= 0 { return fmt.Errorf("invalid varint at offset %d", offset) } offset += uint64(n) if offset+length > uint64(len(decompressed)) { return fmt.Errorf("record out of bounds at offset %d", start) } payload := decompressed[offset : offset+length] offset += length if err := fn(start, payload); err != nil { return err } } return nil } func encodeRecord(payload []byte) []byte { var hdr [binary.MaxVarintLen64]byte n := binary.PutUvarint(hdr[:], uint64(len(payload))) out := make([]byte, n+len(payload)) copy(out, hdr[:n]) copy(out[n:], payload) return out } func decodeRecord(b []byte) (uint64, []byte, error) { length, n := binary.Uvarint(b) if n <= 0 { return 0, nil, fmt.Errorf("invalid varint record") } if uint64(n)+length != uint64(len(b)) { return 0, nil, fmt.Errorf("record length mismatch") } return length, b[n:], nil } func detectNextBlockID(dir string) (uint64, error) { entries, err := os.ReadDir(dir) if err != nil { return 0, fmt.Errorf("read ops dir: %w", err) } ids := make([]int, 0) for _, e := range entries { name := e.Name() if !strings.HasSuffix(name, ".zst") { continue } base := strings.TrimSuffix(name, ".zst") n, err := strconv.Atoi(base) if err != nil { continue } ids = append(ids, n) } if len(ids) == 0 { return 1, nil } sort.Ints(ids) return uint64(ids[len(ids)-1] + 1), nil } func fileHash(path string) (string, error) { b, err := os.ReadFile(path) if err != nil { return "", err } sum := sha256.Sum256(b) return hex.EncodeToString(sum[:]), nil }