diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-30 13:58:14 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-30 13:58:14 +0200 |
| commit | 6d75696d11aab547bb34ea22ec10fcdc594e5a44 (patch) | |
| tree | 047db726b2c4cfc05fca433561fe09f635ae88a8 /scripts/test_scripts/hub_provision_perf_test.py | |
| parent | hub resource limits (#900) (diff) | |
| download | zen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.tar.xz zen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.zip | |
hub s3 hydrate improvements (#902)
- Feature: Added `--hub-hydration-target-config` option to specify the hydration target via a JSON config file (mutually exclusive with `--hub-hydration-target-spec`); supports `file` and `s3` types with structured settings
```json
{
"type": "file",
"settings": {
"path": "/path/to/hydration/storage"
}
}
```
```json
{
"type": "s3",
"settings": {
"uri": "s3://bucket[/prefix]",
"region": "us-east-1",
"endpoint": "http://localhost:9000",
"path-style": true
}
}
```
- Improvement: Hub hydration dehydration skips the `.sentry-native` directory
- Bugfix: Fixed `MakeSafeAbsolutePathInPlace` when a UNC prefix is present but path uses mixed delimiters
Diffstat (limited to 'scripts/test_scripts/hub_provision_perf_test.py')
| -rw-r--r-- | scripts/test_scripts/hub_provision_perf_test.py | 501 |
1 files changed, 501 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub_provision_perf_test.py b/scripts/test_scripts/hub_provision_perf_test.py new file mode 100644 index 000000000..5b264ad62 --- /dev/null +++ b/scripts/test_scripts/hub_provision_perf_test.py @@ -0,0 +1,501 @@ +#!/usr/bin/env python3 +"""Hub provisioning performance test. + +Floods a zenserver hub with concurrent provision requests until the hub rejects +further provisioning (HTTP 409), then deprovisions all instances and exits. +A WPR ETW trace runs for the entire test and produces a .etl file for analysis +in WPA or PerfView (CPU sampling + context-switch events for lock contention). + +Optional --s3: starts a local MinIO server and configures the hub to use it +as the de/hydration backend instead of the default local filesystem. + +Requirements: + pip install boto3 (only needed with --s3) +WPR (wpr.exe) must be available (ships with Windows). Running as Administrator +is required for WPR to collect ETW traces. +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +import uuid +import webbrowser +from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"} +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}") + for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True): + return match + + sys.exit( + "zenserver executable not found in build/. " + "Run: xmake config -y -m release -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit( + f"minio executable not found at {p}. " + "Build with: xmake config -y -m release -a x64 && xmake -y" + ) + return p + + +# --------------------------------------------------------------------------- +# WPR (Windows Performance Recorder) +# --------------------------------------------------------------------------- + +def _is_elevated() -> bool: + if sys.platform != "win32": + return False + try: + import ctypes + return bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + return False + + +def _wpr_start(trace_file: Path) -> bool: + if sys.platform != "win32": + print("[wpr] WPR is Windows-only; skipping ETW trace.") + return False + if not _is_elevated(): + print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.") + return False + result = subprocess.run( + ["wpr.exe", "-start", "CPU", "-filemode"], + capture_output=True, text=True + ) + if result.returncode != 0: + print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}") + return False + print(f"[wpr] ETW trace started (CPU profile, file mode)") + return True + + +def _wpr_stop(trace_file: Path) -> None: + if sys.platform != "win32": + return + result = subprocess.run( + ["wpr.exe", "-stop", str(trace_file)], + capture_output=True, text=True + ) + if result.returncode != 0: + print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}") + else: + print(f"[wpr] ETW trace saved: {trace_file}") + + +# --------------------------------------------------------------------------- +# MinIO +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + minio_data = data_dir / "minio" + minio_data.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + proc = subprocess.Popen( + [str(minio_exe), "server", str(minio_data), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 + import botocore.config + import botocore.exceptions + except ImportError: + sys.exit( + "[minio] boto3 is required for --s3.\n" + "Install it with: pip install boto3" + ) + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + extra_args: list[str], + extra_env: Optional[dict[str, str]], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + "--hub-instance-limit=100", + ] + extra_args + + env = os.environ.copy() + if extra_env: + env.update(extra_env) + + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub HTTP helpers +# --------------------------------------------------------------------------- + +def _hub_url(port: int, path: str) -> str: + return f"http://localhost:{port}{path}" + + +def _provision_one(port: int) -> tuple[str, int]: + module_id = str(uuid.uuid4()) + url = _hub_url(port, f"/hub/modules/{module_id}/provision") + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return module_id, resp.status + except urllib.error.HTTPError as e: + return module_id, e.code + except Exception: + return module_id, 0 + + +def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int: + url = _hub_url(port, f"/hub/modules/{module_id}/deprovision") + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + for attempt in range(retries + 1): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status + except urllib.error.HTTPError as e: + if e.code == 409 and attempt < retries: + time.sleep(0.2) + continue + return e.code + except Exception: + return 0 + + +def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]: + try: + req = urllib.request.Request(_hub_url(port, "/hub/status"), + headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + return data.get("modules", []) + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Test phases +# --------------------------------------------------------------------------- + +def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]: + stopped = False + provisioned_ids: list[str] = [] + time_to_rejection: Optional[float] = None + t0 = time.monotonic() + + with ThreadPoolExecutor(max_workers=workers) as pool: + pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)} + + while pending: + done, pending = wait(pending, return_when=FIRST_COMPLETED) + + for f in done: + module_id, status = f.result() + if status in (200, 202): + provisioned_ids.append(module_id) + elif status == 409: + if not stopped: + time_to_rejection = time.monotonic() - t0 + stopped = True + print(f"\n[flood] hub rejected provisioning after " + f"{len(provisioned_ids)} instances " + f"({time_to_rejection:.2f}s)") + elif status == 0: + if not stopped: + stopped = True + print(f"\n[flood] hub unreachable - stopping flood " + f"({len(provisioned_ids)} instances so far)") + else: + print(f"[flood] unexpected status {status} for {module_id}") + + if not stopped: + pending.add(pool.submit(_provision_one, port)) + + wall_clock = time.monotonic() - t0 + return provisioned_ids, time_to_rejection or wall_clock, wall_clock + + +def _wait_stable(port: int, timeout_s: float = 20.0) -> None: + print("[hub] waiting for all instances to reach stable state...") + deadline = time.monotonic() + timeout_s + status_timeout_s = 5.0 + while time.monotonic() < deadline: + modules = _hub_status(port, timeout_s=status_timeout_s) + if modules is None: + time.sleep(0.5) + continue + transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES] + elapsed = time.monotonic() - (deadline - timeout_s) + print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r") + if not transitioning: + print(f"\n[hub] all {len(modules)} instances in stable state") + return + time.sleep(0.5) + print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s") + + +def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None: + raw_status = _hub_status(port, timeout_s=60.0) + if raw_status is None: + print("[deprovision] WARNING: could not reach hub to enumerate extra modules - " + "only deprovisioning tracked instances") + extra_ids = {m["moduleId"] for m in (raw_status or []) + if m.get("state") not in ("unprovisioned",)} - set(module_ids) + all_ids = list(module_ids) + list(extra_ids) + + print(f"[deprovision] deprovisioning {len(all_ids)} instances...") + t0 = time.monotonic() + errors = 0 + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids} + for f in as_completed(futures): + status = f.result() + if status not in (200, 202, 409): + errors += 1 + print(f"[deprovision] module {futures[f]}: unexpected status {status}") + elapsed = time.monotonic() - t0 + print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-perftest", + help="Hub --data-dir (default: E:/Dev/hub-perftest)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--workers", type=int, default=20, + help="Concurrent provisioning threads (default: 20)") + parser.add_argument("--trace-file", default="hub_perf_trace.etl", + help="WPR output .etl path (default: hub_perf_trace.etl)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + parser.add_argument("--s3", action="store_true", + help="Use local MinIO for hub de/hydration instead of filesystem") + parser.add_argument("--minio-port", type=int, default=9000, + help="MinIO S3 API port when --s3 is used (default: 9000)") + parser.add_argument("--minio-console-port", type=int, default=9001, + help="MinIO web console port when --s3 is used (default: 9001)") + parser.add_argument("--s3-bucket", default="zen-hydration-test", + help="S3 bucket name for MinIO hydration (default: zen-hydration-test)") + args = parser.parse_args() + + data_dir = Path(args.data_dir) + trace_file = Path(args.trace_file).resolve() + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + wpr_started = False + + hub_extra_args: list[str] = [] + hub_extra_env: Optional[dict[str, str]] = None + + try: + if args.s3: + minio_exe = _find_minio(zenserver_exe) + minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, args.s3_bucket) + webbrowser.open(f"http://localhost:{args.minio_console_port}") + config_json = { + "type": "s3", + "settings": { + "uri": f"s3://{args.s3_bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + } + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text(json.dumps(config_json), encoding="ascii") + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + ] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + wpr_started = _wpr_start(trace_file) + + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, + hub_log, hub_extra_args, hub_extra_env + ) + _wait_for_hub(hub_proc, args.port) + webbrowser.open(f"http://localhost:{args.port}/dashboard") + + provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers) + + print(f"\n[results] provisioned : {len(provisioned_ids)}") + print(f"[results] time to 409 : {time_to_rejection:.3f}s") + print(f"[results] wall clock : {wall_clock:.3f}s") + if time_to_rejection > 0: + print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s") + + _wait_stable(args.port, timeout_s=120.0) + _deprovision_all(args.port, provisioned_ids, args.workers) + dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5) + _wait_stable(args.port, timeout_s=dehydration_timeout_s) + + finally: + if wpr_started: + _wpr_stop(trace_file) + if hub_proc is not None: + _stop_process(hub_proc, "hub") + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None: + _stop_process(minio_proc, "minio") + + +if __name__ == "__main__": + main() |