aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts/hub_provision_perf_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/test_scripts/hub_provision_perf_test.py')
-rw-r--r--scripts/test_scripts/hub_provision_perf_test.py501
1 files changed, 501 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub_provision_perf_test.py b/scripts/test_scripts/hub_provision_perf_test.py
new file mode 100644
index 000000000..5b264ad62
--- /dev/null
+++ b/scripts/test_scripts/hub_provision_perf_test.py
@@ -0,0 +1,501 @@
+#!/usr/bin/env python3
+"""Hub provisioning performance test.
+
+Floods a zenserver hub with concurrent provision requests until the hub rejects
+further provisioning (HTTP 409), then deprovisions all instances and exits.
+A WPR ETW trace runs for the entire test and produces a .etl file for analysis
+in WPA or PerfView (CPU sampling + context-switch events for lock contention).
+
+Optional --s3: starts a local MinIO server and configures the hub to use it
+as the de/hydration backend instead of the default local filesystem.
+
+Requirements:
+ pip install boto3 (only needed with --s3)
+WPR (wpr.exe) must be available (ships with Windows). Running as Administrator
+is required for WPR to collect ETW traces.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+import uuid
+import webbrowser
+from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"}
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}")
+ for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True):
+ return match
+
+ sys.exit(
+ "zenserver executable not found in build/. "
+ "Run: xmake config -y -m release -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(
+ f"minio executable not found at {p}. "
+ "Build with: xmake config -y -m release -a x64 && xmake -y"
+ )
+ return p
+
+
+# ---------------------------------------------------------------------------
+# WPR (Windows Performance Recorder)
+# ---------------------------------------------------------------------------
+
+def _is_elevated() -> bool:
+ if sys.platform != "win32":
+ return False
+ try:
+ import ctypes
+ return bool(ctypes.windll.shell32.IsUserAnAdmin())
+ except Exception:
+ return False
+
+
+def _wpr_start(trace_file: Path) -> bool:
+ if sys.platform != "win32":
+ print("[wpr] WPR is Windows-only; skipping ETW trace.")
+ return False
+ if not _is_elevated():
+ print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.")
+ return False
+ result = subprocess.run(
+ ["wpr.exe", "-start", "CPU", "-filemode"],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}")
+ return False
+ print(f"[wpr] ETW trace started (CPU profile, file mode)")
+ return True
+
+
+def _wpr_stop(trace_file: Path) -> None:
+ if sys.platform != "win32":
+ return
+ result = subprocess.run(
+ ["wpr.exe", "-stop", str(trace_file)],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}")
+ else:
+ print(f"[wpr] ETW trace saved: {trace_file}")
+
+
+# ---------------------------------------------------------------------------
+# MinIO
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ minio_data = data_dir / "minio"
+ minio_data.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(minio_data),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3
+ import botocore.config
+ import botocore.exceptions
+ except ImportError:
+ sys.exit(
+ "[minio] boto3 is required for --s3.\n"
+ "Install it with: pip install boto3"
+ )
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ extra_args: list[str],
+ extra_env: Optional[dict[str, str]],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ "--hub-instance-limit=100",
+ ] + extra_args
+
+ env = os.environ.copy()
+ if extra_env:
+ env.update(extra_env)
+
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub HTTP helpers
+# ---------------------------------------------------------------------------
+
+def _hub_url(port: int, path: str) -> str:
+ return f"http://localhost:{port}{path}"
+
+
+def _provision_one(port: int) -> tuple[str, int]:
+ module_id = str(uuid.uuid4())
+ url = _hub_url(port, f"/hub/modules/{module_id}/provision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return module_id, resp.status
+ except urllib.error.HTTPError as e:
+ return module_id, e.code
+ except Exception:
+ return module_id, 0
+
+
+def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int:
+ url = _hub_url(port, f"/hub/modules/{module_id}/deprovision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ for attempt in range(retries + 1):
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return resp.status
+ except urllib.error.HTTPError as e:
+ if e.code == 409 and attempt < retries:
+ time.sleep(0.2)
+ continue
+ return e.code
+ except Exception:
+ return 0
+
+
+def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
+ try:
+ req = urllib.request.Request(_hub_url(port, "/hub/status"),
+ headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ return data.get("modules", [])
+ except Exception:
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Test phases
+# ---------------------------------------------------------------------------
+
+def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]:
+ stopped = False
+ provisioned_ids: list[str] = []
+ time_to_rejection: Optional[float] = None
+ t0 = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)}
+
+ while pending:
+ done, pending = wait(pending, return_when=FIRST_COMPLETED)
+
+ for f in done:
+ module_id, status = f.result()
+ if status in (200, 202):
+ provisioned_ids.append(module_id)
+ elif status == 409:
+ if not stopped:
+ time_to_rejection = time.monotonic() - t0
+ stopped = True
+ print(f"\n[flood] hub rejected provisioning after "
+ f"{len(provisioned_ids)} instances "
+ f"({time_to_rejection:.2f}s)")
+ elif status == 0:
+ if not stopped:
+ stopped = True
+ print(f"\n[flood] hub unreachable - stopping flood "
+ f"({len(provisioned_ids)} instances so far)")
+ else:
+ print(f"[flood] unexpected status {status} for {module_id}")
+
+ if not stopped:
+ pending.add(pool.submit(_provision_one, port))
+
+ wall_clock = time.monotonic() - t0
+ return provisioned_ids, time_to_rejection or wall_clock, wall_clock
+
+
+def _wait_stable(port: int, timeout_s: float = 20.0) -> None:
+ print("[hub] waiting for all instances to reach stable state...")
+ deadline = time.monotonic() + timeout_s
+ status_timeout_s = 5.0
+ while time.monotonic() < deadline:
+ modules = _hub_status(port, timeout_s=status_timeout_s)
+ if modules is None:
+ time.sleep(0.5)
+ continue
+ transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES]
+ elapsed = time.monotonic() - (deadline - timeout_s)
+ print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r")
+ if not transitioning:
+ print(f"\n[hub] all {len(modules)} instances in stable state")
+ return
+ time.sleep(0.5)
+ print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s")
+
+
+def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None:
+ raw_status = _hub_status(port, timeout_s=60.0)
+ if raw_status is None:
+ print("[deprovision] WARNING: could not reach hub to enumerate extra modules - "
+ "only deprovisioning tracked instances")
+ extra_ids = {m["moduleId"] for m in (raw_status or [])
+ if m.get("state") not in ("unprovisioned",)} - set(module_ids)
+ all_ids = list(module_ids) + list(extra_ids)
+
+ print(f"[deprovision] deprovisioning {len(all_ids)} instances...")
+ t0 = time.monotonic()
+ errors = 0
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids}
+ for f in as_completed(futures):
+ status = f.result()
+ if status not in (200, 202, 409):
+ errors += 1
+ print(f"[deprovision] module {futures[f]}: unexpected status {status}")
+ elapsed = time.monotonic() - t0
+ print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-perftest",
+ help="Hub --data-dir (default: E:/Dev/hub-perftest)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--workers", type=int, default=20,
+ help="Concurrent provisioning threads (default: 20)")
+ parser.add_argument("--trace-file", default="hub_perf_trace.etl",
+ help="WPR output .etl path (default: hub_perf_trace.etl)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ parser.add_argument("--s3", action="store_true",
+ help="Use local MinIO for hub de/hydration instead of filesystem")
+ parser.add_argument("--minio-port", type=int, default=9000,
+ help="MinIO S3 API port when --s3 is used (default: 9000)")
+ parser.add_argument("--minio-console-port", type=int, default=9001,
+ help="MinIO web console port when --s3 is used (default: 9001)")
+ parser.add_argument("--s3-bucket", default="zen-hydration-test",
+ help="S3 bucket name for MinIO hydration (default: zen-hydration-test)")
+ args = parser.parse_args()
+
+ data_dir = Path(args.data_dir)
+ trace_file = Path(args.trace_file).resolve()
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ wpr_started = False
+
+ hub_extra_args: list[str] = []
+ hub_extra_env: Optional[dict[str, str]] = None
+
+ try:
+ if args.s3:
+ minio_exe = _find_minio(zenserver_exe)
+ minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, args.s3_bucket)
+ webbrowser.open(f"http://localhost:{args.minio_console_port}")
+ config_json = {
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.s3_bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(json.dumps(config_json), encoding="ascii")
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ ]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ wpr_started = _wpr_start(trace_file)
+
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port,
+ hub_log, hub_extra_args, hub_extra_env
+ )
+ _wait_for_hub(hub_proc, args.port)
+ webbrowser.open(f"http://localhost:{args.port}/dashboard")
+
+ provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers)
+
+ print(f"\n[results] provisioned : {len(provisioned_ids)}")
+ print(f"[results] time to 409 : {time_to_rejection:.3f}s")
+ print(f"[results] wall clock : {wall_clock:.3f}s")
+ if time_to_rejection > 0:
+ print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s")
+
+ _wait_stable(args.port, timeout_s=120.0)
+ _deprovision_all(args.port, provisioned_ids, args.workers)
+ dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5)
+ _wait_stable(args.port, timeout_s=dehydration_timeout_s)
+
+ finally:
+ if wpr_started:
+ _wpr_stop(trace_file)
+ if hub_proc is not None:
+ _stop_process(hub_proc, "hub")
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+
+
+if __name__ == "__main__":
+ main()