diff options
Diffstat (limited to 'scripts/test_scripts/hub/hub_load_test_s3.py')
| -rw-r--r-- | scripts/test_scripts/hub/hub_load_test_s3.py | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py new file mode 100644 index 000000000..23014409c --- /dev/null +++ b/scripts/test_scripts/hub/hub_load_test_s3.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +"""Hub provision/deprovision sweep against a real S3 bucket. + +Lists the top-level folders in S3_URI (each folder name is a moduleId), +picks 200 of them, then: + 1. fires all provision requests concurrently, + 2. polls until every module reaches 'provisioned', + 3. waits 5 seconds, + 4. fires all deprovision requests concurrently, + 5. polls until every module is deprovisioned, + 6. waits 5 seconds, + 7. shuts down the hub. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO +session is missing or expired, the script triggers the login automatically. + +Requirements: + pip install boto3 + aws CLI v2 +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}")) + if matches: + return max(matches, key=lambda p: p.stat().st_mtime) + + sys.exit( + "zenserver debug executable not found in build/. " + "Run: xmake config -y -m debug -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +# --------------------------------------------------------------------------- +# AWS SSO + S3 listing +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit( + "[aws] boto3 is required.\n" + "Install it with: pip install boto3" + ) + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]: + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + paginator = s3.get_paginator("list_objects_v2") + module_ids: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + # Strip outer prefix + trailing slash to get the folder name + folder = full[len(prefix_norm):].rstrip("/") + if folder: + module_ids.append(folder) + return module_ids + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + out: dict[str, str] = {} + for m in data.get("modules", []) or []: + mid = m.get("moduleId") + if mid: + out[mid] = m.get("state", "") + return out + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """Fire POST /hub/modules/<id>/<verb> for every module concurrently. + + Returns (accepted_ids, failures) where accepted_ids got 200/202 and + failures is a list of (mid, status, body) for everything else. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + # Already gone - treat as success for deprovision fan-out. + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _wait_for_provisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is 'provisioned' or gone. + + Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach + 'provisioned' within timeout_s; last_states maps all module_ids to their + last-seen state (or empty string if the module was absent from the report). + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + now_done: list[str] = [] + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == "provisioned": + now_done.append(mid) + elif s in ("", "unprovisioned", "crashed"): + # Not useful end-states for "waiting to become provisioned", + # but we don't block forever on them either - let timeout decide. + pass + for mid in now_done: + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is gone from hub status.""" + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3", + help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=200, + help="Number of modules to sweep (default: 200)") + parser.add_argument("--settle-seconds", type=float, default=5.0, + help="Seconds to wait between provision-complete and deprovision, " + "and between deprovision-complete and shutdown (default: 5.0)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=600.0, + help="Max seconds to wait for provision or deprovision to finish (default: 600)") + parser.add_argument("--trace", + nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. " + "Optionally pass channel spec (default: 'default')") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + data_dir = Path(args.data_dir) + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] region: {aws_region}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region) + print(f"[s3] found {len(module_ids)} module folders") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + selected = module_ids[:args.module_count] + if len(selected) < args.module_count: + print(f"[s3] only {len(selected)} folders available (requested {args.module_count})") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back. + "--hub-enable-dehydration=false", + ] + if args.trace: + trace_path = data_dir / "hub.utrace" + hub_extra_args += [ + f"--trace={args.trace}", + f"--tracefile={trace_path}", + ] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + + try: + hub_instance_limit = max(args.module_count, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision phase --- + print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...") + t_pv0 = time.monotonic() + accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision") + print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} " + f"(fan-out {time.monotonic() - t_pv0:.1f}s)") + for mid, status, body in pv_failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if len(pv_failures) > 10: + print(f"[provision] ... and {len(pv_failures) - 10} more failures") + + if hub_proc.poll() is not None: + print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") + return + + if accepted: + stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout) + provisioned_count = len(accepted) - len(stuck) + print() + print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' " + f"({time.monotonic() - t_pv0:.1f}s total)") + if stuck: + print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s") + for mid in stuck[:10]: + print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'") + else: + print("[provision] nothing provisioned") + return + + print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...") + time.sleep(args.settle_seconds) + + # --- Deprovision phase --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t_dp0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision") + print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} " + f"(fan-out {time.monotonic() - t_dp0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[deprovision] FAILED {mid}: status={status} body={body}") + if len(dp_failures) > 10: + print(f"[deprovision] ... and {len(dp_failures) - 10} more failures") + + stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout) + deprovisioned_count = len(dp_accepted) - len(stuck_dp) + print() + print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone " + f"({time.monotonic() - t_dp0:.1f}s total)") + if stuck_dp: + print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s") + for mid in stuck_dp[:10]: + print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'") + + print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...") + time.sleep(args.settle_seconds) + + print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _stop_process(hub_proc, "hub", timeout_s=120.0) + if hub_log_handle is not None: + hub_log_handle.close() + + +if __name__ == "__main__": + main() |