diff options
| author | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
|---|---|---|
| committer | Fuwn <[email protected]> | 2026-02-26 15:41:45 -0800 |
| commit | fec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch) | |
| tree | 0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /cmd | |
| parent | feat: add read-only PLC API compatibility endpoints (diff) | |
| download | plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip | |
feat: Apply Iku formatting
Diffstat (limited to 'cmd')
| -rw-r--r-- | cmd/plutia/main.go | 123 | ||||
| -rw-r--r-- | cmd/plutia/version_test.go | 2 |
2 files changed, 122 insertions, 3 deletions
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go index 2685a26..8939f4f 100644 --- a/cmd/plutia/main.go +++ b/cmd/plutia/main.go @@ -17,7 +17,6 @@ import ( "strings" "syscall" "time" - "github.com/Fuwn/plutia/internal/api" "github.com/Fuwn/plutia/internal/checkpoint" "github.com/Fuwn/plutia/internal/config" @@ -45,7 +44,9 @@ func main() { usage() os.Exit(2) } + cmd := os.Args[1] + switch cmd { case "serve": if err := runServe(os.Args[2:]); err != nil { @@ -85,18 +86,24 @@ func runServe(args []string) error { fs := flag.NewFlagSet("serve", flag.ExitOnError) configPath := fs.String("config", "config.yaml", "config path") maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") + if err := fs.Parse(args); err != nil { return err } + app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() + app.service.SetMaxOps(*maxOps) ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() if err := app.service.Replay(ctx); err != nil { @@ -111,18 +118,21 @@ func runServe(args []string) error { Addr: app.cfg.ListenAddr, Handler: app.apiServer.Handler(), } - errCh := make(chan error, 2) pollDone := make(chan struct{}) + go func() { log.Printf("HTTP server listening on %s", app.cfg.ListenAddr) + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } }() + if !app.service.IsCorrupted() { go func() { defer close(pollDone) + if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) { errCh <- err } @@ -132,21 +142,28 @@ func runServe(args []string) error { } var runErr error + select { case <-ctx.Done(): case err := <-errCh: runErr = err + stop() } + <-pollDone shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + flushErr := app.service.Flush(shutdownCtx) httpErr := httpSrv.Shutdown(shutdownCtx) + if runErr != nil || flushErr != nil || httpErr != nil { return errors.Join(runErr, flushErr, httpErr) } + return nil } @@ -154,26 +171,36 @@ func runReplay(args []string) error { fs := flag.NewFlagSet("replay", flag.ExitOnError) configPath := fs.String("config", "config.yaml", "config path") maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)") + if err := fs.Parse(args); err != nil { return err } + app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() + app.service.SetMaxOps(*maxOps) ctx := context.Background() + if err := app.service.Replay(ctx); err != nil { return err } + if err := app.service.Flush(ctx); err != nil { return err } + seq, _ := app.store.GetGlobalSeq() + fmt.Printf("replay complete at sequence %d\n", seq) + return nil } @@ -181,43 +208,58 @@ func runVerify(args []string) error { fs := flag.NewFlagSet("verify", flag.ExitOnError) configPath := fs.String("config", "config.yaml", "config path") did := fs.String("did", "", "did to verify") + if err := fs.Parse(args); err != nil { return err } + if *did == "" { return fmt.Errorf("--did is required") } + app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() if err := app.service.VerifyDID(context.Background(), *did); err != nil { return err } + fmt.Printf("verification succeeded for %s\n", *did) + return nil } func runSnapshot(args []string) error { fs := flag.NewFlagSet("snapshot", flag.ExitOnError) configPath := fs.String("config", "config.yaml", "config path") + if err := fs.Parse(args); err != nil { return err } + app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() + cp, err := app.service.Snapshot(context.Background()) + if err != nil { return err } + fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash) + return nil } @@ -226,42 +268,54 @@ func runBench(args []string) error { configPath := fs.String("config", "config.yaml", "config path") maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark") interval := fs.Duration("interval", 10*time.Second, "rolling report interval") + if err := fs.Parse(args); err != nil { return err } + app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() + app.service.SetMaxOps(*maxOps) startSeq, err := app.store.GetGlobalSeq() + if err != nil { return err } + start := time.Now() lastSeq := startSeq pid := os.Getpid() - done := make(chan struct{}) + go func() { ticker := time.NewTicker(*interval) + defer ticker.Stop() + for { select { case <-done: return case <-ticker.C: seq, err := app.store.GetGlobalSeq() + if err != nil { continue } + delta := seq - lastSeq lastSeq = seq opsSec := float64(delta) / interval.Seconds() cpuPct, rssKB, _ := processMetrics(pid) + fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB) } } @@ -269,23 +323,29 @@ func runBench(args []string) error { ctx := context.Background() replayErr := app.service.Replay(ctx) + close(done) + if replayErr != nil { return replayErr } + if err := app.service.Flush(ctx); err != nil { return err } endSeq, err := app.store.GetGlobalSeq() + if err != nil { return err } + elapsed := time.Since(start) totalOps := endSeq - startSeq totalOpsSec := float64(totalOps) / elapsed.Seconds() cpuPct, rssKB, _ := processMetrics(pid) totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir) + if err != nil { return err } @@ -293,6 +353,7 @@ func runBench(args []string) error { fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB) fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n", totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps))) + return nil } @@ -300,43 +361,56 @@ func runCompare(args []string) error { fs := flag.NewFlagSet("compare", flag.ExitOnError) configPath := fs.String("config", "config.yaml", "config path") remote := fs.String("remote", "", "remote mirror base URL") + if err := fs.Parse(args); err != nil { return err } + if strings.TrimSpace(*remote) == "" { return fmt.Errorf("--remote is required") } app, err := bootstrap(*configPath) + if err != nil { return err } + defer app.store.Close() defer app.service.Close() local, ok, err := app.store.GetLatestCheckpoint() + if err != nil { return fmt.Errorf("load local checkpoint: %w", err) } + if !ok { return fmt.Errorf("local mirror has no checkpoints") } url := strings.TrimRight(*remote, "/") + "/checkpoints/latest" req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) + if err != nil { return err } + httpClient := &http.Client{Timeout: app.cfg.RequestTimeout} resp, err := httpClient.Do(req) + if err != nil { return fmt.Errorf("fetch remote checkpoint: %w", err) } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode) } + var remoteCP types.CheckpointV1 + if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil { return fmt.Errorf("decode remote checkpoint: %w", err) } @@ -345,30 +419,41 @@ func runCompare(args []string) error { fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "") mismatch := false + if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot { fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence) + mismatch = true } + if local.Sequence > remoteCP.Sequence { fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence) + mismatch = true } else if remoteCP.Sequence > local.Sequence { fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence) + mismatch = true } + if (local.Signature == "") != (remoteCP.Signature == "") { fmt.Println("signature presence mismatch between local and remote checkpoints") + mismatch = true } + if mismatch { return fmt.Errorf("mirror comparison mismatch") } + fmt.Println("mirrors match at latest checkpoint") + return nil } func runVersion() error { fmt.Print(formatVersion()) + return nil } @@ -379,22 +464,29 @@ func formatVersion() string { func bootstrap(path string) (*app, error) { cfg, err := config.Load(path) + if err != nil { return nil, err } + for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} { if err := os.MkdirAll(p, 0o755); err != nil { return nil, fmt.Errorf("mkdir %s: %w", p, err) } } + store, err := storage.OpenPebble(cfg.DataDir) + if err != nil { return nil, err } + mode, err := store.GetMode() + if err != nil { return nil, err } + if mode == "" { if err := store.SetMode(cfg.Mode); err != nil { return nil, err @@ -404,12 +496,15 @@ func bootstrap(path string) (*app, error) { } var blockLog *storage.BlockLog + if cfg.Mode == config.ModeMirror { blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB) + if err != nil { return nil, err } } + client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{ MaxAttempts: cfg.HTTPRetryMaxAttempts, BaseDelay: cfg.HTTPRetryBaseDelay, @@ -423,6 +518,7 @@ func bootstrap(path string) (*app, error) { BuildDate: buildDate, GoVersion: runtime.Version(), })) + return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil } @@ -442,54 +538,74 @@ Commands: func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) { out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output() + if err != nil { return 0, 0, err } + fields := strings.Fields(string(out)) + if len(fields) < 2 { return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out))) } + cpuPct, _ = strconv.ParseFloat(fields[0], 64) rssKB, _ = strconv.ParseInt(fields[1], 10, 64) + return cpuPct, rssKB, nil } func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) { total, err = dirSize(dataDir) + if err != nil { return 0, 0, 0, 0, err } + ops, err = dirSize(filepath.Join(dataDir, "ops")) + if err != nil { return 0, 0, 0, 0, err } + index, err = dirSize(filepath.Join(dataDir, "index")) + if err != nil { return 0, 0, 0, 0, err } + checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints")) + if err != nil { return 0, 0, 0, 0, err } + return total, ops, index, checkpoints, nil } func dirSize(path string) (int64, error) { var total int64 + err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error { if err != nil { return err } + if d.IsDir() { return nil } + info, err := d.Info() + if err != nil { return err } + total += info.Size() + return nil }) + return total, err } @@ -497,5 +613,6 @@ func max(a, b float64) float64 { if a > b { return a } + return b } diff --git a/cmd/plutia/version_test.go b/cmd/plutia/version_test.go index 3869de6..0e3b8a4 100644 --- a/cmd/plutia/version_test.go +++ b/cmd/plutia/version_test.go @@ -10,6 +10,7 @@ func TestFormatVersionIncludesBuildMetadata(t *testing.T) { version = "v0.1.0" commit = "abc123" buildDate = "2026-02-26T00:00:00Z" + t.Cleanup(func() { version = oldVersion commit = oldCommit @@ -23,6 +24,7 @@ func TestFormatVersionIncludesBuildMetadata(t *testing.T) { "BuildDate: 2026-02-26T00:00:00Z", "GoVersion: go", } + for _, want := range checks { if !strings.Contains(out, want) { t.Fatalf("version output missing %q: %s", want, out) |