aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts/hub/hub_load_test_s3.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/test_scripts/hub/hub_load_test_s3.py')
-rw-r--r--scripts/test_scripts/hub/hub_load_test_s3.py537
1 files changed, 537 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py
new file mode 100644
index 000000000..23014409c
--- /dev/null
+++ b/scripts/test_scripts/hub/hub_load_test_s3.py
@@ -0,0 +1,537 @@
+#!/usr/bin/env python3
+"""Hub provision/deprovision sweep against a real S3 bucket.
+
+Lists the top-level folders in S3_URI (each folder name is a moduleId),
+picks 200 of them, then:
+ 1. fires all provision requests concurrently,
+ 2. polls until every module reaches 'provisioned',
+ 3. waits 5 seconds,
+ 4. fires all deprovision requests concurrently,
+ 5. polls until every module is deprovisioned,
+ 6. waits 5 seconds,
+ 7. shuts down the hub.
+
+Required environment variables (or pass via CLI flags):
+ ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/
+ ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access
+ ZEN_PERF_AWS_REGION defaults to us-east-1
+
+Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO
+session is missing or expired, the script triggers the login automatically.
+
+Requirements:
+ pip install boto3
+ aws CLI v2
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "")
+_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "")
+_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1")
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}"))
+ if matches:
+ return max(matches, key=lambda p: p.stat().st_mtime)
+
+ sys.exit(
+ "zenserver debug executable not found in build/. "
+ "Run: xmake config -y -m debug -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+# ---------------------------------------------------------------------------
+# AWS SSO + S3 listing
+# ---------------------------------------------------------------------------
+
+def _require_boto3():
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit(
+ "[aws] boto3 is required.\n"
+ "Install it with: pip install boto3"
+ )
+ return boto3, botocore.exceptions
+
+
+def _sso_login(profile: str) -> None:
+ print(f"[aws] running 'aws sso login --profile {profile}'...")
+ rc = subprocess.call(["aws", "sso", "login", "--profile", profile])
+ if rc != 0:
+ sys.exit(f"[aws] 'aws sso login' failed with rc={rc}")
+
+
+def _get_session(profile: str):
+ boto3, exc_mod = _require_boto3()
+
+ def _load_frozen():
+ session = boto3.Session(profile_name=profile)
+ creds = session.get_credentials()
+ if creds is None:
+ return session, None
+ return session, creds.get_frozen_credentials()
+
+ try:
+ session, frozen = _load_frozen()
+ if frozen is not None and frozen.access_key and frozen.secret_key:
+ return session, frozen
+ except exc_mod.ProfileNotFound:
+ sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config")
+ except Exception as e:
+ print(f"[aws] initial credential load failed: {e}")
+
+ _sso_login(profile)
+
+ session, frozen = _load_frozen()
+ if frozen is None or not frozen.access_key:
+ sys.exit("[aws] could not resolve credentials after sso login")
+ return session, frozen
+
+
+def _parse_s3_uri(uri: str) -> tuple[str, str]:
+ if not uri.startswith("s3://"):
+ sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}")
+ rest = uri[len("s3://"):]
+ if "/" in rest:
+ bucket, prefix = rest.split("/", 1)
+ else:
+ bucket, prefix = rest, ""
+ return bucket, prefix
+
+
+def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]:
+ s3 = session.client("s3", region_name=region)
+ prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
+
+ paginator = s3.get_paginator("list_objects_v2")
+ module_ids: list[str] = []
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"):
+ for cp in page.get("CommonPrefixes", []) or []:
+ full = cp.get("Prefix", "")
+ # Strip outer prefix + trailing slash to get the folder name
+ folder = full[len(prefix_norm):].rstrip("/")
+ if folder:
+ module_ids.append(folder)
+ return module_ids
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ out: dict[str, str] = {}
+ for m in data.get("modules", []) or []:
+ mid = m.get("moduleId")
+ if mid:
+ out[mid] = m.get("state", "")
+ return out
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """Fire POST /hub/modules/<id>/<verb> for every module concurrently.
+
+ Returns (accepted_ids, failures) where accepted_ids got 200/202 and
+ failures is a list of (mid, status, body) for everything else.
+ """
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ # Already gone - treat as success for deprovision fan-out.
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+def _wait_for_provisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is 'provisioned' or gone.
+
+ Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach
+ 'provisioned' within timeout_s; last_states maps all module_ids to their
+ last-seen state (or empty string if the module was absent from the report).
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ now_done: list[str] = []
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == "provisioned":
+ now_done.append(mid)
+ elif s in ("", "unprovisioned", "crashed"):
+ # Not useful end-states for "waiting to become provisioned",
+ # but we don't block forever on them either - let timeout decide.
+ pass
+ for mid in now_done:
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+def _wait_for_deprovisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is gone from hub status."""
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3",
+ help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--module-count", type=int, default=200,
+ help="Number of modules to sweep (default: 200)")
+ parser.add_argument("--settle-seconds", type=float, default=5.0,
+ help="Seconds to wait between provision-complete and deprovision, "
+ "and between deprovision-complete and shutdown (default: 5.0)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)")
+ parser.add_argument("--poll-timeout", type=float, default=600.0,
+ help="Max seconds to wait for provision or deprovision to finish (default: 600)")
+ parser.add_argument("--trace",
+ nargs="?", const="default", default=None, metavar="CHANNELS",
+ help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. "
+ "Optionally pass channel spec (default: 'default')")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ args = parser.parse_args()
+
+ s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI)
+ aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE)
+ aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION)
+ if not s3_uri:
+ sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/")
+ if not aws_profile:
+ sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name")
+
+ data_dir = Path(args.data_dir)
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] S3 URI: {s3_uri}")
+ print(f"[setup] profile: {aws_profile}")
+ print(f"[setup] region: {aws_region}")
+
+ session, frozen = _get_session(aws_profile)
+ print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})")
+
+ bucket, prefix = _parse_s3_uri(s3_uri)
+ print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...")
+ module_ids = _list_module_ids(session, bucket, prefix, aws_region)
+ print(f"[s3] found {len(module_ids)} module folders")
+
+ if not module_ids:
+ sys.exit("[s3] no module folders found, aborting")
+
+ selected = module_ids[:args.module_count]
+ if len(selected) < args.module_count:
+ print(f"[s3] only {len(selected)} folders available (requested {args.module_count})")
+
+ aws_env = {
+ "AWS_ACCESS_KEY_ID": frozen.access_key,
+ "AWS_SECRET_ACCESS_KEY": frozen.secret_key,
+ }
+ if frozen.token:
+ aws_env["AWS_SESSION_TOKEN"] = frozen.token
+
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {"uri": s3_uri, "region": aws_region},
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back.
+ "--hub-enable-dehydration=false",
+ ]
+ if args.trace:
+ trace_path = data_dir / "hub.utrace"
+ hub_extra_args += [
+ f"--trace={args.trace}",
+ f"--tracefile={trace_path}",
+ ]
+ print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+
+ try:
+ hub_instance_limit = max(args.module_count, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port, hub_log,
+ hub_instance_limit, hub_extra_args, aws_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision phase ---
+ print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...")
+ t_pv0 = time.monotonic()
+ accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision")
+ print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} "
+ f"(fan-out {time.monotonic() - t_pv0:.1f}s)")
+ for mid, status, body in pv_failures[:10]:
+ print(f"[provision] FAILED {mid}: status={status} body={body}")
+ if len(pv_failures) > 10:
+ print(f"[provision] ... and {len(pv_failures) - 10} more failures")
+
+ if hub_proc.poll() is not None:
+ print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
+ return
+
+ if accepted:
+ stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout)
+ provisioned_count = len(accepted) - len(stuck)
+ print()
+ print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' "
+ f"({time.monotonic() - t_pv0:.1f}s total)")
+ if stuck:
+ print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s")
+ for mid in stuck[:10]:
+ print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'")
+ else:
+ print("[provision] nothing provisioned")
+ return
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...")
+ time.sleep(args.settle_seconds)
+
+ # --- Deprovision phase ---
+ print(f"[deprovision] firing {len(accepted)} deprovision requests...")
+ t_dp0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision")
+ print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} "
+ f"(fan-out {time.monotonic() - t_dp0:.1f}s)")
+ for mid, status, body in dp_failures[:10]:
+ print(f"[deprovision] FAILED {mid}: status={status} body={body}")
+ if len(dp_failures) > 10:
+ print(f"[deprovision] ... and {len(dp_failures) - 10} more failures")
+
+ stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout)
+ deprovisioned_count = len(dp_accepted) - len(stuck_dp)
+ print()
+ print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone "
+ f"({time.monotonic() - t_dp0:.1f}s total)")
+ if stuck_dp:
+ print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s")
+ for mid in stuck_dp[:10]:
+ print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'")
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...")
+ time.sleep(args.settle_seconds)
+
+ print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ _stop_process(hub_proc, "hub", timeout_s=120.0)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+
+if __name__ == "__main__":
+ main()