package main import ( "context" "encoding/json" "errors" "flag" "fmt" "github.com/Fuwn/plutia/internal/api" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" "github.com/Fuwn/plutia/internal/ingest" "github.com/Fuwn/plutia/internal/storage" "github.com/Fuwn/plutia/internal/types" "log" "net/http" "os" "os/exec" "os/signal" "path/filepath" "runtime" "strconv" "strings" "syscall" "time" ) var ( version = "dev" commit = "unknown" buildDate = "unknown" ) type app struct { cfg config.Config store *storage.PebbleStore service *ingest.Service apiServer *api.Server checkpointM *checkpoint.Manager } func main() { if len(os.Args) < 2 { usage() os.Exit(2) } cmd := os.Args[1] switch cmd { case "serve": if err := runServe(os.Args[2:]); err != nil { log.Fatal(err) } case "replay": if err := runReplay(os.Args[2:]); err != nil { log.Fatal(err) } case "verify": if err := runVerify(os.Args[2:]); err != nil { log.Fatal(err) } case "snapshot": if err := runSnapshot(os.Args[2:]); err != nil { log.Fatal(err) } case "bench": if err := runBench(os.Args[2:]); err != nil { log.Fatal(err) } case "compare": if err := runCompare(os.Args[2:]); err != nil { log.Fatal(err) } case "keygen": if err := runKeygen(os.Args[2:]); err != nil { log.Fatal(err) } case "version": if err := runVersion(); err != nil { log.Fatal(err) } default: usage() os.Exit(2) } } func runServe(args []string) error { fs := flag.NewFlagSet("serve", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") if err := fs.Parse(args); err != nil { return err } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() app.service.SetMaxOps(*maxOps) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() if app.cfg.Mode != config.ModeThin { if err := app.service.Replay(ctx); err != nil { if app.service.IsCorrupted() { log.Printf("starting in read-only corrupted mode: %v", err) } else { return fmt.Errorf("initial replay failed: %w", err) } } } httpSrv := &http.Server{ Addr: app.cfg.ListenAddr, Handler: app.apiServer.Handler(), } errCh := make(chan error, 2) pollDone := make(chan struct{}) go func() { log.Printf("HTTP server listening on %s", app.cfg.ListenAddr) if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } }() if app.cfg.Mode != config.ModeThin && !app.service.IsCorrupted() { go func() { defer close(pollDone) if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { errCh <- err } }() } else { close(pollDone) } var runErr error select { case <-ctx.Done(): case err := <-errCh: runErr = err stop() } <-pollDone shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() flushErr := app.service.Flush(shutdownCtx) httpErr := httpSrv.Shutdown(shutdownCtx) if runErr != nil || flushErr != nil || httpErr != nil { return errors.Join(runErr, flushErr, httpErr) } return nil } func runReplay(args []string) error { fs := flag.NewFlagSet("replay", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") if err := fs.Parse(args); err != nil { return err } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() app.service.SetMaxOps(*maxOps) ctx := context.Background() if err := app.service.Replay(ctx); err != nil { return err } if err := app.service.Flush(ctx); err != nil { return err } seq, _ := app.store.GetGlobalSeq() fmt.Printf("replay complete at sequence %d\n", seq) return nil } func runVerify(args []string) error { fs := flag.NewFlagSet("verify", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") did := fs.String("did", "", "DID to verify") if err := fs.Parse(args); err != nil { return err } if *did == "" { return fmt.Errorf("--did is required") } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() if err := app.service.VerifyDID(context.Background(), *did); err != nil { return err } fmt.Printf("verification succeeded for %s\n", *did) return nil } func runSnapshot(args []string) error { fs := flag.NewFlagSet("snapshot", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") if err := fs.Parse(args); err != nil { return err } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() cp, err := app.service.Snapshot(context.Background()) if err != nil { return err } fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash) return nil } func runBench(args []string) error { fs := flag.NewFlagSet("bench", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark") interval := fs.Duration("interval", 10*time.Second, "rolling report interval") if err := fs.Parse(args); err != nil { return err } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() app.service.SetMaxOps(*maxOps) startSeq, err := app.store.GetGlobalSeq() if err != nil { return err } start := time.Now() lastSeq := startSeq pid := os.Getpid() done := make(chan struct{}) go func() { ticker := time.NewTicker(*interval) defer ticker.Stop() for { select { case <-done: return case <-ticker.C: seq, err := app.store.GetGlobalSeq() if err != nil { continue } delta := seq - lastSeq lastSeq = seq opsSec := float64(delta) / interval.Seconds() cpuPct, rssKB, _ := processMetrics(pid) fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB) } } }() ctx := context.Background() replayErr := app.service.Replay(ctx) close(done) if replayErr != nil { return replayErr } if err := app.service.Flush(ctx); err != nil { return err } endSeq, err := app.store.GetGlobalSeq() if err != nil { return err } elapsed := time.Since(start) totalOps := endSeq - startSeq totalOpsSec := float64(totalOps) / elapsed.Seconds() cpuPct, rssKB, _ := processMetrics(pid) totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir) if err != nil { return err } fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB) fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n", totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps))) return nil } func runCompare(args []string) error { fs := flag.NewFlagSet("compare", flag.ExitOnError) configPath := fs.String("config", "config.default.yaml", "path to config file") remote := fs.String("remote", "", "base URL for remote mirror") if err := fs.Parse(args); err != nil { return err } if strings.TrimSpace(*remote) == "" { return fmt.Errorf("--remote is required") } app, err := bootstrap(*configPath) if err != nil { return err } defer app.store.Close() defer app.service.Close() local, ok, err := app.store.GetLatestCheckpoint() if err != nil { return fmt.Errorf("load local checkpoint: %w", err) } if !ok { return fmt.Errorf("local mirror has no checkpoints") } url := strings.TrimRight(*remote, "/") + "/checkpoints/latest" req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) if err != nil { return err } httpClient := &http.Client{Timeout: app.cfg.RequestTimeout} resp, err := httpClient.Do(req) if err != nil { return fmt.Errorf("fetch remote checkpoint: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode) } var remoteCP types.CheckpointV1 if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil { return fmt.Errorf("decode remote checkpoint: %w", err) } fmt.Printf("local sequence=%d root=%s signature_present=%t\n", local.Sequence, local.DIDMerkleRoot, local.Signature != "") fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "") mismatch := false if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot { fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence) mismatch = true } if local.Sequence > remoteCP.Sequence { fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence) mismatch = true } else if remoteCP.Sequence > local.Sequence { fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence) mismatch = true } if (local.Signature == "") != (remoteCP.Signature == "") { fmt.Println("signature presence mismatch between local and remote checkpoints") mismatch = true } if mismatch { return fmt.Errorf("mirror comparison mismatch") } fmt.Println("mirrors match at latest checkpoint") return nil } func runVersion() error { fmt.Print(formatVersion()) return nil } func formatVersion() string { return fmt.Sprintf("Version: %s\nCommit: %s\nBuildDate: %s\nGoVersion: %s\n", version, commit, buildDate, runtime.Version()) } func bootstrap(path string) (*app, error) { cfg, err := config.Load(path) if err != nil { return nil, err } for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} { if err := os.MkdirAll(p, 0o755); err != nil { return nil, fmt.Errorf("mkdir %s: %w", p, err) } } store, err := storage.OpenPebble(cfg.DataDir) if err != nil { return nil, err } mode, err := store.GetMode() if err != nil { return nil, err } if mode == "" { if err := store.SetMode(cfg.Mode); err != nil { return nil, err } } else if mode != cfg.Mode { return nil, fmt.Errorf("mode mismatch: store=%s config=%s", mode, cfg.Mode) } var blockLog *storage.BlockLog if cfg.Mode == config.ModeMirror { blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB) if err != nil { return nil, err } } client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{ MaxAttempts: cfg.HTTPRetryMaxAttempts, BaseDelay: cfg.HTTPRetryBaseDelay, MaxDelay: cfg.HTTPRetryMaxDelay, }) checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath) service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr) apiServer := api.NewServer(cfg, store, service, checkpointMgr, api.WithBuildInfo(api.BuildInfo{ Version: version, Commit: commit, BuildDate: buildDate, GoVersion: runtime.Version(), })) return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil } func usage() { fmt.Fprintf(os.Stderr, `plutia: verifiable PLC mirror Commands: plutia serve --config=config.default.yaml [--max-ops=0] plutia replay --config=config.default.yaml [--max-ops=0] plutia verify --config=config.default.yaml --did=did:plc:xyz plutia snapshot --config=config.default.yaml plutia bench --config=config.default.yaml [--max-ops=200000] [--interval=10s] plutia compare --config=config.default.yaml --remote=https://mirror.example.com plutia keygen --out=./mirror.key [--force] plutia version `) } func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() if err != nil { return 0, 0, err } fields := strings.Fields(string(out)) if len(fields) < 2 { return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) } cpuPct, _ = strconv.ParseFloat(fields[0], 64) rssKB, _ = strconv.ParseInt(fields[1], 10, 64) return cpuPct, rssKB, nil } func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) { total, err = dirSize(dataDir) if err != nil { return 0, 0, 0, 0, err } ops, err = dirSize(filepath.Join(dataDir, "ops")) if err != nil { return 0, 0, 0, 0, err } index, err = dirSize(filepath.Join(dataDir, "index")) if err != nil { return 0, 0, 0, 0, err } checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints")) if err != nil { return 0, 0, 0, 0, err } return total, ops, index, checkpoints, nil } func dirSize(path string) (int64, error) { var total int64 err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() { return nil } info, err := d.Info() if err != nil { return err } total += info.Size() return nil }) return total, err } func max(a, b float64) float64 { if a > b { return a } return b }