diff options
| author | Fuwn <[email protected]> | 2026-02-26 14:46:02 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 14:48:52 -0800 |
| commit | 0099d621e97b6048971fadb5c71918cc9f2b5a09 (patch) | |
| tree | a38ba31585200bacd61f453ef7158de7f0aaf7a3 /cmd | |
| parent | Initial commit (diff) | |
| download | plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.tar.xz plutia-test-0099d621e97b6048971fadb5c71918cc9f2b5a09.zip | |
feat: initial Plutia release — verifiable high-performance PLC mirror (mirror + resolver modes)
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/plutia/main.go | 383 |
1 files changed, 383 insertions, 0 deletions
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go new file mode 100644 index 0000000..0849e0a --- /dev/null +++ b/cmd/plutia/main.go @@ -0,0 +1,383 @@ +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/Fuwn/plutia/internal/api" + "github.com/Fuwn/plutia/internal/checkpoint" + "github.com/Fuwn/plutia/internal/config" + "github.com/Fuwn/plutia/internal/ingest" + "github.com/Fuwn/plutia/internal/storage" +) + +type app struct { + cfg config.Config + store *storage.PebbleStore + service *ingest.Service + apiServer *api.Server + checkpointM *checkpoint.Manager +} + +func main() { + if len(os.Args) < 2 { + usage() + os.Exit(2) + } + cmd := os.Args[1] + switch cmd { + case "serve": + if err := runServe(os.Args[2:]); err != nil { + log.Fatal(err) + } + case "replay": + if err := runReplay(os.Args[2:]); err != nil { + log.Fatal(err) + } + case "verify": + if err := runVerify(os.Args[2:]); err != nil { + log.Fatal(err) + } + case "snapshot": + if err := runSnapshot(os.Args[2:]); err != nil { + log.Fatal(err) + } + case "bench": + if err := runBench(os.Args[2:]); err != nil { + log.Fatal(err) + } + default: + usage() + os.Exit(2) + } +} + +func runServe(args []string) error { + fs := flag.NewFlagSet("serve", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") + if err := fs.Parse(args); err != nil { + return err + } + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + app.service.SetMaxOps(*maxOps) + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + if err := app.service.Replay(ctx); err != nil { + if app.service.IsCorrupted() { + log.Printf("starting in read-only corrupted mode: %v", err) + } else { + return fmt.Errorf("initial replay failed: %w", err) + } + } + + httpSrv := &http.Server{ + Addr: app.cfg.ListenAddr, + Handler: app.apiServer.Handler(), + } + + errCh := make(chan error, 2) + go func() { + log.Printf("HTTP server listening on %s", app.cfg.ListenAddr) + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } + }() + if !app.service.IsCorrupted() { + go func() { + if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { + errCh <- err + } + }() + } + + select { + case <-ctx.Done(): + case err := <-errCh: + return err + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := app.service.Flush(shutdownCtx); err != nil { + return err + } + return httpSrv.Shutdown(shutdownCtx) +} + +func runReplay(args []string) error { + fs := flag.NewFlagSet("replay", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") + if err := fs.Parse(args); err != nil { + return err + } + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + app.service.SetMaxOps(*maxOps) + + ctx := context.Background() + if err := app.service.Replay(ctx); err != nil { + return err + } + if err := app.service.Flush(ctx); err != nil { + return err + } + seq, _ := app.store.GetGlobalSeq() + fmt.Printf("replay complete at sequence %d\n", seq) + return nil +} + +func runVerify(args []string) error { + fs := flag.NewFlagSet("verify", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + did := fs.String("did", "", "did to verify") + if err := fs.Parse(args); err != nil { + return err + } + if *did == "" { + return fmt.Errorf("--did is required") + } + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + + if err := app.service.VerifyDID(context.Background(), *did); err != nil { + return err + } + fmt.Printf("verification succeeded for %s\n", *did) + return nil +} + +func runSnapshot(args []string) error { + fs := flag.NewFlagSet("snapshot", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + if err := fs.Parse(args); err != nil { + return err + } + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + cp, err := app.service.Snapshot(context.Background()) + if err != nil { + return err + } + fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash) + return nil +} + +func runBench(args []string) error { + fs := flag.NewFlagSet("bench", flag.ExitOnError) + configPath := fs.String("config", "config.yaml", "config path") + maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark") + interval := fs.Duration("interval", 10*time.Second, "rolling report interval") + if err := fs.Parse(args); err != nil { + return err + } + app, err := bootstrap(*configPath) + if err != nil { + return err + } + defer app.store.Close() + defer app.service.Close() + app.service.SetMaxOps(*maxOps) + + startSeq, err := app.store.GetGlobalSeq() + if err != nil { + return err + } + start := time.Now() + lastSeq := startSeq + pid := os.Getpid() + + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(*interval) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + seq, err := app.store.GetGlobalSeq() + if err != nil { + continue + } + delta := seq - lastSeq + lastSeq = seq + opsSec := float64(delta) / interval.Seconds() + cpuPct, rssKB, _ := processMetrics(pid) + fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB) + } + } + }() + + ctx := context.Background() + replayErr := app.service.Replay(ctx) + close(done) + if replayErr != nil { + return replayErr + } + if err := app.service.Flush(ctx); err != nil { + return err + } + + endSeq, err := app.store.GetGlobalSeq() + if err != nil { + return err + } + elapsed := time.Since(start) + totalOps := endSeq - startSeq + totalOpsSec := float64(totalOps) / elapsed.Seconds() + cpuPct, rssKB, _ := processMetrics(pid) + totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir) + if err != nil { + return err + } + + fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB) + fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n", + totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps))) + return nil +} + +func bootstrap(path string) (*app, error) { + cfg, err := config.Load(path) + if err != nil { + return nil, err + } + for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} { + if err := os.MkdirAll(p, 0o755); err != nil { + return nil, fmt.Errorf("mkdir %s: %w", p, err) + } + } + store, err := storage.OpenPebble(cfg.DataDir) + if err != nil { + return nil, err + } + mode, err := store.GetMode() + if err != nil { + return nil, err + } + if mode == "" { + if err := store.SetMode(cfg.Mode); err != nil { + return nil, err + } + } else if mode != cfg.Mode { + return nil, fmt.Errorf("mode mismatch: store=%s config=%s", mode, cfg.Mode) + } + + var blockLog *storage.BlockLog + if cfg.Mode == config.ModeMirror { + blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB) + if err != nil { + return nil, err + } + } + client := ingest.NewClient(cfg.PLCSource) + checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath) + service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr) + apiServer := api.NewServer(cfg, store, service, checkpointMgr) + return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil +} + +func usage() { + fmt.Fprintf(os.Stderr, `plutia: verifiable PLC mirror + +Commands: + plutia serve --config=config.yaml [--max-ops=0] + plutia replay --config=config.yaml [--max-ops=0] + plutia verify --config=config.yaml --did=did:plc:xyz + plutia snapshot --config=config.yaml + plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s] +`) +} + +func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { + out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() + if err != nil { + return 0, 0, err + } + fields := strings.Fields(string(out)) + if len(fields) < 2 { + return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) + } + cpuPct, _ = strconv.ParseFloat(fields[0], 64) + rssKB, _ = strconv.ParseInt(fields[1], 10, 64) + return cpuPct, rssKB, nil +} + +func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) { + total, err = dirSize(dataDir) + if err != nil { + return 0, 0, 0, 0, err + } + ops, err = dirSize(filepath.Join(dataDir, "ops")) + if err != nil { + return 0, 0, 0, 0, err + } + index, err = dirSize(filepath.Join(dataDir, "index")) + if err != nil { + return 0, 0, 0, 0, err + } + checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints")) + if err != nil { + return 0, 0, 0, 0, err + } + return total, ops, index, checkpoints, nil +} + +func dirSize(path string) (int64, error) { + var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + info, err := d.Info() + if err != nil { + return err + } + total += info.Size() + return nil + }) + return total, err +} + +func max(a, b float64) float64 { + if a > b { + return a + } + return b +} |