aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 14:46:02 -0800
committerFuwn <[email protected]>2026-02-26 14:48:52 -0800
commit0099d621e97b6048971fadb5c71918cc9f2b5a09 (patch)
treea38ba31585200bacd61f453ef7158de7f0aaf7a3 /cmd
parentInitial commit (diff)
downloadplutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.tar.xz
plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.zip
feat: initial Plutia release — verifiable high-performance PLC mirror (mirror + resolver modes)
Diffstat (limited to 'cmd')
-rw-r--r--cmd/plutia/main.go383
1 files changed, 383 insertions, 0 deletions
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go
new file mode 100644
index 0000000..0849e0a
--- /dev/null
+++ b/cmd/plutia/main.go
@@ -0,0 +1,383 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+ "os/exec"
+ "os/signal"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/Fuwn/plutia/internal/api"
+ "github.com/Fuwn/plutia/internal/checkpoint"
+ "github.com/Fuwn/plutia/internal/config"
+ "github.com/Fuwn/plutia/internal/ingest"
+ "github.com/Fuwn/plutia/internal/storage"
+)
+
+type app struct {
+ cfg config.Config
+ store *storage.PebbleStore
+ service *ingest.Service
+ apiServer *api.Server
+ checkpointM *checkpoint.Manager
+}
+
+func main() {
+ if len(os.Args) < 2 {
+ usage()
+ os.Exit(2)
+ }
+ cmd := os.Args[1]
+ switch cmd {
+ case "serve":
+ if err := runServe(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "replay":
+ if err := runReplay(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "verify":
+ if err := runVerify(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "snapshot":
+ if err := runSnapshot(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "bench":
+ if err := runBench(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ default:
+ usage()
+ os.Exit(2)
+ }
+}
+
+func runServe(args []string) error {
+ fs := flag.NewFlagSet("serve", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+ defer stop()
+
+ if err := app.service.Replay(ctx); err != nil {
+ if app.service.IsCorrupted() {
+ log.Printf("starting in read-only corrupted mode: %v", err)
+ } else {
+ return fmt.Errorf("initial replay failed: %w", err)
+ }
+ }
+
+ httpSrv := &http.Server{
+ Addr: app.cfg.ListenAddr,
+ Handler: app.apiServer.Handler(),
+ }
+
+ errCh := make(chan error, 2)
+ go func() {
+ log.Printf("HTTP server listening on %s", app.cfg.ListenAddr)
+ if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ errCh <- err
+ }
+ }()
+ if !app.service.IsCorrupted() {
+ go func() {
+ if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) {
+ errCh <- err
+ }
+ }()
+ }
+
+ select {
+ case <-ctx.Done():
+ case err := <-errCh:
+ return err
+ }
+
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ defer cancel()
+ if err := app.service.Flush(shutdownCtx); err != nil {
+ return err
+ }
+ return httpSrv.Shutdown(shutdownCtx)
+}
+
+func runReplay(args []string) error {
+ fs := flag.NewFlagSet("replay", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ ctx := context.Background()
+ if err := app.service.Replay(ctx); err != nil {
+ return err
+ }
+ if err := app.service.Flush(ctx); err != nil {
+ return err
+ }
+ seq, _ := app.store.GetGlobalSeq()
+ fmt.Printf("replay complete at sequence %d\n", seq)
+ return nil
+}
+
+func runVerify(args []string) error {
+ fs := flag.NewFlagSet("verify", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ did := fs.String("did", "", "did to verify")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if *did == "" {
+ return fmt.Errorf("--did is required")
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+
+ if err := app.service.VerifyDID(context.Background(), *did); err != nil {
+ return err
+ }
+ fmt.Printf("verification succeeded for %s\n", *did)
+ return nil
+}
+
+func runSnapshot(args []string) error {
+ fs := flag.NewFlagSet("snapshot", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ cp, err := app.service.Snapshot(context.Background())
+ if err != nil {
+ return err
+ }
+ fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash)
+ return nil
+}
+
+func runBench(args []string) error {
+ fs := flag.NewFlagSet("bench", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark")
+ interval := fs.Duration("interval", 10*time.Second, "rolling report interval")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+ app.service.SetMaxOps(*maxOps)
+
+ startSeq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ return err
+ }
+ start := time.Now()
+ lastSeq := startSeq
+ pid := os.Getpid()
+
+ done := make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(*interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-done:
+ return
+ case <-ticker.C:
+ seq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ continue
+ }
+ delta := seq - lastSeq
+ lastSeq = seq
+ opsSec := float64(delta) / interval.Seconds()
+ cpuPct, rssKB, _ := processMetrics(pid)
+ fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB)
+ }
+ }
+ }()
+
+ ctx := context.Background()
+ replayErr := app.service.Replay(ctx)
+ close(done)
+ if replayErr != nil {
+ return replayErr
+ }
+ if err := app.service.Flush(ctx); err != nil {
+ return err
+ }
+
+ endSeq, err := app.store.GetGlobalSeq()
+ if err != nil {
+ return err
+ }
+ elapsed := time.Since(start)
+ totalOps := endSeq - startSeq
+ totalOpsSec := float64(totalOps) / elapsed.Seconds()
+ cpuPct, rssKB, _ := processMetrics(pid)
+ totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir)
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB)
+ fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n",
+ totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps)))
+ return nil
+}
+
+func bootstrap(path string) (*app, error) {
+ cfg, err := config.Load(path)
+ if err != nil {
+ return nil, err
+ }
+ for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} {
+ if err := os.MkdirAll(p, 0o755); err != nil {
+ return nil, fmt.Errorf("mkdir %s: %w", p, err)
+ }
+ }
+ store, err := storage.OpenPebble(cfg.DataDir)
+ if err != nil {
+ return nil, err
+ }
+ mode, err := store.GetMode()
+ if err != nil {
+ return nil, err
+ }
+ if mode == "" {
+ if err := store.SetMode(cfg.Mode); err != nil {
+ return nil, err
+ }
+ } else if mode != cfg.Mode {
+ return nil, fmt.Errorf("mode mismatch: store=%s config=%s", mode, cfg.Mode)
+ }
+
+ var blockLog *storage.BlockLog
+ if cfg.Mode == config.ModeMirror {
+ blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB)
+ if err != nil {
+ return nil, err
+ }
+ }
+ client := ingest.NewClient(cfg.PLCSource)
+ checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath)
+ service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr)
+ apiServer := api.NewServer(cfg, store, service, checkpointMgr)
+ return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil
+}
+
+func usage() {
+ fmt.Fprintf(os.Stderr, `plutia: verifiable PLC mirror
+
+Commands:
+ plutia serve --config=config.yaml [--max-ops=0]
+ plutia replay --config=config.yaml [--max-ops=0]
+ plutia verify --config=config.yaml --did=did:plc:xyz
+ plutia snapshot --config=config.yaml
+ plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s]
+`)
+}
+
+func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
+ out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+ if err != nil {
+ return 0, 0, err
+ }
+ fields := strings.Fields(string(out))
+ if len(fields) < 2 {
+ return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
+ }
+ cpuPct, _ = strconv.ParseFloat(fields[0], 64)
+ rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+ return cpuPct, rssKB, nil
+}
+
+func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) {
+ total, err = dirSize(dataDir)
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ ops, err = dirSize(filepath.Join(dataDir, "ops"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ index, err = dirSize(filepath.Join(dataDir, "index"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints"))
+ if err != nil {
+ return 0, 0, 0, 0, err
+ }
+ return total, ops, index, checkpoints, nil
+}
+
+func dirSize(path string) (int64, error) {
+ var total int64
+ err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
+ if err != nil {
+ return err
+ }
+ if d.IsDir() {
+ return nil
+ }
+ info, err := d.Info()
+ if err != nil {
+ return err
+ }
+ total += info.Size()
+ return nil
+ })
+ return total, err
+}
+
+func max(a, b float64) float64 {
+ if a > b {
+ return a
+ }
+ return b
+}