#!/usr/bin/env python3 """Hub provisioning performance test. Floods a zenserver hub with concurrent provision requests until the hub rejects further provisioning (HTTP 409), then deprovisions all instances and exits. A WPR ETW trace runs for the entire test and produces a .etl file for analysis in WPA or PerfView (CPU sampling + context-switch events for lock contention). Optional --s3: starts a local MinIO server and configures the hub to use it as the de/hydration backend instead of the default local filesystem. Requirements: pip install boto3 (only needed with --s3) WPR (wpr.exe) must be available (ships with Windows). Running as Administrator is required for WPR to collect ETW traces. """ from __future__ import annotations import argparse import json import os import subprocess import sys import time import urllib.error import urllib.request import uuid import webbrowser from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait from pathlib import Path from typing import Optional _EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" _STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"} _MINIO_USER = "minioadmin" _MINIO_PASS = "minioadmin" # --------------------------------------------------------------------------- # Executable discovery # --------------------------------------------------------------------------- def _find_zenserver(override: Optional[str]) -> Path: if override: p = Path(override) / f"zenserver{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent repo_root = script_dir.parent.parent candidates = [ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}", repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", ] for c in candidates: if c.exists(): return c matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}") for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True): return match sys.exit( "zenserver executable not found in build/. " "Run: xmake config -y -m release -a x64 && xmake -y\n" "Or pass --zenserver-dir ." ) def _find_minio(zenserver_path: Path) -> Path: p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" if not p.exists(): sys.exit( f"minio executable not found at {p}. " "Build with: xmake config -y -m release -a x64 && xmake -y" ) return p # --------------------------------------------------------------------------- # WPR (Windows Performance Recorder) # --------------------------------------------------------------------------- def _is_elevated() -> bool: if sys.platform != "win32": return False try: import ctypes return bool(ctypes.windll.shell32.IsUserAnAdmin()) except Exception: return False def _wpr_start(trace_file: Path) -> bool: if sys.platform != "win32": print("[wpr] WPR is Windows-only; skipping ETW trace.") return False if not _is_elevated(): print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.") return False result = subprocess.run( ["wpr.exe", "-start", "CPU", "-filemode"], capture_output=True, text=True ) if result.returncode != 0: print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}") return False print(f"[wpr] ETW trace started (CPU profile, file mode)") return True def _wpr_stop(trace_file: Path) -> None: if sys.platform != "win32": return result = subprocess.run( ["wpr.exe", "-stop", str(trace_file)], capture_output=True, text=True ) if result.returncode != 0: print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}") else: print(f"[wpr] ETW trace saved: {trace_file}") # --------------------------------------------------------------------------- # MinIO # --------------------------------------------------------------------------- def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: minio_data = data_dir / "minio" minio_data.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["MINIO_ROOT_USER"] = _MINIO_USER env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS proc = subprocess.Popen( [str(minio_exe), "server", str(minio_data), "--address", f":{port}", "--console-address", f":{console_port}", "--quiet"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}") return proc def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): print("[minio] ready") return except Exception: time.sleep(0.1) sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") def _create_minio_bucket(port: int, bucket: str) -> None: try: import boto3 import botocore.config import botocore.exceptions except ImportError: sys.exit( "[minio] boto3 is required for --s3.\n" "Install it with: pip install boto3" ) s3 = boto3.client( "s3", endpoint_url=f"http://localhost:{port}", aws_access_key_id=_MINIO_USER, aws_secret_access_key=_MINIO_PASS, region_name="us-east-1", config=botocore.config.Config(signature_version="s3v4"), ) try: s3.create_bucket(Bucket=bucket) print(f"[minio] created bucket '{bucket}'") except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): print(f"[minio] bucket '{bucket}' already exists") else: raise # --------------------------------------------------------------------------- # Hub lifecycle # --------------------------------------------------------------------------- def _start_hub( zenserver_exe: Path, data_dir: Path, port: int, log_file: Path, extra_args: list[str], extra_env: Optional[dict[str, str]], ) -> tuple[subprocess.Popen, object]: data_dir.mkdir(parents=True, exist_ok=True) cmd = [ str(zenserver_exe), "hub", f"--data-dir={data_dir}", f"--port={port}", "--hub-instance-http-threads=8", "--hub-instance-corelimit=4", "--hub-provision-disk-limit-percent=99", "--hub-provision-memory-limit-percent=80", "--hub-instance-limit=100", ] + extra_args env = os.environ.copy() if extra_env: env.update(extra_env) log_handle = log_file.open("wb") try: proc = subprocess.Popen( cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT ) except Exception: log_handle.close() raise print(f"[hub] started (pid {proc.pid}), log: {log_file}") return proc, log_handle def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " f"is another zenserver already running on port {port}?") try: with urllib.request.urlopen(req, timeout=2): print("[hub] ready") return except Exception: time.sleep(0.2) sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None: if proc.poll() is not None: return proc.terminate() try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[{name}] did not exit after {timeout_s}s, killing") proc.kill() proc.wait() # --------------------------------------------------------------------------- # Hub HTTP helpers # --------------------------------------------------------------------------- def _hub_url(port: int, path: str) -> str: return f"http://localhost:{port}{path}" def _provision_one(port: int) -> tuple[str, int]: module_id = str(uuid.uuid4()) url = _hub_url(port, f"/hub/modules/{module_id}/provision") req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=30) as resp: return module_id, resp.status except urllib.error.HTTPError as e: return module_id, e.code except Exception: return module_id, 0 def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int: url = _hub_url(port, f"/hub/modules/{module_id}/deprovision") req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) for attempt in range(retries + 1): try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status except urllib.error.HTTPError as e: if e.code == 409 and attempt < retries: time.sleep(0.2) continue return e.code except Exception: return 0 def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]: try: req = urllib.request.Request(_hub_url(port, "/hub/status"), headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = json.loads(resp.read()) return data.get("modules", []) except Exception: return None # --------------------------------------------------------------------------- # Test phases # --------------------------------------------------------------------------- def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]: stopped = False provisioned_ids: list[str] = [] time_to_rejection: Optional[float] = None t0 = time.monotonic() with ThreadPoolExecutor(max_workers=workers) as pool: pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)} while pending: done, pending = wait(pending, return_when=FIRST_COMPLETED) for f in done: module_id, status = f.result() if status in (200, 202): provisioned_ids.append(module_id) elif status == 409: if not stopped: time_to_rejection = time.monotonic() - t0 stopped = True print(f"\n[flood] hub rejected provisioning after " f"{len(provisioned_ids)} instances " f"({time_to_rejection:.2f}s)") elif status == 0: if not stopped: stopped = True print(f"\n[flood] hub unreachable - stopping flood " f"({len(provisioned_ids)} instances so far)") else: print(f"[flood] unexpected status {status} for {module_id}") if not stopped: pending.add(pool.submit(_provision_one, port)) wall_clock = time.monotonic() - t0 return provisioned_ids, time_to_rejection or wall_clock, wall_clock def _wait_stable(port: int, timeout_s: float = 20.0) -> None: print("[hub] waiting for all instances to reach stable state...") deadline = time.monotonic() + timeout_s status_timeout_s = 5.0 while time.monotonic() < deadline: modules = _hub_status(port, timeout_s=status_timeout_s) if modules is None: time.sleep(0.5) continue transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES] elapsed = time.monotonic() - (deadline - timeout_s) print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r") if not transitioning: print(f"\n[hub] all {len(modules)} instances in stable state") return time.sleep(0.5) print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s") def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None: raw_status = _hub_status(port, timeout_s=60.0) if raw_status is None: print("[deprovision] WARNING: could not reach hub to enumerate extra modules - " "only deprovisioning tracked instances") extra_ids = {m["moduleId"] for m in (raw_status or []) if m.get("state") not in ("unprovisioned",)} - set(module_ids) all_ids = list(module_ids) + list(extra_ids) print(f"[deprovision] deprovisioning {len(all_ids)} instances...") t0 = time.monotonic() errors = 0 with ThreadPoolExecutor(max_workers=workers) as pool: futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids} for f in as_completed(futures): status = f.result() if status not in (200, 202, 409): errors += 1 print(f"[deprovision] module {futures[f]}: unexpected status {status}") elapsed = time.monotonic() - t0 print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--data-dir", default="E:/Dev/hub-perftest", help="Hub --data-dir (default: E:/Dev/hub-perftest)") parser.add_argument("--port", type=int, default=8558, help="Hub HTTP port (default: 8558)") parser.add_argument("--workers", type=int, default=20, help="Concurrent provisioning threads (default: 20)") parser.add_argument("--trace-file", default="hub_perf_trace.etl", help="WPR output .etl path (default: hub_perf_trace.etl)") parser.add_argument("--zenserver-dir", help="Directory containing zenserver executable (auto-detected by default)") parser.add_argument("--s3", action="store_true", help="Use local MinIO for hub de/hydration instead of filesystem") parser.add_argument("--minio-port", type=int, default=9000, help="MinIO S3 API port when --s3 is used (default: 9000)") parser.add_argument("--minio-console-port", type=int, default=9001, help="MinIO web console port when --s3 is used (default: 9001)") parser.add_argument("--s3-bucket", default="zen-hydration-test", help="S3 bucket name for MinIO hydration (default: zen-hydration-test)") args = parser.parse_args() data_dir = Path(args.data_dir) trace_file = Path(args.trace_file).resolve() hub_log = data_dir / "hub.log" zenserver_exe = _find_zenserver(args.zenserver_dir) print(f"[setup] zenserver: {zenserver_exe}") minio_proc: Optional[subprocess.Popen] = None hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None wpr_started = False hub_extra_args: list[str] = [] hub_extra_env: Optional[dict[str, str]] = None try: if args.s3: minio_exe = _find_minio(zenserver_exe) minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) _create_minio_bucket(args.minio_port, args.s3_bucket) webbrowser.open(f"http://localhost:{args.minio_console_port}") config_json = { "type": "s3", "settings": { "uri": f"s3://{args.s3_bucket}", "endpoint": f"http://localhost:{args.minio_port}", "path-style": True, "region": "us-east-1", }, } data_dir.mkdir(parents=True, exist_ok=True) config_path = data_dir / "hydration_config.json" config_path.write_text(json.dumps(config_json), encoding="ascii") hub_extra_args = [ f"--hub-hydration-target-config={config_path}", ] hub_extra_env = { "AWS_ACCESS_KEY_ID": _MINIO_USER, "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, } wpr_started = _wpr_start(trace_file) hub_proc, hub_log_handle = _start_hub( zenserver_exe, data_dir, args.port, hub_log, hub_extra_args, hub_extra_env ) _wait_for_hub(hub_proc, args.port) webbrowser.open(f"http://localhost:{args.port}/dashboard") provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers) print(f"\n[results] provisioned : {len(provisioned_ids)}") print(f"[results] time to 409 : {time_to_rejection:.3f}s") print(f"[results] wall clock : {wall_clock:.3f}s") if time_to_rejection > 0: print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s") _wait_stable(args.port, timeout_s=120.0) _deprovision_all(args.port, provisioned_ids, args.workers) dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5) _wait_stable(args.port, timeout_s=dehydration_timeout_s) finally: if wpr_started: _wpr_stop(trace_file) if hub_proc is not None: _stop_process(hub_proc, "hub") if hub_log_handle is not None: hub_log_handle.close() if minio_proc is not None: _stop_process(minio_proc, "minio") if __name__ == "__main__": main()