#!/usr/bin/env python3 """Hub sustained load test. Keeps ~N modules concurrently provisioned from a pool of 1000 predefined module names, writing and reading data to each instance, then either letting the hub watchdog deprovision idle ones or explicitly deprovisioning them. Runs indefinitely until Ctrl-C. Optional --s3: starts a local MinIO server and configures the hub to use it as the de/hydration backend. Requirements: pip install boto3 (only needed with --s3) """ from __future__ import annotations import argparse import json import os import queue import random import subprocess import sys import threading import time import urllib.error import urllib.request import webbrowser from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait from dataclasses import dataclass, field from pathlib import Path from typing import Optional _EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" _MINIO_USER = "minioadmin" _MINIO_PASS = "minioadmin" _NAMESPACE = "loadtest" _BUCKET = "bucket" # Key sizes to use for activity writes (bytes) - biased toward smaller. # Blobs are pre-generated at startup; one per size, shared across all requests. _KEY_SIZES = [512, 512, 2048, 2048, 8192, 32768] _BLOBS: list[bytes] = [] # populated by _init_blobs() def _init_blobs() -> None: seen: set[int] = set() for size in _KEY_SIZES: if size not in seen: seen.add(size) _BLOBS.append(os.urandom(size)) else: # reuse the already-generated blob for this size _BLOBS.append(next(b for b in _BLOBS if len(b) == size)) # --------------------------------------------------------------------------- # Executable discovery # --------------------------------------------------------------------------- def _find_zenserver(override: Optional[str]) -> Path: if override: p = Path(override) / f"zenserver{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent repo_root = script_dir.parent.parent candidates = [ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}", repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}", ] for c in candidates: if c.exists(): return c matches = list(repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}")) if matches: return max(matches, key=lambda p: p.stat().st_mtime) sys.exit( "zenserver executable not found in build/. " "Run: xmake config -y -m release -a x64 && xmake -y\n" "Or pass --zenserver-dir ." ) def _find_minio(zenserver_path: Path) -> Path: p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" if not p.exists(): sys.exit( f"minio executable not found at {p}. " "Build with: xmake config -y -m release -a x64 && xmake -y" ) return p # --------------------------------------------------------------------------- # MinIO # --------------------------------------------------------------------------- def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: minio_data = data_dir / "minio" minio_data.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["MINIO_ROOT_USER"] = _MINIO_USER env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen( [str(minio_exe), "server", str(minio_data), "--address", f":{port}", "--console-address", f":{console_port}", "--quiet"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, ) print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}") return proc def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): print("[minio] ready") return except Exception: time.sleep(0.1) sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") def _create_minio_bucket(port: int, bucket: str) -> None: try: import boto3 import botocore.config import botocore.exceptions except ImportError: sys.exit( "[minio] boto3 is required for --s3.\n" "Install it with: pip install boto3" ) s3 = boto3.client( "s3", endpoint_url=f"http://localhost:{port}", aws_access_key_id=_MINIO_USER, aws_secret_access_key=_MINIO_PASS, region_name="us-east-1", config=botocore.config.Config(signature_version="s3v4"), ) try: s3.create_bucket(Bucket=bucket) print(f"[minio] created bucket '{bucket}'") except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): print(f"[minio] bucket '{bucket}' already exists") else: raise # --------------------------------------------------------------------------- # Hub lifecycle # --------------------------------------------------------------------------- def _start_hub( zenserver_exe: Path, data_dir: Path, port: int, log_file: Path, idle_timeout: int, extra_args: list[str], extra_env: Optional[dict[str, str]], ) -> tuple[subprocess.Popen, object]: data_dir.mkdir(parents=True, exist_ok=True) cmd = [ str(zenserver_exe), "hub", f"--data-dir={data_dir}", f"--port={port}", "--hub-instance-http-threads=8", "--hub-instance-corelimit=4", "--hub-provision-disk-limit-percent=99", "--hub-provision-memory-limit-percent=80", "--hub-instance-limit=500", f"--hub-watchdog-provisioned-inactivity-timeout-seconds={idle_timeout}", "--hub-watchdog-inactivity-check-margin-seconds=5", "--hub-watchdog-cycle-interval-ms=2000", "--hub-watchdog-cycle-processing-budget-ms=3000", "--hub-watchdog-activity-check-connect-timeout-ms=20", "--hub-watchdog-activity-check-request-timeout-ms=50", ] + extra_args env = os.environ.copy() if extra_env: env.update(extra_env) popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP log_handle = log_file.open("wb") try: proc = subprocess.Popen( cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs, ) except Exception: log_handle.close() raise print(f"[hub] started (pid {proc.pid}), log: {log_file}") return proc, log_handle def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " f"is another zenserver already running on port {port}?") try: with urllib.request.urlopen(req, timeout=2): print("[hub] ready") return except Exception: time.sleep(0.2) sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None: if proc.poll() is not None: return proc.terminate() try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[{name}] did not exit after {timeout_s}s, killing") proc.kill() proc.wait() # --------------------------------------------------------------------------- # Module state # --------------------------------------------------------------------------- @dataclass class ModuleState: state: str = "idle" # idle|provisioning|active|deprovisioning|watchdog-pending base_uri: Optional[str] = None written_keys: list[str] = field(default_factory=list) activity_rounds_left: int = 0 explicit_deprovision: bool = False watchdog_pending_since: float = 0.0 generation: int = 0 # incremented on each provision; queue entries carry this to discard stale entries # --------------------------------------------------------------------------- # Counters # --------------------------------------------------------------------------- @dataclass class Counters: provisions: int = 0 deprovisions_explicit: int = 0 deprovisions_watchdog: int = 0 provision_rejected: int = 0 activity_rounds: int = 0 writes: int = 0 reads: int = 0 errors: int = 0 last_reject_time: float = 0.0 _lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False) def inc(self, name: str, n: int = 1) -> None: with self._lock: setattr(self, name, getattr(self, name) + n) def record_reject(self) -> None: with self._lock: self.provision_rejected += 1 self.last_reject_time = time.monotonic() def snapshot(self) -> dict: with self._lock: return {k: v for k, v in self.__dict__.items() if not k.startswith("_")} # --------------------------------------------------------------------------- # Hub API helpers (urllib, no external deps) # --------------------------------------------------------------------------- def _hub_post(port: int, path: str, timeout_s: float = 30.0) -> tuple[int, dict]: url = f"http://localhost:{port}{path}" req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: body = json.loads(resp.read()) return resp.status, body except urllib.error.HTTPError as e: try: body = json.loads(e.read()) except Exception: body = {} return e.code, body except Exception: return 0, {} def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]: try: req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = json.loads(resp.read()) return data.get("modules", []) except Exception: return None def _instance_put(base_uri: str, key: str, data: bytes, timeout_s: float = 10.0) -> int: url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}" req = urllib.request.Request(url, data=data, method="PUT", headers={"Content-Type": "application/octet-stream", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: return resp.status except urllib.error.HTTPError as e: return e.code except Exception: return 0 def _instance_get(base_uri: str, key: str, timeout_s: float = 10.0) -> int: url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}" req = urllib.request.Request(url, headers={"Accept": "application/octet-stream"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: resp.read() return resp.status except urllib.error.HTTPError as e: return e.code except Exception: return 0 # --------------------------------------------------------------------------- # Worker tasks (run in thread pool - no sleeping) # --------------------------------------------------------------------------- def _task_provision( module_id: str, port: int, state_map: dict[str, ModuleState], state_lock: threading.Lock, activity_queue: queue.PriorityQueue, counters: Counters, explicit_deprovision_rate: float, stop_event: threading.Event, ) -> None: status, body = _hub_post(port, f"/hub/modules/{module_id}/provision") generation = 0 schedule_burst = False with state_lock: mod = state_map[module_id] if mod.state != "provisioning": # Shutdown changed state while provision was in-flight; leave it alone return if status in (200, 202): instance_port = body.get("port") base_uri = f"http://localhost:{instance_port}" if instance_port else None if status == 200: # Instance is immediately ready mod.generation += 1 generation = mod.generation mod.state = "active" mod.base_uri = base_uri mod.written_keys = [] mod.activity_rounds_left = random.randint(5, 20) mod.explicit_deprovision = random.random() < explicit_deprovision_rate schedule_burst = not stop_event.is_set() else: # 202: async provision - instance is still starting up. # Stay in "provisioning"; the hub status poll activates it when ready. # _shutdown_deprovision_all also covers "provisioning", so shutdown is safe. mod.state = "provisioning" mod.base_uri = base_uri elif status == 409: mod.state = "idle" counters.record_reject() return else: mod.state = "idle" counters.inc("errors") return counters.inc("provisions") if schedule_burst: activity_queue.put((time.monotonic() + random.uniform(0.1, 0.3), generation, module_id)) def _task_deprovision( module_id: str, port: int, state_map: dict[str, ModuleState], state_lock: threading.Lock, counters: Counters, retries: int = 5, ) -> None: succeeded = False for attempt in range(retries + 1): status, _ = _hub_post(port, f"/hub/modules/{module_id}/deprovision") if status in (200, 202, 404): succeeded = True break if status == 409 and attempt < retries: time.sleep(0.2) continue counters.inc("errors") break with state_lock: state_map[module_id].state = "idle" state_map[module_id].base_uri = None state_map[module_id].written_keys = [] if succeeded: counters.inc("deprovisions_explicit") def _task_activity_burst( module_id: str, generation: int, state_map: dict[str, ModuleState], state_lock: threading.Lock, activity_queue: queue.PriorityQueue, counters: Counters, pool: ThreadPoolExecutor, port: int, ) -> None: with state_lock: mod = state_map[module_id] if mod.state != "active" or mod.generation != generation: return base_uri = mod.base_uri existing_keys = list(mod.written_keys) if not base_uri: with state_lock: state_map[module_id].state = "idle" return # Write 3-8 new keys per burst num_writes = random.randint(3, 8) new_keys: list[str] = [] write_errors = 0 for _ in range(num_writes): key = f"{random.getrandbits(160):040x}" data = random.choice(_BLOBS) status = _instance_put(base_uri, key, data) if status in (200, 201, 204): new_keys.append(key) counters.inc("writes") else: write_errors += 1 counters.inc("errors") if write_errors > 0 and not new_keys and not existing_keys: # Instance unreachable - likely watchdog fired while we were scheduled with state_lock: mod = state_map[module_id] if mod.generation == generation: mod.state = "idle" mod.base_uri = None mod.written_keys = [] return # Read back 1-3 random keys from all known keys all_keys = existing_keys + new_keys num_reads = min(random.randint(2, 5), len(all_keys)) for key in random.sample(all_keys, num_reads): status = _instance_get(base_uri, key) if status == 200: counters.inc("reads") elif status in (404, 0): # Key may not exist yet or instance gone; not fatal pass else: counters.inc("errors") counters.inc("activity_rounds") next_generation = 0 with state_lock: mod = state_map[module_id] if mod.state != "active" or mod.generation != generation: return mod.written_keys = (existing_keys + new_keys)[-200:] # cap list size mod.activity_rounds_left -= 1 if mod.activity_rounds_left <= 0: if mod.explicit_deprovision: mod.state = "deprovisioning" mod.base_uri = None try: pool.submit( _task_deprovision, module_id, port, state_map, state_lock, counters, ) except RuntimeError: pass # pool shutting down; hub watchdog will clean up else: mod.state = "watchdog-pending" mod.watchdog_pending_since = time.monotonic() return next_generation = mod.generation # capture under lock before releasing # Schedule next burst soon for semi-continuous activity next_time = time.monotonic() + random.uniform(0.2, 0.8) activity_queue.put((next_time, next_generation, module_id)) # --------------------------------------------------------------------------- # Orchestrator # --------------------------------------------------------------------------- def _orchestrate( port: int, state_map: dict[str, ModuleState], state_lock: threading.Lock, activity_queue: queue.PriorityQueue, counters: Counters, pool: ThreadPoolExecutor, stop_event: threading.Event, target_active: int, explicit_deprovision_rate: float, idle_timeout: int, ) -> None: last_status_poll = 0.0 provision_backoff = False while not stop_event.is_set(): now = time.monotonic() # Drain activity queue for due bursts while True: try: next_time, generation, module_id = activity_queue.get_nowait() except queue.Empty: break if next_time <= now: try: pool.submit( _task_activity_burst, module_id, generation, state_map, state_lock, activity_queue, counters, pool, port, ) except RuntimeError: break # pool shutting down else: activity_queue.put((next_time, generation, module_id)) break # Activate/deactivate backoff based on recent 409 rejections last_reject = counters.last_reject_time if last_reject > 0 and now - last_reject < 5.0: provision_backoff = True elif provision_backoff and now - last_reject >= 5.0: provision_backoff = False # Count states and submit provision tasks if needed if not provision_backoff: with state_lock: idle_ids = [mid for mid, m in state_map.items() if m.state == "idle"] working_count = sum( 1 for m in state_map.values() if m.state in ("active", "provisioning", "deprovisioning") ) watchdog_count = sum( 1 for m in state_map.values() if m.state == "watchdog-pending" ) # Provision enough to keep working_count at target, but cap total # in-flight (working + watchdog-pending) at 2x target to prevent # runaway accumulation when all modules cycle to watchdog-pending. inflight_cap = target_active * 2 deficit = min( target_active - working_count, inflight_cap - working_count - watchdog_count, ) to_provision = idle_ids[:max(0, deficit)] for mid in to_provision: with state_lock: if state_map[mid].state != "idle": continue state_map[mid].state = "provisioning" try: pool.submit( _task_provision, mid, port, state_map, state_lock, activity_queue, counters, explicit_deprovision_rate, stop_event, ) except RuntimeError: with state_lock: state_map[mid].state = "idle" # Poll hub status to detect async provision completions and watchdog-fired modules if now - last_status_poll >= 5.0: last_status_poll = now modules_status = _hub_status(port) if modules_status is not None: hub_ids = {m["moduleId"]: m.get("state", "") for m in modules_status} # Hub watchdog fires at ~idle_timeout from last activity. However, if the # watchdog's previous visit predates the last burst, it sees a changed # activity sum and resets LastActivityTime, delaying deprovision by ~173s. # Explicitly deprovision after idle_timeout + 15s to avoid waiting for that. timeout_threshold = idle_timeout + 15 to_deprovision_explicitly: list[str] = [] with state_lock: for mid, mod in state_map.items(): if mod.state == "provisioning" and mod.base_uri is not None: # Waiting for async (202) provision to complete hub_state = hub_ids.get(mid, "") if hub_state == "provisioned": mod.generation += 1 mod.state = "active" mod.written_keys = [] mod.activity_rounds_left = random.randint(5, 20) mod.explicit_deprovision = random.random() < explicit_deprovision_rate activity_queue.put( (time.monotonic() + random.uniform(0.1, 0.3), mod.generation, mid) ) elif mid not in hub_ids: # Provision failed or was rolled back mod.state = "idle" mod.base_uri = None elif mod.state == "watchdog-pending": hub_state = hub_ids.get(mid, "") gone = mid not in hub_ids timed_out = (now - mod.watchdog_pending_since) > timeout_threshold if gone or hub_state in ("unprovisioned", "deprovisioning"): mod.state = "idle" mod.base_uri = None mod.written_keys = [] counters.inc("deprovisions_watchdog") elif timed_out: mod.state = "deprovisioning" to_deprovision_explicitly.append(mid) for mid in to_deprovision_explicitly: try: pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters) except RuntimeError: with state_lock: if state_map[mid].state == "deprovisioning": state_map[mid].state = "idle" stop_event.wait(timeout=0.05) # --------------------------------------------------------------------------- # Stats display # --------------------------------------------------------------------------- def _stats_thread( state_map: dict[str, ModuleState], state_lock: threading.Lock, counters: Counters, stop_event: threading.Event, interval_s: float = 5.0, ) -> None: is_tty = sys.stdout.isatty() prev_lines = 0 t0 = time.monotonic() prev_snap: Optional[dict] = None prev_t = t0 while not stop_event.is_set(): stop_event.wait(timeout=interval_s) now = time.monotonic() elapsed = now - t0 dt = now - prev_t snap = counters.snapshot() with state_lock: states: dict[str, int] = {} for m in state_map.values(): states[m.state] = states.get(m.state, 0) + 1 def rate(key: str) -> float: if prev_snap is None or dt <= 0: return 0.0 return (snap[key] - prev_snap[key]) / dt * 60.0 lines = [ f"[{time.strftime('%H:%M:%S')}] elapsed={elapsed:.0f}s", f" modules: idle={states.get('idle', 0)} " f"provisioning={states.get('provisioning', 0)} " f"active={states.get('active', 0)} " f"watchdog-pending={states.get('watchdog-pending', 0)} " f"deprovisioning={states.get('deprovisioning', 0)}", f" totals: provisions={snap['provisions']} " f"deprov-explicit={snap['deprovisions_explicit']} " f"deprov-watchdog={snap['deprovisions_watchdog']} " f"rejected={snap['provision_rejected']} " f"errors={snap['errors']}", f" data: writes={snap['writes']} reads={snap['reads']} " f"rounds={snap['activity_rounds']}", f" rates/min: provisions={rate('provisions'):.1f} " f"deprov={rate('deprovisions_explicit') + rate('deprovisions_watchdog'):.1f} " f"writes={rate('writes'):.1f} reads={rate('reads'):.1f}", ] if is_tty and prev_lines > 0: # Move cursor up to overwrite previous block sys.stdout.write(f"\033[{prev_lines}A\033[J") sys.stdout.write("\n".join(lines) + "\n") sys.stdout.flush() prev_lines = len(lines) prev_snap = snap prev_t = now # --------------------------------------------------------------------------- # Shutdown # --------------------------------------------------------------------------- def _wait_for_hub_idle(port: int, hub_proc: subprocess.Popen, timeout_s: float = 120.0) -> None: """Wait until the hub reports no transitioning instances (dehydration done).""" _STABLE = {"provisioned", "hibernated", "crashed", "unprovisioned"} print(f"[shutdown] waiting for hub dehydration (up to {timeout_s:.0f}s)...") deadline = time.monotonic() + timeout_s while time.monotonic() < deadline: if hub_proc.poll() is not None: print("[shutdown] hub process has exited") return modules = _hub_status(port, timeout_s=5.0) if modules is None: # Hub not responding. If it has exited, we're done. If it's still alive # it may be saturated with S3 uploads - keep waiting rather than assuming done. if hub_proc.poll() is not None: print("[shutdown] hub process has exited") return time.sleep(1.0) continue transitioning = [m for m in modules if m.get("state") not in _STABLE] remaining = len(modules) if not transitioning: if remaining: print(f"[shutdown] hub idle ({remaining} instances in stable state)") else: print("[shutdown] hub idle (no instances remaining)") return print(f"[shutdown] {len(transitioning)} instance(s) still dehydrating...", end="\r") time.sleep(1.0) print(f"\n[shutdown] WARNING: hub did not become idle within {timeout_s:.0f}s") def _shutdown_deprovision_all( port: int, state_map: dict[str, ModuleState], state_lock: threading.Lock, counters: Counters, workers: int, timeout_s: float = 60.0, ) -> None: with state_lock: to_deprovision = [ mid for mid, m in state_map.items() if m.state in ("active", "watchdog-pending", "provisioning") ] for mid in to_deprovision: state_map[mid].state = "deprovisioning" if not to_deprovision: return print(f"\n[shutdown] deprovisioning {len(to_deprovision)} active modules...") pool = ThreadPoolExecutor(max_workers=min(workers, len(to_deprovision))) futures: list[Future] = [ pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters) for mid in to_deprovision ] pool.shutdown(wait=False) done_set, not_done_set = futures_wait(futures, timeout=timeout_s) if not_done_set: print(f"[shutdown] WARNING: {len(not_done_set)} deprovision tasks did not complete within {timeout_s}s") else: print(f"[shutdown] all modules deprovisioned") # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest", help="Hub --data-dir (default: E:/Dev/hub-loadtest)") parser.add_argument("--port", type=int, default=8558, help="Hub HTTP port (default: 8558)") parser.add_argument("--workers", type=int, default=50, help="Thread pool size for HTTP calls (default: 50)") parser.add_argument("--active-modules", type=int, default=100, help="Target number of concurrently provisioned modules (default: 100)") parser.add_argument("--module-count", type=int, default=1000, help="Total predefined module name pool size (default: 1000)") parser.add_argument("--idle-timeout", type=int, default=90, help="Hub watchdog inactivity timeout in seconds (default: 90)") parser.add_argument("--explicit-deprovision-rate", type=float, default=0.3, help="Fraction of modules explicitly deprovisioned (default: 0.3)") parser.add_argument("--zenserver-dir", help="Directory containing zenserver executable (auto-detected by default)") parser.add_argument("--s3", action="store_true", help="Use local MinIO for hub de/hydration instead of filesystem") parser.add_argument("--minio-port", type=int, default=9000, help="MinIO S3 API port (default: 9000)") parser.add_argument("--minio-console-port", type=int, default=9001, help="MinIO web console port (default: 9001)") parser.add_argument("--s3-bucket", default="zen-load-test", help="S3 bucket name for MinIO hydration (default: zen-load-test)") parser.add_argument("--no-browser", action="store_true", help="Skip opening browser tabs") args = parser.parse_args() if args.active_modules > args.module_count: sys.exit( f"--active-modules ({args.active_modules}) must not exceed " f"--module-count ({args.module_count})" ) _init_blobs() data_dir = Path(args.data_dir) hub_log = data_dir / "hub.log" zenserver_exe = _find_zenserver(args.zenserver_dir) print(f"[setup] zenserver: {zenserver_exe}") module_names = [f"load-test-module-{i:04d}" for i in range(args.module_count)] state_map: dict[str, ModuleState] = {mid: ModuleState() for mid in module_names} state_lock = threading.Lock() activity_queue: queue.PriorityQueue = queue.PriorityQueue() counters = Counters() stop_event = threading.Event() minio_proc: Optional[subprocess.Popen] = None hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None hub_extra_args: list[str] = [] hub_extra_env: Optional[dict[str, str]] = None try: if args.s3: minio_exe = _find_minio(zenserver_exe) minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) _create_minio_bucket(args.minio_port, args.s3_bucket) if not args.no_browser: webbrowser.open(f"http://localhost:{args.minio_console_port}") config_json = { "type": "s3", "settings": { "uri": f"s3://{args.s3_bucket}", "endpoint": f"http://localhost:{args.minio_port}", "path-style": True, "region": "us-east-1", }, } data_dir.mkdir(parents=True, exist_ok=True) config_path = data_dir / "hydration_config.json" config_path.write_text(json.dumps(config_json), encoding="ascii") hub_extra_args = [f"--hub-hydration-target-config={config_path}"] hub_extra_env = { "AWS_ACCESS_KEY_ID": _MINIO_USER, "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, } hub_proc, hub_log_handle = _start_hub( zenserver_exe, data_dir, args.port, hub_log, args.idle_timeout, hub_extra_args, hub_extra_env, ) _wait_for_hub(hub_proc, args.port) if not args.no_browser: webbrowser.open(f"http://localhost:{args.port}/dashboard") print(f"[load-test] starting: pool={args.module_count} modules, " f"target={args.active_modules} active, " f"idle-timeout={args.idle_timeout}s, " f"explicit-deprovision={args.explicit_deprovision_rate:.0%}") print("[load-test] press Ctrl-C to stop") with ThreadPoolExecutor(max_workers=args.workers) as pool: stats_t = threading.Thread( target=_stats_thread, args=(state_map, state_lock, counters, stop_event), daemon=True, ) stats_t.start() orch_t = threading.Thread( target=_orchestrate, args=( args.port, state_map, state_lock, activity_queue, counters, pool, stop_event, args.active_modules, args.explicit_deprovision_rate, args.idle_timeout, ), daemon=True, ) orch_t.start() try: while not stop_event.is_set(): time.sleep(0.5) if hub_proc.poll() is not None: print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") break except KeyboardInterrupt: print("\n[load-test] Ctrl-C received, shutting down...") stop_event.set() orch_t.join(timeout=3.0) stats_t.join(timeout=3.0) _shutdown_deprovision_all( args.port, state_map, state_lock, counters, args.workers, timeout_s=60.0, ) _wait_for_hub_idle(args.port, hub_proc, timeout_s=max(120.0, args.active_modules * 1.0)) _stop_process(hub_proc, "hub", timeout_s=120.0) if hub_log_handle is not None: hub_log_handle.close() hub_log_handle = None if minio_proc is not None: _stop_process(minio_proc, "minio") minio_proc = None finally: # Safety net: only reached if an exception occurred before normal shutdown if hub_proc is not None and hub_proc.poll() is None: _stop_process(hub_proc, "hub") if hub_log_handle is not None: hub_log_handle.close() if minio_proc is not None: _stop_process(minio_proc, "minio") if __name__ == "__main__": main()