diff options
Diffstat (limited to 'scripts/test_scripts')
20 files changed, 4731 insertions, 335 deletions
diff --git a/scripts/test_scripts/block-clone-test-mac.sh b/scripts/test_scripts/block-clone-test-mac.sh deleted file mode 100755 index a3d3ca4d3..000000000 --- a/scripts/test_scripts/block-clone-test-mac.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env bash -# Test block-clone functionality on macOS (APFS). -# -# APFS is the default filesystem on modern Macs and natively supports -# clonefile(), so no special setup is needed — just run the tests. -# -# Usage: -# ./scripts/test_scripts/block-clone-test-mac.sh [path-to-zencore-test] -# -# If no path is given, defaults to build/macosx/<arch>/debug/zencore-test -# relative to the repository root. - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" - -ARCH="$(uname -m)" -TEST_BINARY="${1:-$REPO_ROOT/build/macosx/$ARCH/debug/zencore-test}" - -if [ ! -x "$TEST_BINARY" ]; then - echo "error: test binary not found or not executable: $TEST_BINARY" >&2 - echo "hint: build with 'xmake config -m debug && xmake build zencore-test'" >&2 - exit 1 -fi - -# Verify we're on APFS -BINARY_DIR="$(dirname "$TEST_BINARY")" -FS_TYPE="$(diskutil info "$(df "$BINARY_DIR" | tail -1 | awk '{print $1}')" 2>/dev/null | grep "Type (Bundle)" | awk '{print $NF}' || true)" - -if [ "$FS_TYPE" != "apfs" ]; then - echo "warning: filesystem does not appear to be APFS (got: ${FS_TYPE:-unknown}), clone tests may skip" >&2 -fi - -TEST_CASES="TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface" - -echo "Running block-clone tests ..." -echo "---" -"$TEST_BINARY" \ - --test-suite="core.filesystem" \ - --test-case="$TEST_CASES" -echo "---" -echo "All block-clone tests passed." diff --git a/scripts/test_scripts/block-clone-test-windows.ps1 b/scripts/test_scripts/block-clone-test-windows.ps1 deleted file mode 100644 index df24831a4..000000000 --- a/scripts/test_scripts/block-clone-test-windows.ps1 +++ /dev/null @@ -1,145 +0,0 @@ -# Test block-clone functionality on a temporary ReFS VHD. -# -# Requires: -# - Administrator privileges -# - Windows Server, or Windows 10/11 Pro for Workstations (ReFS support) -# - Hyper-V PowerShell module (for New-VHD), or diskpart fallback -# -# Usage: -# # From an elevated PowerShell prompt: -# .\scripts\test_scripts\block-clone-test-windows.ps1 [-TestBinary <path>] -# -# If -TestBinary is not given, defaults to build\windows\x64\debug\zencore-test.exe -# relative to the repository root. - -param( - [string]$TestBinary = "" -) - -$ErrorActionPreference = "Stop" - -$ScriptDir = Split-Path -Parent $MyInvocation.MyCommand.Definition -$RepoRoot = (Resolve-Path "$ScriptDir\..\..").Path - -if (-not $TestBinary) { - $TestBinary = Join-Path $RepoRoot "build\windows\x64\debug\zencore-test.exe" -} - -$ImageSizeMB = 2048 -$TestCases = "TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface" - -$VhdPath = "" -$MountLetter = "" - -function Cleanup { - $ErrorActionPreference = "SilentlyContinue" - - if ($MountLetter) { - Write-Host "Dismounting VHD ..." - Dismount-VHD -Path $VhdPath -ErrorAction SilentlyContinue - } - if ($VhdPath -and (Test-Path $VhdPath)) { - Remove-Item -Force $VhdPath -ErrorAction SilentlyContinue - } -} - -trap { - Cleanup - throw $_ -} - -# --- Preflight checks --- - -$IsAdmin = ([Security.Principal.WindowsPrincipal] [Security.Principal.WindowsIdentity]::GetCurrent()).IsInRole( - [Security.Principal.WindowsBuiltInRole]::Administrator) -if (-not $IsAdmin) { - Write-Error "This script must be run as Administrator (for VHD mount/format)." - exit 1 -} - -if (-not (Test-Path $TestBinary)) { - Write-Error "Test binary not found: $TestBinary`nHint: build with 'xmake config -m debug && xmake build zencore-test'" - exit 1 -} - -# Check that ReFS formatting is available -$RefsAvailable = $true -try { - # A quick check: on non-Server/Workstation SKUs, Format-Volume -FileSystem ReFS will fail - $OsCaption = (Get-CimInstance Win32_OperatingSystem).Caption - if ($OsCaption -notmatch "Server|Workstation|Enterprise") { - Write-Warning "ReFS may not be available on this Windows edition: $OsCaption" - Write-Warning "Continuing anyway — format step will fail if unsupported." - } -} catch { - # Non-fatal, just proceed -} - -# --- Create and mount ReFS VHD --- - -$VhdPath = Join-Path $env:TEMP "refs-clone-test-$([guid]::NewGuid().ToString('N').Substring(0,8)).vhdx" - -Write-Host "Creating ${ImageSizeMB}MB VHDX at $VhdPath ..." - -try { - # Prefer Hyper-V cmdlet if available - New-VHD -Path $VhdPath -SizeBytes ($ImageSizeMB * 1MB) -Fixed | Out-Null -} catch { - # Fallback to diskpart - Write-Host "New-VHD not available, falling back to diskpart ..." - $DiskpartScript = @" -create vdisk file="$VhdPath" maximum=$ImageSizeMB type=fixed -"@ - $DiskpartScript | diskpart | Out-Null -} - -Write-Host "Mounting and initializing VHD ..." - -Mount-VHD -Path $VhdPath -$Disk = Get-VHD -Path $VhdPath | Get-Disk - -# Suppress Explorer's auto-open / "format disk?" prompts for the raw partition -Stop-Service ShellHWDetection -ErrorAction SilentlyContinue - -try { - Initialize-Disk -Number $Disk.Number -PartitionStyle GPT -ErrorAction SilentlyContinue - $Partition = New-Partition -DiskNumber $Disk.Number -UseMaximumSize -AssignDriveLetter - $MountLetter = $Partition.DriveLetter - - Write-Host "Formatting ${MountLetter}: as ReFS with integrity disabled ..." - Format-Volume -DriveLetter $MountLetter -FileSystem ReFS -NewFileSystemLabel "CloneTest" -Confirm:$false | Out-Null - - # Disable integrity streams (required for block cloning to work on ReFS) - Set-FileIntegrity "${MountLetter}:\" -Enable $false -ErrorAction SilentlyContinue -} finally { - Start-Service ShellHWDetection -ErrorAction SilentlyContinue -} - -$MountRoot = "${MountLetter}:\" - -# --- Copy test binary and run --- - -Write-Host "Copying test binary to ReFS volume ..." -Copy-Item $TestBinary "$MountRoot\zencore-test.exe" - -Write-Host "Running block-clone tests ..." -Write-Host "---" - -$proc = Start-Process -FilePath "$MountRoot\zencore-test.exe" ` - -ArgumentList "--test-suite=core.filesystem", "--test-case=$TestCases" ` - -NoNewWindow -Wait -PassThru - -Write-Host "---" - -if ($proc.ExitCode -ne 0) { - Write-Error "Tests failed with exit code $($proc.ExitCode)" - Cleanup - exit $proc.ExitCode -} - -Write-Host "ReFS: all block-clone tests passed." - -# --- Cleanup --- - -Cleanup -Write-Host "Done." diff --git a/scripts/test_scripts/block-clone-test.sh b/scripts/test_scripts/block-clone-test.sh deleted file mode 100755 index 7c6bf5605..000000000 --- a/scripts/test_scripts/block-clone-test.sh +++ /dev/null @@ -1,143 +0,0 @@ -#!/usr/bin/env bash -# Test block-clone functionality on temporary Btrfs and XFS loopback filesystems. -# -# Requires: root/sudo, btrfs-progs (mkfs.btrfs), xfsprogs (mkfs.xfs) -# -# Usage: -# sudo ./scripts/test_scripts/block-clone-test.sh [path-to-zencore-test] -# -# If no path is given, defaults to build/linux/x86_64/debug/zencore-test -# relative to the repository root. -# -# Options: -# --btrfs-only Only test Btrfs -# --xfs-only Only test XFS - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" - -TEST_BINARY="" -RUN_BTRFS=true -RUN_XFS=true - -for arg in "$@"; do - case "$arg" in - --btrfs-only) RUN_XFS=false ;; - --xfs-only) RUN_BTRFS=false ;; - *) TEST_BINARY="$arg" ;; - esac -done - -TEST_BINARY="${TEST_BINARY:-$REPO_ROOT/build/linux/x86_64/debug/zencore-test}" -IMAGE_SIZE="512M" -TEST_CASES="TryCloneFile,CopyFile.Clone,SupportsBlockRefCounting,CloneQueryInterface" - -# Track all temp files for cleanup -CLEANUP_MOUNTS=() -CLEANUP_DIRS=() -CLEANUP_FILES=() - -cleanup() { - local exit_code=$? - set +e - - for mnt in "${CLEANUP_MOUNTS[@]}"; do - if mountpoint -q "$mnt" 2>/dev/null; then - umount "$mnt" - fi - done - for dir in "${CLEANUP_DIRS[@]}"; do - [ -d "$dir" ] && rmdir "$dir" - done - for f in "${CLEANUP_FILES[@]}"; do - [ -f "$f" ] && rm -f "$f" - done - - if [ $exit_code -ne 0 ]; then - echo "FAILED (exit code $exit_code)" - fi - exit $exit_code -} -trap cleanup EXIT - -# --- Preflight checks --- - -if [ "$(id -u)" -ne 0 ]; then - echo "error: this script must be run as root (for mount/umount)" >&2 - exit 1 -fi - -if [ ! -x "$TEST_BINARY" ]; then - echo "error: test binary not found or not executable: $TEST_BINARY" >&2 - echo "hint: build with 'xmake config -m debug && xmake build zencore-test'" >&2 - exit 1 -fi - -if $RUN_BTRFS && ! command -v mkfs.btrfs &>/dev/null; then - echo "warning: mkfs.btrfs not found — install btrfs-progs to test Btrfs, skipping" >&2 - RUN_BTRFS=false -fi - -if $RUN_XFS && ! command -v mkfs.xfs &>/dev/null; then - echo "warning: mkfs.xfs not found — install xfsprogs to test XFS, skipping" >&2 - RUN_XFS=false -fi - -if ! $RUN_BTRFS && ! $RUN_XFS; then - echo "error: no filesystems to test" >&2 - exit 1 -fi - -# --- Helper to create, mount, and run tests on a loopback filesystem --- - -run_tests_on_fs() { - local fs_type="$1" - local mkfs_cmd="$2" - - echo "" - echo "========================================" - echo " Testing block-clone on $fs_type" - echo "========================================" - - local image_path mount_path - image_path="$(mktemp "/tmp/${fs_type}-clone-test-XXXXXX.img")" - mount_path="$(mktemp -d "/tmp/${fs_type}-clone-mount-XXXXXX")" - CLEANUP_FILES+=("$image_path") - CLEANUP_DIRS+=("$mount_path") - CLEANUP_MOUNTS+=("$mount_path") - - echo "Creating ${IMAGE_SIZE} ${fs_type} image at ${image_path} ..." - truncate -s "$IMAGE_SIZE" "$image_path" - $mkfs_cmd "$image_path" - - echo "Mounting at ${mount_path} ..." - mount -o loop "$image_path" "$mount_path" - chmod 777 "$mount_path" - - echo "Copying test binary ..." - cp "$TEST_BINARY" "$mount_path/zencore-test" - chmod +x "$mount_path/zencore-test" - - echo "Running tests ..." - echo "---" - "$mount_path/zencore-test" \ - --test-suite="core.filesystem" \ - --test-case="$TEST_CASES" - echo "---" - echo "$fs_type: all block-clone tests passed." -} - -# --- Run --- - -if $RUN_BTRFS; then - run_tests_on_fs "btrfs" "mkfs.btrfs -q" -fi - -if $RUN_XFS; then - run_tests_on_fs "xfs" "mkfs.xfs -q -m reflink=1" -fi - -echo "" -echo "All block-clone tests passed." diff --git a/scripts/test_scripts/builds-download-upload-test.py b/scripts/test_scripts/builds-download-upload-test.py index 8ff5245c1..dd9aae07e 100644 --- a/scripts/test_scripts/builds-download-upload-test.py +++ b/scripts/test_scripts/builds-download-upload-test.py @@ -82,7 +82,7 @@ SERVER_ARGS: tuple[str, ...] = ( def zen_cmd(*args: str | Path, extra_zen_args: list[str] | None = None) -> list[str | Path]: """Build a zen CLI command list, inserting extra_zen_args before subcommands.""" - return [ZEN_EXE, *(extra_zen_args or []), *args] + return [ZEN_EXE, "--enable-execution-history=false", *(extra_zen_args or []), *args] def run(cmd: list[str | Path]) -> None: diff --git a/scripts/test_scripts/builds-download-upload-update-build-ids.py b/scripts/test_scripts/builds-download-upload-update-build-ids.py index 2a63aa44d..6332d6990 100644 --- a/scripts/test_scripts/builds-download-upload-update-build-ids.py +++ b/scripts/test_scripts/builds-download-upload-update-build-ids.py @@ -51,7 +51,7 @@ def list_builds_for_bucket(zen: str, host: str, namespace: str, bucket: str) -> result_path = Path(tmp.name) cmd = [ - zen, "builds", "list", + zen, "--enable-execution-history=false", "builds", "list", "--namespace", namespace, "--bucket", bucket, "--host", host, diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md new file mode 100644 index 000000000..fb471d4bb --- /dev/null +++ b/scripts/test_scripts/hub/PERF_SEED_README.md @@ -0,0 +1,148 @@ +# Perf-seed workflow + +Three-stage pipeline for running repeatable hub-hydration perf tests against a +local MinIO backend seeded with real module data pulled from production S3. + +## Layout + +All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/` +in the script defaults, but every path is overridable via CLI flag (see the +per-stage options below). Pick a root with enough free space (snapshots and +preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on +each invocation or change the script defaults to your chosen root. + +Layout under the chosen root (`<perf-seed>/`): + +``` +<perf-seed>/ + hub-a/ Stage A hub data dir (transient) + servers/<moduleid>/ + s3-snapshot/ Preserved production server-state trees (read-only after Stage A) + <moduleid>/ + hubs/ Stage B per-bucket hub data dirs (transient) + hub-b-zen-seed-packed/ + hub-b-zen-seed-unpacked/ + minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket) + minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve) + README.txt + minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree) + README.txt + hub-perf/ Stage C hub data dir (wiped each run) + minio-run/ Stage C MinIO data dir (wiped + re-copied each run) + perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json + 20260423-141530_zen-seed-packed/ + 20260423-143112_zen-seed-unpacked/ +``` + +## Prerequisites + +- Debug or release build of zenserver + minio: `xmake -y` +- `pip install boto3` +- AWS CLI v2 with an SSO profile configured (for Stage A only) +- Environment variables (or pass equivalents via CLI flags): + - `ZEN_PERF_S3_URI` - source S3 bucket, e.g. `s3://your-bucket/optional-prefix/` + - `ZEN_PERF_AWS_PROFILE` - AWS SSO profile name with read access to that bucket + - `ZEN_PERF_AWS_REGION` - optional, defaults to `us-east-1` + +## Stage A - snapshot real S3 data + +One-time (or when you want a fresh baseline from production). + +``` +export ZEN_PERF_S3_URI=s3://your-bucket/ +export ZEN_PERF_AWS_PROFILE=your-sso-profile +python scripts/test_scripts/hub/seed_s3_snapshot.py +``` + +Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies +`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login` +automatically if the SSO token is missing or expired. + +Module selection ranks all UUID-shaped folders by their +`incremental-state.cbo` `LastModified` (newest first, a proxy for +most-recently-accessed) and takes the top `--module-count`. + +Options: +- `--module-count N` (default 1000) +- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`) +- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`) + +## Stage B - seed MinIO from the snapshot + +One-time per pack-mode (or when `s3-snapshot` changes). + +`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is +hardcoded inside the script (`--hub-hydration-enable-pack=true` near the +top of `_start_hub`). To produce both packed and unpacked baselines for +comparison, invoke the script twice from two separate worktrees - one with +the flag flipped to `false` - and preserve the resulting MinIO data dir +each time. + +``` +# In the pack worktree (flag = true), seeds zen-seed-packed +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed + +# In the no-pack worktree (flag = false), seeds zen-seed-unpacked +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked +``` + +The script provisions every module found under `s3-snapshot/`, hibernates +them, overlays the snapshot on top of the hub's servers dir, then +deprovisions all modules - which runs the dehydrate path and uploads the +content into the bucket. + +`preserve_minio_state.py` copies the resulting `minio-data/` to a +variant-specific preservation dir and writes a README with provenance. + +Options of interest: +- `--bucket NAME` - bucket name (default `zen-seed-packed`). +- `--wipe` removes the per-bucket hub data dir and the shared minio-data + dir before starting. +- `--module-count N` caps the set (0 = every module in snapshot-dir). + +## Stage C - run a perf iteration + +Repeat as often as you want; each run starts from the preserved baseline. + +``` +# Pack-on bucket +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace + +# Pack-off bucket (for comparison) +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace +``` + +Steps: +1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state. +2. Wipes `hub-perf/` (unless `--no-wipe-hub`). +3. Starts MinIO and hub. +4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone. +5. Stops everything cleanly. + +Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every +iteration exercises the hydrate-only path against the same baseline CAS. The +`--bucket` flag selects which seeded bucket (and therefore which pack mode) +to exercise. + +Pass `--enable-dehydration` to run a full provision -> deprovision cycle that +includes re-upload (dehydrate) at deprovision time. Use this to measure the +dehydrate phase end-to-end against the seeded baseline. Note the seeded +baseline diverges after a `--enable-dehydration` run - re-copy `--minio-seeded` +or re-run `preserve_minio_state.py` if you want to compare to the pristine state. + +After each run the hub log, structured zenserver logs, any utrace file, and a +`summary.json` with the run's timings are copied into +`perf-runs/<timestamp>_<bucket>/` so Stage C runs can be compared +post-hoc. Override the destination with `--archive-dir PATH`. + +## Resetting between runs + +- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild. +- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`. + +To force a fresh MinIO seed for one variant: delete the matching +`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching +`--dest`) in that worktree. To force a fresh S3 snapshot: delete +`s3-snapshot/` and re-run Stage A. diff --git a/scripts/test_scripts/hub/analyze_perf_runs.py b/scripts/test_scripts/hub/analyze_perf_runs.py new file mode 100644 index 000000000..521d9cc1e --- /dev/null +++ b/scripts/test_scripts/hub/analyze_perf_runs.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +"""Parse two hub.log archives (packed vs unpacked) and produce a comparison.""" + +from __future__ import annotations + +import argparse +import re +import sys +from pathlib import Path +from statistics import mean, median + + +_SIZE_UNITS = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} +_TIME_UNITS = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1.0} + + +def parse_size(s: str) -> float: + # Accept forms: "1.53K", "22.6K", "180M", "1.83G", "25" (bare bytes), "0B". + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)B?", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "B" + return float(val) * _SIZE_UNITS.get(unit, 1) + + +def parse_time_s(s: str) -> float: + # Accept forms: "1s", "46ms", "712us", "0ns" + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(ns|us|ms|s)", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) + return float(val) * _TIME_UNITS[unit] + + +def parse_rate_bits(s: str) -> float: + # "30.7Mbits/s" -> bits/sec + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)bits/s", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "" + mult = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12}[unit] + return float(val) * mult + + +_RE_COMPLETE = re.compile(r"Hydration complete module '([0-9a-f-]+)': (\d+) files \(([^)]+)\) in (\S+)$") +_RE_PACK = re.compile(r"Pack unpack:\s+(\d+) packs,\s+(\d+) files,\s+(\S+),\s+download (\S+),\s+unpack (\S+)") +_RE_DLPHASE = re.compile(r"Download phase:\s+(\S+)\s+(\d+)/(\d+)\s+\(([^)]+)\) downloaded,\s+(\S+),\s+(\d+) threads") +_RE_REQS = re.compile(r"Requests:\s+(\d+) reqs,\s+avg (\S+)/req,\s+avg (\S+)/req,\s+peak in-flight (\d+),\s+saturation (\d+)%") +_RE_SCHED = re.compile(r"Scheduling:\s+1st schedule \+(\S+),\s+1st start \+(\S+),\s+queue wait (\S+)") +_RE_LOADMETA = re.compile(r"Load metadata:\s+(\S+)") +_RE_RENAME = re.compile(r"Rename/copy:\s+(\S+)") + + +def parse_log(path: Path) -> list[dict]: + modules: list[dict] = [] + text = path.read_text(encoding="utf-8", errors="replace").splitlines() + i = 0 + while i < len(text): + line = text[i] + m = _RE_COMPLETE.search(line) + if not m: + i += 1 + continue + mod = { + "mid": m.group(1), + "files": int(m.group(2)), + "total_bytes": parse_size(m.group(3)), + "wall_s": parse_time_s(m.group(4)), + } + # Read the block that follows (up to ~12 lines). + block = text[i : min(i + 20, len(text))] + joined = "\n".join(block) + pm = _RE_PACK.search(joined) + if pm: + mod["packs"] = int(pm.group(1)) + mod["packed_files"] = int(pm.group(2)) + # Uncompressed format: single size, no integrity rehash. + mod["pack_bytes"] = parse_size(pm.group(3)) + mod["pack_download_s"] = parse_time_s(pm.group(4)) + mod["pack_unpack_s"] = parse_time_s(pm.group(5)) + else: + mod["packs"] = 0 + mod["packed_files"] = 0 + mod["pack_bytes"] = 0.0 + mod["pack_download_s"] = 0.0 + mod["pack_unpack_s"] = 0.0 + + dm = _RE_DLPHASE.search(joined) + if dm: + mod["dl_phase_s"] = parse_time_s(dm.group(1)) + mod["dl_files"] = int(dm.group(2)) + mod["dl_total_files"] = int(dm.group(3)) + mod["dl_bytes"] = parse_size(dm.group(4)) + mod["dl_throughput_bps"] = parse_rate_bits(dm.group(5)) + mod["dl_threads"] = int(dm.group(6)) + rm = _RE_REQS.search(joined) + if rm: + mod["reqs"] = int(rm.group(1)) + mod["req_avg_s"] = parse_time_s(rm.group(2)) + mod["req_avg_throughput_bps"] = parse_rate_bits(rm.group(3)) + mod["req_peak_inflight"] = int(rm.group(4)) + mod["req_saturation_pct"] = int(rm.group(5)) + sm = _RE_SCHED.search(joined) + if sm: + mod["sched_1st_schedule_s"] = parse_time_s(sm.group(1)) + mod["sched_1st_start_s"] = parse_time_s(sm.group(2)) + mod["queue_wait_s"] = parse_time_s(sm.group(3)) + lm = _RE_LOADMETA.search(joined) + if lm: + mod["load_metadata_s"] = parse_time_s(lm.group(1)) + rcm = _RE_RENAME.search(joined) + if rcm: + mod["rename_copy_s"] = parse_time_s(rcm.group(1)) + + modules.append(mod) + i += 1 + + return modules + + +def fmt_bytes(n: float) -> str: + for u in ("B", "KB", "MB", "GB", "TB"): + if n < 1024 or u == "TB": + return f"{n:.1f} {u}" + n /= 1024 + return f"{n:.1f} TB" + + +def fmt_s(s: float) -> str: + if s < 1e-3: + return f"{s*1e6:.0f} us" + if s < 1.0: + return f"{s*1e3:.1f} ms" + return f"{s:.2f} s" + + +def agg(modules: list[dict]) -> dict: + a: dict = {} + a["count"] = len(modules) + a["total_files"] = sum(m["files"] for m in modules) + a["total_bytes"] = sum(m["total_bytes"] for m in modules) + a["total_reqs"] = sum(m.get("reqs", 0) for m in modules) + a["total_packs"] = sum(m.get("packs", 0) for m in modules) + a["total_packed_files"] = sum(m.get("packed_files", 0) for m in modules) + a["total_pack_bytes"] = sum(m.get("pack_bytes", 0.0) for m in modules) + a["total_dl_bytes"] = sum(m.get("dl_bytes", 0.0) for m in modules) + a["total_dl_phase_s"] = sum(m.get("dl_phase_s", 0.0) for m in modules) + a["total_pack_download_s"] = sum(m.get("pack_download_s", 0.0) for m in modules) + a["total_pack_unpack_s"] = sum(m.get("pack_unpack_s", 0.0) for m in modules) + a["total_rename_s"] = sum(m.get("rename_copy_s", 0.0) for m in modules) + a["total_load_meta_s"] = sum(m.get("load_metadata_s", 0.0) for m in modules) + a["total_wall_s"] = sum(m["wall_s"] for m in modules) + + # Per-module medians and percentiles of request-level metrics + req_avgs = [m["req_avg_s"] for m in modules if m.get("reqs", 0) > 0] + a["req_avg_latency_median_s"] = median(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_mean_s"] = mean(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_p95_s"] = ( + sorted(req_avgs)[int(0.95 * len(req_avgs))] if req_avgs else 0.0 + ) + + queue_waits = [m["queue_wait_s"] for m in modules if "queue_wait_s" in m] + a["queue_wait_median_s"] = median(queue_waits) if queue_waits else 0.0 + a["queue_wait_mean_s"] = mean(queue_waits) if queue_waits else 0.0 + a["queue_wait_p95_s"] = ( + sorted(queue_waits)[int(0.95 * len(queue_waits))] if queue_waits else 0.0 + ) + a["queue_wait_max_s"] = max(queue_waits) if queue_waits else 0.0 + + first_starts = [m["sched_1st_start_s"] for m in modules if "sched_1st_start_s" in m] + a["first_start_median_s"] = median(first_starts) if first_starts else 0.0 + a["first_start_p95_s"] = ( + sorted(first_starts)[int(0.95 * len(first_starts))] if first_starts else 0.0 + ) + + wall_times = [m["wall_s"] for m in modules] + a["wall_median_s"] = median(wall_times) if wall_times else 0.0 + a["wall_mean_s"] = mean(wall_times) if wall_times else 0.0 + a["wall_p95_s"] = sorted(wall_times)[int(0.95 * len(wall_times))] if wall_times else 0.0 + a["wall_max_s"] = max(wall_times) if wall_times else 0.0 + + saturations = [m["req_saturation_pct"] for m in modules if "req_saturation_pct" in m] + a["saturation_median_pct"] = median(saturations) if saturations else 0 + a["saturation_mean_pct"] = mean(saturations) if saturations else 0 + + peak_inflights = [m["req_peak_inflight"] for m in modules if "req_peak_inflight" in m] + a["peak_inflight_median"] = median(peak_inflights) if peak_inflights else 0 + a["peak_inflight_max"] = max(peak_inflights) if peak_inflights else 0 + + return a + + +def print_row(label: str, packed_val: str, unpacked_val: str, delta: str = ""): + print(f" {label:<42} {packed_val:>14} {unpacked_val:>14} {delta}") + + +def delta_pct(packed: float, unpacked: float) -> str: + if unpacked == 0: + return "" + d = packed - unpacked + pct = (d / unpacked) * 100.0 + sign = "+" if d >= 0 else "" + return f"{sign}{pct:.1f}%" + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--packed-log", required=True) + p.add_argument("--unpacked-log", required=True) + args = p.parse_args() + + pm = parse_log(Path(args.packed_log)) + um = parse_log(Path(args.unpacked_log)) + + pa = agg(pm) + ua = agg(um) + + print(f"\nParsed packed: {pa['count']} modules from {args.packed_log}") + print(f"Parsed unpacked: {ua['count']} modules from {args.unpacked_log}\n") + + print(f" {'metric':<42} {'packed':>14} {'unpacked':>14} delta(p vs u)") + print(f" {'-'*42} {'-'*14} {'-'*14} {'-'*12}") + + print("\n-- workload --") + print_row("modules", f"{pa['count']}", f"{ua['count']}") + print_row("total files (entries)", f"{pa['total_files']}", f"{ua['total_files']}") + print_row("total raw data", fmt_bytes(pa["total_bytes"]), fmt_bytes(ua["total_bytes"])) + + print("\n-- S3 requests --") + print_row("total S3 GETs", + f"{pa['total_reqs']}", f"{ua['total_reqs']}", + delta_pct(pa["total_reqs"], ua["total_reqs"])) + print_row("GETs per module (mean)", + f"{pa['total_reqs']/pa['count']:.1f}", + f"{ua['total_reqs']/ua['count']:.1f}", + delta_pct(pa['total_reqs']/pa['count'], ua['total_reqs']/ua['count'])) + print_row("requests saved by packing", + f"{ua['total_reqs'] - pa['total_reqs']}", "-", + f"{((ua['total_reqs'] - pa['total_reqs']) / ua['total_reqs']) * 100:.1f}%") + + print("\n-- payload --") + print_row("total bytes downloaded", + fmt_bytes(pa["total_dl_bytes"]), + fmt_bytes(ua["total_dl_bytes"]), + delta_pct(pa["total_dl_bytes"], ua["total_dl_bytes"])) + print_row(" pack bytes (aggregate)", + fmt_bytes(pa["total_pack_bytes"]), "-") + diff = pa["total_dl_bytes"] - ua["total_dl_bytes"] + direction = "saved by packing" if diff < 0 else "added by packing" + print_row(f"bytes {direction}", fmt_bytes(abs(diff)), "-") + + print("\n-- per-request latency (per-module avg) --") + print_row("median req avg latency", + fmt_s(pa["req_avg_latency_median_s"]), + fmt_s(ua["req_avg_latency_median_s"]), + delta_pct(pa["req_avg_latency_median_s"], ua["req_avg_latency_median_s"])) + print_row("mean req avg latency", + fmt_s(pa["req_avg_latency_mean_s"]), + fmt_s(ua["req_avg_latency_mean_s"]), + delta_pct(pa["req_avg_latency_mean_s"], ua["req_avg_latency_mean_s"])) + print_row("p95 req avg latency", + fmt_s(pa["req_avg_latency_p95_s"]), + fmt_s(ua["req_avg_latency_p95_s"]), + delta_pct(pa["req_avg_latency_p95_s"], ua["req_avg_latency_p95_s"])) + + print("\n-- queue / scheduling --") + print_row("median queue wait (per module)", + fmt_s(pa["queue_wait_median_s"]), + fmt_s(ua["queue_wait_median_s"]), + delta_pct(pa["queue_wait_median_s"], ua["queue_wait_median_s"])) + print_row("mean queue wait", + fmt_s(pa["queue_wait_mean_s"]), + fmt_s(ua["queue_wait_mean_s"]), + delta_pct(pa["queue_wait_mean_s"], ua["queue_wait_mean_s"])) + print_row("p95 queue wait", + fmt_s(pa["queue_wait_p95_s"]), + fmt_s(ua["queue_wait_p95_s"]), + delta_pct(pa["queue_wait_p95_s"], ua["queue_wait_p95_s"])) + print_row("max queue wait", + fmt_s(pa["queue_wait_max_s"]), + fmt_s(ua["queue_wait_max_s"]), + delta_pct(pa["queue_wait_max_s"], ua["queue_wait_max_s"])) + print_row("median 1st-start delay", + fmt_s(pa["first_start_median_s"]), + fmt_s(ua["first_start_median_s"]), + delta_pct(pa["first_start_median_s"], ua["first_start_median_s"])) + + print("\n-- per-module wall time (hydration block) --") + print_row("median", + fmt_s(pa["wall_median_s"]), + fmt_s(ua["wall_median_s"]), + delta_pct(pa["wall_median_s"], ua["wall_median_s"])) + print_row("mean", + fmt_s(pa["wall_mean_s"]), + fmt_s(ua["wall_mean_s"]), + delta_pct(pa["wall_mean_s"], ua["wall_mean_s"])) + print_row("p95", + fmt_s(pa["wall_p95_s"]), + fmt_s(ua["wall_p95_s"]), + delta_pct(pa["wall_p95_s"], ua["wall_p95_s"])) + print_row("max", + fmt_s(pa["wall_max_s"]), + fmt_s(ua["wall_max_s"]), + delta_pct(pa["wall_max_s"], ua["wall_max_s"])) + + print("\n-- thread-pool saturation / concurrency --") + print_row("saturation median %", + f"{pa['saturation_median_pct']}", + f"{ua['saturation_median_pct']}") + print_row("saturation mean %", + f"{pa['saturation_mean_pct']:.1f}", + f"{ua['saturation_mean_pct']:.1f}") + print_row("peak in-flight median", + f"{pa['peak_inflight_median']}", + f"{ua['peak_inflight_median']}") + print_row("peak in-flight max", + f"{pa['peak_inflight_max']}", + f"{ua['peak_inflight_max']}") + + print("\n-- pack-specific costs (packed run only) --") + print(f" pack GETs added: {pa['total_packs']} ({pa['total_packs']/pa['count']:.2f}/module)") + print(f" small files folded: {pa['total_packed_files']} ({pa['total_packed_files']/pa['count']:.1f}/module)") + print(f" pack download time total: {pa['total_pack_download_s']:.1f} s") + print(f" pack unpack total: {pa['total_pack_unpack_s']:.2f} s") + + print("\n-- per-phase totals (sum across modules, wall-clock contribution) --") + print_row("load metadata total", + fmt_s(pa["total_load_meta_s"]), + fmt_s(ua["total_load_meta_s"]), + delta_pct(pa["total_load_meta_s"], ua["total_load_meta_s"])) + print_row("download phase total", + fmt_s(pa["total_dl_phase_s"]), + fmt_s(ua["total_dl_phase_s"]), + delta_pct(pa["total_dl_phase_s"], ua["total_dl_phase_s"])) + print_row("rename/copy total", + fmt_s(pa["total_rename_s"]), + fmt_s(ua["total_rename_s"]), + delta_pct(pa["total_rename_s"], ua["total_rename_s"])) + print_row("hydration wall sum", + fmt_s(pa["total_wall_s"]), + fmt_s(ua["total_wall_s"]), + delta_pct(pa["total_wall_s"], ua["total_wall_s"])) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py new file mode 100644 index 000000000..23014409c --- /dev/null +++ b/scripts/test_scripts/hub/hub_load_test_s3.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +"""Hub provision/deprovision sweep against a real S3 bucket. + +Lists the top-level folders in S3_URI (each folder name is a moduleId), +picks 200 of them, then: + 1. fires all provision requests concurrently, + 2. polls until every module reaches 'provisioned', + 3. waits 5 seconds, + 4. fires all deprovision requests concurrently, + 5. polls until every module is deprovisioned, + 6. waits 5 seconds, + 7. shuts down the hub. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO +session is missing or expired, the script triggers the login automatically. + +Requirements: + pip install boto3 + aws CLI v2 +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}")) + if matches: + return max(matches, key=lambda p: p.stat().st_mtime) + + sys.exit( + "zenserver debug executable not found in build/. " + "Run: xmake config -y -m debug -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +# --------------------------------------------------------------------------- +# AWS SSO + S3 listing +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit( + "[aws] boto3 is required.\n" + "Install it with: pip install boto3" + ) + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]: + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + paginator = s3.get_paginator("list_objects_v2") + module_ids: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + # Strip outer prefix + trailing slash to get the folder name + folder = full[len(prefix_norm):].rstrip("/") + if folder: + module_ids.append(folder) + return module_ids + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + out: dict[str, str] = {} + for m in data.get("modules", []) or []: + mid = m.get("moduleId") + if mid: + out[mid] = m.get("state", "") + return out + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """Fire POST /hub/modules/<id>/<verb> for every module concurrently. + + Returns (accepted_ids, failures) where accepted_ids got 200/202 and + failures is a list of (mid, status, body) for everything else. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + # Already gone - treat as success for deprovision fan-out. + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _wait_for_provisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is 'provisioned' or gone. + + Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach + 'provisioned' within timeout_s; last_states maps all module_ids to their + last-seen state (or empty string if the module was absent from the report). + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + now_done: list[str] = [] + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == "provisioned": + now_done.append(mid) + elif s in ("", "unprovisioned", "crashed"): + # Not useful end-states for "waiting to become provisioned", + # but we don't block forever on them either - let timeout decide. + pass + for mid in now_done: + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is gone from hub status.""" + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3", + help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=200, + help="Number of modules to sweep (default: 200)") + parser.add_argument("--settle-seconds", type=float, default=5.0, + help="Seconds to wait between provision-complete and deprovision, " + "and between deprovision-complete and shutdown (default: 5.0)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=600.0, + help="Max seconds to wait for provision or deprovision to finish (default: 600)") + parser.add_argument("--trace", + nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. " + "Optionally pass channel spec (default: 'default')") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + data_dir = Path(args.data_dir) + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] region: {aws_region}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region) + print(f"[s3] found {len(module_ids)} module folders") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + selected = module_ids[:args.module_count] + if len(selected) < args.module_count: + print(f"[s3] only {len(selected)} folders available (requested {args.module_count})") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back. + "--hub-enable-dehydration=false", + ] + if args.trace: + trace_path = data_dir / "hub.utrace" + hub_extra_args += [ + f"--trace={args.trace}", + f"--tracefile={trace_path}", + ] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + + try: + hub_instance_limit = max(args.module_count, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision phase --- + print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...") + t_pv0 = time.monotonic() + accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision") + print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} " + f"(fan-out {time.monotonic() - t_pv0:.1f}s)") + for mid, status, body in pv_failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if len(pv_failures) > 10: + print(f"[provision] ... and {len(pv_failures) - 10} more failures") + + if hub_proc.poll() is not None: + print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") + return + + if accepted: + stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout) + provisioned_count = len(accepted) - len(stuck) + print() + print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' " + f"({time.monotonic() - t_pv0:.1f}s total)") + if stuck: + print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s") + for mid in stuck[:10]: + print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'") + else: + print("[provision] nothing provisioned") + return + + print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...") + time.sleep(args.settle_seconds) + + # --- Deprovision phase --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t_dp0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision") + print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} " + f"(fan-out {time.monotonic() - t_dp0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[deprovision] FAILED {mid}: status={status} body={body}") + if len(dp_failures) > 10: + print(f"[deprovision] ... and {len(dp_failures) - 10} more failures") + + stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout) + deprovisioned_count = len(dp_accepted) - len(stuck_dp) + print() + print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone " + f"({time.monotonic() - t_dp0:.1f}s total)") + if stuck_dp: + print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s") + for mid in stuck_dp[:10]: + print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'") + + print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...") + time.sleep(args.settle_seconds) + + print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _stop_process(hub_proc, "hub", timeout_s=120.0) + if hub_log_handle is not None: + hub_log_handle.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua new file mode 100644 index 000000000..f3cf3e697 --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/hub.lua @@ -0,0 +1,48 @@ +-- Perf-test hub config: mirrors the production hub config in the repo root +-- (hub.lua / instance.lua). The launch script overrides hub.instance.config +-- and the effective concurrency (--corelimit=128) via CLI to simulate an +-- r5n.32xlarge box so the default auto-picks match what prod sees there. + +hub = { + instance = { + baseportnumber = 21000, -- default + limits = { + count = 1100, -- headroom for 1000-module perf runs + memorylimitpercent = 90, -- default: 0 (disabled) + disklimitpercent = 90, -- default: 0 (disabled) + }, + corelimit = 4, -- default: 0 (auto) + provisionthreads = 8, -- default: auto + -- NOTE: hub.instance.config (path to instance lua) is overridden via + -- --hub-instance-config on the CLI. If left here, it would be resolved + -- relative to the hub's CWD at spawn time (NOT this file's dir). + }, + + hydration = { + -- Match production's per-module download pool size. Without this, the + -- default auto-picks hardware_concurrency/4 which on --corelimit=128 + -- would be 32. Prod logs consistently show "16 threads" in Download phase. + threads = 16, + }, + + watchdog = { + cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan + cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count + instancecheckthrottlems = 10, -- default: 5. slight throttle to reduce hub CPU + provisionedinactivitytimeoutseconds = 600, -- default + hibernatedinactivitytimeoutseconds = 1800, -- default + inactivitycheckmarginseconds = 60, -- default + }, +} + +network = { + httpserverthreads = 8, -- default: auto. hub itself needs few threads +} + +server = { + dedicated = true, -- default: false. signals build-farm use, affects thread scaling heuristics +} + +gc = { + enabled = false, -- default: true. hub has no storage, no need for GC +} diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua new file mode 100644 index 000000000..1251997db --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/instance.lua @@ -0,0 +1,15 @@ +network = { + httpserverthreads = 4, -- default: auto +} + +gc = { + enabled = false, -- default: true. hub triggers GC at deprovision, no need for periodic GC +} + +cache = { + bucket = { + memlayer = { + sizethreshold = 0, -- default: 1024. 0 disables memlayer entirely + }, + }, +} diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py new file mode 100644 index 000000000..365e4a542 --- /dev/null +++ b/scripts/test_scripts/hub/preserve_minio_state.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +"""Preserve a freshly-seeded MinIO data directory. + +Run this after seed_minio.py has finished. By default it MOVES --source (the +MinIO --data-dir used during seeding) to --dest (the preservation path) and +writes a README with provenance so future perf runs start from a known +baseline. Move avoids a ~0.5 TB copy on a full 1000-module seed; Stage B +wipes --minio-data-dir on its next invocation anyway. + +Pass --copy to keep --source in place (slower; needs 2x disk). + +Typical invocation: + python preserve_minio_state.py + +Defaults map to the paths recommended by PERF_SEED_README.md. +""" + +from __future__ import annotations + +import argparse +import datetime +import os +import shutil +import stat +import sys +from pathlib import Path + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + def _onerror(func, p, exc_info): + try: + os.chmod(p, stat.S_IWRITE) + func(p) + except Exception: + pass + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +def _size_of(path: Path) -> tuple[int, int]: + files = 0 + total = 0 + for root, _dirs, names in os.walk(path): + for n in names: + p = Path(root) / n + try: + total += p.stat().st_size + except OSError: + pass + files += 1 + return files, total + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data", + help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""), + help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)") + parser.add_argument("--bucket", default="zen-seed", + help="MinIO bucket name recorded in the README") + parser.add_argument("--module-count", type=int, default=300, + help="Module count recorded in the README") + parser.add_argument("--copy", action="store_true", + help="Copy --source to --dest instead of moving it. Default is move " + "(fast, in-place rename when on the same volume). Use --copy if you " + "want to keep --source intact for another preserve run.") + args = parser.parse_args() + + source = Path(args.source).resolve() + dest = Path(args.dest).resolve() + + if not source.is_dir(): + sys.exit(f"[preserve] source not found: {source}") + + # Dest is wiped and rewritten. Refuse any path that would clobber source. + if dest == source or dest in source.parents or source in dest.parents: + sys.exit(f"[preserve] source ({source}) and dest ({dest}) must be disjoint") + + files, total = _size_of(source) + mode = "copy" if args.copy else "move" + print(f"[preserve] source: {source} -> {files:,} files, {total/1024/1024:.1f} MB") + print(f"[preserve] dest: {dest}") + print(f"[preserve] mode: {mode}") + + if dest.exists(): + print(f"[preserve] removing existing dest {dest}") + _rmtree_robust(dest) + + dest.parent.mkdir(parents=True, exist_ok=True) + if args.copy: + shutil.copytree(source, dest, symlinks=False) + else: + shutil.move(str(source), str(dest)) + + readme = dest / "README.txt" + readme.write_text( + "\n".join([ + "zen-perf-seed preserved MinIO state", + "", + f"Created: {datetime.datetime.now(datetime.timezone.utc).isoformat()}", + f"Source s3 URI: {args.s3_uri}", + f"Bucket: {args.bucket}", + f"Modules: {args.module_count}", + f"Files: {files:,}", + f"Bytes: {total:,}", + "", + "To run a perf iteration: copy this directory onto a fresh MinIO data", + "dir (see scripts/test_scripts/hub/run_minio_perf.py) and point a hub at it.", + "", + ]), + encoding="ascii", + ) + print(f"[preserve] wrote {readme}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py new file mode 100644 index 000000000..b59cff41a --- /dev/null +++ b/scripts/test_scripts/hub/run_minio_perf.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python3 +"""Stage C of the perf-seed workflow: run a perf iteration against the +preserved MinIO state. + +Flow per invocation: + 1. Reset --minio-run by copying --minio-seeded over it (so every iteration + starts from the same canonical state). + 2. Start MinIO against --minio-run. + 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so + the perf run is read-only and the seeded baseline survives intact. + Pass --enable-dehydration to run a full provision -> deprovision cycle + that re-uploads at deprovision time (use this to measure the dehydrate + phase as well as hydrate; the seeded baseline diverges after such a run). + 4. Provision every module found under --snapshot-dir (same set that was + used to seed MinIO), wait for all to reach 'provisioned'. + 5. Deprovision them all, wait for them to be gone. + 6. Stop hub + MinIO cleanly. + +Optional --trace writes a utrace file to --hub-data-dir/hub.utrace. + +This script reuses the lightweight harness helpers from the other stages but +keeps the run fully offline once the seed is preserved. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p}") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}") + return p + + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out after {timeout_s}s") + + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + # Use the production-like Lua config (limits, thread counts, watchdog timers) + # so perf numbers reflect what prod actually runs. Hub.hydration.threads is + # left unset so the default auto-pick (hardware_concurrency/4) fires; combined + # with --corelimit=128 below, that yields 32 hydration threads on the actual + # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes. + config_dir = Path(__file__).resolve().parent / "perf_configs" + hub_config = config_dir / "hub.lua" + inst_config = config_dir / "instance.lua" + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + f"--config={hub_config}", + f"--hub-instance-config={inst_config}", + # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box + # with fewer physical cores, this clamps to the actual count (see + # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates). + # When running on the actual prod instance class, this makes the default + # auto-picks (provision/hydration thread counts) behave as prod does. + "--corelimit=128", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' for graceful hub shutdown.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_states(port: int) -> Optional[dict[str, str]]: + # Without Accept: application/json the hub serves compact binary (CBOR) + # which json.loads can't parse. 60s timeout - /hub/status can take a few + # seconds to serialize with 300+ modules in flight. + req = urllib.request.Request( + f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=60) as resp: + data = json.loads(resp.read()) + except Exception as e: + # Visibility: prior silent None caused "0/300 provisioned..." forever. + print(f"[status] WARN /hub/status failed: {e}") + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids} + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _state_histogram(states: dict[str, str], remaining: set[str]) -> str: + counts: dict[str, int] = {} + for mid in remaining: + s = states.get(mid, "<absent>") + counts[s] = counts.get(s, 0) + 1 + return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1])) + + +def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + if states.get(mid) == "provisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all provisioned)" + print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}") + last_verbose = now + else: + print(f"[provision] {done}/{len(ids)} provisioned...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all gone)" + print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}") + last_verbose = now + else: + print(f"[deprovision] {done}/{len(ids)} gone...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _robust_copytree(src: Path, dst: Path) -> None: + """Windows-friendly directory copy with progress. + + Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy + file data + attributes + timestamps explicitly (not just the defaults). + Safe because dst is always the working dir (minio-run) - never the + preserved baseline. + """ + if sys.platform == "win32": + cmd = [ + "robocopy", str(src), str(dst), + "/MIR", + "/COPY:DAT", "/DCOPY:DAT", + "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP", + "/R:2", "/W:1", + ] + print(f"[reset] robocopy {src} -> {dst}") + rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if rc >= 8: + sys.exit(f"[reset] robocopy failed rc={rc}") + else: + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False) + + +def _archive_run( + archive_dir: Path, + hub_data_dir: Path, + bucket: str, + summary: dict, +) -> Optional[Path]: + """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a + timestamped subdir under archive_dir so successive runs can be compared. + + Returns the archive destination path or None if there was nothing to copy. + """ + try: + ts = time.strftime("%Y%m%d-%H%M%S", time.localtime()) + dest = archive_dir / f"{ts}_{bucket}" + dest.mkdir(parents=True, exist_ok=True) + + copied: list[str] = [] + # Hub stdout/stderr (captured by _start_hub via subprocess stdout=) + hub_log = hub_data_dir / "hub.log" + if hub_log.is_file(): + shutil.copy2(hub_log, dest / "hub.log") + copied.append("hub.log") + # Structured zenserver logs (rotated variants included) + logs_src = hub_data_dir / "logs" + if logs_src.is_dir(): + logs_dst = dest / "logs" + logs_dst.mkdir(exist_ok=True) + for p in logs_src.iterdir(): + if p.is_file(): + shutil.copy2(p, logs_dst / p.name) + copied.append(f"logs/{p.name}") + # Optional trace + utrace = hub_data_dir / "hub.utrace" + if utrace.is_file(): + shutil.copy2(utrace, dest / "hub.utrace") + copied.append("hub.utrace") + + (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii") + + print(f"[archive] {dest} ({len(copied)} files)") + return dest + except Exception as e: + print(f"[archive] WARNING: failed to archive run: {e}") + return None + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run", + help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf", + help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf") + parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs", + help="Where to archive hub.log + zenserver.log + hub.utrace after each run " + "(default: E:/Dev/zen-perf-seed/perf-runs)") + parser.add_argument("--bucket", default="zen-seed-packed", + help="MinIO bucket name to exercise (default: zen-seed-packed). " + "Pack worktree seeds only the packed bucket.") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules used (0 = all from snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1200.0) + parser.add_argument("--settle-seconds", type=float, default=5.0) + parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')") + parser.add_argument("--enable-dehydration", action="store_true", + help="Run with --hub-enable-dehydration=true so the deprovision phase " + "actually re-uploads to MinIO. Default false preserves the seeded " + "baseline; pass this to measure dehydrate cost as well as hydrate.") + parser.add_argument("--no-wipe-hub", action="store_true", + help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + minio_seeded = Path(args.minio_seeded).resolve() + minio_run = Path(args.minio_run).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_dir = Path(args.hub_data_dir).resolve() + archive_dir = Path(args.archive_dir).resolve() + + if not minio_seeded.is_dir(): + sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}") + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}") + + # Preserved inputs are read-only from this script's perspective. Refuse to + # run if any mutable path could accidentally clobber them. + for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)): + for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)): + if mutable == ro or mutable in ro.parents or ro in mutable.parents: + sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no modules in {snapshot_dir}") + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] minio-seeded: {minio_seeded}") + print(f"[setup] minio-run: {minio_run}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + + # Reset MinIO run dir from the baseline. + _robust_copytree(minio_seeded, minio_run) + + # Wipe the hub data dir so every run starts from scratch unless the user opts out. + if not args.no_wipe_hub and hub_data_dir.exists(): + print(f"[reset] wiping {hub_data_dir}") + _rmtree_robust(hub_data_dir) + hub_data_dir.mkdir(parents=True, exist_ok=True) + + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{args.bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", + ] + if args.enable_dehydration: + print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") + if args.trace: + trace_path = hub_data_dir / "hub.utrace" + hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + timings: dict = { + "bucket": args.bucket, + "module_count": len(module_ids), + "build_mode": zenserver_mode, + "provision_fanout_s": None, + "provision_total_s": None, + "deprovision_fanout_s": None, + "deprovision_total_s": None, + "total_s": None, + } + + try: + minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + + hub_log = hub_data_dir / "hub.log" + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.hub_port, hub_log, + hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, args.hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision") + fanout_s = time.monotonic() - t0 + timings["provision_fanout_s"] = round(fanout_s, 2) + print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)") + for mid, status, body in failures[:5]: + print(f"[provision] FAIL {mid}: status={status} body={body}") + if failures: + exit_code = 1 + stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["provision_total_s"] = round(total_s, 2) + print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})") + if stuck: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + # --- Deprovision --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision") + fanout_s = time.monotonic() - t0 + timings["deprovision_fanout_s"] = round(fanout_s, 2) + print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)") + if dp_failures: + exit_code = 1 + stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["deprovision_total_s"] = round(total_s, 2) + print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})") + if stuck_dp: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + elapsed = time.monotonic() - t_start + timings["total_s"] = round(elapsed, 2) + print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None and minio_proc.poll() is None: + _stop_minio_graceful(minio_proc) + # Archive AFTER the hub has exited so in-flight log writes are flushed. + timings["exit_code"] = exit_code + _archive_run(archive_dir, hub_data_dir, args.bucket, timings) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py new file mode 100644 index 000000000..e0e45c4cb --- /dev/null +++ b/scripts/test_scripts/hub/seed_minio.py @@ -0,0 +1,720 @@ +#!/usr/bin/env python3 +"""Stage B of the perf-seed workflow. + +Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO +bucket so a later perf run can exercise that bucket. Pack mode is hardcoded +in this script (see --hub-hydration-enable-pack flag in _start_hub) and +governs how the hub uploads into MinIO. To compare packed vs unpacked, +invoke this script twice from two separate worktrees - one with the pack +flag flipped to false - producing two preserved MinIO data dirs, then run +run_minio_perf.py against each. + +Flow: + 1. Start a local MinIO server against --minio-data-dir, create the bucket. + 2. Start a hub against MinIO (--bucket as target), dehydration ON, + hibernated-watchdog timeout set huge so hibernated modules are not + auto-deprovisioned before we deprovision them ourselves. + 3. Provision every module discovered under --snapshot-dir (fresh hydrate + against an empty bucket spawns the child on an empty dir). + 4. Hibernate every module (child exits, state dir unlocked). + 5. Overlay the snapshot into <hub-data-dir>/servers/<mid>/. + 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(), + which scans the overlaid ServerStateDir and uploads content into the + bucket. + 7. Shut the hub down via 'zen down --pid'. + 8. Stop MinIO. + +Preservation of the resulting MinIO data directory is done by a separate +step (see preserve_minio_state.py / PERF_SEED_README.md). +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +# --------------------------------------------------------------------------- +# Executable discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}. Build the zen project first.") + return p + + +# --------------------------------------------------------------------------- +# MinIO lifecycle +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 # type: ignore[import-not-found] + import botocore.config # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[minio] boto3 is required. pip install boto3") + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # Prevent the watchdog from auto-deprovisioning modules while we're + # still hydrating the tail / in the overlay phase. BOTH timers have to + # be extended - the provisioned one (default 600s) is what bites on + # large-N runs where early provisions go idle waiting for the rest. + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + # Explicit - default is true, but make it obvious that Stage B needs + # it since the final deprovision drives the MinIO upload. + "--hub-enable-dehydration=true", + # Pack worktree: turn pack on so dehydrate emits packed CAS. + "--hub-hydration-enable-pack=true", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' so the hub runs its normal shutdown path.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + """MinIO has no external shutdown tool; Ctrl-Break lets it flush.""" + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST concurrently. For 'deprovision' a 404 means the module is already + gone - count it as accepted so the state-poll recognises completion. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], list[str], dict[str, str]]: + """Wait until each module is gone from /hub/status or in 'unprovisioned'. + 'crashed' counts as a hard failure - dehydrate never completed for it. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + elif s == "crashed": + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot overlay +# --------------------------------------------------------------------------- + +def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*. + + snapshot_root is treated as read-only; only hub_servers_root is written to. + """ + files_copied = 0 + bytes_copied = 0 + modules_overlaid = 0 + + for i, mid in enumerate(module_ids, 1): + src = snapshot_root / mid + dst = hub_servers_root / mid + if not src.is_dir(): + print(f"[overlay] WARNING: snapshot missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_overlaid += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[overlay] {i}/{len(module_ids)} modules overlaid " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + return modules_overlaid, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _seed_one_bucket( + *, + bucket: str, + hub_data_dir: Path, + snapshot_dir: Path, + module_ids: list[str], + minio_port: int, + hub_port: int, + poll_timeout: float, + workers: int, + zenserver_exe: Path, + zen_exe: Path, +) -> tuple[int, dict[str, float]]: + """Run the provision/hibernate/overlay/deprovision cycle against a single + MinIO bucket. Returns (exit_code, timings) where timings maps phase name + to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s, + total_s). + """ + print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================") + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{bucket}", + "endpoint": f"http://localhost:{minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [f"--hub-hydration-target-config={config_path}"] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + hib_accepted: list[str] = [] + stuck_hib: list[str] = [] + failed_hib: list[str] = [] + timings: dict[str, float] = { + "provision_s": 0.0, + "hibernate_s": 0.0, + "overlay_s": 0.0, + "deprovision_s": 0.0, + "total_s": 0.0, + } + + try: + hub_log = hub_data_dir / "hub.log" + hub_instance_limit = max(len(module_ids) + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, hub_port, hub_log, + hub_instance_limit, hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=workers) as pool: + # --- Provision --- + print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision") + print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + if not accepted: + return 1, timings + + stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision") + timings["provision_s"] = time.monotonic() - t0 + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({timings['provision_s']:.1f}s)") + if failed or stuck: + exit_code = 1 + for mid in (failed + stuck)[:10]: + print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'") + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + if not accepted: + return 1, timings + + # --- Hibernate --- + print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate") + print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate") + timings["hibernate_s"] = time.monotonic() - t0 + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({timings['hibernate_s']:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Overlay snapshot onto hub's servers dir --- + to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + hub_servers = hub_data_dir / "servers" + print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}") + t0 = time.monotonic() + modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay) + timings["overlay_s"] = time.monotonic() - t0 + print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({timings['overlay_s']:.1f}s)") + if modules_overlaid < len(to_overlay): + exit_code = 1 + + # --- Deprovision (triggers dehydrate -> MinIO upload) --- + with ThreadPoolExecutor(max_workers=workers) as pool: + print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision") + print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout) + timings["deprovision_s"] = time.monotonic() - t0 + dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp) + print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck " + f"({timings['deprovision_s']:.1f}s)") + if failed_dp or stuck_dp: + exit_code = 1 + for mid in (failed_dp + stuck_dp)[:10]: + print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'") + + timings["total_s"] = time.monotonic() - t_start + print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + print(f"[{bucket}][hub] stopping...") + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code, timings + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs", + help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ " + "(default: E:/Dev/zen-perf-seed/hubs)") + parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data", + help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--bucket", default="zen-seed-packed", + help="Bucket to seed (default: zen-seed-packed). Pack worktree - " + "hub is launched with --hub-hydration-enable-pack=true.") + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules processed (0 = all modules in snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for each state transition (default: 1800)") + parser.add_argument("--wipe", action="store_true", + help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting " + "(never touches --snapshot-dir)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + bucket_name: str = args.bucket + + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_root = Path(args.hub_data_root).resolve() + minio_data_dir = Path(args.minio_data_dir).resolve() + + hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve() + + # Safety: snapshot-dir must not overlap any mutable path. + for label, d in [ + ("minio-data-dir", minio_data_dir), + ("hub-data-root", hub_data_root), + (f"hub-b-{bucket_name}", hub_data_dir), + ]: + if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents: + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint") + + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no module directories found in {snapshot_dir}") + + if args.wipe: + for d in [hub_data_dir, minio_data_dir]: + if d.exists(): + if snapshot_dir.is_relative_to(d): + sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it") + print(f"[setup] wiping {d}") + _rmtree_robust(d) + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)") + print(f"[setup] hub-data-root: {hub_data_root}") + print(f"[setup] minio-data-dir: {minio_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})") + + minio_proc: Optional[subprocess.Popen] = None + exit_code = 0 + + try: + minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, bucket_name) + + t_total = time.monotonic() + rc, timings = _seed_one_bucket( + bucket=bucket_name, + hub_data_dir=hub_data_dir, + snapshot_dir=snapshot_dir, + module_ids=module_ids, + minio_port=args.minio_port, + hub_port=args.hub_port, + poll_timeout=args.poll_timeout, + workers=args.workers, + zenserver_exe=zenserver_exe, + zen_exe=zen_exe, + ) + exit_code = rc + + print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}") + print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}") + + phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"] + labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"] + print(f"\n[summary] timings (s):") + for key, lbl in zip(phases, labels): + print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}") + + print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/") + + finally: + if minio_proc is not None and minio_proc.poll() is None: + print("[minio] stopping...") + _stop_minio_graceful(minio_proc) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py new file mode 100644 index 000000000..f0bc7b607 --- /dev/null +++ b/scripts/test_scripts/hub/seed_s3_snapshot.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +"""Stage A of the perf-seed workflow. + +Fetches real module data from a configured S3 bucket via the hub hydration +pipeline and preserves the decompressed server-state trees to disk so a +later stage can replay them into a local MinIO backend. + +Flow: + 1. Start a hub against real S3 (read-only SSO credentials), dehydration off, + hibernated-watchdog timeout set huge so modules are not auto-deprovisioned + mid-flow. + 2. Provision N modules (first N moduleIds from the bucket listing, filtered + to UUID-shaped folders). + 3. Wait until every module reaches 'provisioned'. + 4. Hibernate every module (child processes exit, state dir intact and no + remaining writer while we copy). + 5. Wait until every module reaches 'hibernated'. + 6. Copy <hub-data-dir>/servers/<moduleid>/ -> <snapshot-dir>/<moduleid>/ + with the hub still running - hibernate took the writers offline and the + watchdog is muted. + 7. Shut the hub down via 'zen down --pid <hub pid>'. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Requirements: + pip install boto3 + aws CLI v2 with the SSO profile configured +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + +# Matches UUID-shaped module IDs (UUID-ish with mixed 4-char groups). Filters +# out stray top-level keys (readmes, test scratches) that aren't real modules. +_MODULEID_RE = re.compile(r"^[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}$") + + +# --------------------------------------------------------------------------- +# zenserver binary discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +# --------------------------------------------------------------------------- +# AWS SSO credential resolution +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[aws] boto3 is required. Install it with: pip install boto3") + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI: {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]: + """List UUID-shaped module folders under the bucket and return the `limit` + most recently active ones, ranked by the LastModified of each module's + `incremental-state.cbo` (newest first - these were last dehydrated most + recently, which is the closest proxy for "most recently accessed"). + + Falls back to listing order for any folder whose state file can't be + HEADed (missing / 403 / transient error). + """ + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + # 1. Enumerate every UUID-shaped folder. + paginator = s3.get_paginator("list_objects_v2") + candidates: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/") + if folder and _MODULEID_RE.match(folder): + candidates.append(folder) + print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...") + + # 2. HEAD each module's state file in parallel. Missing/failed HEADs land + # at the tail via a sentinel epoch 0 timestamp. + from datetime import datetime, timezone + epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) + + def _state_mtime(mid: str) -> datetime: + key = f"{prefix_norm}{mid}/incremental-state.cbo" + try: + resp = s3.head_object(Bucket=bucket, Key=key) + return resp.get("LastModified", epoch) + except Exception: + return epoch + + with ThreadPoolExecutor(max_workers=50) as pool: + times = list(pool.map(_state_mtime, candidates)) + + # 3. Sort descending (newest first). Folders without a state file sink. + ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True) + missing = sum(1 for _, t in ranked if t == epoch) + if missing: + print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)") + return [mid for mid, _ in ranked[:limit]] + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH + # watchdog inactivity timers so early-provisioned modules do not get + # auto-deprovisioned while we're still hydrating the tail (hibernated + # default 1800s, provisioned default 600s - the latter is the one that + # bit us on the 1000-module run and dropped 335 modules). + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """Shut the hub down via 'zen down --pid <pid>'. zen down performs a + proper server-initiated shutdown (signals the shutdown event, waits for + the server to drain) rather than a blunt kill.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +# --------------------------------------------------------------------------- +# Hub HTTP API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST /hub/modules/<id>/<verb> concurrently. Returns (accepted, failures).""" + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +# Any of these means the module is done transitioning and will not reach the +# target state on its own (without a retry we control). +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + """Poll hub status until every module hits target_state, fails, or times out. + + Returns (stuck, failed, last_states). 'stuck' = still mid-transition when + we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer) +# --------------------------------------------------------------------------- + +def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Copy src_root/<mid>/* to dst_root/<mid>/* for each module. + + Returns (modules_copied, files_copied, bytes_copied). + Replaces only the specific per-module subdirs; never touches siblings. + """ + dst_root.mkdir(parents=True, exist_ok=True) + modules_copied = 0 + files_copied = 0 + bytes_copied = 0 + + for i, mid in enumerate(module_ids, 1): + src = src_root / mid + dst = dst_root / mid + if not src.is_dir(): + print(f"[snapshot] WARNING: source missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_copied += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[snapshot] {i}/{len(module_ids)} modules copied " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + + return modules_copied, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a", + help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=1000, + help="Number of modules to snapshot (default: 300)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for provision or hibernate to finish (default: 1800)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + hub_data_dir = Path(args.hub_data_dir).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + + # Safety: snapshot-dir is the preserved output. It must not overlap the + # hub data-dir in either direction so nothing the hub writes can clobber + # the preserved state. + if (snapshot_dir == hub_data_dir + or snapshot_dir in hub_data_dir.parents + or hub_data_dir in snapshot_dir.parents): + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and hub-data-dir ({hub_data_dir}) must be disjoint") + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + hub_log = hub_data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] snapshot-dir: {snapshot_dir}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region, args.module_count) + print(f"[s3] selected {len(module_ids)} module folders (UUID-shaped)") + if len(module_ids) < args.module_count: + print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + "--hub-enable-dehydration=false", + ] + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + + try: + hub_instance_limit = max(args.module_count + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.port, module_ids, "provision") + print(f"[provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if not accepted: + sys.exit("[provision] nothing accepted, aborting") + + stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision") + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed: + for mid in failed[:10]: + print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + if stuck: + for mid in stuck[:10]: + print(f"[provision] STUCK {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + + if not accepted: + sys.exit("[provision] nothing successfully provisioned, aborting") + + # --- Hibernate --- + print(f"[hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, args.port, accepted, "hibernate") + print(f"[hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(args.port, hib_accepted, "hibernated", args.poll_timeout, "hibernate") + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Copy snapshots while hub is still running. All instances are + # hibernated (no writers), watchdog-hibernated-timeout is 86400s + # (no auto-deprovision), hub is only touching its own metadata + # outside servers/<mid>/. Safe. --- + copy_src = hub_data_dir / "servers" + to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}") + t0 = time.monotonic() + modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy) + print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({time.monotonic()-t0:.1f}s)") + + if modules_copied < len(to_copy): + print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied") + exit_code = 1 + + # --- Graceful hub shutdown via 'zen down' --- + _zen_down_hub(zen_exe, hub_proc) + hub_proc = None + if hub_log_handle is not None: + hub_log_handle.close() + hub_log_handle = None + + print(f"[summary] stage A total elapsed: {time.monotonic()-t_start:.1f}s, exit={exit_code}") + print(f"[summary] snapshot available at: {snapshot_dir}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + # Fallback: something aborted before the 'zen down' path ran. + print("[hub] force-terminating (zen down was not invoked)") + hub_proc.terminate() + try: + hub_proc.wait(timeout=60) + except subprocess.TimeoutExpired: + hub_proc.kill() + hub_proc.wait() + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub_load_test.py b/scripts/test_scripts/hub_load_test.py new file mode 100644 index 000000000..33f05d90f --- /dev/null +++ b/scripts/test_scripts/hub_load_test.py @@ -0,0 +1,970 @@ +#!/usr/bin/env python3 +"""Hub sustained load test. + +Keeps ~N modules concurrently provisioned from a pool of 1000 predefined +module names, writing and reading data to each instance, then either letting +the hub watchdog deprovision idle ones or explicitly deprovisioning them. +Runs indefinitely until Ctrl-C. + +Optional --s3: starts a local MinIO server and configures the hub to use it +as the de/hydration backend. + +Requirements: + pip install boto3 (only needed with --s3) +""" + +from __future__ import annotations + +import argparse +import json +import os +import queue +import random +import subprocess +import sys +import threading +import time +import urllib.error +import urllib.request +import webbrowser +from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" +_NAMESPACE = "loadtest" +_BUCKET = "bucket" + +# Key sizes to use for activity writes (bytes) - biased toward smaller. +# Blobs are pre-generated at startup; one per size, shared across all requests. +_KEY_SIZES = [512, 512, 2048, 2048, 8192, 32768] +_BLOBS: list[bytes] = [] # populated by _init_blobs() + + +def _init_blobs() -> None: + seen: set[int] = set() + for size in _KEY_SIZES: + if size not in seen: + seen.add(size) + _BLOBS.append(os.urandom(size)) + else: + # reuse the already-generated blob for this size + _BLOBS.append(next(b for b in _BLOBS if len(b) == size)) + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = list(repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}")) + if matches: + return max(matches, key=lambda p: p.stat().st_mtime) + + sys.exit( + "zenserver executable not found in build/. " + "Run: xmake config -y -m release -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit( + f"minio executable not found at {p}. " + "Build with: xmake config -y -m release -a x64 && xmake -y" + ) + return p + + +# --------------------------------------------------------------------------- +# MinIO +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + minio_data = data_dir / "minio" + minio_data.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(minio_data), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 + import botocore.config + import botocore.exceptions + except ImportError: + sys.exit( + "[minio] boto3 is required for --s3.\n" + "Install it with: pip install boto3" + ) + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + idle_timeout: int, + extra_args: list[str], + extra_env: Optional[dict[str, str]], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + "--hub-instance-limit=500", + f"--hub-watchdog-provisioned-inactivity-timeout-seconds={idle_timeout}", + "--hub-watchdog-inactivity-check-margin-seconds=5", + "--hub-watchdog-cycle-interval-ms=2000", + "--hub-watchdog-cycle-processing-budget-ms=3000", + "--hub-watchdog-activity-check-connect-timeout-ms=20", + "--hub-watchdog-activity-check-request-timeout-ms=50", + ] + extra_args + + env = os.environ.copy() + if extra_env: + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Module state +# --------------------------------------------------------------------------- + +@dataclass +class ModuleState: + state: str = "idle" # idle|provisioning|active|deprovisioning|watchdog-pending + base_uri: Optional[str] = None + written_keys: list[str] = field(default_factory=list) + activity_rounds_left: int = 0 + explicit_deprovision: bool = False + watchdog_pending_since: float = 0.0 + generation: int = 0 # incremented on each provision; queue entries carry this to discard stale entries + + +# --------------------------------------------------------------------------- +# Counters +# --------------------------------------------------------------------------- + +@dataclass +class Counters: + provisions: int = 0 + deprovisions_explicit: int = 0 + deprovisions_watchdog: int = 0 + provision_rejected: int = 0 + activity_rounds: int = 0 + writes: int = 0 + reads: int = 0 + errors: int = 0 + last_reject_time: float = 0.0 + _lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False) + + def inc(self, name: str, n: int = 1) -> None: + with self._lock: + setattr(self, name, getattr(self, name) + n) + + def record_reject(self) -> None: + with self._lock: + self.provision_rejected += 1 + self.last_reject_time = time.monotonic() + + def snapshot(self) -> dict: + with self._lock: + return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} + + +# --------------------------------------------------------------------------- +# Hub API helpers (urllib, no external deps) +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 30.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + body = json.loads(resp.read()) + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception: + return 0, {} + + +def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]: + try: + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + return data.get("modules", []) + except Exception: + return None + + +def _instance_put(base_uri: str, key: str, data: bytes, timeout_s: float = 10.0) -> int: + url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}" + req = urllib.request.Request(url, data=data, method="PUT", + headers={"Content-Type": "application/octet-stream", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + return resp.status + except urllib.error.HTTPError as e: + return e.code + except Exception: + return 0 + + +def _instance_get(base_uri: str, key: str, timeout_s: float = 10.0) -> int: + url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}" + req = urllib.request.Request(url, headers={"Accept": "application/octet-stream"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + resp.read() + return resp.status + except urllib.error.HTTPError as e: + return e.code + except Exception: + return 0 + + +# --------------------------------------------------------------------------- +# Worker tasks (run in thread pool - no sleeping) +# --------------------------------------------------------------------------- + +def _task_provision( + module_id: str, + port: int, + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + activity_queue: queue.PriorityQueue, + counters: Counters, + explicit_deprovision_rate: float, + stop_event: threading.Event, +) -> None: + status, body = _hub_post(port, f"/hub/modules/{module_id}/provision") + + generation = 0 + schedule_burst = False + with state_lock: + mod = state_map[module_id] + if mod.state != "provisioning": + # Shutdown changed state while provision was in-flight; leave it alone + return + if status in (200, 202): + instance_port = body.get("port") + base_uri = f"http://localhost:{instance_port}" if instance_port else None + if status == 200: + # Instance is immediately ready + mod.generation += 1 + generation = mod.generation + mod.state = "active" + mod.base_uri = base_uri + mod.written_keys = [] + mod.activity_rounds_left = random.randint(5, 20) + mod.explicit_deprovision = random.random() < explicit_deprovision_rate + schedule_burst = not stop_event.is_set() + else: + # 202: async provision - instance is still starting up. + # Stay in "provisioning"; the hub status poll activates it when ready. + # _shutdown_deprovision_all also covers "provisioning", so shutdown is safe. + mod.state = "provisioning" + mod.base_uri = base_uri + elif status == 409: + mod.state = "idle" + counters.record_reject() + return + else: + mod.state = "idle" + counters.inc("errors") + return + + counters.inc("provisions") + if schedule_burst: + activity_queue.put((time.monotonic() + random.uniform(0.1, 0.3), generation, module_id)) + + +def _task_deprovision( + module_id: str, + port: int, + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + counters: Counters, + retries: int = 5, +) -> None: + succeeded = False + for attempt in range(retries + 1): + status, _ = _hub_post(port, f"/hub/modules/{module_id}/deprovision") + if status in (200, 202, 404): + succeeded = True + break + if status == 409 and attempt < retries: + time.sleep(0.2) + continue + counters.inc("errors") + break + + with state_lock: + state_map[module_id].state = "idle" + state_map[module_id].base_uri = None + state_map[module_id].written_keys = [] + + if succeeded: + counters.inc("deprovisions_explicit") + + +def _task_activity_burst( + module_id: str, + generation: int, + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + activity_queue: queue.PriorityQueue, + counters: Counters, + pool: ThreadPoolExecutor, + port: int, +) -> None: + with state_lock: + mod = state_map[module_id] + if mod.state != "active" or mod.generation != generation: + return + base_uri = mod.base_uri + existing_keys = list(mod.written_keys) + + if not base_uri: + with state_lock: + state_map[module_id].state = "idle" + return + + # Write 3-8 new keys per burst + num_writes = random.randint(3, 8) + new_keys: list[str] = [] + write_errors = 0 + for _ in range(num_writes): + key = f"{random.getrandbits(160):040x}" + data = random.choice(_BLOBS) + status = _instance_put(base_uri, key, data) + if status in (200, 201, 204): + new_keys.append(key) + counters.inc("writes") + else: + write_errors += 1 + counters.inc("errors") + + if write_errors > 0 and not new_keys and not existing_keys: + # Instance unreachable - likely watchdog fired while we were scheduled + with state_lock: + mod = state_map[module_id] + if mod.generation == generation: + mod.state = "idle" + mod.base_uri = None + mod.written_keys = [] + return + + # Read back 1-3 random keys from all known keys + all_keys = existing_keys + new_keys + num_reads = min(random.randint(2, 5), len(all_keys)) + for key in random.sample(all_keys, num_reads): + status = _instance_get(base_uri, key) + if status == 200: + counters.inc("reads") + elif status in (404, 0): + # Key may not exist yet or instance gone; not fatal + pass + else: + counters.inc("errors") + + counters.inc("activity_rounds") + + next_generation = 0 + with state_lock: + mod = state_map[module_id] + if mod.state != "active" or mod.generation != generation: + return + mod.written_keys = (existing_keys + new_keys)[-200:] # cap list size + mod.activity_rounds_left -= 1 + + if mod.activity_rounds_left <= 0: + if mod.explicit_deprovision: + mod.state = "deprovisioning" + mod.base_uri = None + try: + pool.submit( + _task_deprovision, + module_id, port, state_map, state_lock, counters, + ) + except RuntimeError: + pass # pool shutting down; hub watchdog will clean up + else: + mod.state = "watchdog-pending" + mod.watchdog_pending_since = time.monotonic() + return + + next_generation = mod.generation # capture under lock before releasing + + # Schedule next burst soon for semi-continuous activity + next_time = time.monotonic() + random.uniform(0.2, 0.8) + activity_queue.put((next_time, next_generation, module_id)) + + +# --------------------------------------------------------------------------- +# Orchestrator +# --------------------------------------------------------------------------- + +def _orchestrate( + port: int, + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + activity_queue: queue.PriorityQueue, + counters: Counters, + pool: ThreadPoolExecutor, + stop_event: threading.Event, + target_active: int, + explicit_deprovision_rate: float, + idle_timeout: int, +) -> None: + last_status_poll = 0.0 + provision_backoff = False + + while not stop_event.is_set(): + now = time.monotonic() + + # Drain activity queue for due bursts + while True: + try: + next_time, generation, module_id = activity_queue.get_nowait() + except queue.Empty: + break + if next_time <= now: + try: + pool.submit( + _task_activity_burst, + module_id, generation, state_map, state_lock, + activity_queue, counters, pool, port, + ) + except RuntimeError: + break # pool shutting down + else: + activity_queue.put((next_time, generation, module_id)) + break + + # Activate/deactivate backoff based on recent 409 rejections + last_reject = counters.last_reject_time + if last_reject > 0 and now - last_reject < 5.0: + provision_backoff = True + elif provision_backoff and now - last_reject >= 5.0: + provision_backoff = False + + # Count states and submit provision tasks if needed + if not provision_backoff: + with state_lock: + idle_ids = [mid for mid, m in state_map.items() if m.state == "idle"] + working_count = sum( + 1 for m in state_map.values() + if m.state in ("active", "provisioning", "deprovisioning") + ) + watchdog_count = sum( + 1 for m in state_map.values() + if m.state == "watchdog-pending" + ) + + # Provision enough to keep working_count at target, but cap total + # in-flight (working + watchdog-pending) at 2x target to prevent + # runaway accumulation when all modules cycle to watchdog-pending. + inflight_cap = target_active * 2 + deficit = min( + target_active - working_count, + inflight_cap - working_count - watchdog_count, + ) + to_provision = idle_ids[:max(0, deficit)] + for mid in to_provision: + with state_lock: + if state_map[mid].state != "idle": + continue + state_map[mid].state = "provisioning" + try: + pool.submit( + _task_provision, + mid, port, state_map, state_lock, + activity_queue, counters, explicit_deprovision_rate, + stop_event, + ) + except RuntimeError: + with state_lock: + state_map[mid].state = "idle" + + # Poll hub status to detect async provision completions and watchdog-fired modules + if now - last_status_poll >= 5.0: + last_status_poll = now + modules_status = _hub_status(port) + if modules_status is not None: + hub_ids = {m["moduleId"]: m.get("state", "") for m in modules_status} + # Hub watchdog fires at ~idle_timeout from last activity. However, if the + # watchdog's previous visit predates the last burst, it sees a changed + # activity sum and resets LastActivityTime, delaying deprovision by ~173s. + # Explicitly deprovision after idle_timeout + 15s to avoid waiting for that. + timeout_threshold = idle_timeout + 15 + to_deprovision_explicitly: list[str] = [] + with state_lock: + for mid, mod in state_map.items(): + if mod.state == "provisioning" and mod.base_uri is not None: + # Waiting for async (202) provision to complete + hub_state = hub_ids.get(mid, "") + if hub_state == "provisioned": + mod.generation += 1 + mod.state = "active" + mod.written_keys = [] + mod.activity_rounds_left = random.randint(5, 20) + mod.explicit_deprovision = random.random() < explicit_deprovision_rate + activity_queue.put( + (time.monotonic() + random.uniform(0.1, 0.3), mod.generation, mid) + ) + elif mid not in hub_ids: + # Provision failed or was rolled back + mod.state = "idle" + mod.base_uri = None + elif mod.state == "watchdog-pending": + hub_state = hub_ids.get(mid, "") + gone = mid not in hub_ids + timed_out = (now - mod.watchdog_pending_since) > timeout_threshold + if gone or hub_state in ("unprovisioned", "deprovisioning"): + mod.state = "idle" + mod.base_uri = None + mod.written_keys = [] + counters.inc("deprovisions_watchdog") + elif timed_out: + mod.state = "deprovisioning" + to_deprovision_explicitly.append(mid) + for mid in to_deprovision_explicitly: + try: + pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters) + except RuntimeError: + with state_lock: + if state_map[mid].state == "deprovisioning": + state_map[mid].state = "idle" + + stop_event.wait(timeout=0.05) + + +# --------------------------------------------------------------------------- +# Stats display +# --------------------------------------------------------------------------- + +def _stats_thread( + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + counters: Counters, + stop_event: threading.Event, + interval_s: float = 5.0, +) -> None: + is_tty = sys.stdout.isatty() + prev_lines = 0 + t0 = time.monotonic() + prev_snap: Optional[dict] = None + prev_t = t0 + + while not stop_event.is_set(): + stop_event.wait(timeout=interval_s) + now = time.monotonic() + elapsed = now - t0 + dt = now - prev_t + + snap = counters.snapshot() + with state_lock: + states: dict[str, int] = {} + for m in state_map.values(): + states[m.state] = states.get(m.state, 0) + 1 + + def rate(key: str) -> float: + if prev_snap is None or dt <= 0: + return 0.0 + return (snap[key] - prev_snap[key]) / dt * 60.0 + + lines = [ + f"[{time.strftime('%H:%M:%S')}] elapsed={elapsed:.0f}s", + f" modules: idle={states.get('idle', 0)} " + f"provisioning={states.get('provisioning', 0)} " + f"active={states.get('active', 0)} " + f"watchdog-pending={states.get('watchdog-pending', 0)} " + f"deprovisioning={states.get('deprovisioning', 0)}", + f" totals: provisions={snap['provisions']} " + f"deprov-explicit={snap['deprovisions_explicit']} " + f"deprov-watchdog={snap['deprovisions_watchdog']} " + f"rejected={snap['provision_rejected']} " + f"errors={snap['errors']}", + f" data: writes={snap['writes']} reads={snap['reads']} " + f"rounds={snap['activity_rounds']}", + f" rates/min: provisions={rate('provisions'):.1f} " + f"deprov={rate('deprovisions_explicit') + rate('deprovisions_watchdog'):.1f} " + f"writes={rate('writes'):.1f} reads={rate('reads'):.1f}", + ] + + if is_tty and prev_lines > 0: + # Move cursor up to overwrite previous block + sys.stdout.write(f"\033[{prev_lines}A\033[J") + + sys.stdout.write("\n".join(lines) + "\n") + sys.stdout.flush() + prev_lines = len(lines) + prev_snap = snap + prev_t = now + + +# --------------------------------------------------------------------------- +# Shutdown +# --------------------------------------------------------------------------- + +def _wait_for_hub_idle(port: int, hub_proc: subprocess.Popen, timeout_s: float = 120.0) -> None: + """Wait until the hub reports no transitioning instances (dehydration done).""" + _STABLE = {"provisioned", "hibernated", "crashed", "unprovisioned"} + print(f"[shutdown] waiting for hub dehydration (up to {timeout_s:.0f}s)...") + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + if hub_proc.poll() is not None: + print("[shutdown] hub process has exited") + return + modules = _hub_status(port, timeout_s=5.0) + if modules is None: + # Hub not responding. If it has exited, we're done. If it's still alive + # it may be saturated with S3 uploads - keep waiting rather than assuming done. + if hub_proc.poll() is not None: + print("[shutdown] hub process has exited") + return + time.sleep(1.0) + continue + transitioning = [m for m in modules if m.get("state") not in _STABLE] + remaining = len(modules) + if not transitioning: + if remaining: + print(f"[shutdown] hub idle ({remaining} instances in stable state)") + else: + print("[shutdown] hub idle (no instances remaining)") + return + print(f"[shutdown] {len(transitioning)} instance(s) still dehydrating...", end="\r") + time.sleep(1.0) + print(f"\n[shutdown] WARNING: hub did not become idle within {timeout_s:.0f}s") + + +def _shutdown_deprovision_all( + port: int, + state_map: dict[str, ModuleState], + state_lock: threading.Lock, + counters: Counters, + workers: int, + timeout_s: float = 60.0, +) -> None: + with state_lock: + to_deprovision = [ + mid for mid, m in state_map.items() + if m.state in ("active", "watchdog-pending", "provisioning") + ] + for mid in to_deprovision: + state_map[mid].state = "deprovisioning" + + if not to_deprovision: + return + + print(f"\n[shutdown] deprovisioning {len(to_deprovision)} active modules...") + pool = ThreadPoolExecutor(max_workers=min(workers, len(to_deprovision))) + futures: list[Future] = [ + pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters) + for mid in to_deprovision + ] + pool.shutdown(wait=False) + + done_set, not_done_set = futures_wait(futures, timeout=timeout_s) + if not_done_set: + print(f"[shutdown] WARNING: {len(not_done_set)} deprovision tasks did not complete within {timeout_s}s") + else: + print(f"[shutdown] all modules deprovisioned") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest", + help="Hub --data-dir (default: E:/Dev/hub-loadtest)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--workers", type=int, default=50, + help="Thread pool size for HTTP calls (default: 50)") + parser.add_argument("--active-modules", type=int, default=100, + help="Target number of concurrently provisioned modules (default: 100)") + parser.add_argument("--module-count", type=int, default=1000, + help="Total predefined module name pool size (default: 1000)") + parser.add_argument("--idle-timeout", type=int, default=90, + help="Hub watchdog inactivity timeout in seconds (default: 90)") + parser.add_argument("--explicit-deprovision-rate", type=float, default=0.3, + help="Fraction of modules explicitly deprovisioned (default: 0.3)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + parser.add_argument("--s3", action="store_true", + help="Use local MinIO for hub de/hydration instead of filesystem") + parser.add_argument("--minio-port", type=int, default=9000, + help="MinIO S3 API port (default: 9000)") + parser.add_argument("--minio-console-port", type=int, default=9001, + help="MinIO web console port (default: 9001)") + parser.add_argument("--s3-bucket", default="zen-load-test", + help="S3 bucket name for MinIO hydration (default: zen-load-test)") + parser.add_argument("--no-browser", action="store_true", + help="Skip opening browser tabs") + args = parser.parse_args() + + if args.active_modules > args.module_count: + sys.exit( + f"--active-modules ({args.active_modules}) must not exceed " + f"--module-count ({args.module_count})" + ) + + _init_blobs() + + data_dir = Path(args.data_dir) + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + + module_names = [f"load-test-module-{i:04d}" for i in range(args.module_count)] + state_map: dict[str, ModuleState] = {mid: ModuleState() for mid in module_names} + state_lock = threading.Lock() + activity_queue: queue.PriorityQueue = queue.PriorityQueue() + counters = Counters() + stop_event = threading.Event() + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + + hub_extra_args: list[str] = [] + hub_extra_env: Optional[dict[str, str]] = None + + try: + if args.s3: + minio_exe = _find_minio(zenserver_exe) + minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, args.s3_bucket) + if not args.no_browser: + webbrowser.open(f"http://localhost:{args.minio_console_port}") + config_json = { + "type": "s3", + "settings": { + "uri": f"s3://{args.s3_bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + } + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text(json.dumps(config_json), encoding="ascii") + hub_extra_args = [f"--hub-hydration-target-config={config_path}"] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, + hub_log, args.idle_timeout, + hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, args.port) + if not args.no_browser: + webbrowser.open(f"http://localhost:{args.port}/dashboard") + + print(f"[load-test] starting: pool={args.module_count} modules, " + f"target={args.active_modules} active, " + f"idle-timeout={args.idle_timeout}s, " + f"explicit-deprovision={args.explicit_deprovision_rate:.0%}") + print("[load-test] press Ctrl-C to stop") + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + stats_t = threading.Thread( + target=_stats_thread, + args=(state_map, state_lock, counters, stop_event), + daemon=True, + ) + stats_t.start() + + orch_t = threading.Thread( + target=_orchestrate, + args=( + args.port, state_map, state_lock, + activity_queue, counters, pool, + stop_event, args.active_modules, + args.explicit_deprovision_rate, args.idle_timeout, + ), + daemon=True, + ) + orch_t.start() + + try: + while not stop_event.is_set(): + time.sleep(0.5) + if hub_proc.poll() is not None: + print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") + break + except KeyboardInterrupt: + print("\n[load-test] Ctrl-C received, shutting down...") + + stop_event.set() + orch_t.join(timeout=3.0) + stats_t.join(timeout=3.0) + + _shutdown_deprovision_all( + args.port, state_map, state_lock, counters, + args.workers, timeout_s=60.0, + ) + _wait_for_hub_idle(args.port, hub_proc, timeout_s=max(120.0, args.active_modules * 1.0)) + + _stop_process(hub_proc, "hub", timeout_s=120.0) + if hub_log_handle is not None: + hub_log_handle.close() + hub_log_handle = None + if minio_proc is not None: + _stop_process(minio_proc, "minio") + minio_proc = None + + finally: + # Safety net: only reached if an exception occurred before normal shutdown + if hub_proc is not None and hub_proc.poll() is None: + _stop_process(hub_proc, "hub") + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None: + _stop_process(minio_proc, "minio") + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/hub_provision_perf_test.py b/scripts/test_scripts/hub_provision_perf_test.py new file mode 100644 index 000000000..76fb8508d --- /dev/null +++ b/scripts/test_scripts/hub_provision_perf_test.py @@ -0,0 +1,502 @@ +#!/usr/bin/env python3 +"""Hub provisioning performance test. + +Floods a zenserver hub with concurrent provision requests until the hub rejects +further provisioning (HTTP 409), then deprovisions all instances and exits. +A WPR ETW trace runs for the entire test and produces a .etl file for analysis +in WPA or PerfView (CPU sampling + context-switch events for lock contention). + +Optional --s3: starts a local MinIO server and configures the hub to use it +as the de/hydration backend instead of the default local filesystem. + +Requirements: + pip install boto3 (only needed with --s3) +WPR (wpr.exe) must be available (ships with Windows). Running as Administrator +is required for WPR to collect ETW traces. +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +import uuid +import webbrowser +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"} +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}") + for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True): + return match + + sys.exit( + "zenserver executable not found in build/. " + "Run: xmake config -y -m release -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit( + f"minio executable not found at {p}. " + "Build with: xmake config -y -m release -a x64 && xmake -y" + ) + return p + + +# --------------------------------------------------------------------------- +# WPR (Windows Performance Recorder) +# --------------------------------------------------------------------------- + +def _is_elevated() -> bool: + if sys.platform != "win32": + return False + try: + import ctypes + return bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + return False + + +def _wpr_start(trace_file: Path) -> bool: + if sys.platform != "win32": + print("[wpr] WPR is Windows-only; skipping ETW trace.") + return False + if not _is_elevated(): + print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.") + return False + result = subprocess.run( + ["wpr.exe", "-start", "CPU", "-filemode"], + capture_output=True, text=True + ) + if result.returncode != 0: + print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}") + return False + print(f"[wpr] ETW trace started (CPU profile, file mode)") + return True + + +def _wpr_stop(trace_file: Path) -> None: + if sys.platform != "win32": + return + result = subprocess.run( + ["wpr.exe", "-stop", str(trace_file)], + capture_output=True, text=True + ) + if result.returncode != 0: + print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}") + else: + print(f"[wpr] ETW trace saved: {trace_file}") + + +# --------------------------------------------------------------------------- +# MinIO +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + minio_data = data_dir / "minio" + minio_data.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + proc = subprocess.Popen( + [str(minio_exe), "server", str(minio_data), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 + import botocore.config + import botocore.exceptions + except ImportError: + sys.exit( + "[minio] boto3 is required for --s3.\n" + "Install it with: pip install boto3" + ) + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + extra_args: list[str], + extra_env: Optional[dict[str, str]], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + "--hub-instance-limit=100", + ] + extra_args + + env = os.environ.copy() + if extra_env: + env.update(extra_env) + + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub HTTP helpers +# --------------------------------------------------------------------------- + +def _hub_url(port: int, path: str) -> str: + return f"http://localhost:{port}{path}" + + +def _provision_one(port: int) -> tuple[str, int]: + module_id = str(uuid.uuid4()) + url = _hub_url(port, f"/hub/modules/{module_id}/provision") + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return module_id, resp.status + except urllib.error.HTTPError as e: + return module_id, e.code + except Exception: + return module_id, 0 + + +def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int: + url = _hub_url(port, f"/hub/modules/{module_id}/deprovision") + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + for attempt in range(retries + 1): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status + except urllib.error.HTTPError as e: + if e.code == 409 and attempt < retries: + time.sleep(0.2) + continue + return e.code + except Exception: + return 0 + + +def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]: + try: + req = urllib.request.Request(_hub_url(port, "/hub/status"), + headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + return data.get("modules", []) + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Test phases +# --------------------------------------------------------------------------- + +def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]: + stopped = False + provisioned_ids: list[str] = [] + time_to_rejection: Optional[float] = None + t0 = time.monotonic() + + with ThreadPoolExecutor(max_workers=workers) as pool: + pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)} + + while pending: + done, pending = wait(pending, return_when=FIRST_COMPLETED) + + for f in done: + module_id, status = f.result() + if status in (200, 202): + provisioned_ids.append(module_id) + elif status == 409: + if not stopped: + time_to_rejection = time.monotonic() - t0 + stopped = True + print(f"\n[flood] hub rejected provisioning after " + f"{len(provisioned_ids)} instances " + f"({time_to_rejection:.2f}s)") + elif status == 0: + if not stopped: + stopped = True + print(f"\n[flood] hub unreachable - stopping flood " + f"({len(provisioned_ids)} instances so far)") + else: + print(f"[flood] unexpected status {status} for {module_id}") + + if not stopped: + pending.add(pool.submit(_provision_one, port)) + + wall_clock = time.monotonic() - t0 + return provisioned_ids, time_to_rejection or wall_clock, wall_clock + + +def _wait_stable(port: int, timeout_s: float = 20.0) -> None: + print("[hub] waiting for all instances to reach stable state...") + deadline = time.monotonic() + timeout_s + status_timeout_s = 5.0 + while time.monotonic() < deadline: + modules = _hub_status(port, timeout_s=status_timeout_s) + if modules is None: + time.sleep(0.5) + continue + transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES] + elapsed = time.monotonic() - (deadline - timeout_s) + print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r") + if not transitioning: + print(f"\n[hub] all {len(modules)} instances in stable state") + return + time.sleep(0.5) + print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s") + + +def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None: + raw_status = _hub_status(port, timeout_s=60.0) + if raw_status is None: + print("[deprovision] WARNING: could not reach hub to enumerate extra modules - " + "only deprovisioning tracked instances") + extra_ids = {m["moduleId"] for m in (raw_status or []) + if m.get("state") not in ("unprovisioned",)} - set(module_ids) + all_ids = list(module_ids) + list(extra_ids) + + print(f"[deprovision] deprovisioning {len(all_ids)} instances...") + t0 = time.monotonic() + errors = 0 + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids} + for f in as_completed(futures): + status = f.result() + if status not in (200, 202, 409): + errors += 1 + print(f"[deprovision] module {futures[f]}: unexpected status {status}") + elapsed = time.monotonic() - t0 + print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-perftest", + help="Hub --data-dir (default: E:/Dev/hub-perftest)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--workers", type=int, default=20, + help="Concurrent provisioning threads (default: 20)") + parser.add_argument("--trace-file", default="hub_perf_trace.etl", + help="WPR output .etl path (default: hub_perf_trace.etl)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + parser.add_argument("--s3", action="store_true", + help="Use local MinIO for hub de/hydration instead of filesystem") + parser.add_argument("--minio-port", type=int, default=9000, + help="MinIO S3 API port when --s3 is used (default: 9000)") + parser.add_argument("--minio-console-port", type=int, default=9001, + help="MinIO web console port when --s3 is used (default: 9001)") + parser.add_argument("--s3-bucket", default="zen-hydration-test", + help="S3 bucket name for MinIO hydration (default: zen-hydration-test)") + args = parser.parse_args() + + data_dir = Path(args.data_dir) + trace_file = Path(args.trace_file).resolve() + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + wpr_started = False + + hub_extra_args: list[str] = [] + hub_extra_env: Optional[dict[str, str]] = None + + try: + if args.s3: + minio_exe = _find_minio(zenserver_exe) + minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, args.s3_bucket) + webbrowser.open(f"http://localhost:{args.minio_console_port}") + config_json = { + "type": "s3", + "settings": { + "uri": f"s3://{args.s3_bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + } + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text(json.dumps(config_json), encoding="ascii") + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + ] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + wpr_started = _wpr_start(trace_file) + + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, + hub_log, hub_extra_args, hub_extra_env + ) + _wait_for_hub(hub_proc, args.port) + webbrowser.open(f"http://localhost:{args.port}/dashboard") + + provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers) + + print(f"\n[results] provisioned : {len(provisioned_ids)}") + print(f"[results] time to 409 : {time_to_rejection:.3f}s") + print(f"[results] wall clock : {wall_clock:.3f}s") + if time_to_rejection > 0: + print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s") + + _wait_stable(args.port, timeout_s=120.0) + _deprovision_all(args.port, provisioned_ids, args.workers) + dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5) + _wait_stable(args.port, timeout_s=dehydration_timeout_s) + + finally: + if wpr_started: + _wpr_stop(trace_file) + if hub_proc is not None: + _stop_process(hub_proc, "hub") + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None: + _stop_process(minio_proc, "minio") + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/kill-test-processes.ps1 b/scripts/test_scripts/kill-test-processes.ps1 new file mode 100644 index 000000000..0668a5319 --- /dev/null +++ b/scripts/test_scripts/kill-test-processes.ps1 @@ -0,0 +1,19 @@ +# Kill leftover CI test processes (zenserver, minio, nomad, consul) whose +# executable lives under the given build directory. Windows counterpart of +# kill-test-processes.sh; see that file for rationale. +# +# Usage: kill-test-processes.ps1 -Label <label> -BuildDir <path> + +param( + [Parameter(Mandatory=$true)][string]$Label, + [Parameter(Mandatory=$true)][string]$BuildDir +) + +foreach ($name in @('zenserver', 'minio', 'nomad', 'consul')) { + $procs = Get-Process -Name $name -ErrorAction SilentlyContinue | + Where-Object { $_.Path -and $_.Path.StartsWith($BuildDir, [System.StringComparison]::OrdinalIgnoreCase) } + foreach ($p in $procs) { + Write-Host "Killing $Label $name (PID $($p.Id)): $($p.Path)" + $p | Stop-Process -Force -ErrorAction SilentlyContinue + } +} diff --git a/scripts/test_scripts/kill-test-processes.sh b/scripts/test_scripts/kill-test-processes.sh new file mode 100755 index 000000000..e5fb2fcaa --- /dev/null +++ b/scripts/test_scripts/kill-test-processes.sh @@ -0,0 +1,45 @@ +#!/usr/bin/env bash +# +# Kill leftover CI test processes (zenserver, minio, nomad, consul) whose +# executable lives under the given build directory. +# +# Used by CI workflows to clean up any test processes from a previous run +# before starting a new one, and to reap any that are still running after +# the run finishes. +# +# Usage: kill-test-processes.sh <label> <build_dir> +# label: word printed in log messages, e.g. "stale" or "leftover" +# build_dir: absolute path; only processes whose executable is under this +# directory are killed. +# +# We resolve each PID's actual executable path rather than relying on argv +# (which `pgrep -a` reports). argv starts with the short process name (e.g. +# "consul") regardless of how the process was launched, so a pure argv prefix +# match never fires and leftovers silently survived between runs. + +set -u + +label=${1:?label required} +build_dir=${2:?build_dir required} + +get_exe_path() { + local pid=$1 + if [[ -r "/proc/$pid/exe" ]]; then + # Linux + readlink -f "/proc/$pid/exe" 2>/dev/null || true + else + # macOS fallback: the "txt" file descriptor points at the process binary + lsof -p "$pid" -Fn -a -d txt 2>/dev/null | awk '/^n/{print substr($0,2); exit}' || true + fi +} + +for name in zenserver minio nomad consul; do + while read -r pid; do + [[ -z "$pid" ]] && continue + exe=$(get_exe_path "$pid") + if [[ "$exe" == "$build_dir"* ]]; then + echo "Killing $label $name (PID $pid): $exe" + kill -9 "$pid" 2>/dev/null || true + fi + done < <(pgrep -x "$name" 2>/dev/null || true) +done diff --git a/scripts/test_scripts/oplog-import-export-test.py b/scripts/test_scripts/oplog-import-export-test.py index f913a7351..93f732135 100644 --- a/scripts/test_scripts/oplog-import-export-test.py +++ b/scripts/test_scripts/oplog-import-export-test.py @@ -82,7 +82,7 @@ SERVER_ARGS: tuple[str, ...] = ( def zen_cmd(*args: str | Path, extra_zen_args: list[str] | None = None) -> list[str | Path]: """Build a zen CLI command list, inserting extra_zen_args before subcommands.""" - return [ZEN_EXE, *(extra_zen_args or []), *args] + return [ZEN_EXE, "--enable-execution-history=false", *(extra_zen_args or []), *args] def run(cmd: list[str | Path]) -> None: diff --git a/scripts/test_scripts/oplog-update-build-ids.py b/scripts/test_scripts/oplog-update-build-ids.py index 67e128c8e..2675d07c0 100644 --- a/scripts/test_scripts/oplog-update-build-ids.py +++ b/scripts/test_scripts/oplog-update-build-ids.py @@ -52,7 +52,7 @@ def list_builds_for_bucket(zen: str, host: str, namespace: str, bucket: str) -> result_path = Path(tmp.name) cmd = [ - zen, "builds", "list", + zen, "--enable-execution-history=false", "builds", "list", "--namespace", namespace, "--bucket", bucket, "--host", host, |