aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorFuwn <[email protected]>2026-02-26 15:07:03 -0800
committerFuwn <[email protected]>2026-02-26 15:07:03 -0800
commitf1b26a68534e4299de7b5fe6b15d2b248fac40e1 (patch)
tree4003d041bc46866b7d4b7f0bfb6c6924d991cbb6 /cmd
parentfeat: initial Plutia release — verifiable high-performance PLC mirror (mirr... (diff)
downloadplutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.tar.xz
plutia-test-f1b26a68534e4299de7b5fe6b15d2b248fac40e1.zip
feat: harden launch readiness with versioning, metrics, and resilience
Diffstat (limited to 'cmd')
-rw-r--r--cmd/plutia/main.go132
-rw-r--r--cmd/plutia/version_test.go31
2 files changed, 156 insertions, 7 deletions
diff --git a/cmd/plutia/main.go b/cmd/plutia/main.go
index 0849e0a..2685a26 100644
--- a/cmd/plutia/main.go
+++ b/cmd/plutia/main.go
@@ -2,6 +2,7 @@ package main
import (
"context"
+ "encoding/json"
"errors"
"flag"
"fmt"
@@ -11,6 +12,7 @@ import (
"os/exec"
"os/signal"
"path/filepath"
+ "runtime"
"strconv"
"strings"
"syscall"
@@ -21,6 +23,13 @@ import (
"github.com/Fuwn/plutia/internal/config"
"github.com/Fuwn/plutia/internal/ingest"
"github.com/Fuwn/plutia/internal/storage"
+ "github.com/Fuwn/plutia/internal/types"
+)
+
+var (
+ version = "dev"
+ commit = "unknown"
+ buildDate = "unknown"
)
type app struct {
@@ -58,6 +67,14 @@ func main() {
if err := runBench(os.Args[2:]); err != nil {
log.Fatal(err)
}
+ case "compare":
+ if err := runCompare(os.Args[2:]); err != nil {
+ log.Fatal(err)
+ }
+ case "version":
+ if err := runVersion(); err != nil {
+ log.Fatal(err)
+ }
default:
usage()
os.Exit(2)
@@ -96,6 +113,7 @@ func runServe(args []string) error {
}
errCh := make(chan error, 2)
+ pollDone := make(chan struct{})
go func() {
log.Printf("HTTP server listening on %s", app.cfg.ListenAddr)
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
@@ -104,24 +122,32 @@ func runServe(args []string) error {
}()
if !app.service.IsCorrupted() {
go func() {
+ defer close(pollDone)
if err := app.service.Poll(ctx); err != nil && !errors.Is(err, context.Canceled) {
errCh <- err
}
}()
+ } else {
+ close(pollDone)
}
+ var runErr error
select {
case <-ctx.Done():
case err := <-errCh:
- return err
+ runErr = err
+ stop()
}
+ <-pollDone
- shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
- if err := app.service.Flush(shutdownCtx); err != nil {
- return err
+ flushErr := app.service.Flush(shutdownCtx)
+ httpErr := httpSrv.Shutdown(shutdownCtx)
+ if runErr != nil || flushErr != nil || httpErr != nil {
+ return errors.Join(runErr, flushErr, httpErr)
}
- return httpSrv.Shutdown(shutdownCtx)
+ return nil
}
func runReplay(args []string) error {
@@ -270,6 +296,87 @@ func runBench(args []string) error {
return nil
}
+func runCompare(args []string) error {
+ fs := flag.NewFlagSet("compare", flag.ExitOnError)
+ configPath := fs.String("config", "config.yaml", "config path")
+ remote := fs.String("remote", "", "remote mirror base URL")
+ if err := fs.Parse(args); err != nil {
+ return err
+ }
+ if strings.TrimSpace(*remote) == "" {
+ return fmt.Errorf("--remote is required")
+ }
+
+ app, err := bootstrap(*configPath)
+ if err != nil {
+ return err
+ }
+ defer app.store.Close()
+ defer app.service.Close()
+
+ local, ok, err := app.store.GetLatestCheckpoint()
+ if err != nil {
+ return fmt.Errorf("load local checkpoint: %w", err)
+ }
+ if !ok {
+ return fmt.Errorf("local mirror has no checkpoints")
+ }
+
+ url := strings.TrimRight(*remote, "/") + "/checkpoints/latest"
+ req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
+ if err != nil {
+ return err
+ }
+ httpClient := &http.Client{Timeout: app.cfg.RequestTimeout}
+ resp, err := httpClient.Do(req)
+ if err != nil {
+ return fmt.Errorf("fetch remote checkpoint: %w", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("remote checkpoint response status=%d", resp.StatusCode)
+ }
+ var remoteCP types.CheckpointV1
+ if err := json.NewDecoder(resp.Body).Decode(&remoteCP); err != nil {
+ return fmt.Errorf("decode remote checkpoint: %w", err)
+ }
+
+ fmt.Printf("local sequence=%d root=%s signature_present=%t\n", local.Sequence, local.DIDMerkleRoot, local.Signature != "")
+ fmt.Printf("remote sequence=%d root=%s signature_present=%t\n", remoteCP.Sequence, remoteCP.DIDMerkleRoot, remoteCP.Signature != "")
+
+ mismatch := false
+ if local.Sequence == remoteCP.Sequence && local.DIDMerkleRoot != remoteCP.DIDMerkleRoot {
+ fmt.Printf("divergence: matching sequence %d with different merkle roots\n", local.Sequence)
+ mismatch = true
+ }
+ if local.Sequence > remoteCP.Sequence {
+ fmt.Printf("local mirror is ahead by %d checkpoints\n", local.Sequence-remoteCP.Sequence)
+ mismatch = true
+ } else if remoteCP.Sequence > local.Sequence {
+ fmt.Printf("remote mirror is ahead by %d checkpoints\n", remoteCP.Sequence-local.Sequence)
+ mismatch = true
+ }
+ if (local.Signature == "") != (remoteCP.Signature == "") {
+ fmt.Println("signature presence mismatch between local and remote checkpoints")
+ mismatch = true
+ }
+ if mismatch {
+ return fmt.Errorf("mirror comparison mismatch")
+ }
+ fmt.Println("mirrors match at latest checkpoint")
+ return nil
+}
+
+func runVersion() error {
+ fmt.Print(formatVersion())
+ return nil
+}
+
+func formatVersion() string {
+ return fmt.Sprintf("Version: %s\nCommit: %s\nBuildDate: %s\nGoVersion: %s\n",
+ version, commit, buildDate, runtime.Version())
+}
+
func bootstrap(path string) (*app, error) {
cfg, err := config.Load(path)
if err != nil {
@@ -303,10 +410,19 @@ func bootstrap(path string) (*app, error) {
return nil, err
}
}
- client := ingest.NewClient(cfg.PLCSource)
+ client := ingest.NewClient(cfg.PLCSource, ingest.ClientOptions{
+ MaxAttempts: cfg.HTTPRetryMaxAttempts,
+ BaseDelay: cfg.HTTPRetryBaseDelay,
+ MaxDelay: cfg.HTTPRetryMaxDelay,
+ })
checkpointMgr := checkpoint.NewManager(store, cfg.DataDir, cfg.MirrorPrivateKeyPath)
service := ingest.NewService(cfg, store, client, blockLog, checkpointMgr)
- apiServer := api.NewServer(cfg, store, service, checkpointMgr)
+ apiServer := api.NewServer(cfg, store, service, checkpointMgr, api.WithBuildInfo(api.BuildInfo{
+ Version: version,
+ Commit: commit,
+ BuildDate: buildDate,
+ GoVersion: runtime.Version(),
+ }))
return &app{cfg: cfg, store: store, service: service, apiServer: apiServer, checkpointM: checkpointMgr}, nil
}
@@ -319,6 +435,8 @@ Commands:
plutia verify --config=config.yaml --did=did:plc:xyz
plutia snapshot --config=config.yaml
plutia bench --config=config.yaml [--max-ops=200000] [--interval=10s]
+ plutia compare --config=config.yaml --remote=https://mirror.example.com
+ plutia version
`)
}
diff --git a/cmd/plutia/version_test.go b/cmd/plutia/version_test.go
new file mode 100644
index 0000000..3869de6
--- /dev/null
+++ b/cmd/plutia/version_test.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestFormatVersionIncludesBuildMetadata(t *testing.T) {
+ oldVersion, oldCommit, oldBuildDate := version, commit, buildDate
+ version = "v0.1.0"
+ commit = "abc123"
+ buildDate = "2026-02-26T00:00:00Z"
+ t.Cleanup(func() {
+ version = oldVersion
+ commit = oldCommit
+ buildDate = oldBuildDate
+ })
+
+ out := formatVersion()
+ checks := []string{
+ "Version: v0.1.0",
+ "Commit: abc123",
+ "BuildDate: 2026-02-26T00:00:00Z",
+ "GoVersion: go",
+ }
+ for _, want := range checks {
+ if !strings.Contains(out, want) {
+ t.Fatalf("version output missing %q: %s", want, out)
+ }
+ }
+}