aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:41:45 -0800
committerFuwn <[email protected]>2026-02-26 15:41:45 -0800
commitfec9114caaa7d274e524793d5eb0cf2ef2c5af11 (patch)
tree0897394ccdfaf6633e1a4ca8eb02bff49bb93c00 /cmd
parentfeat: add read-only PLC API compatibility endpoints (diff)
downloadplutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.tar.xz
plutia-test-fec9114caaa7d274e524793d5eb0cf2ef2c5af11.zip
feat: Apply Iku formatting
Diffstat (limited to 'cmd')
-rw-r--r--cmd/plutia/main.go123
-rw-r--r--cmd/plutia/version_test.go2
2 files changed, 122 insertions, 3 deletions
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go
index 2685a26..8939f4f 100644
--- a/cmd/plutia/main.go
+++ b/cmd/plutia/main.go
@@ -17,7 +17,6 @@ import (
"strings"
"syscall"
"time"
-
"github.com/Fuwn/plutia/internal/api"
"github.com/Fuwn/plutia/internal/checkpoint"
"github.com/Fuwn/plutia/internal/config"
@@ -45,7 +44,9 @@ func main() {
usage()
os.Exit(2)
}
+
cmd := os.Args[1]
+
switch cmd {
case "serve":
if err := runServe(os.Args[2:]); err != nil {
@@ -85,18 +86,24 @@ func runServe(args []string) error {
fs := flag.NewFlagSet("serve", flag.ExitOnError)
configPath := fs.String("config", "config.yaml", "config path")
maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+
if err := fs.Parse(args); err != nil {
return err
}
+
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
+
app.service.SetMaxOps(*maxOps)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
+
defer stop()
if err := app.service.Replay(ctx); err != nil {
@@ -111,18 +118,21 @@ func runServe(args []string) error {
Addr: app.cfg.ListenAddr,
Handler: app.apiServer.Handler(),
}
-
errCh := make(chan error, 2)
pollDone := make(chan struct{})
+
go func() {
log.Printf("HTTP server listening on %s", app.cfg.ListenAddr)
+
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
}
}()
+
if !app.service.IsCorrupted() {
go func() {
defer close(pollDone)
+
if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
@@ -132,21 +142,28 @@ func runServe(args []string) error {
}
var runErr error
+
select {
case <-ctx.Done():
case err := <-errCh:
runErr = err
+
stop()
}
+
<-pollDone
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+
defer cancel()
+
flushErr := app.service.Flush(shutdownCtx)
httpErr := httpSrv.Shutdown(shutdownCtx)
+
if runErr != nil || flushErr != nil || httpErr != nil {
return errors.Join(runErr, flushErr, httpErr)
}
+
return nil
}
@@ -154,26 +171,36 @@ func runReplay(args []string) error {
fs := flag.NewFlagSet("replay", flag.ExitOnError)
configPath := fs.String("config", "config.yaml", "config path")
maxOps := fs.Uint64("max-ops", 0, "max operations to ingest in this process (0 = unlimited)")
+
if err := fs.Parse(args); err != nil {
return err
}
+
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
+
app.service.SetMaxOps(*maxOps)
ctx := context.Background()
+
if err := app.service.Replay(ctx); err != nil {
return err
}
+
if err := app.service.Flush(ctx); err != nil {
return err
}
+
seq, _ := app.store.GetGlobalSeq()
+
fmt.Printf("replay complete at sequence %d\n", seq)
+
return nil
}
@@ -181,43 +208,58 @@ func runVerify(args []string) error {
fs := flag.NewFlagSet("verify", flag.ExitOnError)
configPath := fs.String("config", "config.yaml", "config path")
did := fs.String("did", "", "did to verify")
+
if err := fs.Parse(args); err != nil {
return err
}
+
if *did == "" {
return fmt.Errorf("--did is required")
}
+
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
if err := app.service.VerifyDID(context.Background(), *did); err != nil {
return err
}
+
fmt.Printf("verification succeeded for %s\n", *did)
+
return nil
}
func runSnapshot(args []string) error {
fs := flag.NewFlagSet("snapshot", flag.ExitOnError)
configPath := fs.String("config", "config.yaml", "config path")
+
if err := fs.Parse(args); err != nil {
return err
}
+
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
+
cp, err := app.service.Snapshot(context.Background())
+
if err != nil {
return err
}
+
fmt.Printf("checkpoint sequence=%d hash=%s\n", cp.Sequence, cp.CheckpointHash)
+
return nil
}
@@ -226,42 +268,54 @@ func runBench(args []string) error {
configPath := fs.String("config", "config.yaml", "config path")
maxOps := fs.Uint64("max-ops", 200000, "max operations to ingest for benchmark")
interval := fs.Duration("interval", 10*time.Second, "rolling report interval")
+
if err := fs.Parse(args); err != nil {
return err
}
+
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
+
app.service.SetMaxOps(*maxOps)
startSeq, err := app.store.GetGlobalSeq()
+
if err != nil {
return err
}
+
start := time.Now()
lastSeq := startSeq
pid := os.Getpid()
-
done := make(chan struct{})
+
go func() {
ticker := time.NewTicker(*interval)
+
defer ticker.Stop()
+
for {
select {
case <-done:
return
case <-ticker.C:
seq, err := app.store.GetGlobalSeq()
+
if err != nil {
continue
}
+
delta := seq - lastSeq
lastSeq = seq
opsSec := float64(delta) / interval.Seconds()
cpuPct, rssKB, _ := processMetrics(pid)
+
fmt.Printf("rolling seq=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", seq, opsSec, cpuPct, rssKB)
}
}
@@ -269,23 +323,29 @@ func runBench(args []string) error {
ctx := context.Background()
replayErr := app.service.Replay(ctx)
+
close(done)
+
if replayErr != nil {
return replayErr
}
+
if err := app.service.Flush(ctx); err != nil {
return err
}
endSeq, err := app.store.GetGlobalSeq()
+
if err != nil {
return err
}
+
elapsed := time.Since(start)
totalOps := endSeq - startSeq
totalOpsSec := float64(totalOps) / elapsed.Seconds()
cpuPct, rssKB, _ := processMetrics(pid)
totalBytes, opsBytes, indexBytes, cpBytes, err := diskUsageBreakdown(app.cfg.DataDir)
+
if err != nil {
return err
}
@@ -293,6 +353,7 @@ func runBench(args []string) error {
fmt.Printf("bench_total elapsed=%s ops=%d ops_sec=%.2f cpu_pct=%.2f rss_kb=%d\n", elapsed.Round(time.Millisecond), totalOps, totalOpsSec, cpuPct, rssKB)
fmt.Printf("bench_disk total_bytes=%d ops_bytes=%d index_bytes=%d checkpoints_bytes=%d bytes_per_op=%.2f\n",
totalBytes, opsBytes, indexBytes, cpBytes, float64(totalBytes)/max(1, float64(totalOps)))
+
return nil
}
@@ -300,43 +361,56 @@ func runCompare(args []string) error {
fs := flag.NewFlagSet("compare", flag.ExitOnError)
configPath := fs.String("config", "config.yaml", "config path")
remote := fs.String("remote", "", "remote mirror base URL")
+
if err := fs.Parse(args); err != nil {
return err
}
+
if strings.TrimSpace(*remote) == "" {
return fmt.Errorf("--remote is required")
}
app, err := bootstrap(*configPath)
+
if err != nil {
return err
}
+
defer app.store.Close()
defer app.service.Close()
local, ok, err := app.store.GetLatestCheckpoint()
+
if err != nil {
return fmt.Errorf("load local checkpoint: %w", err)
}
+
if !ok {
return fmt.Errorf("local mirror has no checkpoints")
}
url := strings.TrimRight(*remote, "/") + "/checkpoints/latest"
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
+
if err != nil {
return err
}
+
httpClient := &http.Client{Timeout: app.cfg.RequestTimeout}
resp, err := httpClient.Do(req)
+
if err != nil {
return fmt.Errorf("fetch remote checkpoint: %w", err)
}
+
defer resp.Body.Close()
+
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode)
}
+
var remoteCP types.CheckpointV1
+
if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil {
return fmt.Errorf("decode remote checkpoint: %w", err)
}
@@ -345,30 +419,41 @@ func runCompare(args []string) error {
fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "")
mismatch := false
+
if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot {
fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence)
+
mismatch = true
}
+
if local.Sequence > remoteCP.Sequence {
fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence)
+
mismatch = true
} else if remoteCP.Sequence > local.Sequence {
fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence)
+
mismatch = true
}
+
if (local.Signature == "") != (remoteCP.Signature == "") {
fmt.Println("signature presence mismatch between local and remote checkpoints")
+
mismatch = true
}
+
if mismatch {
return fmt.Errorf("mirror comparison mismatch")
}
+
fmt.Println("mirrors match at latest checkpoint")
+
return nil
}
func runVersion() error {
fmt.Print(formatVersion())
+
return nil
}
@@ -379,22 +464,29 @@ func formatVersion() string {
func bootstrap(path string) (*app, error) {
cfg, err := config.Load(path)
+
if err != nil {
return nil, err
}
+
for _, p := range []string{cfg.DataDir, filepath.Join(cfg.DataDir, "ops"), filepath.Join(cfg.DataDir, "index"), filepath.Join(cfg.DataDir, "checkpoints")} {
if err := os.MkdirAll(p, 0o755); err != nil {
return nil, fmt.Errorf("mkdir %s: %w", p, err)
}
}
+
store, err := storage.OpenPebble(cfg.DataDir)
+
if err != nil {
return nil, err
}
+
mode, err := store.GetMode()
+
if err != nil {
return nil, err
}
+
if mode == "" {
if err := store.SetMode(cfg.Mode); err != nil {
return nil, err
@@ -404,12 +496,15 @@ func bootstrap(path string) (*app, error) {
}
var blockLog *storage.BlockLog
+
if cfg.Mode == config.ModeMirror {
blockLog, err = storage.OpenBlockLog(cfg.DataDir, cfg.ZstdLevel, cfg.BlockSizeMB)
+
if err != nil {
return nil, err
}
}
+
client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{
MaxAttempts: cfg.HTTPRetryMaxAttempts,
BaseDelay: cfg.HTTPRetryBaseDelay,
@@ -423,6 +518,7 @@ func bootstrap(path string) (*app, error) {
BuildDate: buildDate,
GoVersion: runtime.Version(),
}))
+
return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil
}
@@ -442,54 +538,74 @@ Commands:
func processMetrics(pid int) (cpuPct float64, rssKB int64, err error) {
out, err := exec.Command("ps", "-p", strconv.Itoa(pid), "-o", "pcpu=,rss=").Output()
+
if err != nil {
return 0, 0, err
}
+
fields := strings.Fields(string(out))
+
if len(fields) < 2 {
return 0, 0, fmt.Errorf("unexpected ps output: %q", strings.TrimSpace(string(out)))
}
+
cpuPct, _ = strconv.ParseFloat(fields[0], 64)
rssKB, _ = strconv.ParseInt(fields[1], 10, 64)
+
return cpuPct, rssKB, nil
}
func diskUsageBreakdown(dataDir string) (total, ops, index, checkpoints int64, err error) {
total, err = dirSize(dataDir)
+
if err != nil {
return 0, 0, 0, 0, err
}
+
ops, err = dirSize(filepath.Join(dataDir, "ops"))
+
if err != nil {
return 0, 0, 0, 0, err
}
+
index, err = dirSize(filepath.Join(dataDir, "index"))
+
if err != nil {
return 0, 0, 0, 0, err
}
+
checkpoints, err = dirSize(filepath.Join(dataDir, "checkpoints"))
+
if err != nil {
return 0, 0, 0, 0, err
}
+
return total, ops, index, checkpoints, nil
}
func dirSize(path string) (int64, error) {
var total int64
+
err := filepath.WalkDir(path, func(_ string, d os.DirEntry, err error) error {
if err != nil {
return err
}
+
if d.IsDir() {
return nil
}
+
info, err := d.Info()
+
if err != nil {
return err
}
+
total += info.Size()
+
return nil
})
+
return total, err
}
@@ -497,5 +613,6 @@ func max(a, b float64) float64 {
if a > b {
return a
}
+
return b
}
diff --git a/cmd/plutia/version_test.go b/cmd/plutia/version_test.go
index 3869de6..0e3b8a4 100644
--- a/cmd/plutia/version_test.go
+++ b/cmd/plutia/version_test.go
@@ -10,6 +10,7 @@ func TestFormatVersionIncludesBuildMetadata(t *testing.T) {
version = "v0.1.0"
commit = "abc123"
buildDate = "2026-02-26T00:00:00Z"
+
t.Cleanup(func() {
version = oldVersion
commit = oldCommit
@@ -23,6 +24,7 @@ func TestFormatVersionIncludesBuildMetadata(t *testing.T) {
"BuildDate: 2026-02-26T00:00:00Z",
"GoVersion: go",
}
+
for _, want := range checks {
if !strings.Contains(out, want) {
t.Fatalf("version output missing %q: %s", want, out)