aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/test_scripts')
-rwxr-xr-xscripts/test_scripts/block-clone-test-mac.sh43
-rw-r--r--scripts/test_scripts/block-clone-test-windows.ps1145
-rwxr-xr-xscripts/test_scripts/block-clone-test.sh143
-rw-r--r--scripts/test_scripts/builds-download-upload-test.py2
-rw-r--r--scripts/test_scripts/builds-download-upload-update-build-ids.py2
-rw-r--r--scripts/test_scripts/hub/PERF_SEED_README.md148
-rw-r--r--scripts/test_scripts/hub/analyze_perf_runs.py348
-rw-r--r--scripts/test_scripts/hub/hub_load_test_s3.py537
-rw-r--r--scripts/test_scripts/hub/perf_configs/hub.lua48
-rw-r--r--scripts/test_scripts/hub/perf_configs/instance.lua15
-rw-r--r--scripts/test_scripts/hub/preserve_minio_state.py126
-rw-r--r--scripts/test_scripts/hub/run_minio_perf.py614
-rw-r--r--scripts/test_scripts/hub/seed_minio.py720
-rw-r--r--scripts/test_scripts/hub/seed_s3_snapshot.py635
-rw-r--r--scripts/test_scripts/hub_load_test.py970
-rw-r--r--scripts/test_scripts/hub_provision_perf_test.py502
-rw-r--r--scripts/test_scripts/kill-test-processes.ps119
-rwxr-xr-xscripts/test_scripts/kill-test-processes.sh45
-rw-r--r--scripts/test_scripts/oplog-import-export-test.py2
-rw-r--r--scripts/test_scripts/oplog-update-build-ids.py2
20 files changed, 4731 insertions, 335 deletions
diff --git a/scripts/test_scripts/block-clone-test-mac.sh b/scripts/test_scripts/block-clone-test-mac.sh
deleted file mode 100755
index a3d3ca4d3..000000000
--- a/scripts/test_scripts/block-clone-test-mac.sh
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/env bash
-# Test block-clone functionality on macOS (APFS).
-#
-# APFS is the default filesystem on modern Macs and natively supports
-# clonefile(), so no special setup is needed — just run the tests.
-#
-# Usage:
-# ./scripts/test_scripts/block-clone-test-mac.sh [path-to-zencore-test]
-#
-# If no path is given, defaults to build/macosx/<arch>/debug/zencore-test
-# relative to the repository root.
-
-set -euo pipefail
-
-SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
-REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
-
-ARCH="$(uname -m)"
-TEST_BINARY="${1:-$REPO_ROOT/build/macosx/$ARCH/debug/zencore-test}"
-
-if [ ! -x "$TEST_BINARY" ]; then
- echo "error: test binary not found or not executable: $TEST_BINARY" >&2
- echo "hint: build with 'xmake config -m debug && xmake build zencore-test'" >&2
- exit 1
-fi
-
-# Verify we're on APFS
-BINARY_DIR="$(dirname "$TEST_BINARY")"
-FS_TYPE="$(diskutil info "$(df "$BINARY_DIR" | tail -1 | awk '{print $1}')" 2>/dev/null | grep "Type (Bundle)" | awk '{print $NF}' || true)"
-
-if [ "$FS_TYPE" != "apfs" ]; then
- echo "warning: filesystem does not appear to be APFS (got: ${FS_TYPE:-unknown}), clone tests may skip" >&2
-fi
-
-TEST_CASES="TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface"
-
-echo "Running block-clone tests ..."
-echo "---"
-"$TEST_BINARY" \
- --test-suite="core.filesystem" \
- --test-case="$TEST_CASES"
-echo "---"
-echo "All block-clone tests passed."
diff --git a/scripts/test_scripts/block-clone-test-windows.ps1 b/scripts/test_scripts/block-clone-test-windows.ps1
deleted file mode 100644
index df24831a4..000000000
--- a/scripts/test_scripts/block-clone-test-windows.ps1
+++ /dev/null
@@ -1,145 +0,0 @@
-# Test block-clone functionality on a temporary ReFS VHD.
-#
-# Requires:
-# - Administrator privileges
-# - Windows Server, or Windows 10/11 Pro for Workstations (ReFS support)
-# - Hyper-V PowerShell module (for New-VHD), or diskpart fallback
-#
-# Usage:
-# # From an elevated PowerShell prompt:
-# .\scripts\test_scripts\block-clone-test-windows.ps1 [-TestBinary <path>]
-#
-# If -TestBinary is not given, defaults to build\windows\x64\debug\zencore-test.exe
-# relative to the repository root.
-
-param(
- [string]$TestBinary = ""
-)
-
-$ErrorActionPreference = "Stop"
-
-$ScriptDir = Split-Path -Parent $MyInvocation.MyCommand.Definition
-$RepoRoot = (Resolve-Path "$ScriptDir\..\..").Path
-
-if (-not $TestBinary) {
- $TestBinary = Join-Path $RepoRoot "build\windows\x64\debug\zencore-test.exe"
-}
-
-$ImageSizeMB = 2048
-$TestCases = "TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface"
-
-$VhdPath = ""
-$MountLetter = ""
-
-function Cleanup {
- $ErrorActionPreference = "SilentlyContinue"
-
- if ($MountLetter) {
- Write-Host "Dismounting VHD ..."
- Dismount-VHD -Path $VhdPath -ErrorAction SilentlyContinue
- }
- if ($VhdPath -and (Test-Path $VhdPath)) {
- Remove-Item -Force $VhdPath -ErrorAction SilentlyContinue
- }
-}
-
-trap {
- Cleanup
- throw $_
-}
-
-# --- Preflight checks ---
-
-$IsAdmin = ([Security.Principal.WindowsPrincipal] [Security.Principal.WindowsIdentity]::GetCurrent()).IsInRole(
- [Security.Principal.WindowsBuiltInRole]::Administrator)
-if (-not $IsAdmin) {
- Write-Error "This script must be run as Administrator (for VHD mount/format)."
- exit 1
-}
-
-if (-not (Test-Path $TestBinary)) {
- Write-Error "Test binary not found: $TestBinary`nHint: build with 'xmake config -m debug && xmake build zencore-test'"
- exit 1
-}
-
-# Check that ReFS formatting is available
-$RefsAvailable = $true
-try {
- # A quick check: on non-Server/Workstation SKUs, Format-Volume -FileSystem ReFS will fail
- $OsCaption = (Get-CimInstance Win32_OperatingSystem).Caption
- if ($OsCaption -notmatch "Server|Workstation|Enterprise") {
- Write-Warning "ReFS may not be available on this Windows edition: $OsCaption"
- Write-Warning "Continuing anyway — format step will fail if unsupported."
- }
-} catch {
- # Non-fatal, just proceed
-}
-
-# --- Create and mount ReFS VHD ---
-
-$VhdPath = Join-Path $env:TEMP "refs-clone-test-$([guid]::NewGuid().ToString('N').Substring(0,8)).vhdx"
-
-Write-Host "Creating ${ImageSizeMB}MB VHDX at $VhdPath ..."
-
-try {
- # Prefer Hyper-V cmdlet if available
- New-VHD -Path $VhdPath -SizeBytes ($ImageSizeMB * 1MB) -Fixed | Out-Null
-} catch {
- # Fallback to diskpart
- Write-Host "New-VHD not available, falling back to diskpart ..."
- $DiskpartScript = @"
-create vdisk file="$VhdPath" maximum=$ImageSizeMB type=fixed
-"@
- $DiskpartScript | diskpart | Out-Null
-}
-
-Write-Host "Mounting and initializing VHD ..."
-
-Mount-VHD -Path $VhdPath
-$Disk = Get-VHD -Path $VhdPath | Get-Disk
-
-# Suppress Explorer's auto-open / "format disk?" prompts for the raw partition
-Stop-Service ShellHWDetection -ErrorAction SilentlyContinue
-
-try {
- Initialize-Disk -Number $Disk.Number -PartitionStyle GPT -ErrorAction SilentlyContinue
- $Partition = New-Partition -DiskNumber $Disk.Number -UseMaximumSize -AssignDriveLetter
- $MountLetter = $Partition.DriveLetter
-
- Write-Host "Formatting ${MountLetter}: as ReFS with integrity disabled ..."
- Format-Volume -DriveLetter $MountLetter -FileSystem ReFS -NewFileSystemLabel "CloneTest" -Confirm:$false | Out-Null
-
- # Disable integrity streams (required for block cloning to work on ReFS)
- Set-FileIntegrity "${MountLetter}:\" -Enable $false -ErrorAction SilentlyContinue
-} finally {
- Start-Service ShellHWDetection -ErrorAction SilentlyContinue
-}
-
-$MountRoot = "${MountLetter}:\"
-
-# --- Copy test binary and run ---
-
-Write-Host "Copying test binary to ReFS volume ..."
-Copy-Item $TestBinary "$MountRoot\zencore-test.exe"
-
-Write-Host "Running block-clone tests ..."
-Write-Host "---"
-
-$proc = Start-Process -FilePath "$MountRoot\zencore-test.exe" `
- -ArgumentList "--test-suite=core.filesystem", "--test-case=$TestCases" `
- -NoNewWindow -Wait -PassThru
-
-Write-Host "---"
-
-if ($proc.ExitCode -ne 0) {
- Write-Error "Tests failed with exit code $($proc.ExitCode)"
- Cleanup
- exit $proc.ExitCode
-}
-
-Write-Host "ReFS: all block-clone tests passed."
-
-# --- Cleanup ---
-
-Cleanup
-Write-Host "Done."
diff --git a/scripts/test_scripts/block-clone-test.sh b/scripts/test_scripts/block-clone-test.sh
deleted file mode 100755
index 7c6bf5605..000000000
--- a/scripts/test_scripts/block-clone-test.sh
+++ /dev/null
@@ -1,143 +0,0 @@
-#!/usr/bin/env bash
-# Test block-clone functionality on temporary Btrfs and XFS loopback filesystems.
-#
-# Requires: root/sudo, btrfs-progs (mkfs.btrfs), xfsprogs (mkfs.xfs)
-#
-# Usage:
-# sudo ./scripts/test_scripts/block-clone-test.sh [path-to-zencore-test]
-#
-# If no path is given, defaults to build/linux/x86_64/debug/zencore-test
-# relative to the repository root.
-#
-# Options:
-# --btrfs-only Only test Btrfs
-# --xfs-only Only test XFS
-
-set -euo pipefail
-
-SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
-REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
-
-TEST_BINARY=""
-RUN_BTRFS=true
-RUN_XFS=true
-
-for arg in "$@"; do
- case "$arg" in
- --btrfs-only) RUN_XFS=false ;;
- --xfs-only) RUN_BTRFS=false ;;
- *) TEST_BINARY="$arg" ;;
- esac
-done
-
-TEST_BINARY="${TEST_BINARY:-$REPO_ROOT/build/linux/x86_64/debug/zencore-test}"
-IMAGE_SIZE="512M"
-TEST_CASES="TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface"
-
-# Track all temp files for cleanup
-CLEANUP_MOUNTS=()
-CLEANUP_DIRS=()
-CLEANUP_FILES=()
-
-cleanup() {
- local exit_code=$?
- set +e
-
- for mnt in "${CLEANUP_MOUNTS[@]}"; do
- if mountpoint -q "$mnt" 2>/dev/null; then
- umount "$mnt"
- fi
- done
- for dir in "${CLEANUP_DIRS[@]}"; do
- [ -d "$dir" ] && rmdir "$dir"
- done
- for f in "${CLEANUP_FILES[@]}"; do
- [ -f "$f" ] && rm -f "$f"
- done
-
- if [ $exit_code -ne 0 ]; then
- echo "FAILED (exit code $exit_code)"
- fi
- exit $exit_code
-}
-trap cleanup EXIT
-
-# --- Preflight checks ---
-
-if [ "$(id -u)" -ne 0 ]; then
- echo "error: this script must be run as root (for mount/umount)" >&2
- exit 1
-fi
-
-if [ ! -x "$TEST_BINARY" ]; then
- echo "error: test binary not found or not executable: $TEST_BINARY" >&2
- echo "hint: build with 'xmake config -m debug && xmake build zencore-test'" >&2
- exit 1
-fi
-
-if $RUN_BTRFS && ! command -v mkfs.btrfs &>/dev/null; then
- echo "warning: mkfs.btrfs not found — install btrfs-progs to test Btrfs, skipping" >&2
- RUN_BTRFS=false
-fi
-
-if $RUN_XFS && ! command -v mkfs.xfs &>/dev/null; then
- echo "warning: mkfs.xfs not found — install xfsprogs to test XFS, skipping" >&2
- RUN_XFS=false
-fi
-
-if ! $RUN_BTRFS && ! $RUN_XFS; then
- echo "error: no filesystems to test" >&2
- exit 1
-fi
-
-# --- Helper to create, mount, and run tests on a loopback filesystem ---
-
-run_tests_on_fs() {
- local fs_type="$1"
- local mkfs_cmd="$2"
-
- echo ""
- echo "========================================"
- echo " Testing block-clone on $fs_type"
- echo "========================================"
-
- local image_path mount_path
- image_path="$(mktemp "/tmp/${fs_type}-clone-test-XXXXXX.img")"
- mount_path="$(mktemp -d "/tmp/${fs_type}-clone-mount-XXXXXX")"
- CLEANUP_FILES+=("$image_path")
- CLEANUP_DIRS+=("$mount_path")
- CLEANUP_MOUNTS+=("$mount_path")
-
- echo "Creating ${IMAGE_SIZE} ${fs_type} image at ${image_path} ..."
- truncate -s "$IMAGE_SIZE" "$image_path"
- $mkfs_cmd "$image_path"
-
- echo "Mounting at ${mount_path} ..."
- mount -o loop "$image_path" "$mount_path"
- chmod 777 "$mount_path"
-
- echo "Copying test binary ..."
- cp "$TEST_BINARY" "$mount_path/zencore-test"
- chmod +x "$mount_path/zencore-test"
-
- echo "Running tests ..."
- echo "---"
- "$mount_path/zencore-test" \
- --test-suite="core.filesystem" \
- --test-case="$TEST_CASES"
- echo "---"
- echo "$fs_type: all block-clone tests passed."
-}
-
-# --- Run ---
-
-if $RUN_BTRFS; then
- run_tests_on_fs "btrfs" "mkfs.btrfs -q"
-fi
-
-if $RUN_XFS; then
- run_tests_on_fs "xfs" "mkfs.xfs -q -m reflink=1"
-fi
-
-echo ""
-echo "All block-clone tests passed."
diff --git a/scripts/test_scripts/builds-download-upload-test.py b/scripts/test_scripts/builds-download-upload-test.py
index 8ff5245c1..dd9aae07e 100644
--- a/scripts/test_scripts/builds-download-upload-test.py
+++ b/scripts/test_scripts/builds-download-upload-test.py
@@ -82,7 +82,7 @@ SERVER_ARGS: tuple[str, ...] = (
def zen_cmd(*args: str | Path, extra_zen_args: list[str] | None = None) -> list[str | Path]:
"""Build a zen CLI command list, inserting extra_zen_args before subcommands."""
- return [ZEN_EXE, *(extra_zen_args or []), *args]
+ return [ZEN_EXE, "--enable-execution-history=false", *(extra_zen_args or []), *args]
def run(cmd: list[str | Path]) -> None:
diff --git a/scripts/test_scripts/builds-download-upload-update-build-ids.py b/scripts/test_scripts/builds-download-upload-update-build-ids.py
index 2a63aa44d..6332d6990 100644
--- a/scripts/test_scripts/builds-download-upload-update-build-ids.py
+++ b/scripts/test_scripts/builds-download-upload-update-build-ids.py
@@ -51,7 +51,7 @@ def list_builds_for_bucket(zen: str, host: str, namespace: str, bucket: str) ->
result_path = Path(tmp.name)
cmd = [
- zen, "builds", "list",
+ zen, "--enable-execution-history=false", "builds", "list",
"--namespace", namespace,
"--bucket", bucket,
"--host", host,
diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md
new file mode 100644
index 000000000..fb471d4bb
--- /dev/null
+++ b/scripts/test_scripts/hub/PERF_SEED_README.md
@@ -0,0 +1,148 @@
+# Perf-seed workflow
+
+Three-stage pipeline for running repeatable hub-hydration perf tests against a
+local MinIO backend seeded with real module data pulled from production S3.
+
+## Layout
+
+All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/`
+in the script defaults, but every path is overridable via CLI flag (see the
+per-stage options below). Pick a root with enough free space (snapshots and
+preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on
+each invocation or change the script defaults to your chosen root.
+
+Layout under the chosen root (`<perf-seed>/`):
+
+```
+<perf-seed>/
+ hub-a/ Stage A hub data dir (transient)
+ servers/<moduleid>/
+ s3-snapshot/ Preserved production server-state trees (read-only after Stage A)
+ <moduleid>/
+ hubs/ Stage B per-bucket hub data dirs (transient)
+ hub-b-zen-seed-packed/
+ hub-b-zen-seed-unpacked/
+ minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket)
+ minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve)
+ README.txt
+ minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree)
+ README.txt
+ hub-perf/ Stage C hub data dir (wiped each run)
+ minio-run/ Stage C MinIO data dir (wiped + re-copied each run)
+ perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json
+ 20260423-141530_zen-seed-packed/
+ 20260423-143112_zen-seed-unpacked/
+```
+
+## Prerequisites
+
+- Debug or release build of zenserver + minio: `xmake -y`
+- `pip install boto3`
+- AWS CLI v2 with an SSO profile configured (for Stage A only)
+- Environment variables (or pass equivalents via CLI flags):
+ - `ZEN_PERF_S3_URI` - source S3 bucket, e.g. `s3://your-bucket/optional-prefix/`
+ - `ZEN_PERF_AWS_PROFILE` - AWS SSO profile name with read access to that bucket
+ - `ZEN_PERF_AWS_REGION` - optional, defaults to `us-east-1`
+
+## Stage A - snapshot real S3 data
+
+One-time (or when you want a fresh baseline from production).
+
+```
+export ZEN_PERF_S3_URI=s3://your-bucket/
+export ZEN_PERF_AWS_PROFILE=your-sso-profile
+python scripts/test_scripts/hub/seed_s3_snapshot.py
+```
+
+Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies
+`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login`
+automatically if the SSO token is missing or expired.
+
+Module selection ranks all UUID-shaped folders by their
+`incremental-state.cbo` `LastModified` (newest first, a proxy for
+most-recently-accessed) and takes the top `--module-count`.
+
+Options:
+- `--module-count N` (default 1000)
+- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`)
+- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`)
+
+## Stage B - seed MinIO from the snapshot
+
+One-time per pack-mode (or when `s3-snapshot` changes).
+
+`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is
+hardcoded inside the script (`--hub-hydration-enable-pack=true` near the
+top of `_start_hub`). To produce both packed and unpacked baselines for
+comparison, invoke the script twice from two separate worktrees - one with
+the flag flipped to `false` - and preserve the resulting MinIO data dir
+each time.
+
+```
+# In the pack worktree (flag = true), seeds zen-seed-packed
+python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed
+python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed
+
+# In the no-pack worktree (flag = false), seeds zen-seed-unpacked
+python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked
+python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked
+```
+
+The script provisions every module found under `s3-snapshot/`, hibernates
+them, overlays the snapshot on top of the hub's servers dir, then
+deprovisions all modules - which runs the dehydrate path and uploads the
+content into the bucket.
+
+`preserve_minio_state.py` copies the resulting `minio-data/` to a
+variant-specific preservation dir and writes a README with provenance.
+
+Options of interest:
+- `--bucket NAME` - bucket name (default `zen-seed-packed`).
+- `--wipe` removes the per-bucket hub data dir and the shared minio-data
+ dir before starting.
+- `--module-count N` caps the set (0 = every module in snapshot-dir).
+
+## Stage C - run a perf iteration
+
+Repeat as often as you want; each run starts from the preserved baseline.
+
+```
+# Pack-on bucket
+python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace
+
+# Pack-off bucket (for comparison)
+python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace
+```
+
+Steps:
+1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state.
+2. Wipes `hub-perf/` (unless `--no-wipe-hub`).
+3. Starts MinIO and hub.
+4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone.
+5. Stops everything cleanly.
+
+Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every
+iteration exercises the hydrate-only path against the same baseline CAS. The
+`--bucket` flag selects which seeded bucket (and therefore which pack mode)
+to exercise.
+
+Pass `--enable-dehydration` to run a full provision -> deprovision cycle that
+includes re-upload (dehydrate) at deprovision time. Use this to measure the
+dehydrate phase end-to-end against the seeded baseline. Note the seeded
+baseline diverges after a `--enable-dehydration` run - re-copy `--minio-seeded`
+or re-run `preserve_minio_state.py` if you want to compare to the pristine state.
+
+After each run the hub log, structured zenserver logs, any utrace file, and a
+`summary.json` with the run's timings are copied into
+`perf-runs/<timestamp>_<bucket>/` so Stage C runs can be compared
+post-hoc. Override the destination with `--archive-dir PATH`.
+
+## Resetting between runs
+
+- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild.
+- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`.
+
+To force a fresh MinIO seed for one variant: delete the matching
+`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching
+`--dest`) in that worktree. To force a fresh S3 snapshot: delete
+`s3-snapshot/` and re-run Stage A.
diff --git a/scripts/test_scripts/hub/analyze_perf_runs.py b/scripts/test_scripts/hub/analyze_perf_runs.py
new file mode 100644
index 000000000..521d9cc1e
--- /dev/null
+++ b/scripts/test_scripts/hub/analyze_perf_runs.py
@@ -0,0 +1,348 @@
+#!/usr/bin/env python3
+"""Parse two hub.log archives (packed vs unpacked) and produce a comparison."""
+
+from __future__ import annotations
+
+import argparse
+import re
+import sys
+from pathlib import Path
+from statistics import mean, median
+
+
+_SIZE_UNITS = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4}
+_TIME_UNITS = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1.0}
+
+
+def parse_size(s: str) -> float:
+ # Accept forms: "1.53K", "22.6K", "180M", "1.83G", "25" (bare bytes), "0B".
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)B?", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2) or "B"
+ return float(val) * _SIZE_UNITS.get(unit, 1)
+
+
+def parse_time_s(s: str) -> float:
+ # Accept forms: "1s", "46ms", "712us", "0ns"
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(ns|us|ms|s)", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2)
+ return float(val) * _TIME_UNITS[unit]
+
+
+def parse_rate_bits(s: str) -> float:
+ # "30.7Mbits/s" -> bits/sec
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)bits/s", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2) or ""
+ mult = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12}[unit]
+ return float(val) * mult
+
+
+_RE_COMPLETE = re.compile(r"Hydration complete module '([0-9a-f-]+)': (\d+) files \(([^)]+)\) in (\S+)$")
+_RE_PACK = re.compile(r"Pack unpack:\s+(\d+) packs,\s+(\d+) files,\s+(\S+),\s+download (\S+),\s+unpack (\S+)")
+_RE_DLPHASE = re.compile(r"Download phase:\s+(\S+)\s+(\d+)/(\d+)\s+\(([^)]+)\) downloaded,\s+(\S+),\s+(\d+) threads")
+_RE_REQS = re.compile(r"Requests:\s+(\d+) reqs,\s+avg (\S+)/req,\s+avg (\S+)/req,\s+peak in-flight (\d+),\s+saturation (\d+)%")
+_RE_SCHED = re.compile(r"Scheduling:\s+1st schedule \+(\S+),\s+1st start \+(\S+),\s+queue wait (\S+)")
+_RE_LOADMETA = re.compile(r"Load metadata:\s+(\S+)")
+_RE_RENAME = re.compile(r"Rename/copy:\s+(\S+)")
+
+
+def parse_log(path: Path) -> list[dict]:
+ modules: list[dict] = []
+ text = path.read_text(encoding="utf-8", errors="replace").splitlines()
+ i = 0
+ while i < len(text):
+ line = text[i]
+ m = _RE_COMPLETE.search(line)
+ if not m:
+ i += 1
+ continue
+ mod = {
+ "mid": m.group(1),
+ "files": int(m.group(2)),
+ "total_bytes": parse_size(m.group(3)),
+ "wall_s": parse_time_s(m.group(4)),
+ }
+ # Read the block that follows (up to ~12 lines).
+ block = text[i : min(i + 20, len(text))]
+ joined = "\n".join(block)
+ pm = _RE_PACK.search(joined)
+ if pm:
+ mod["packs"] = int(pm.group(1))
+ mod["packed_files"] = int(pm.group(2))
+ # Uncompressed format: single size, no integrity rehash.
+ mod["pack_bytes"] = parse_size(pm.group(3))
+ mod["pack_download_s"] = parse_time_s(pm.group(4))
+ mod["pack_unpack_s"] = parse_time_s(pm.group(5))
+ else:
+ mod["packs"] = 0
+ mod["packed_files"] = 0
+ mod["pack_bytes"] = 0.0
+ mod["pack_download_s"] = 0.0
+ mod["pack_unpack_s"] = 0.0
+
+ dm = _RE_DLPHASE.search(joined)
+ if dm:
+ mod["dl_phase_s"] = parse_time_s(dm.group(1))
+ mod["dl_files"] = int(dm.group(2))
+ mod["dl_total_files"] = int(dm.group(3))
+ mod["dl_bytes"] = parse_size(dm.group(4))
+ mod["dl_throughput_bps"] = parse_rate_bits(dm.group(5))
+ mod["dl_threads"] = int(dm.group(6))
+ rm = _RE_REQS.search(joined)
+ if rm:
+ mod["reqs"] = int(rm.group(1))
+ mod["req_avg_s"] = parse_time_s(rm.group(2))
+ mod["req_avg_throughput_bps"] = parse_rate_bits(rm.group(3))
+ mod["req_peak_inflight"] = int(rm.group(4))
+ mod["req_saturation_pct"] = int(rm.group(5))
+ sm = _RE_SCHED.search(joined)
+ if sm:
+ mod["sched_1st_schedule_s"] = parse_time_s(sm.group(1))
+ mod["sched_1st_start_s"] = parse_time_s(sm.group(2))
+ mod["queue_wait_s"] = parse_time_s(sm.group(3))
+ lm = _RE_LOADMETA.search(joined)
+ if lm:
+ mod["load_metadata_s"] = parse_time_s(lm.group(1))
+ rcm = _RE_RENAME.search(joined)
+ if rcm:
+ mod["rename_copy_s"] = parse_time_s(rcm.group(1))
+
+ modules.append(mod)
+ i += 1
+
+ return modules
+
+
+def fmt_bytes(n: float) -> str:
+ for u in ("B", "KB", "MB", "GB", "TB"):
+ if n < 1024 or u == "TB":
+ return f"{n:.1f} {u}"
+ n /= 1024
+ return f"{n:.1f} TB"
+
+
+def fmt_s(s: float) -> str:
+ if s < 1e-3:
+ return f"{s*1e6:.0f} us"
+ if s < 1.0:
+ return f"{s*1e3:.1f} ms"
+ return f"{s:.2f} s"
+
+
+def agg(modules: list[dict]) -> dict:
+ a: dict = {}
+ a["count"] = len(modules)
+ a["total_files"] = sum(m["files"] for m in modules)
+ a["total_bytes"] = sum(m["total_bytes"] for m in modules)
+ a["total_reqs"] = sum(m.get("reqs", 0) for m in modules)
+ a["total_packs"] = sum(m.get("packs", 0) for m in modules)
+ a["total_packed_files"] = sum(m.get("packed_files", 0) for m in modules)
+ a["total_pack_bytes"] = sum(m.get("pack_bytes", 0.0) for m in modules)
+ a["total_dl_bytes"] = sum(m.get("dl_bytes", 0.0) for m in modules)
+ a["total_dl_phase_s"] = sum(m.get("dl_phase_s", 0.0) for m in modules)
+ a["total_pack_download_s"] = sum(m.get("pack_download_s", 0.0) for m in modules)
+ a["total_pack_unpack_s"] = sum(m.get("pack_unpack_s", 0.0) for m in modules)
+ a["total_rename_s"] = sum(m.get("rename_copy_s", 0.0) for m in modules)
+ a["total_load_meta_s"] = sum(m.get("load_metadata_s", 0.0) for m in modules)
+ a["total_wall_s"] = sum(m["wall_s"] for m in modules)
+
+ # Per-module medians and percentiles of request-level metrics
+ req_avgs = [m["req_avg_s"] for m in modules if m.get("reqs", 0) > 0]
+ a["req_avg_latency_median_s"] = median(req_avgs) if req_avgs else 0.0
+ a["req_avg_latency_mean_s"] = mean(req_avgs) if req_avgs else 0.0
+ a["req_avg_latency_p95_s"] = (
+ sorted(req_avgs)[int(0.95 * len(req_avgs))] if req_avgs else 0.0
+ )
+
+ queue_waits = [m["queue_wait_s"] for m in modules if "queue_wait_s" in m]
+ a["queue_wait_median_s"] = median(queue_waits) if queue_waits else 0.0
+ a["queue_wait_mean_s"] = mean(queue_waits) if queue_waits else 0.0
+ a["queue_wait_p95_s"] = (
+ sorted(queue_waits)[int(0.95 * len(queue_waits))] if queue_waits else 0.0
+ )
+ a["queue_wait_max_s"] = max(queue_waits) if queue_waits else 0.0
+
+ first_starts = [m["sched_1st_start_s"] for m in modules if "sched_1st_start_s" in m]
+ a["first_start_median_s"] = median(first_starts) if first_starts else 0.0
+ a["first_start_p95_s"] = (
+ sorted(first_starts)[int(0.95 * len(first_starts))] if first_starts else 0.0
+ )
+
+ wall_times = [m["wall_s"] for m in modules]
+ a["wall_median_s"] = median(wall_times) if wall_times else 0.0
+ a["wall_mean_s"] = mean(wall_times) if wall_times else 0.0
+ a["wall_p95_s"] = sorted(wall_times)[int(0.95 * len(wall_times))] if wall_times else 0.0
+ a["wall_max_s"] = max(wall_times) if wall_times else 0.0
+
+ saturations = [m["req_saturation_pct"] for m in modules if "req_saturation_pct" in m]
+ a["saturation_median_pct"] = median(saturations) if saturations else 0
+ a["saturation_mean_pct"] = mean(saturations) if saturations else 0
+
+ peak_inflights = [m["req_peak_inflight"] for m in modules if "req_peak_inflight" in m]
+ a["peak_inflight_median"] = median(peak_inflights) if peak_inflights else 0
+ a["peak_inflight_max"] = max(peak_inflights) if peak_inflights else 0
+
+ return a
+
+
+def print_row(label: str, packed_val: str, unpacked_val: str, delta: str = ""):
+ print(f" {label:<42} {packed_val:>14} {unpacked_val:>14} {delta}")
+
+
+def delta_pct(packed: float, unpacked: float) -> str:
+ if unpacked == 0:
+ return ""
+ d = packed - unpacked
+ pct = (d / unpacked) * 100.0
+ sign = "+" if d >= 0 else ""
+ return f"{sign}{pct:.1f}%"
+
+
+def main() -> int:
+ p = argparse.ArgumentParser()
+ p.add_argument("--packed-log", required=True)
+ p.add_argument("--unpacked-log", required=True)
+ args = p.parse_args()
+
+ pm = parse_log(Path(args.packed_log))
+ um = parse_log(Path(args.unpacked_log))
+
+ pa = agg(pm)
+ ua = agg(um)
+
+ print(f"\nParsed packed: {pa['count']} modules from {args.packed_log}")
+ print(f"Parsed unpacked: {ua['count']} modules from {args.unpacked_log}\n")
+
+ print(f" {'metric':<42} {'packed':>14} {'unpacked':>14} delta(p vs u)")
+ print(f" {'-'*42} {'-'*14} {'-'*14} {'-'*12}")
+
+ print("\n-- workload --")
+ print_row("modules", f"{pa['count']}", f"{ua['count']}")
+ print_row("total files (entries)", f"{pa['total_files']}", f"{ua['total_files']}")
+ print_row("total raw data", fmt_bytes(pa["total_bytes"]), fmt_bytes(ua["total_bytes"]))
+
+ print("\n-- S3 requests --")
+ print_row("total S3 GETs",
+ f"{pa['total_reqs']}", f"{ua['total_reqs']}",
+ delta_pct(pa["total_reqs"], ua["total_reqs"]))
+ print_row("GETs per module (mean)",
+ f"{pa['total_reqs']/pa['count']:.1f}",
+ f"{ua['total_reqs']/ua['count']:.1f}",
+ delta_pct(pa['total_reqs']/pa['count'], ua['total_reqs']/ua['count']))
+ print_row("requests saved by packing",
+ f"{ua['total_reqs'] - pa['total_reqs']}", "-",
+ f"{((ua['total_reqs'] - pa['total_reqs']) / ua['total_reqs']) * 100:.1f}%")
+
+ print("\n-- payload --")
+ print_row("total bytes downloaded",
+ fmt_bytes(pa["total_dl_bytes"]),
+ fmt_bytes(ua["total_dl_bytes"]),
+ delta_pct(pa["total_dl_bytes"], ua["total_dl_bytes"]))
+ print_row(" pack bytes (aggregate)",
+ fmt_bytes(pa["total_pack_bytes"]), "-")
+ diff = pa["total_dl_bytes"] - ua["total_dl_bytes"]
+ direction = "saved by packing" if diff < 0 else "added by packing"
+ print_row(f"bytes {direction}", fmt_bytes(abs(diff)), "-")
+
+ print("\n-- per-request latency (per-module avg) --")
+ print_row("median req avg latency",
+ fmt_s(pa["req_avg_latency_median_s"]),
+ fmt_s(ua["req_avg_latency_median_s"]),
+ delta_pct(pa["req_avg_latency_median_s"], ua["req_avg_latency_median_s"]))
+ print_row("mean req avg latency",
+ fmt_s(pa["req_avg_latency_mean_s"]),
+ fmt_s(ua["req_avg_latency_mean_s"]),
+ delta_pct(pa["req_avg_latency_mean_s"], ua["req_avg_latency_mean_s"]))
+ print_row("p95 req avg latency",
+ fmt_s(pa["req_avg_latency_p95_s"]),
+ fmt_s(ua["req_avg_latency_p95_s"]),
+ delta_pct(pa["req_avg_latency_p95_s"], ua["req_avg_latency_p95_s"]))
+
+ print("\n-- queue / scheduling --")
+ print_row("median queue wait (per module)",
+ fmt_s(pa["queue_wait_median_s"]),
+ fmt_s(ua["queue_wait_median_s"]),
+ delta_pct(pa["queue_wait_median_s"], ua["queue_wait_median_s"]))
+ print_row("mean queue wait",
+ fmt_s(pa["queue_wait_mean_s"]),
+ fmt_s(ua["queue_wait_mean_s"]),
+ delta_pct(pa["queue_wait_mean_s"], ua["queue_wait_mean_s"]))
+ print_row("p95 queue wait",
+ fmt_s(pa["queue_wait_p95_s"]),
+ fmt_s(ua["queue_wait_p95_s"]),
+ delta_pct(pa["queue_wait_p95_s"], ua["queue_wait_p95_s"]))
+ print_row("max queue wait",
+ fmt_s(pa["queue_wait_max_s"]),
+ fmt_s(ua["queue_wait_max_s"]),
+ delta_pct(pa["queue_wait_max_s"], ua["queue_wait_max_s"]))
+ print_row("median 1st-start delay",
+ fmt_s(pa["first_start_median_s"]),
+ fmt_s(ua["first_start_median_s"]),
+ delta_pct(pa["first_start_median_s"], ua["first_start_median_s"]))
+
+ print("\n-- per-module wall time (hydration block) --")
+ print_row("median",
+ fmt_s(pa["wall_median_s"]),
+ fmt_s(ua["wall_median_s"]),
+ delta_pct(pa["wall_median_s"], ua["wall_median_s"]))
+ print_row("mean",
+ fmt_s(pa["wall_mean_s"]),
+ fmt_s(ua["wall_mean_s"]),
+ delta_pct(pa["wall_mean_s"], ua["wall_mean_s"]))
+ print_row("p95",
+ fmt_s(pa["wall_p95_s"]),
+ fmt_s(ua["wall_p95_s"]),
+ delta_pct(pa["wall_p95_s"], ua["wall_p95_s"]))
+ print_row("max",
+ fmt_s(pa["wall_max_s"]),
+ fmt_s(ua["wall_max_s"]),
+ delta_pct(pa["wall_max_s"], ua["wall_max_s"]))
+
+ print("\n-- thread-pool saturation / concurrency --")
+ print_row("saturation median %",
+ f"{pa['saturation_median_pct']}",
+ f"{ua['saturation_median_pct']}")
+ print_row("saturation mean %",
+ f"{pa['saturation_mean_pct']:.1f}",
+ f"{ua['saturation_mean_pct']:.1f}")
+ print_row("peak in-flight median",
+ f"{pa['peak_inflight_median']}",
+ f"{ua['peak_inflight_median']}")
+ print_row("peak in-flight max",
+ f"{pa['peak_inflight_max']}",
+ f"{ua['peak_inflight_max']}")
+
+ print("\n-- pack-specific costs (packed run only) --")
+ print(f" pack GETs added: {pa['total_packs']} ({pa['total_packs']/pa['count']:.2f}/module)")
+ print(f" small files folded: {pa['total_packed_files']} ({pa['total_packed_files']/pa['count']:.1f}/module)")
+ print(f" pack download time total: {pa['total_pack_download_s']:.1f} s")
+ print(f" pack unpack total: {pa['total_pack_unpack_s']:.2f} s")
+
+ print("\n-- per-phase totals (sum across modules, wall-clock contribution) --")
+ print_row("load metadata total",
+ fmt_s(pa["total_load_meta_s"]),
+ fmt_s(ua["total_load_meta_s"]),
+ delta_pct(pa["total_load_meta_s"], ua["total_load_meta_s"]))
+ print_row("download phase total",
+ fmt_s(pa["total_dl_phase_s"]),
+ fmt_s(ua["total_dl_phase_s"]),
+ delta_pct(pa["total_dl_phase_s"], ua["total_dl_phase_s"]))
+ print_row("rename/copy total",
+ fmt_s(pa["total_rename_s"]),
+ fmt_s(ua["total_rename_s"]),
+ delta_pct(pa["total_rename_s"], ua["total_rename_s"]))
+ print_row("hydration wall sum",
+ fmt_s(pa["total_wall_s"]),
+ fmt_s(ua["total_wall_s"]),
+ delta_pct(pa["total_wall_s"], ua["total_wall_s"]))
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py
new file mode 100644
index 000000000..23014409c
--- /dev/null
+++ b/scripts/test_scripts/hub/hub_load_test_s3.py
@@ -0,0 +1,537 @@
+#!/usr/bin/env python3
+"""Hub provision/deprovision sweep against a real S3 bucket.
+
+Lists the top-level folders in S3_URI (each folder name is a moduleId),
+picks 200 of them, then:
+ 1. fires all provision requests concurrently,
+ 2. polls until every module reaches 'provisioned',
+ 3. waits 5 seconds,
+ 4. fires all deprovision requests concurrently,
+ 5. polls until every module is deprovisioned,
+ 6. waits 5 seconds,
+ 7. shuts down the hub.
+
+Required environment variables (or pass via CLI flags):
+ ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/
+ ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access
+ ZEN_PERF_AWS_REGION defaults to us-east-1
+
+Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO
+session is missing or expired, the script triggers the login automatically.
+
+Requirements:
+ pip install boto3
+ aws CLI v2
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "")
+_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "")
+_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1")
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}"))
+ if matches:
+ return max(matches, key=lambda p: p.stat().st_mtime)
+
+ sys.exit(
+ "zenserver debug executable not found in build/. "
+ "Run: xmake config -y -m debug -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+# ---------------------------------------------------------------------------
+# AWS SSO + S3 listing
+# ---------------------------------------------------------------------------
+
+def _require_boto3():
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit(
+ "[aws] boto3 is required.\n"
+ "Install it with: pip install boto3"
+ )
+ return boto3, botocore.exceptions
+
+
+def _sso_login(profile: str) -> None:
+ print(f"[aws] running 'aws sso login --profile {profile}'...")
+ rc = subprocess.call(["aws", "sso", "login", "--profile", profile])
+ if rc != 0:
+ sys.exit(f"[aws] 'aws sso login' failed with rc={rc}")
+
+
+def _get_session(profile: str):
+ boto3, exc_mod = _require_boto3()
+
+ def _load_frozen():
+ session = boto3.Session(profile_name=profile)
+ creds = session.get_credentials()
+ if creds is None:
+ return session, None
+ return session, creds.get_frozen_credentials()
+
+ try:
+ session, frozen = _load_frozen()
+ if frozen is not None and frozen.access_key and frozen.secret_key:
+ return session, frozen
+ except exc_mod.ProfileNotFound:
+ sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config")
+ except Exception as e:
+ print(f"[aws] initial credential load failed: {e}")
+
+ _sso_login(profile)
+
+ session, frozen = _load_frozen()
+ if frozen is None or not frozen.access_key:
+ sys.exit("[aws] could not resolve credentials after sso login")
+ return session, frozen
+
+
+def _parse_s3_uri(uri: str) -> tuple[str, str]:
+ if not uri.startswith("s3://"):
+ sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}")
+ rest = uri[len("s3://"):]
+ if "/" in rest:
+ bucket, prefix = rest.split("/", 1)
+ else:
+ bucket, prefix = rest, ""
+ return bucket, prefix
+
+
+def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]:
+ s3 = session.client("s3", region_name=region)
+ prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
+
+ paginator = s3.get_paginator("list_objects_v2")
+ module_ids: list[str] = []
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"):
+ for cp in page.get("CommonPrefixes", []) or []:
+ full = cp.get("Prefix", "")
+ # Strip outer prefix + trailing slash to get the folder name
+ folder = full[len(prefix_norm):].rstrip("/")
+ if folder:
+ module_ids.append(folder)
+ return module_ids
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ out: dict[str, str] = {}
+ for m in data.get("modules", []) or []:
+ mid = m.get("moduleId")
+ if mid:
+ out[mid] = m.get("state", "")
+ return out
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """Fire POST /hub/modules/<id>/<verb> for every module concurrently.
+
+ Returns (accepted_ids, failures) where accepted_ids got 200/202 and
+ failures is a list of (mid, status, body) for everything else.
+ """
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ # Already gone - treat as success for deprovision fan-out.
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+def _wait_for_provisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is 'provisioned' or gone.
+
+ Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach
+ 'provisioned' within timeout_s; last_states maps all module_ids to their
+ last-seen state (or empty string if the module was absent from the report).
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ now_done: list[str] = []
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == "provisioned":
+ now_done.append(mid)
+ elif s in ("", "unprovisioned", "crashed"):
+ # Not useful end-states for "waiting to become provisioned",
+ # but we don't block forever on them either - let timeout decide.
+ pass
+ for mid in now_done:
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+def _wait_for_deprovisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is gone from hub status."""
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3",
+ help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--module-count", type=int, default=200,
+ help="Number of modules to sweep (default: 200)")
+ parser.add_argument("--settle-seconds", type=float, default=5.0,
+ help="Seconds to wait between provision-complete and deprovision, "
+ "and between deprovision-complete and shutdown (default: 5.0)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)")
+ parser.add_argument("--poll-timeout", type=float, default=600.0,
+ help="Max seconds to wait for provision or deprovision to finish (default: 600)")
+ parser.add_argument("--trace",
+ nargs="?", const="default", default=None, metavar="CHANNELS",
+ help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. "
+ "Optionally pass channel spec (default: 'default')")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ args = parser.parse_args()
+
+ s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI)
+ aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE)
+ aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION)
+ if not s3_uri:
+ sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/")
+ if not aws_profile:
+ sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name")
+
+ data_dir = Path(args.data_dir)
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] S3 URI: {s3_uri}")
+ print(f"[setup] profile: {aws_profile}")
+ print(f"[setup] region: {aws_region}")
+
+ session, frozen = _get_session(aws_profile)
+ print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})")
+
+ bucket, prefix = _parse_s3_uri(s3_uri)
+ print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...")
+ module_ids = _list_module_ids(session, bucket, prefix, aws_region)
+ print(f"[s3] found {len(module_ids)} module folders")
+
+ if not module_ids:
+ sys.exit("[s3] no module folders found, aborting")
+
+ selected = module_ids[:args.module_count]
+ if len(selected) < args.module_count:
+ print(f"[s3] only {len(selected)} folders available (requested {args.module_count})")
+
+ aws_env = {
+ "AWS_ACCESS_KEY_ID": frozen.access_key,
+ "AWS_SECRET_ACCESS_KEY": frozen.secret_key,
+ }
+ if frozen.token:
+ aws_env["AWS_SESSION_TOKEN"] = frozen.token
+
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {"uri": s3_uri, "region": aws_region},
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back.
+ "--hub-enable-dehydration=false",
+ ]
+ if args.trace:
+ trace_path = data_dir / "hub.utrace"
+ hub_extra_args += [
+ f"--trace={args.trace}",
+ f"--tracefile={trace_path}",
+ ]
+ print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+
+ try:
+ hub_instance_limit = max(args.module_count, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port, hub_log,
+ hub_instance_limit, hub_extra_args, aws_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision phase ---
+ print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...")
+ t_pv0 = time.monotonic()
+ accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision")
+ print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} "
+ f"(fan-out {time.monotonic() - t_pv0:.1f}s)")
+ for mid, status, body in pv_failures[:10]:
+ print(f"[provision] FAILED {mid}: status={status} body={body}")
+ if len(pv_failures) > 10:
+ print(f"[provision] ... and {len(pv_failures) - 10} more failures")
+
+ if hub_proc.poll() is not None:
+ print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
+ return
+
+ if accepted:
+ stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout)
+ provisioned_count = len(accepted) - len(stuck)
+ print()
+ print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' "
+ f"({time.monotonic() - t_pv0:.1f}s total)")
+ if stuck:
+ print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s")
+ for mid in stuck[:10]:
+ print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'")
+ else:
+ print("[provision] nothing provisioned")
+ return
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...")
+ time.sleep(args.settle_seconds)
+
+ # --- Deprovision phase ---
+ print(f"[deprovision] firing {len(accepted)} deprovision requests...")
+ t_dp0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision")
+ print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} "
+ f"(fan-out {time.monotonic() - t_dp0:.1f}s)")
+ for mid, status, body in dp_failures[:10]:
+ print(f"[deprovision] FAILED {mid}: status={status} body={body}")
+ if len(dp_failures) > 10:
+ print(f"[deprovision] ... and {len(dp_failures) - 10} more failures")
+
+ stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout)
+ deprovisioned_count = len(dp_accepted) - len(stuck_dp)
+ print()
+ print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone "
+ f"({time.monotonic() - t_dp0:.1f}s total)")
+ if stuck_dp:
+ print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s")
+ for mid in stuck_dp[:10]:
+ print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'")
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...")
+ time.sleep(args.settle_seconds)
+
+ print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ _stop_process(hub_proc, "hub", timeout_s=120.0)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua
new file mode 100644
index 000000000..f3cf3e697
--- /dev/null
+++ b/scripts/test_scripts/hub/perf_configs/hub.lua
@@ -0,0 +1,48 @@
+-- Perf-test hub config: mirrors the production hub config in the repo root
+-- (hub.lua / instance.lua). The launch script overrides hub.instance.config
+-- and the effective concurrency (--corelimit=128) via CLI to simulate an
+-- r5n.32xlarge box so the default auto-picks match what prod sees there.
+
+hub = {
+ instance = {
+ baseportnumber = 21000, -- default
+ limits = {
+ count = 1100, -- headroom for 1000-module perf runs
+ memorylimitpercent = 90, -- default: 0 (disabled)
+ disklimitpercent = 90, -- default: 0 (disabled)
+ },
+ corelimit = 4, -- default: 0 (auto)
+ provisionthreads = 8, -- default: auto
+ -- NOTE: hub.instance.config (path to instance lua) is overridden via
+ -- --hub-instance-config on the CLI. If left here, it would be resolved
+ -- relative to the hub's CWD at spawn time (NOT this file's dir).
+ },
+
+ hydration = {
+ -- Match production's per-module download pool size. Without this, the
+ -- default auto-picks hardware_concurrency/4 which on --corelimit=128
+ -- would be 32. Prod logs consistently show "16 threads" in Download phase.
+ threads = 16,
+ },
+
+ watchdog = {
+ cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan
+ cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count
+ instancecheckthrottlems = 10, -- default: 5. slight throttle to reduce hub CPU
+ provisionedinactivitytimeoutseconds = 600, -- default
+ hibernatedinactivitytimeoutseconds = 1800, -- default
+ inactivitycheckmarginseconds = 60, -- default
+ },
+}
+
+network = {
+ httpserverthreads = 8, -- default: auto. hub itself needs few threads
+}
+
+server = {
+ dedicated = true, -- default: false. signals build-farm use, affects thread scaling heuristics
+}
+
+gc = {
+ enabled = false, -- default: true. hub has no storage, no need for GC
+}
diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua
new file mode 100644
index 000000000..1251997db
--- /dev/null
+++ b/scripts/test_scripts/hub/perf_configs/instance.lua
@@ -0,0 +1,15 @@
+network = {
+ httpserverthreads = 4, -- default: auto
+}
+
+gc = {
+ enabled = false, -- default: true. hub triggers GC at deprovision, no need for periodic GC
+}
+
+cache = {
+ bucket = {
+ memlayer = {
+ sizethreshold = 0, -- default: 1024. 0 disables memlayer entirely
+ },
+ },
+}
diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py
new file mode 100644
index 000000000..365e4a542
--- /dev/null
+++ b/scripts/test_scripts/hub/preserve_minio_state.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python3
+"""Preserve a freshly-seeded MinIO data directory.
+
+Run this after seed_minio.py has finished. By default it MOVES --source (the
+MinIO --data-dir used during seeding) to --dest (the preservation path) and
+writes a README with provenance so future perf runs start from a known
+baseline. Move avoids a ~0.5 TB copy on a full 1000-module seed; Stage B
+wipes --minio-data-dir on its next invocation anyway.
+
+Pass --copy to keep --source in place (slower; needs 2x disk).
+
+Typical invocation:
+ python preserve_minio_state.py
+
+Defaults map to the paths recommended by PERF_SEED_README.md.
+"""
+
+from __future__ import annotations
+
+import argparse
+import datetime
+import os
+import shutil
+import stat
+import sys
+from pathlib import Path
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ def _onerror(func, p, exc_info):
+ try:
+ os.chmod(p, stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+def _size_of(path: Path) -> tuple[int, int]:
+ files = 0
+ total = 0
+ for root, _dirs, names in os.walk(path):
+ for n in names:
+ p = Path(root) / n
+ try:
+ total += p.stat().st_size
+ except OSError:
+ pass
+ files += 1
+ return files, total
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data",
+ help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)")
+ parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
+ help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
+ "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
+ parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""),
+ help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)")
+ parser.add_argument("--bucket", default="zen-seed",
+ help="MinIO bucket name recorded in the README")
+ parser.add_argument("--module-count", type=int, default=300,
+ help="Module count recorded in the README")
+ parser.add_argument("--copy", action="store_true",
+ help="Copy --source to --dest instead of moving it. Default is move "
+ "(fast, in-place rename when on the same volume). Use --copy if you "
+ "want to keep --source intact for another preserve run.")
+ args = parser.parse_args()
+
+ source = Path(args.source).resolve()
+ dest = Path(args.dest).resolve()
+
+ if not source.is_dir():
+ sys.exit(f"[preserve] source not found: {source}")
+
+ # Dest is wiped and rewritten. Refuse any path that would clobber source.
+ if dest == source or dest in source.parents or source in dest.parents:
+ sys.exit(f"[preserve] source ({source}) and dest ({dest}) must be disjoint")
+
+ files, total = _size_of(source)
+ mode = "copy" if args.copy else "move"
+ print(f"[preserve] source: {source} -> {files:,} files, {total/1024/1024:.1f} MB")
+ print(f"[preserve] dest: {dest}")
+ print(f"[preserve] mode: {mode}")
+
+ if dest.exists():
+ print(f"[preserve] removing existing dest {dest}")
+ _rmtree_robust(dest)
+
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ if args.copy:
+ shutil.copytree(source, dest, symlinks=False)
+ else:
+ shutil.move(str(source), str(dest))
+
+ readme = dest / "README.txt"
+ readme.write_text(
+ "\n".join([
+ "zen-perf-seed preserved MinIO state",
+ "",
+ f"Created: {datetime.datetime.now(datetime.timezone.utc).isoformat()}",
+ f"Source s3 URI: {args.s3_uri}",
+ f"Bucket: {args.bucket}",
+ f"Modules: {args.module_count}",
+ f"Files: {files:,}",
+ f"Bytes: {total:,}",
+ "",
+ "To run a perf iteration: copy this directory onto a fresh MinIO data",
+ "dir (see scripts/test_scripts/hub/run_minio_perf.py) and point a hub at it.",
+ "",
+ ]),
+ encoding="ascii",
+ )
+ print(f"[preserve] wrote {readme}")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py
new file mode 100644
index 000000000..b59cff41a
--- /dev/null
+++ b/scripts/test_scripts/hub/run_minio_perf.py
@@ -0,0 +1,614 @@
+#!/usr/bin/env python3
+"""Stage C of the perf-seed workflow: run a perf iteration against the
+preserved MinIO state.
+
+Flow per invocation:
+ 1. Reset --minio-run by copying --minio-seeded over it (so every iteration
+ starts from the same canonical state).
+ 2. Start MinIO against --minio-run.
+ 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so
+ the perf run is read-only and the seeded baseline survives intact.
+ Pass --enable-dehydration to run a full provision -> deprovision cycle
+ that re-uploads at deprovision time (use this to measure the dehydrate
+ phase as well as hydrate; the seeded baseline diverges after such a run).
+ 4. Provision every module found under --snapshot-dir (same set that was
+ used to seed MinIO), wait for all to reach 'provisioned'.
+ 5. Deprovision them all, wait for them to be gone.
+ 6. Stop hub + MinIO cleanly.
+
+Optional --trace writes a utrace file to --hub-data-dir/hub.utrace.
+
+This script reuses the lightweight harness helpers from the other stages but
+keeps the run fully offline once the seed is preserved.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import shutil
+import signal
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p}")
+ return p
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"minio executable not found at {p}")
+ return p
+
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(data_dir),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out after {timeout_s}s")
+
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ # Use the production-like Lua config (limits, thread counts, watchdog timers)
+ # so perf numbers reflect what prod actually runs. Hub.hydration.threads is
+ # left unset so the default auto-pick (hardware_concurrency/4) fires; combined
+ # with --corelimit=128 below, that yields 32 hydration threads on the actual
+ # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes.
+ config_dir = Path(__file__).resolve().parent / "perf_configs"
+ hub_config = config_dir / "hub.lua"
+ inst_config = config_dir / "instance.lua"
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ f"--config={hub_config}",
+ f"--hub-instance-config={inst_config}",
+ # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box
+ # with fewer physical cores, this clamps to the actual count (see
+ # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates).
+ # When running on the actual prod instance class, this makes the default
+ # auto-picks (provision/hydration thread counts) behave as prod does.
+ "--corelimit=128",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs)
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """'zen down --pid' for graceful hub shutdown."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
+ if proc.poll() is not None:
+ return
+ try:
+ if sys.platform == "win32":
+ proc.send_signal(signal.CTRL_BREAK_EVENT)
+ else:
+ proc.terminate()
+ except Exception:
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[minio] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_states(port: int) -> Optional[dict[str, str]]:
+ # Without Accept: application/json the hub serves compact binary (CBOR)
+ # which json.loads can't parse. 60s timeout - /hub/status can take a few
+ # seconds to serialize with 300+ modules in flight.
+ req = urllib.request.Request(
+ f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"},
+ )
+ try:
+ with urllib.request.urlopen(req, timeout=60) as resp:
+ data = json.loads(resp.read())
+ except Exception as e:
+ # Visibility: prior silent None caused "0/300 provisioned..." forever.
+ print(f"[status] WARN /hub/status failed: {e}")
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids}
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+def _state_histogram(states: dict[str, str], remaining: set[str]) -> str:
+ counts: dict[str, int] = {}
+ for mid in remaining:
+ s = states.get(mid, "<absent>")
+ counts[s] = counts.get(s, 0) + 1
+ return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1]))
+
+
+def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(ids)
+ last_verbose = 0.0
+ while remaining and time.monotonic() < deadline:
+ states = _hub_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ if states.get(mid) == "provisioned":
+ remaining.discard(mid)
+ done = len(ids) - len(remaining)
+ now = time.monotonic()
+ if states is not None and (now - last_verbose >= 10.0 or not remaining):
+ hist = _state_histogram(states, remaining) if remaining else "(all provisioned)"
+ print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}")
+ last_verbose = now
+ else:
+ print(f"[provision] {done}/{len(ids)} provisioned...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining)
+
+
+def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(ids)
+ last_verbose = 0.0
+ while remaining and time.monotonic() < deadline:
+ states = _hub_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+ done = len(ids) - len(remaining)
+ now = time.monotonic()
+ if states is not None and (now - last_verbose >= 10.0 or not remaining):
+ hist = _state_histogram(states, remaining) if remaining else "(all gone)"
+ print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}")
+ last_verbose = now
+ else:
+ print(f"[deprovision] {done}/{len(ids)} gone...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining)
+
+
+def _robust_copytree(src: Path, dst: Path) -> None:
+ """Windows-friendly directory copy with progress.
+
+ Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy
+ file data + attributes + timestamps explicitly (not just the defaults).
+ Safe because dst is always the working dir (minio-run) - never the
+ preserved baseline.
+ """
+ if sys.platform == "win32":
+ cmd = [
+ "robocopy", str(src), str(dst),
+ "/MIR",
+ "/COPY:DAT", "/DCOPY:DAT",
+ "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP",
+ "/R:2", "/W:1",
+ ]
+ print(f"[reset] robocopy {src} -> {dst}")
+ rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ if rc >= 8:
+ sys.exit(f"[reset] robocopy failed rc={rc}")
+ else:
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False)
+
+
+def _archive_run(
+ archive_dir: Path,
+ hub_data_dir: Path,
+ bucket: str,
+ summary: dict,
+) -> Optional[Path]:
+ """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a
+ timestamped subdir under archive_dir so successive runs can be compared.
+
+ Returns the archive destination path or None if there was nothing to copy.
+ """
+ try:
+ ts = time.strftime("%Y%m%d-%H%M%S", time.localtime())
+ dest = archive_dir / f"{ts}_{bucket}"
+ dest.mkdir(parents=True, exist_ok=True)
+
+ copied: list[str] = []
+ # Hub stdout/stderr (captured by _start_hub via subprocess stdout=)
+ hub_log = hub_data_dir / "hub.log"
+ if hub_log.is_file():
+ shutil.copy2(hub_log, dest / "hub.log")
+ copied.append("hub.log")
+ # Structured zenserver logs (rotated variants included)
+ logs_src = hub_data_dir / "logs"
+ if logs_src.is_dir():
+ logs_dst = dest / "logs"
+ logs_dst.mkdir(exist_ok=True)
+ for p in logs_src.iterdir():
+ if p.is_file():
+ shutil.copy2(p, logs_dst / p.name)
+ copied.append(f"logs/{p.name}")
+ # Optional trace
+ utrace = hub_data_dir / "hub.utrace"
+ if utrace.is_file():
+ shutil.copy2(utrace, dest / "hub.utrace")
+ copied.append("hub.utrace")
+
+ (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii")
+
+ print(f"[archive] {dest} ({len(copied)} files)")
+ return dest
+ except Exception as e:
+ print(f"[archive] WARNING: failed to archive run: {e}")
+ return None
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
+ help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
+ "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
+ parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run",
+ help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)")
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf",
+ help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf")
+ parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs",
+ help="Where to archive hub.log + zenserver.log + hub.utrace after each run "
+ "(default: E:/Dev/zen-perf-seed/perf-runs)")
+ parser.add_argument("--bucket", default="zen-seed-packed",
+ help="MinIO bucket name to exercise (default: zen-seed-packed). "
+ "Pack worktree seeds only the packed bucket.")
+ parser.add_argument("--minio-port", type=int, default=9000)
+ parser.add_argument("--minio-console-port", type=int, default=9001)
+ parser.add_argument("--hub-port", type=int, default=8558)
+ parser.add_argument("--module-count", type=int, default=0,
+ help="Cap on modules used (0 = all from snapshot-dir)")
+ parser.add_argument("--workers", type=int, default=50)
+ parser.add_argument("--poll-timeout", type=float, default=1200.0)
+ parser.add_argument("--settle-seconds", type=float, default=5.0)
+ parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS",
+ help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')")
+ parser.add_argument("--enable-dehydration", action="store_true",
+ help="Run with --hub-enable-dehydration=true so the deprovision phase "
+ "actually re-uploads to MinIO. Default false preserves the seeded "
+ "baseline; pass this to measure dehydrate cost as well as hydrate.")
+ parser.add_argument("--no-wipe-hub", action="store_true",
+ help="Don't wipe the hub data dir before starting (useful to inspect leftover state)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver + minio executables (auto-detected)")
+ args = parser.parse_args()
+
+ minio_seeded = Path(args.minio_seeded).resolve()
+ minio_run = Path(args.minio_run).resolve()
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+ hub_data_dir = Path(args.hub_data_dir).resolve()
+ archive_dir = Path(args.archive_dir).resolve()
+
+ if not minio_seeded.is_dir():
+ sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}")
+ if not snapshot_dir.is_dir():
+ sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}")
+
+ # Preserved inputs are read-only from this script's perspective. Refuse to
+ # run if any mutable path could accidentally clobber them.
+ for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)):
+ for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)):
+ if mutable == ro or mutable in ro.parents or ro in mutable.parents:
+ sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}")
+
+ module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()])
+ if args.module_count > 0:
+ module_ids = module_ids[:args.module_count]
+ if not module_ids:
+ sys.exit(f"[setup] no modules in {snapshot_dir}")
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ minio_exe = _find_minio(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] minio: {minio_exe}")
+ print(f"[setup] minio-seeded: {minio_seeded}")
+ print(f"[setup] minio-run: {minio_run}")
+ print(f"[setup] hub-data-dir: {hub_data_dir}")
+ print(f"[setup] modules: {len(module_ids)}")
+
+ # Reset MinIO run dir from the baseline.
+ _robust_copytree(minio_seeded, minio_run)
+
+ # Wipe the hub data dir so every run starts from scratch unless the user opts out.
+ if not args.no_wipe_hub and hub_data_dir.exists():
+ print(f"[reset] wiping {hub_data_dir}")
+ _rmtree_robust(hub_data_dir)
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}",
+ ]
+ if args.enable_dehydration:
+ print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge")
+ if args.trace:
+ trace_path = hub_data_dir / "hub.utrace"
+ hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"]
+ print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+ timings: dict = {
+ "bucket": args.bucket,
+ "module_count": len(module_ids),
+ "build_mode": zenserver_mode,
+ "provision_fanout_s": None,
+ "provision_total_s": None,
+ "deprovision_fanout_s": None,
+ "deprovision_total_s": None,
+ "total_s": None,
+ }
+
+ try:
+ minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+
+ hub_log = hub_data_dir / "hub.log"
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, args.hub_port, hub_log,
+ hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, args.hub_port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision ---
+ print(f"[provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision")
+ fanout_s = time.monotonic() - t0
+ timings["provision_fanout_s"] = round(fanout_s, 2)
+ print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)")
+ for mid, status, body in failures[:5]:
+ print(f"[provision] FAIL {mid}: status={status} body={body}")
+ if failures:
+ exit_code = 1
+ stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout)
+ total_s = time.monotonic() - t0
+ timings["provision_total_s"] = round(total_s, 2)
+ print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})")
+ if stuck:
+ exit_code = 1
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s")
+ time.sleep(args.settle_seconds)
+
+ # --- Deprovision ---
+ print(f"[deprovision] firing {len(accepted)} deprovision requests...")
+ t0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision")
+ fanout_s = time.monotonic() - t0
+ timings["deprovision_fanout_s"] = round(fanout_s, 2)
+ print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)")
+ if dp_failures:
+ exit_code = 1
+ stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout)
+ total_s = time.monotonic() - t0
+ timings["deprovision_total_s"] = round(total_s, 2)
+ print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})")
+ if stuck_dp:
+ exit_code = 1
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s")
+ time.sleep(args.settle_seconds)
+
+ elapsed = time.monotonic() - t_start
+ timings["total_s"] = round(elapsed, 2)
+ print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ _zen_down_hub(zen_exe, hub_proc)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None and minio_proc.poll() is None:
+ _stop_minio_graceful(minio_proc)
+ # Archive AFTER the hub has exited so in-flight log writes are flushed.
+ timings["exit_code"] = exit_code
+ _archive_run(archive_dir, hub_data_dir, args.bucket, timings)
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py
new file mode 100644
index 000000000..e0e45c4cb
--- /dev/null
+++ b/scripts/test_scripts/hub/seed_minio.py
@@ -0,0 +1,720 @@
+#!/usr/bin/env python3
+"""Stage B of the perf-seed workflow.
+
+Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO
+bucket so a later perf run can exercise that bucket. Pack mode is hardcoded
+in this script (see --hub-hydration-enable-pack flag in _start_hub) and
+governs how the hub uploads into MinIO. To compare packed vs unpacked,
+invoke this script twice from two separate worktrees - one with the pack
+flag flipped to false - producing two preserved MinIO data dirs, then run
+run_minio_perf.py against each.
+
+Flow:
+ 1. Start a local MinIO server against --minio-data-dir, create the bucket.
+ 2. Start a hub against MinIO (--bucket as target), dehydration ON,
+ hibernated-watchdog timeout set huge so hibernated modules are not
+ auto-deprovisioned before we deprovision them ourselves.
+ 3. Provision every module discovered under --snapshot-dir (fresh hydrate
+ against an empty bucket spawns the child on an empty dir).
+ 4. Hibernate every module (child exits, state dir unlocked).
+ 5. Overlay the snapshot into <hub-data-dir>/servers/<mid>/.
+ 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(),
+ which scans the overlaid ServerStateDir and uploads content into the
+ bucket.
+ 7. Shut the hub down via 'zen down --pid'.
+ 8. Stop MinIO.
+
+Preservation of the resulting MinIO data directory is done by a separate
+step (see preserve_minio_state.py / PERF_SEED_README.md).
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import shutil
+import signal
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery - prefer release, fall back to debug.
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)")
+ return p
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"minio executable not found at {p}. Build the zen project first.")
+ return p
+
+
+# ---------------------------------------------------------------------------
+# MinIO lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(data_dir),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.config # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit("[minio] boto3 is required. pip install boto3")
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ # Seeding is not a perf-measurement path - we want it as fast as the
+ # host can manage. Let the hub go wide on both provisioning and
+ # hydration thread pools rather than matching prod limits.
+ "--hub-instance-provision-threads=64",
+ "--hub-hydration-threads=64",
+ # Prevent the watchdog from auto-deprovisioning modules while we're
+ # still hydrating the tail / in the overlay phase. BOTH timers have to
+ # be extended - the provisioned one (default 600s) is what bites on
+ # large-N runs where early provisions go idle waiting for the rest.
+ "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400",
+ "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400",
+ # Explicit - default is true, but make it obvious that Stage B needs
+ # it since the final deprovision drives the MinIO upload.
+ "--hub-enable-dehydration=true",
+ # Pack worktree: turn pack on so dehydrate emits packed CAS.
+ "--hub-hydration-enable-pack=true",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """'zen down --pid' so the hub runs its normal shutdown path."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
+ """MinIO has no external shutdown tool; Ctrl-Break lets it flush."""
+ if proc.poll() is not None:
+ return
+ try:
+ if sys.platform == "win32":
+ proc.send_signal(signal.CTRL_BREAK_EVENT)
+ else:
+ proc.terminate()
+ except Exception:
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[minio] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """POST concurrently. For 'deprovision' a 404 means the module is already
+ gone - count it as accepted so the state-poll recognises completion.
+ """
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+_FAILED_STATES = {"crashed", "unprovisioned"}
+
+
+def _wait_for_state(
+ port: int,
+ module_ids: list[str],
+ target_state: str,
+ timeout_s: float,
+ label: str,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == target_state:
+ remaining.discard(mid)
+ elif s in _FAILED_STATES and target_state not in _FAILED_STATES:
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining), failed, last_states
+
+
+def _wait_for_deprovisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ """Wait until each module is gone from /hub/status or in 'unprovisioned'.
+ 'crashed' counts as a hard failure - dehydrate never completed for it.
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+ elif s == "crashed":
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining), failed, last_states
+
+
+# ---------------------------------------------------------------------------
+# Snapshot overlay
+# ---------------------------------------------------------------------------
+
+def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]:
+ """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*.
+
+ snapshot_root is treated as read-only; only hub_servers_root is written to.
+ """
+ files_copied = 0
+ bytes_copied = 0
+ modules_overlaid = 0
+
+ for i, mid in enumerate(module_ids, 1):
+ src = snapshot_root / mid
+ dst = hub_servers_root / mid
+ if not src.is_dir():
+ print(f"[overlay] WARNING: snapshot missing for {mid}: {src}")
+ continue
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ modules_overlaid += 1
+ for root, _dirs, files in os.walk(dst):
+ for f in files:
+ p = Path(root) / f
+ try:
+ bytes_copied += p.stat().st_size
+ except OSError:
+ pass
+ files_copied += 1
+ if i % 25 == 0 or i == len(module_ids):
+ print(f"[overlay] {i}/{len(module_ids)} modules overlaid "
+ f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
+ return modules_overlaid, files_copied, bytes_copied
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def _seed_one_bucket(
+ *,
+ bucket: str,
+ hub_data_dir: Path,
+ snapshot_dir: Path,
+ module_ids: list[str],
+ minio_port: int,
+ hub_port: int,
+ poll_timeout: float,
+ workers: int,
+ zenserver_exe: Path,
+ zen_exe: Path,
+) -> tuple[int, dict[str, float]]:
+ """Run the provision/hibernate/overlay/deprovision cycle against a single
+ MinIO bucket. Returns (exit_code, timings) where timings maps phase name
+ to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s,
+ total_s).
+ """
+ print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================")
+
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{bucket}",
+ "endpoint": f"http://localhost:{minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [f"--hub-hydration-target-config={config_path}"]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+ hib_accepted: list[str] = []
+ stuck_hib: list[str] = []
+ failed_hib: list[str] = []
+ timings: dict[str, float] = {
+ "provision_s": 0.0,
+ "hibernate_s": 0.0,
+ "overlay_s": 0.0,
+ "deprovision_s": 0.0,
+ "total_s": 0.0,
+ }
+
+ try:
+ hub_log = hub_data_dir / "hub.log"
+ hub_instance_limit = max(len(module_ids) + 10, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, hub_port, hub_log,
+ hub_instance_limit, hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, hub_port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ # --- Provision ---
+ print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision")
+ print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in failures[:10]:
+ print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+ if not accepted:
+ return 1, timings
+
+ stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision")
+ timings["provision_s"] = time.monotonic() - t0
+ prov_done = len(accepted) - len(stuck) - len(failed)
+ print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck "
+ f"({timings['provision_s']:.1f}s)")
+ if failed or stuck:
+ exit_code = 1
+ for mid in (failed + stuck)[:10]:
+ print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'")
+ accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)]
+ if not accepted:
+ return 1, timings
+
+ # --- Hibernate ---
+ print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...")
+ t0 = time.monotonic()
+ hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate")
+ print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in hib_failures[:10]:
+ print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate")
+ timings["hibernate_s"] = time.monotonic() - t0
+ hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib)
+ print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck "
+ f"({timings['hibernate_s']:.1f}s)")
+ if failed_hib or stuck_hib:
+ exit_code = 1
+ for mid in (failed_hib + stuck_hib)[:10]:
+ print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'")
+
+ # --- Overlay snapshot onto hub's servers dir ---
+ to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
+ hub_servers = hub_data_dir / "servers"
+ print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}")
+ t0 = time.monotonic()
+ modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay)
+ timings["overlay_s"] = time.monotonic() - t0
+ print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB "
+ f"({timings['overlay_s']:.1f}s)")
+ if modules_overlaid < len(to_overlay):
+ exit_code = 1
+
+ # --- Deprovision (triggers dehydrate -> MinIO upload) ---
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...")
+ t0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision")
+ print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in dp_failures[:10]:
+ print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout)
+ timings["deprovision_s"] = time.monotonic() - t0
+ dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp)
+ print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck "
+ f"({timings['deprovision_s']:.1f}s)")
+ if failed_dp or stuck_dp:
+ exit_code = 1
+ for mid in (failed_dp + stuck_dp)[:10]:
+ print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'")
+
+ timings["total_s"] = time.monotonic() - t_start
+ print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ print(f"[{bucket}][hub] stopping...")
+ _zen_down_hub(zen_exe, hub_proc)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+ return exit_code, timings
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs",
+ help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ "
+ "(default: E:/Dev/zen-perf-seed/hubs)")
+ parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data",
+ help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)")
+ parser.add_argument("--minio-port", type=int, default=9000)
+ parser.add_argument("--minio-console-port", type=int, default=9001)
+ parser.add_argument("--hub-port", type=int, default=8558)
+ parser.add_argument("--bucket", default="zen-seed-packed",
+ help="Bucket to seed (default: zen-seed-packed). Pack worktree - "
+ "hub is launched with --hub-hydration-enable-pack=true.")
+ parser.add_argument("--module-count", type=int, default=0,
+ help="Cap on modules processed (0 = all modules in snapshot-dir)")
+ parser.add_argument("--workers", type=int, default=50)
+ parser.add_argument("--poll-timeout", type=float, default=1800.0,
+ help="Max seconds to wait for each state transition (default: 1800)")
+ parser.add_argument("--wipe", action="store_true",
+ help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting "
+ "(never touches --snapshot-dir)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver + minio executables (auto-detected)")
+ args = parser.parse_args()
+
+ bucket_name: str = args.bucket
+
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+ hub_data_root = Path(args.hub_data_root).resolve()
+ minio_data_dir = Path(args.minio_data_dir).resolve()
+
+ hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve()
+
+ # Safety: snapshot-dir must not overlap any mutable path.
+ for label, d in [
+ ("minio-data-dir", minio_data_dir),
+ ("hub-data-root", hub_data_root),
+ (f"hub-b-{bucket_name}", hub_data_dir),
+ ]:
+ if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents:
+ sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint")
+
+ if not snapshot_dir.is_dir():
+ sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}")
+
+ module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()])
+ if args.module_count > 0:
+ module_ids = module_ids[:args.module_count]
+ if not module_ids:
+ sys.exit(f"[setup] no module directories found in {snapshot_dir}")
+
+ if args.wipe:
+ for d in [hub_data_dir, minio_data_dir]:
+ if d.exists():
+ if snapshot_dir.is_relative_to(d):
+ sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it")
+ print(f"[setup] wiping {d}")
+ _rmtree_robust(d)
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ minio_exe = _find_minio(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] zen cli: {zen_exe}")
+ print(f"[setup] minio: {minio_exe}")
+ print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)")
+ print(f"[setup] hub-data-root: {hub_data_root}")
+ print(f"[setup] minio-data-dir: {minio_data_dir}")
+ print(f"[setup] modules: {len(module_ids)}")
+ print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})")
+
+ minio_proc: Optional[subprocess.Popen] = None
+ exit_code = 0
+
+ try:
+ minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, bucket_name)
+
+ t_total = time.monotonic()
+ rc, timings = _seed_one_bucket(
+ bucket=bucket_name,
+ hub_data_dir=hub_data_dir,
+ snapshot_dir=snapshot_dir,
+ module_ids=module_ids,
+ minio_port=args.minio_port,
+ hub_port=args.hub_port,
+ poll_timeout=args.poll_timeout,
+ workers=args.workers,
+ zenserver_exe=zenserver_exe,
+ zen_exe=zen_exe,
+ )
+ exit_code = rc
+
+ print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}")
+ print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}")
+
+ phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"]
+ labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"]
+ print(f"\n[summary] timings (s):")
+ for key, lbl in zip(phases, labels):
+ print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}")
+
+ print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/")
+
+ finally:
+ if minio_proc is not None and minio_proc.poll() is None:
+ print("[minio] stopping...")
+ _stop_minio_graceful(minio_proc)
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py
new file mode 100644
index 000000000..f0bc7b607
--- /dev/null
+++ b/scripts/test_scripts/hub/seed_s3_snapshot.py
@@ -0,0 +1,635 @@
+#!/usr/bin/env python3
+"""Stage A of the perf-seed workflow.
+
+Fetches real module data from a configured S3 bucket via the hub hydration
+pipeline and preserves the decompressed server-state trees to disk so a
+later stage can replay them into a local MinIO backend.
+
+Flow:
+ 1. Start a hub against real S3 (read-only SSO credentials), dehydration off,
+ hibernated-watchdog timeout set huge so modules are not auto-deprovisioned
+ mid-flow.
+ 2. Provision N modules (first N moduleIds from the bucket listing, filtered
+ to UUID-shaped folders).
+ 3. Wait until every module reaches 'provisioned'.
+ 4. Hibernate every module (child processes exit, state dir intact and no
+ remaining writer while we copy).
+ 5. Wait until every module reaches 'hibernated'.
+ 6. Copy <hub-data-dir>/servers/<moduleid>/ -> <snapshot-dir>/<moduleid>/
+ with the hub still running - hibernate took the writers offline and the
+ watchdog is muted.
+ 7. Shut the hub down via 'zen down --pid <hub pid>'.
+
+Required environment variables (or pass via CLI flags):
+ ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/
+ ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access
+ ZEN_PERF_AWS_REGION defaults to us-east-1
+
+Requirements:
+ pip install boto3
+ aws CLI v2 with the SSO profile configured
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import re
+import shutil
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "")
+_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "")
+_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1")
+
+# Matches UUID-shaped module IDs (UUID-ish with mixed 4-char groups). Filters
+# out stray top-level keys (readmes, test scratches) that aren't real modules.
+_MODULEID_RE = re.compile(r"^[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}$")
+
+
+# ---------------------------------------------------------------------------
+# zenserver binary discovery - prefer release, fall back to debug.
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)")
+ return p
+
+
+# ---------------------------------------------------------------------------
+# AWS SSO credential resolution
+# ---------------------------------------------------------------------------
+
+def _require_boto3():
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit("[aws] boto3 is required. Install it with: pip install boto3")
+ return boto3, botocore.exceptions
+
+
+def _sso_login(profile: str) -> None:
+ print(f"[aws] running 'aws sso login --profile {profile}'...")
+ rc = subprocess.call(["aws", "sso", "login", "--profile", profile])
+ if rc != 0:
+ sys.exit(f"[aws] 'aws sso login' failed with rc={rc}")
+
+
+def _get_session(profile: str):
+ boto3, exc_mod = _require_boto3()
+
+ def _load_frozen():
+ session = boto3.Session(profile_name=profile)
+ creds = session.get_credentials()
+ if creds is None:
+ return session, None
+ return session, creds.get_frozen_credentials()
+
+ try:
+ session, frozen = _load_frozen()
+ if frozen is not None and frozen.access_key and frozen.secret_key:
+ return session, frozen
+ except exc_mod.ProfileNotFound:
+ sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config")
+ except Exception as e:
+ print(f"[aws] initial credential load failed: {e}")
+
+ _sso_login(profile)
+
+ session, frozen = _load_frozen()
+ if frozen is None or not frozen.access_key:
+ sys.exit("[aws] could not resolve credentials after sso login")
+ return session, frozen
+
+
+def _parse_s3_uri(uri: str) -> tuple[str, str]:
+ if not uri.startswith("s3://"):
+ sys.exit(f"[aws] invalid S3 URI: {uri}")
+ rest = uri[len("s3://"):]
+ if "/" in rest:
+ bucket, prefix = rest.split("/", 1)
+ else:
+ bucket, prefix = rest, ""
+ return bucket, prefix
+
+
+def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]:
+ """List UUID-shaped module folders under the bucket and return the `limit`
+ most recently active ones, ranked by the LastModified of each module's
+ `incremental-state.cbo` (newest first - these were last dehydrated most
+ recently, which is the closest proxy for "most recently accessed").
+
+ Falls back to listing order for any folder whose state file can't be
+ HEADed (missing / 403 / transient error).
+ """
+ s3 = session.client("s3", region_name=region)
+ prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
+
+ # 1. Enumerate every UUID-shaped folder.
+ paginator = s3.get_paginator("list_objects_v2")
+ candidates: list[str] = []
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"):
+ for cp in page.get("CommonPrefixes", []) or []:
+ folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/")
+ if folder and _MODULEID_RE.match(folder):
+ candidates.append(folder)
+ print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...")
+
+ # 2. HEAD each module's state file in parallel. Missing/failed HEADs land
+ # at the tail via a sentinel epoch 0 timestamp.
+ from datetime import datetime, timezone
+ epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
+
+ def _state_mtime(mid: str) -> datetime:
+ key = f"{prefix_norm}{mid}/incremental-state.cbo"
+ try:
+ resp = s3.head_object(Bucket=bucket, Key=key)
+ return resp.get("LastModified", epoch)
+ except Exception:
+ return epoch
+
+ with ThreadPoolExecutor(max_workers=50) as pool:
+ times = list(pool.map(_state_mtime, candidates))
+
+ # 3. Sort descending (newest first). Folders without a state file sink.
+ ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True)
+ missing = sum(1 for _, t in ranked if t == epoch)
+ if missing:
+ print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)")
+ return [mid for mid, _ in ranked[:limit]]
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ # Seeding is not a perf-measurement path - we want it as fast as the
+ # host can manage. Let the hub go wide on both provisioning and
+ # hydration thread pools rather than matching prod limits.
+ "--hub-instance-provision-threads=64",
+ "--hub-hydration-threads=64",
+ # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH
+ # watchdog inactivity timers so early-provisioned modules do not get
+ # auto-deprovisioned while we're still hydrating the tail (hibernated
+ # default 1800s, provisioned default 600s - the latter is the one that
+ # bit us on the 1000-module run and dropped 335 modules).
+ "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400",
+ "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """Shut the hub down via 'zen down --pid <pid>'. zen down performs a
+ proper server-initiated shutdown (signals the shutdown event, waits for
+ the server to drain) rather than a blunt kill."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub HTTP API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """POST /hub/modules/<id>/<verb> concurrently. Returns (accepted, failures)."""
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+# Any of these means the module is done transitioning and will not reach the
+# target state on its own (without a retry we control).
+_FAILED_STATES = {"crashed", "unprovisioned"}
+
+
+def _wait_for_state(
+ port: int,
+ module_ids: list[str],
+ target_state: str,
+ timeout_s: float,
+ label: str,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ """Poll hub status until every module hits target_state, fails, or times out.
+
+ Returns (stuck, failed, last_states). 'stuck' = still mid-transition when
+ we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'.
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == target_state:
+ remaining.discard(mid)
+ elif s in _FAILED_STATES and target_state not in _FAILED_STATES:
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+
+ print()
+ return list(remaining), failed, last_states
+
+
+# ---------------------------------------------------------------------------
+# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer)
+# ---------------------------------------------------------------------------
+
+def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]:
+ """Copy src_root/<mid>/* to dst_root/<mid>/* for each module.
+
+ Returns (modules_copied, files_copied, bytes_copied).
+ Replaces only the specific per-module subdirs; never touches siblings.
+ """
+ dst_root.mkdir(parents=True, exist_ok=True)
+ modules_copied = 0
+ files_copied = 0
+ bytes_copied = 0
+
+ for i, mid in enumerate(module_ids, 1):
+ src = src_root / mid
+ dst = dst_root / mid
+ if not src.is_dir():
+ print(f"[snapshot] WARNING: source missing for {mid}: {src}")
+ continue
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ modules_copied += 1
+ for root, _dirs, files in os.walk(dst):
+ for f in files:
+ p = Path(root) / f
+ try:
+ bytes_copied += p.stat().st_size
+ except OSError:
+ pass
+ files_copied += 1
+ if i % 25 == 0 or i == len(module_ids):
+ print(f"[snapshot] {i}/{len(module_ids)} modules copied "
+ f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
+
+ return modules_copied, files_copied, bytes_copied
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a",
+ help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)")
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Number of modules to snapshot (default: 300)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Concurrent HTTP workers (default: 50)")
+ parser.add_argument("--poll-timeout", type=float, default=1800.0,
+ help="Max seconds to wait for provision or hibernate to finish (default: 1800)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ args = parser.parse_args()
+
+ hub_data_dir = Path(args.hub_data_dir).resolve()
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+
+ # Safety: snapshot-dir is the preserved output. It must not overlap the
+ # hub data-dir in either direction so nothing the hub writes can clobber
+ # the preserved state.
+ if (snapshot_dir == hub_data_dir
+ or snapshot_dir in hub_data_dir.parents
+ or hub_data_dir in snapshot_dir.parents):
+ sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and hub-data-dir ({hub_data_dir}) must be disjoint")
+
+ s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI)
+ aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE)
+ aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION)
+ if not s3_uri:
+ sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/")
+ if not aws_profile:
+ sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name")
+
+ hub_log = hub_data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] zen cli: {zen_exe}")
+ print(f"[setup] S3 URI: {s3_uri}")
+ print(f"[setup] profile: {aws_profile}")
+ print(f"[setup] hub-data-dir: {hub_data_dir}")
+ print(f"[setup] snapshot-dir: {snapshot_dir}")
+
+ session, frozen = _get_session(aws_profile)
+ print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})")
+
+ bucket, prefix = _parse_s3_uri(s3_uri)
+ print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...")
+ module_ids = _list_module_ids(session, bucket, prefix, aws_region, args.module_count)
+ print(f"[s3] selected {len(module_ids)} module folders (UUID-shaped)")
+ if len(module_ids) < args.module_count:
+ print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter")
+
+ if not module_ids:
+ sys.exit("[s3] no module folders found, aborting")
+
+ aws_env = {
+ "AWS_ACCESS_KEY_ID": frozen.access_key,
+ "AWS_SECRET_ACCESS_KEY": frozen.secret_key,
+ }
+ if frozen.token:
+ aws_env["AWS_SESSION_TOKEN"] = frozen.token
+
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {"uri": s3_uri, "region": aws_region},
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ "--hub-enable-dehydration=false",
+ ]
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+
+ try:
+ hub_instance_limit = max(args.module_count + 10, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, args.port, hub_log,
+ hub_instance_limit, hub_extra_args, aws_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision ---
+ print(f"[provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, args.port, module_ids, "provision")
+ print(f"[provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in failures[:10]:
+ print(f"[provision] FAILED {mid}: status={status} body={body}")
+ if not accepted:
+ sys.exit("[provision] nothing accepted, aborting")
+
+ stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision")
+ prov_done = len(accepted) - len(stuck) - len(failed)
+ print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck "
+ f"({time.monotonic()-t0:.1f}s)")
+ if failed:
+ for mid in failed[:10]:
+ print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'")
+ exit_code = 1
+ if stuck:
+ for mid in stuck[:10]:
+ print(f"[provision] STUCK {mid}: last state='{last_states.get(mid, '')}'")
+ exit_code = 1
+ accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)]
+
+ if not accepted:
+ sys.exit("[provision] nothing successfully provisioned, aborting")
+
+ # --- Hibernate ---
+ print(f"[hibernate] firing {len(accepted)} hibernate requests...")
+ t0 = time.monotonic()
+ hib_accepted, hib_failures = _fan_out_post(pool, args.port, accepted, "hibernate")
+ print(f"[hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in hib_failures[:10]:
+ print(f"[hibernate] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_hib, failed_hib, last_states_hib = _wait_for_state(args.port, hib_accepted, "hibernated", args.poll_timeout, "hibernate")
+ hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib)
+ print(f"[hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck "
+ f"({time.monotonic()-t0:.1f}s)")
+ if failed_hib or stuck_hib:
+ exit_code = 1
+ for mid in (failed_hib + stuck_hib)[:10]:
+ print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'")
+
+ # --- Copy snapshots while hub is still running. All instances are
+ # hibernated (no writers), watchdog-hibernated-timeout is 86400s
+ # (no auto-deprovision), hub is only touching its own metadata
+ # outside servers/<mid>/. Safe. ---
+ copy_src = hub_data_dir / "servers"
+ to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
+ print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}")
+ t0 = time.monotonic()
+ modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy)
+ print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB "
+ f"({time.monotonic()-t0:.1f}s)")
+
+ if modules_copied < len(to_copy):
+ print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied")
+ exit_code = 1
+
+ # --- Graceful hub shutdown via 'zen down' ---
+ _zen_down_hub(zen_exe, hub_proc)
+ hub_proc = None
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ hub_log_handle = None
+
+ print(f"[summary] stage A total elapsed: {time.monotonic()-t_start:.1f}s, exit={exit_code}")
+ print(f"[summary] snapshot available at: {snapshot_dir}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ # Fallback: something aborted before the 'zen down' path ran.
+ print("[hub] force-terminating (zen down was not invoked)")
+ hub_proc.terminate()
+ try:
+ hub_proc.wait(timeout=60)
+ except subprocess.TimeoutExpired:
+ hub_proc.kill()
+ hub_proc.wait()
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub_load_test.py b/scripts/test_scripts/hub_load_test.py
new file mode 100644
index 000000000..33f05d90f
--- /dev/null
+++ b/scripts/test_scripts/hub_load_test.py
@@ -0,0 +1,970 @@
+#!/usr/bin/env python3
+"""Hub sustained load test.
+
+Keeps ~N modules concurrently provisioned from a pool of 1000 predefined
+module names, writing and reading data to each instance, then either letting
+the hub watchdog deprovision idle ones or explicitly deprovisioning them.
+Runs indefinitely until Ctrl-C.
+
+Optional --s3: starts a local MinIO server and configures the hub to use it
+as the de/hydration backend.
+
+Requirements:
+ pip install boto3 (only needed with --s3)
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import queue
+import random
+import subprocess
+import sys
+import threading
+import time
+import urllib.error
+import urllib.request
+import webbrowser
+from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+_NAMESPACE = "loadtest"
+_BUCKET = "bucket"
+
+# Key sizes to use for activity writes (bytes) - biased toward smaller.
+# Blobs are pre-generated at startup; one per size, shared across all requests.
+_KEY_SIZES = [512, 512, 2048, 2048, 8192, 32768]
+_BLOBS: list[bytes] = [] # populated by _init_blobs()
+
+
+def _init_blobs() -> None:
+ seen: set[int] = set()
+ for size in _KEY_SIZES:
+ if size not in seen:
+ seen.add(size)
+ _BLOBS.append(os.urandom(size))
+ else:
+ # reuse the already-generated blob for this size
+ _BLOBS.append(next(b for b in _BLOBS if len(b) == size))
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = list(repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}"))
+ if matches:
+ return max(matches, key=lambda p: p.stat().st_mtime)
+
+ sys.exit(
+ "zenserver executable not found in build/. "
+ "Run: xmake config -y -m release -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(
+ f"minio executable not found at {p}. "
+ "Build with: xmake config -y -m release -a x64 && xmake -y"
+ )
+ return p
+
+
+# ---------------------------------------------------------------------------
+# MinIO
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ minio_data = data_dir / "minio"
+ minio_data.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(minio_data),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3
+ import botocore.config
+ import botocore.exceptions
+ except ImportError:
+ sys.exit(
+ "[minio] boto3 is required for --s3.\n"
+ "Install it with: pip install boto3"
+ )
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ idle_timeout: int,
+ extra_args: list[str],
+ extra_env: Optional[dict[str, str]],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ "--hub-instance-limit=500",
+ f"--hub-watchdog-provisioned-inactivity-timeout-seconds={idle_timeout}",
+ "--hub-watchdog-inactivity-check-margin-seconds=5",
+ "--hub-watchdog-cycle-interval-ms=2000",
+ "--hub-watchdog-cycle-processing-budget-ms=3000",
+ "--hub-watchdog-activity-check-connect-timeout-ms=20",
+ "--hub-watchdog-activity-check-request-timeout-ms=50",
+ ] + extra_args
+
+ env = os.environ.copy()
+ if extra_env:
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Module state
+# ---------------------------------------------------------------------------
+
+@dataclass
+class ModuleState:
+ state: str = "idle" # idle|provisioning|active|deprovisioning|watchdog-pending
+ base_uri: Optional[str] = None
+ written_keys: list[str] = field(default_factory=list)
+ activity_rounds_left: int = 0
+ explicit_deprovision: bool = False
+ watchdog_pending_since: float = 0.0
+ generation: int = 0 # incremented on each provision; queue entries carry this to discard stale entries
+
+
+# ---------------------------------------------------------------------------
+# Counters
+# ---------------------------------------------------------------------------
+
+@dataclass
+class Counters:
+ provisions: int = 0
+ deprovisions_explicit: int = 0
+ deprovisions_watchdog: int = 0
+ provision_rejected: int = 0
+ activity_rounds: int = 0
+ writes: int = 0
+ reads: int = 0
+ errors: int = 0
+ last_reject_time: float = 0.0
+ _lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)
+
+ def inc(self, name: str, n: int = 1) -> None:
+ with self._lock:
+ setattr(self, name, getattr(self, name) + n)
+
+ def record_reject(self) -> None:
+ with self._lock:
+ self.provision_rejected += 1
+ self.last_reject_time = time.monotonic()
+
+ def snapshot(self) -> dict:
+ with self._lock:
+ return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
+
+
+# ---------------------------------------------------------------------------
+# Hub API helpers (urllib, no external deps)
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 30.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ body = json.loads(resp.read())
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception:
+ return 0, {}
+
+
+def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
+ try:
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ return data.get("modules", [])
+ except Exception:
+ return None
+
+
+def _instance_put(base_uri: str, key: str, data: bytes, timeout_s: float = 10.0) -> int:
+ url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
+ req = urllib.request.Request(url, data=data, method="PUT",
+ headers={"Content-Type": "application/octet-stream",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ return resp.status
+ except urllib.error.HTTPError as e:
+ return e.code
+ except Exception:
+ return 0
+
+
+def _instance_get(base_uri: str, key: str, timeout_s: float = 10.0) -> int:
+ url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
+ req = urllib.request.Request(url, headers={"Accept": "application/octet-stream"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ resp.read()
+ return resp.status
+ except urllib.error.HTTPError as e:
+ return e.code
+ except Exception:
+ return 0
+
+
+# ---------------------------------------------------------------------------
+# Worker tasks (run in thread pool - no sleeping)
+# ---------------------------------------------------------------------------
+
+def _task_provision(
+ module_id: str,
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ explicit_deprovision_rate: float,
+ stop_event: threading.Event,
+) -> None:
+ status, body = _hub_post(port, f"/hub/modules/{module_id}/provision")
+
+ generation = 0
+ schedule_burst = False
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "provisioning":
+ # Shutdown changed state while provision was in-flight; leave it alone
+ return
+ if status in (200, 202):
+ instance_port = body.get("port")
+ base_uri = f"http://localhost:{instance_port}" if instance_port else None
+ if status == 200:
+ # Instance is immediately ready
+ mod.generation += 1
+ generation = mod.generation
+ mod.state = "active"
+ mod.base_uri = base_uri
+ mod.written_keys = []
+ mod.activity_rounds_left = random.randint(5, 20)
+ mod.explicit_deprovision = random.random() < explicit_deprovision_rate
+ schedule_burst = not stop_event.is_set()
+ else:
+ # 202: async provision - instance is still starting up.
+ # Stay in "provisioning"; the hub status poll activates it when ready.
+ # _shutdown_deprovision_all also covers "provisioning", so shutdown is safe.
+ mod.state = "provisioning"
+ mod.base_uri = base_uri
+ elif status == 409:
+ mod.state = "idle"
+ counters.record_reject()
+ return
+ else:
+ mod.state = "idle"
+ counters.inc("errors")
+ return
+
+ counters.inc("provisions")
+ if schedule_burst:
+ activity_queue.put((time.monotonic() + random.uniform(0.1, 0.3), generation, module_id))
+
+
+def _task_deprovision(
+ module_id: str,
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ retries: int = 5,
+) -> None:
+ succeeded = False
+ for attempt in range(retries + 1):
+ status, _ = _hub_post(port, f"/hub/modules/{module_id}/deprovision")
+ if status in (200, 202, 404):
+ succeeded = True
+ break
+ if status == 409 and attempt < retries:
+ time.sleep(0.2)
+ continue
+ counters.inc("errors")
+ break
+
+ with state_lock:
+ state_map[module_id].state = "idle"
+ state_map[module_id].base_uri = None
+ state_map[module_id].written_keys = []
+
+ if succeeded:
+ counters.inc("deprovisions_explicit")
+
+
+def _task_activity_burst(
+ module_id: str,
+ generation: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ pool: ThreadPoolExecutor,
+ port: int,
+) -> None:
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "active" or mod.generation != generation:
+ return
+ base_uri = mod.base_uri
+ existing_keys = list(mod.written_keys)
+
+ if not base_uri:
+ with state_lock:
+ state_map[module_id].state = "idle"
+ return
+
+ # Write 3-8 new keys per burst
+ num_writes = random.randint(3, 8)
+ new_keys: list[str] = []
+ write_errors = 0
+ for _ in range(num_writes):
+ key = f"{random.getrandbits(160):040x}"
+ data = random.choice(_BLOBS)
+ status = _instance_put(base_uri, key, data)
+ if status in (200, 201, 204):
+ new_keys.append(key)
+ counters.inc("writes")
+ else:
+ write_errors += 1
+ counters.inc("errors")
+
+ if write_errors > 0 and not new_keys and not existing_keys:
+ # Instance unreachable - likely watchdog fired while we were scheduled
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.generation == generation:
+ mod.state = "idle"
+ mod.base_uri = None
+ mod.written_keys = []
+ return
+
+ # Read back 1-3 random keys from all known keys
+ all_keys = existing_keys + new_keys
+ num_reads = min(random.randint(2, 5), len(all_keys))
+ for key in random.sample(all_keys, num_reads):
+ status = _instance_get(base_uri, key)
+ if status == 200:
+ counters.inc("reads")
+ elif status in (404, 0):
+ # Key may not exist yet or instance gone; not fatal
+ pass
+ else:
+ counters.inc("errors")
+
+ counters.inc("activity_rounds")
+
+ next_generation = 0
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "active" or mod.generation != generation:
+ return
+ mod.written_keys = (existing_keys + new_keys)[-200:] # cap list size
+ mod.activity_rounds_left -= 1
+
+ if mod.activity_rounds_left <= 0:
+ if mod.explicit_deprovision:
+ mod.state = "deprovisioning"
+ mod.base_uri = None
+ try:
+ pool.submit(
+ _task_deprovision,
+ module_id, port, state_map, state_lock, counters,
+ )
+ except RuntimeError:
+ pass # pool shutting down; hub watchdog will clean up
+ else:
+ mod.state = "watchdog-pending"
+ mod.watchdog_pending_since = time.monotonic()
+ return
+
+ next_generation = mod.generation # capture under lock before releasing
+
+ # Schedule next burst soon for semi-continuous activity
+ next_time = time.monotonic() + random.uniform(0.2, 0.8)
+ activity_queue.put((next_time, next_generation, module_id))
+
+
+# ---------------------------------------------------------------------------
+# Orchestrator
+# ---------------------------------------------------------------------------
+
+def _orchestrate(
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ pool: ThreadPoolExecutor,
+ stop_event: threading.Event,
+ target_active: int,
+ explicit_deprovision_rate: float,
+ idle_timeout: int,
+) -> None:
+ last_status_poll = 0.0
+ provision_backoff = False
+
+ while not stop_event.is_set():
+ now = time.monotonic()
+
+ # Drain activity queue for due bursts
+ while True:
+ try:
+ next_time, generation, module_id = activity_queue.get_nowait()
+ except queue.Empty:
+ break
+ if next_time <= now:
+ try:
+ pool.submit(
+ _task_activity_burst,
+ module_id, generation, state_map, state_lock,
+ activity_queue, counters, pool, port,
+ )
+ except RuntimeError:
+ break # pool shutting down
+ else:
+ activity_queue.put((next_time, generation, module_id))
+ break
+
+ # Activate/deactivate backoff based on recent 409 rejections
+ last_reject = counters.last_reject_time
+ if last_reject > 0 and now - last_reject < 5.0:
+ provision_backoff = True
+ elif provision_backoff and now - last_reject >= 5.0:
+ provision_backoff = False
+
+ # Count states and submit provision tasks if needed
+ if not provision_backoff:
+ with state_lock:
+ idle_ids = [mid for mid, m in state_map.items() if m.state == "idle"]
+ working_count = sum(
+ 1 for m in state_map.values()
+ if m.state in ("active", "provisioning", "deprovisioning")
+ )
+ watchdog_count = sum(
+ 1 for m in state_map.values()
+ if m.state == "watchdog-pending"
+ )
+
+ # Provision enough to keep working_count at target, but cap total
+ # in-flight (working + watchdog-pending) at 2x target to prevent
+ # runaway accumulation when all modules cycle to watchdog-pending.
+ inflight_cap = target_active * 2
+ deficit = min(
+ target_active - working_count,
+ inflight_cap - working_count - watchdog_count,
+ )
+ to_provision = idle_ids[:max(0, deficit)]
+ for mid in to_provision:
+ with state_lock:
+ if state_map[mid].state != "idle":
+ continue
+ state_map[mid].state = "provisioning"
+ try:
+ pool.submit(
+ _task_provision,
+ mid, port, state_map, state_lock,
+ activity_queue, counters, explicit_deprovision_rate,
+ stop_event,
+ )
+ except RuntimeError:
+ with state_lock:
+ state_map[mid].state = "idle"
+
+ # Poll hub status to detect async provision completions and watchdog-fired modules
+ if now - last_status_poll >= 5.0:
+ last_status_poll = now
+ modules_status = _hub_status(port)
+ if modules_status is not None:
+ hub_ids = {m["moduleId"]: m.get("state", "") for m in modules_status}
+ # Hub watchdog fires at ~idle_timeout from last activity. However, if the
+ # watchdog's previous visit predates the last burst, it sees a changed
+ # activity sum and resets LastActivityTime, delaying deprovision by ~173s.
+ # Explicitly deprovision after idle_timeout + 15s to avoid waiting for that.
+ timeout_threshold = idle_timeout + 15
+ to_deprovision_explicitly: list[str] = []
+ with state_lock:
+ for mid, mod in state_map.items():
+ if mod.state == "provisioning" and mod.base_uri is not None:
+ # Waiting for async (202) provision to complete
+ hub_state = hub_ids.get(mid, "")
+ if hub_state == "provisioned":
+ mod.generation += 1
+ mod.state = "active"
+ mod.written_keys = []
+ mod.activity_rounds_left = random.randint(5, 20)
+ mod.explicit_deprovision = random.random() < explicit_deprovision_rate
+ activity_queue.put(
+ (time.monotonic() + random.uniform(0.1, 0.3), mod.generation, mid)
+ )
+ elif mid not in hub_ids:
+ # Provision failed or was rolled back
+ mod.state = "idle"
+ mod.base_uri = None
+ elif mod.state == "watchdog-pending":
+ hub_state = hub_ids.get(mid, "")
+ gone = mid not in hub_ids
+ timed_out = (now - mod.watchdog_pending_since) > timeout_threshold
+ if gone or hub_state in ("unprovisioned", "deprovisioning"):
+ mod.state = "idle"
+ mod.base_uri = None
+ mod.written_keys = []
+ counters.inc("deprovisions_watchdog")
+ elif timed_out:
+ mod.state = "deprovisioning"
+ to_deprovision_explicitly.append(mid)
+ for mid in to_deprovision_explicitly:
+ try:
+ pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
+ except RuntimeError:
+ with state_lock:
+ if state_map[mid].state == "deprovisioning":
+ state_map[mid].state = "idle"
+
+ stop_event.wait(timeout=0.05)
+
+
+# ---------------------------------------------------------------------------
+# Stats display
+# ---------------------------------------------------------------------------
+
+def _stats_thread(
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ stop_event: threading.Event,
+ interval_s: float = 5.0,
+) -> None:
+ is_tty = sys.stdout.isatty()
+ prev_lines = 0
+ t0 = time.monotonic()
+ prev_snap: Optional[dict] = None
+ prev_t = t0
+
+ while not stop_event.is_set():
+ stop_event.wait(timeout=interval_s)
+ now = time.monotonic()
+ elapsed = now - t0
+ dt = now - prev_t
+
+ snap = counters.snapshot()
+ with state_lock:
+ states: dict[str, int] = {}
+ for m in state_map.values():
+ states[m.state] = states.get(m.state, 0) + 1
+
+ def rate(key: str) -> float:
+ if prev_snap is None or dt <= 0:
+ return 0.0
+ return (snap[key] - prev_snap[key]) / dt * 60.0
+
+ lines = [
+ f"[{time.strftime('%H:%M:%S')}] elapsed={elapsed:.0f}s",
+ f" modules: idle={states.get('idle', 0)} "
+ f"provisioning={states.get('provisioning', 0)} "
+ f"active={states.get('active', 0)} "
+ f"watchdog-pending={states.get('watchdog-pending', 0)} "
+ f"deprovisioning={states.get('deprovisioning', 0)}",
+ f" totals: provisions={snap['provisions']} "
+ f"deprov-explicit={snap['deprovisions_explicit']} "
+ f"deprov-watchdog={snap['deprovisions_watchdog']} "
+ f"rejected={snap['provision_rejected']} "
+ f"errors={snap['errors']}",
+ f" data: writes={snap['writes']} reads={snap['reads']} "
+ f"rounds={snap['activity_rounds']}",
+ f" rates/min: provisions={rate('provisions'):.1f} "
+ f"deprov={rate('deprovisions_explicit') + rate('deprovisions_watchdog'):.1f} "
+ f"writes={rate('writes'):.1f} reads={rate('reads'):.1f}",
+ ]
+
+ if is_tty and prev_lines > 0:
+ # Move cursor up to overwrite previous block
+ sys.stdout.write(f"\033[{prev_lines}A\033[J")
+
+ sys.stdout.write("\n".join(lines) + "\n")
+ sys.stdout.flush()
+ prev_lines = len(lines)
+ prev_snap = snap
+ prev_t = now
+
+
+# ---------------------------------------------------------------------------
+# Shutdown
+# ---------------------------------------------------------------------------
+
+def _wait_for_hub_idle(port: int, hub_proc: subprocess.Popen, timeout_s: float = 120.0) -> None:
+ """Wait until the hub reports no transitioning instances (dehydration done)."""
+ _STABLE = {"provisioned", "hibernated", "crashed", "unprovisioned"}
+ print(f"[shutdown] waiting for hub dehydration (up to {timeout_s:.0f}s)...")
+ deadline = time.monotonic() + timeout_s
+ while time.monotonic() < deadline:
+ if hub_proc.poll() is not None:
+ print("[shutdown] hub process has exited")
+ return
+ modules = _hub_status(port, timeout_s=5.0)
+ if modules is None:
+ # Hub not responding. If it has exited, we're done. If it's still alive
+ # it may be saturated with S3 uploads - keep waiting rather than assuming done.
+ if hub_proc.poll() is not None:
+ print("[shutdown] hub process has exited")
+ return
+ time.sleep(1.0)
+ continue
+ transitioning = [m for m in modules if m.get("state") not in _STABLE]
+ remaining = len(modules)
+ if not transitioning:
+ if remaining:
+ print(f"[shutdown] hub idle ({remaining} instances in stable state)")
+ else:
+ print("[shutdown] hub idle (no instances remaining)")
+ return
+ print(f"[shutdown] {len(transitioning)} instance(s) still dehydrating...", end="\r")
+ time.sleep(1.0)
+ print(f"\n[shutdown] WARNING: hub did not become idle within {timeout_s:.0f}s")
+
+
+def _shutdown_deprovision_all(
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ workers: int,
+ timeout_s: float = 60.0,
+) -> None:
+ with state_lock:
+ to_deprovision = [
+ mid for mid, m in state_map.items()
+ if m.state in ("active", "watchdog-pending", "provisioning")
+ ]
+ for mid in to_deprovision:
+ state_map[mid].state = "deprovisioning"
+
+ if not to_deprovision:
+ return
+
+ print(f"\n[shutdown] deprovisioning {len(to_deprovision)} active modules...")
+ pool = ThreadPoolExecutor(max_workers=min(workers, len(to_deprovision)))
+ futures: list[Future] = [
+ pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
+ for mid in to_deprovision
+ ]
+ pool.shutdown(wait=False)
+
+ done_set, not_done_set = futures_wait(futures, timeout=timeout_s)
+ if not_done_set:
+ print(f"[shutdown] WARNING: {len(not_done_set)} deprovision tasks did not complete within {timeout_s}s")
+ else:
+ print(f"[shutdown] all modules deprovisioned")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest",
+ help="Hub --data-dir (default: E:/Dev/hub-loadtest)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Thread pool size for HTTP calls (default: 50)")
+ parser.add_argument("--active-modules", type=int, default=100,
+ help="Target number of concurrently provisioned modules (default: 100)")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Total predefined module name pool size (default: 1000)")
+ parser.add_argument("--idle-timeout", type=int, default=90,
+ help="Hub watchdog inactivity timeout in seconds (default: 90)")
+ parser.add_argument("--explicit-deprovision-rate", type=float, default=0.3,
+ help="Fraction of modules explicitly deprovisioned (default: 0.3)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ parser.add_argument("--s3", action="store_true",
+ help="Use local MinIO for hub de/hydration instead of filesystem")
+ parser.add_argument("--minio-port", type=int, default=9000,
+ help="MinIO S3 API port (default: 9000)")
+ parser.add_argument("--minio-console-port", type=int, default=9001,
+ help="MinIO web console port (default: 9001)")
+ parser.add_argument("--s3-bucket", default="zen-load-test",
+ help="S3 bucket name for MinIO hydration (default: zen-load-test)")
+ parser.add_argument("--no-browser", action="store_true",
+ help="Skip opening browser tabs")
+ args = parser.parse_args()
+
+ if args.active_modules > args.module_count:
+ sys.exit(
+ f"--active-modules ({args.active_modules}) must not exceed "
+ f"--module-count ({args.module_count})"
+ )
+
+ _init_blobs()
+
+ data_dir = Path(args.data_dir)
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+
+ module_names = [f"load-test-module-{i:04d}" for i in range(args.module_count)]
+ state_map: dict[str, ModuleState] = {mid: ModuleState() for mid in module_names}
+ state_lock = threading.Lock()
+ activity_queue: queue.PriorityQueue = queue.PriorityQueue()
+ counters = Counters()
+ stop_event = threading.Event()
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+
+ hub_extra_args: list[str] = []
+ hub_extra_env: Optional[dict[str, str]] = None
+
+ try:
+ if args.s3:
+ minio_exe = _find_minio(zenserver_exe)
+ minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, args.s3_bucket)
+ if not args.no_browser:
+ webbrowser.open(f"http://localhost:{args.minio_console_port}")
+ config_json = {
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.s3_bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(json.dumps(config_json), encoding="ascii")
+ hub_extra_args = [f"--hub-hydration-target-config={config_path}"]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port,
+ hub_log, args.idle_timeout,
+ hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+ if not args.no_browser:
+ webbrowser.open(f"http://localhost:{args.port}/dashboard")
+
+ print(f"[load-test] starting: pool={args.module_count} modules, "
+ f"target={args.active_modules} active, "
+ f"idle-timeout={args.idle_timeout}s, "
+ f"explicit-deprovision={args.explicit_deprovision_rate:.0%}")
+ print("[load-test] press Ctrl-C to stop")
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ stats_t = threading.Thread(
+ target=_stats_thread,
+ args=(state_map, state_lock, counters, stop_event),
+ daemon=True,
+ )
+ stats_t.start()
+
+ orch_t = threading.Thread(
+ target=_orchestrate,
+ args=(
+ args.port, state_map, state_lock,
+ activity_queue, counters, pool,
+ stop_event, args.active_modules,
+ args.explicit_deprovision_rate, args.idle_timeout,
+ ),
+ daemon=True,
+ )
+ orch_t.start()
+
+ try:
+ while not stop_event.is_set():
+ time.sleep(0.5)
+ if hub_proc.poll() is not None:
+ print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
+ break
+ except KeyboardInterrupt:
+ print("\n[load-test] Ctrl-C received, shutting down...")
+
+ stop_event.set()
+ orch_t.join(timeout=3.0)
+ stats_t.join(timeout=3.0)
+
+ _shutdown_deprovision_all(
+ args.port, state_map, state_lock, counters,
+ args.workers, timeout_s=60.0,
+ )
+ _wait_for_hub_idle(args.port, hub_proc, timeout_s=max(120.0, args.active_modules * 1.0))
+
+ _stop_process(hub_proc, "hub", timeout_s=120.0)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ hub_log_handle = None
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+ minio_proc = None
+
+ finally:
+ # Safety net: only reached if an exception occurred before normal shutdown
+ if hub_proc is not None and hub_proc.poll() is None:
+ _stop_process(hub_proc, "hub")
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/hub_provision_perf_test.py b/scripts/test_scripts/hub_provision_perf_test.py
new file mode 100644
index 000000000..76fb8508d
--- /dev/null
+++ b/scripts/test_scripts/hub_provision_perf_test.py
@@ -0,0 +1,502 @@
+#!/usr/bin/env python3
+"""Hub provisioning performance test.
+
+Floods a zenserver hub with concurrent provision requests until the hub rejects
+further provisioning (HTTP 409), then deprovisions all instances and exits.
+A WPR ETW trace runs for the entire test and produces a .etl file for analysis
+in WPA or PerfView (CPU sampling + context-switch events for lock contention).
+
+Optional --s3: starts a local MinIO server and configures the hub to use it
+as the de/hydration backend instead of the default local filesystem.
+
+Requirements:
+ pip install boto3 (only needed with --s3)
+WPR (wpr.exe) must be available (ships with Windows). Running as Administrator
+is required for WPR to collect ETW traces.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+import uuid
+import webbrowser
+from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"}
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}")
+ for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True):
+ return match
+
+ sys.exit(
+ "zenserver executable not found in build/. "
+ "Run: xmake config -y -m release -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(
+ f"minio executable not found at {p}. "
+ "Build with: xmake config -y -m release -a x64 && xmake -y"
+ )
+ return p
+
+
+# ---------------------------------------------------------------------------
+# WPR (Windows Performance Recorder)
+# ---------------------------------------------------------------------------
+
+def _is_elevated() -> bool:
+ if sys.platform != "win32":
+ return False
+ try:
+ import ctypes
+ return bool(ctypes.windll.shell32.IsUserAnAdmin())
+ except Exception:
+ return False
+
+
+def _wpr_start(trace_file: Path) -> bool:
+ if sys.platform != "win32":
+ print("[wpr] WPR is Windows-only; skipping ETW trace.")
+ return False
+ if not _is_elevated():
+ print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.")
+ return False
+ result = subprocess.run(
+ ["wpr.exe", "-start", "CPU", "-filemode"],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}")
+ return False
+ print(f"[wpr] ETW trace started (CPU profile, file mode)")
+ return True
+
+
+def _wpr_stop(trace_file: Path) -> None:
+ if sys.platform != "win32":
+ return
+ result = subprocess.run(
+ ["wpr.exe", "-stop", str(trace_file)],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}")
+ else:
+ print(f"[wpr] ETW trace saved: {trace_file}")
+
+
+# ---------------------------------------------------------------------------
+# MinIO
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ minio_data = data_dir / "minio"
+ minio_data.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(minio_data),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3
+ import botocore.config
+ import botocore.exceptions
+ except ImportError:
+ sys.exit(
+ "[minio] boto3 is required for --s3.\n"
+ "Install it with: pip install boto3"
+ )
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ extra_args: list[str],
+ extra_env: Optional[dict[str, str]],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ "--hub-instance-limit=100",
+ ] + extra_args
+
+ env = os.environ.copy()
+ if extra_env:
+ env.update(extra_env)
+
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub HTTP helpers
+# ---------------------------------------------------------------------------
+
+def _hub_url(port: int, path: str) -> str:
+ return f"http://localhost:{port}{path}"
+
+
+def _provision_one(port: int) -> tuple[str, int]:
+ module_id = str(uuid.uuid4())
+ url = _hub_url(port, f"/hub/modules/{module_id}/provision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return module_id, resp.status
+ except urllib.error.HTTPError as e:
+ return module_id, e.code
+ except Exception:
+ return module_id, 0
+
+
+def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int:
+ url = _hub_url(port, f"/hub/modules/{module_id}/deprovision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ for attempt in range(retries + 1):
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return resp.status
+ except urllib.error.HTTPError as e:
+ if e.code == 409 and attempt < retries:
+ time.sleep(0.2)
+ continue
+ return e.code
+ except Exception:
+ return 0
+
+
+def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
+ try:
+ req = urllib.request.Request(_hub_url(port, "/hub/status"),
+ headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ return data.get("modules", [])
+ except Exception:
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Test phases
+# ---------------------------------------------------------------------------
+
+def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]:
+ stopped = False
+ provisioned_ids: list[str] = []
+ time_to_rejection: Optional[float] = None
+ t0 = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)}
+
+ while pending:
+ done, pending = wait(pending, return_when=FIRST_COMPLETED)
+
+ for f in done:
+ module_id, status = f.result()
+ if status in (200, 202):
+ provisioned_ids.append(module_id)
+ elif status == 409:
+ if not stopped:
+ time_to_rejection = time.monotonic() - t0
+ stopped = True
+ print(f"\n[flood] hub rejected provisioning after "
+ f"{len(provisioned_ids)} instances "
+ f"({time_to_rejection:.2f}s)")
+ elif status == 0:
+ if not stopped:
+ stopped = True
+ print(f"\n[flood] hub unreachable - stopping flood "
+ f"({len(provisioned_ids)} instances so far)")
+ else:
+ print(f"[flood] unexpected status {status} for {module_id}")
+
+ if not stopped:
+ pending.add(pool.submit(_provision_one, port))
+
+ wall_clock = time.monotonic() - t0
+ return provisioned_ids, time_to_rejection or wall_clock, wall_clock
+
+
+def _wait_stable(port: int, timeout_s: float = 20.0) -> None:
+ print("[hub] waiting for all instances to reach stable state...")
+ deadline = time.monotonic() + timeout_s
+ status_timeout_s = 5.0
+ while time.monotonic() < deadline:
+ modules = _hub_status(port, timeout_s=status_timeout_s)
+ if modules is None:
+ time.sleep(0.5)
+ continue
+ transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES]
+ elapsed = time.monotonic() - (deadline - timeout_s)
+ print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r")
+ if not transitioning:
+ print(f"\n[hub] all {len(modules)} instances in stable state")
+ return
+ time.sleep(0.5)
+ print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s")
+
+
+def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None:
+ raw_status = _hub_status(port, timeout_s=60.0)
+ if raw_status is None:
+ print("[deprovision] WARNING: could not reach hub to enumerate extra modules - "
+ "only deprovisioning tracked instances")
+ extra_ids = {m["moduleId"] for m in (raw_status or [])
+ if m.get("state") not in ("unprovisioned",)} - set(module_ids)
+ all_ids = list(module_ids) + list(extra_ids)
+
+ print(f"[deprovision] deprovisioning {len(all_ids)} instances...")
+ t0 = time.monotonic()
+ errors = 0
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids}
+ for f in as_completed(futures):
+ status = f.result()
+ if status not in (200, 202, 409):
+ errors += 1
+ print(f"[deprovision] module {futures[f]}: unexpected status {status}")
+ elapsed = time.monotonic() - t0
+ print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-perftest",
+ help="Hub --data-dir (default: E:/Dev/hub-perftest)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--workers", type=int, default=20,
+ help="Concurrent provisioning threads (default: 20)")
+ parser.add_argument("--trace-file", default="hub_perf_trace.etl",
+ help="WPR output .etl path (default: hub_perf_trace.etl)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ parser.add_argument("--s3", action="store_true",
+ help="Use local MinIO for hub de/hydration instead of filesystem")
+ parser.add_argument("--minio-port", type=int, default=9000,
+ help="MinIO S3 API port when --s3 is used (default: 9000)")
+ parser.add_argument("--minio-console-port", type=int, default=9001,
+ help="MinIO web console port when --s3 is used (default: 9001)")
+ parser.add_argument("--s3-bucket", default="zen-hydration-test",
+ help="S3 bucket name for MinIO hydration (default: zen-hydration-test)")
+ args = parser.parse_args()
+
+ data_dir = Path(args.data_dir)
+ trace_file = Path(args.trace_file).resolve()
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ wpr_started = False
+
+ hub_extra_args: list[str] = []
+ hub_extra_env: Optional[dict[str, str]] = None
+
+ try:
+ if args.s3:
+ minio_exe = _find_minio(zenserver_exe)
+ minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, args.s3_bucket)
+ webbrowser.open(f"http://localhost:{args.minio_console_port}")
+ config_json = {
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.s3_bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(json.dumps(config_json), encoding="ascii")
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ ]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ wpr_started = _wpr_start(trace_file)
+
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port,
+ hub_log, hub_extra_args, hub_extra_env
+ )
+ _wait_for_hub(hub_proc, args.port)
+ webbrowser.open(f"http://localhost:{args.port}/dashboard")
+
+ provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers)
+
+ print(f"\n[results] provisioned : {len(provisioned_ids)}")
+ print(f"[results] time to 409 : {time_to_rejection:.3f}s")
+ print(f"[results] wall clock : {wall_clock:.3f}s")
+ if time_to_rejection > 0:
+ print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s")
+
+ _wait_stable(args.port, timeout_s=120.0)
+ _deprovision_all(args.port, provisioned_ids, args.workers)
+ dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5)
+ _wait_stable(args.port, timeout_s=dehydration_timeout_s)
+
+ finally:
+ if wpr_started:
+ _wpr_stop(trace_file)
+ if hub_proc is not None:
+ _stop_process(hub_proc, "hub")
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/kill-test-processes.ps1 b/scripts/test_scripts/kill-test-processes.ps1
new file mode 100644
index 000000000..0668a5319
--- /dev/null
+++ b/scripts/test_scripts/kill-test-processes.ps1
@@ -0,0 +1,19 @@
+# Kill leftover CI test processes (zenserver, minio, nomad, consul) whose
+# executable lives under the given build directory. Windows counterpart of
+# kill-test-processes.sh; see that file for rationale.
+#
+# Usage: kill-test-processes.ps1 -Label <label> -BuildDir <path>
+
+param(
+ [Parameter(Mandatory=$true)][string]$Label,
+ [Parameter(Mandatory=$true)][string]$BuildDir
+)
+
+foreach ($name in @('zenserver', 'minio', 'nomad', 'consul')) {
+ $procs = Get-Process -Name $name -ErrorAction SilentlyContinue |
+ Where-Object { $_.Path -and $_.Path.StartsWith($BuildDir, [System.StringComparison]::OrdinalIgnoreCase) }
+ foreach ($p in $procs) {
+ Write-Host "Killing $Label $name (PID $($p.Id)): $($p.Path)"
+ $p | Stop-Process -Force -ErrorAction SilentlyContinue
+ }
+}
diff --git a/scripts/test_scripts/kill-test-processes.sh b/scripts/test_scripts/kill-test-processes.sh
new file mode 100755
index 000000000..e5fb2fcaa
--- /dev/null
+++ b/scripts/test_scripts/kill-test-processes.sh
@@ -0,0 +1,45 @@
+#!/usr/bin/env bash
+#
+# Kill leftover CI test processes (zenserver, minio, nomad, consul) whose
+# executable lives under the given build directory.
+#
+# Used by CI workflows to clean up any test processes from a previous run
+# before starting a new one, and to reap any that are still running after
+# the run finishes.
+#
+# Usage: kill-test-processes.sh <label> <build_dir>
+# label: word printed in log messages, e.g. "stale" or "leftover"
+# build_dir: absolute path; only processes whose executable is under this
+# directory are killed.
+#
+# We resolve each PID's actual executable path rather than relying on argv
+# (which `pgrep -a` reports). argv starts with the short process name (e.g.
+# "consul") regardless of how the process was launched, so a pure argv prefix
+# match never fires and leftovers silently survived between runs.
+
+set -u
+
+label=${1:?label required}
+build_dir=${2:?build_dir required}
+
+get_exe_path() {
+ local pid=$1
+ if [[ -r "/proc/$pid/exe" ]]; then
+ # Linux
+ readlink -f "/proc/$pid/exe" 2>/dev/null || true
+ else
+ # macOS fallback: the "txt" file descriptor points at the process binary
+ lsof -p "$pid" -Fn -a -d txt 2>/dev/null | awk '/^n/{print substr($0,2); exit}' || true
+ fi
+}
+
+for name in zenserver minio nomad consul; do
+ while read -r pid; do
+ [[ -z "$pid" ]] && continue
+ exe=$(get_exe_path "$pid")
+ if [[ "$exe" == "$build_dir"* ]]; then
+ echo "Killing $label $name (PID $pid): $exe"
+ kill -9 "$pid" 2>/dev/null || true
+ fi
+ done < <(pgrep -x "$name" 2>/dev/null || true)
+done
diff --git a/scripts/test_scripts/oplog-import-export-test.py b/scripts/test_scripts/oplog-import-export-test.py
index f913a7351..93f732135 100644
--- a/scripts/test_scripts/oplog-import-export-test.py
+++ b/scripts/test_scripts/oplog-import-export-test.py
@@ -82,7 +82,7 @@ SERVER_ARGS: tuple[str, ...] = (
def zen_cmd(*args: str | Path, extra_zen_args: list[str] | None = None) -> list[str | Path]:
"""Build a zen CLI command list, inserting extra_zen_args before subcommands."""
- return [ZEN_EXE, *(extra_zen_args or []), *args]
+ return [ZEN_EXE, "--enable-execution-history=false", *(extra_zen_args or []), *args]
def run(cmd: list[str | Path]) -> None:
diff --git a/scripts/test_scripts/oplog-update-build-ids.py b/scripts/test_scripts/oplog-update-build-ids.py
index 67e128c8e..2675d07c0 100644
--- a/scripts/test_scripts/oplog-update-build-ids.py
+++ b/scripts/test_scripts/oplog-update-build-ids.py
@@ -52,7 +52,7 @@ def list_builds_for_bucket(zen: str, host: str, namespace: str, bucket: str) ->
result_path = Path(tmp.name)
cmd = [
- zen, "builds", "list",
+ zen, "--enable-execution-history=false", "builds", "list",
"--namespace", namespace,
"--bucket", bucket,
"--host", host,