#!/usr/bin/env python3
"""Hub sustained load test.
Keeps ~N modules concurrently provisioned from a pool of 1000 predefined
module names, writing and reading data to each instance, then either letting
the hub watchdog deprovision idle ones or explicitly deprovisioning them.
Runs indefinitely until Ctrl-C.
Optional --s3: starts a local MinIO server and configures the hub to use it
as the de/hydration backend.
Requirements:
pip install boto3 (only needed with --s3)
"""
from __future__ import annotations
import argparse
import json
import os
import queue
import random
import subprocess
import sys
import threading
import time
import urllib.error
import urllib.request
import webbrowser
from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
_MINIO_USER = "minioadmin"
_MINIO_PASS = "minioadmin"
_NAMESPACE = "loadtest"
_BUCKET = "bucket"
# Key sizes to use for activity writes (bytes) - biased toward smaller.
# Blobs are pre-generated at startup; one per size, shared across all requests.
_KEY_SIZES = [512, 512, 2048, 2048, 8192, 32768]
_BLOBS: list[bytes] = [] # populated by _init_blobs()
def _init_blobs() -> None:
seen: set[int] = set()
for size in _KEY_SIZES:
if size not in seen:
seen.add(size)
_BLOBS.append(os.urandom(size))
else:
# reuse the already-generated blob for this size
_BLOBS.append(next(b for b in _BLOBS if len(b) == size))
# ---------------------------------------------------------------------------
# Executable discovery
# ---------------------------------------------------------------------------
def _find_zenserver(override: Optional[str]) -> Path:
if override:
p = Path(override) / f"zenserver{_EXE_SUFFIX}"
if not p.exists():
sys.exit(f"zenserver not found at {p}")
return p
script_dir = Path(__file__).resolve().parent
repo_root = script_dir.parent.parent
candidates = [
repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
]
for c in candidates:
if c.exists():
return c
matches = list(repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}"))
if matches:
return max(matches, key=lambda p: p.stat().st_mtime)
sys.exit(
"zenserver executable not found in build/. "
"Run: xmake config -y -m release -a x64 && xmake -y\n"
"Or pass --zenserver-dir
."
)
def _find_minio(zenserver_path: Path) -> Path:
p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
if not p.exists():
sys.exit(
f"minio executable not found at {p}. "
"Build with: xmake config -y -m release -a x64 && xmake -y"
)
return p
# ---------------------------------------------------------------------------
# MinIO
# ---------------------------------------------------------------------------
def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
minio_data = data_dir / "minio"
minio_data.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["MINIO_ROOT_USER"] = _MINIO_USER
env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
popen_kwargs: dict = {}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = subprocess.Popen(
[str(minio_exe), "server", str(minio_data),
"--address", f":{port}",
"--console-address", f":{console_port}",
"--quiet"],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
**popen_kwargs,
)
print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
return proc
def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
deadline = time.monotonic() + timeout_s
url = f"http://localhost:{port}/minio/health/live"
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=1):
print("[minio] ready")
return
except Exception:
time.sleep(0.1)
sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
def _create_minio_bucket(port: int, bucket: str) -> None:
try:
import boto3
import botocore.config
import botocore.exceptions
except ImportError:
sys.exit(
"[minio] boto3 is required for --s3.\n"
"Install it with: pip install boto3"
)
s3 = boto3.client(
"s3",
endpoint_url=f"http://localhost:{port}",
aws_access_key_id=_MINIO_USER,
aws_secret_access_key=_MINIO_PASS,
region_name="us-east-1",
config=botocore.config.Config(signature_version="s3v4"),
)
try:
s3.create_bucket(Bucket=bucket)
print(f"[minio] created bucket '{bucket}'")
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
print(f"[minio] bucket '{bucket}' already exists")
else:
raise
# ---------------------------------------------------------------------------
# Hub lifecycle
# ---------------------------------------------------------------------------
def _start_hub(
zenserver_exe: Path,
data_dir: Path,
port: int,
log_file: Path,
idle_timeout: int,
extra_args: list[str],
extra_env: Optional[dict[str, str]],
) -> tuple[subprocess.Popen, object]:
data_dir.mkdir(parents=True, exist_ok=True)
cmd = [
str(zenserver_exe),
"hub",
f"--data-dir={data_dir}",
f"--port={port}",
"--hub-instance-http-threads=8",
"--hub-instance-corelimit=4",
"--hub-provision-disk-limit-percent=99",
"--hub-provision-memory-limit-percent=80",
"--hub-instance-limit=500",
f"--hub-watchdog-provisioned-inactivity-timeout-seconds={idle_timeout}",
"--hub-watchdog-inactivity-check-margin-seconds=5",
"--hub-watchdog-cycle-interval-ms=2000",
"--hub-watchdog-cycle-processing-budget-ms=3000",
"--hub-watchdog-activity-check-connect-timeout-ms=20",
"--hub-watchdog-activity-check-request-timeout-ms=50",
] + extra_args
env = os.environ.copy()
if extra_env:
env.update(extra_env)
popen_kwargs: dict = {}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
log_handle = log_file.open("wb")
try:
proc = subprocess.Popen(
cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
**popen_kwargs,
)
except Exception:
log_handle.close()
raise
print(f"[hub] started (pid {proc.pid}), log: {log_file}")
return proc, log_handle
def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
deadline = time.monotonic() + timeout_s
req = urllib.request.Request(f"http://localhost:{port}/hub/status",
headers={"Accept": "application/json"})
while time.monotonic() < deadline:
if proc.poll() is not None:
sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
f"is another zenserver already running on port {port}?")
try:
with urllib.request.urlopen(req, timeout=2):
print("[hub] ready")
return
except Exception:
time.sleep(0.2)
sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
if proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=timeout_s)
except subprocess.TimeoutExpired:
print(f"[{name}] did not exit after {timeout_s}s, killing")
proc.kill()
proc.wait()
# ---------------------------------------------------------------------------
# Module state
# ---------------------------------------------------------------------------
@dataclass
class ModuleState:
state: str = "idle" # idle|provisioning|active|deprovisioning|watchdog-pending
base_uri: Optional[str] = None
written_keys: list[str] = field(default_factory=list)
activity_rounds_left: int = 0
explicit_deprovision: bool = False
watchdog_pending_since: float = 0.0
generation: int = 0 # incremented on each provision; queue entries carry this to discard stale entries
# ---------------------------------------------------------------------------
# Counters
# ---------------------------------------------------------------------------
@dataclass
class Counters:
provisions: int = 0
deprovisions_explicit: int = 0
deprovisions_watchdog: int = 0
provision_rejected: int = 0
activity_rounds: int = 0
writes: int = 0
reads: int = 0
errors: int = 0
last_reject_time: float = 0.0
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)
def inc(self, name: str, n: int = 1) -> None:
with self._lock:
setattr(self, name, getattr(self, name) + n)
def record_reject(self) -> None:
with self._lock:
self.provision_rejected += 1
self.last_reject_time = time.monotonic()
def snapshot(self) -> dict:
with self._lock:
return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
# ---------------------------------------------------------------------------
# Hub API helpers (urllib, no external deps)
# ---------------------------------------------------------------------------
def _hub_post(port: int, path: str, timeout_s: float = 30.0) -> tuple[int, dict]:
url = f"http://localhost:{port}{path}"
req = urllib.request.Request(url, data=b"{}", method="POST",
headers={"Content-Type": "application/json",
"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
body = json.loads(resp.read())
return resp.status, body
except urllib.error.HTTPError as e:
try:
body = json.loads(e.read())
except Exception:
body = {}
return e.code, body
except Exception:
return 0, {}
def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
try:
req = urllib.request.Request(f"http://localhost:{port}/hub/status",
headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
data = json.loads(resp.read())
return data.get("modules", [])
except Exception:
return None
def _instance_put(base_uri: str, key: str, data: bytes, timeout_s: float = 10.0) -> int:
url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
req = urllib.request.Request(url, data=data, method="PUT",
headers={"Content-Type": "application/octet-stream",
"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
return resp.status
except urllib.error.HTTPError as e:
return e.code
except Exception:
return 0
def _instance_get(base_uri: str, key: str, timeout_s: float = 10.0) -> int:
url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
req = urllib.request.Request(url, headers={"Accept": "application/octet-stream"})
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
resp.read()
return resp.status
except urllib.error.HTTPError as e:
return e.code
except Exception:
return 0
# ---------------------------------------------------------------------------
# Worker tasks (run in thread pool - no sleeping)
# ---------------------------------------------------------------------------
def _task_provision(
module_id: str,
port: int,
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
activity_queue: queue.PriorityQueue,
counters: Counters,
explicit_deprovision_rate: float,
stop_event: threading.Event,
) -> None:
status, body = _hub_post(port, f"/hub/modules/{module_id}/provision")
generation = 0
schedule_burst = False
with state_lock:
mod = state_map[module_id]
if mod.state != "provisioning":
# Shutdown changed state while provision was in-flight; leave it alone
return
if status in (200, 202):
instance_port = body.get("port")
base_uri = f"http://localhost:{instance_port}" if instance_port else None
if status == 200:
# Instance is immediately ready
mod.generation += 1
generation = mod.generation
mod.state = "active"
mod.base_uri = base_uri
mod.written_keys = []
mod.activity_rounds_left = random.randint(5, 20)
mod.explicit_deprovision = random.random() < explicit_deprovision_rate
schedule_burst = not stop_event.is_set()
else:
# 202: async provision - instance is still starting up.
# Stay in "provisioning"; the hub status poll activates it when ready.
# _shutdown_deprovision_all also covers "provisioning", so shutdown is safe.
mod.state = "provisioning"
mod.base_uri = base_uri
elif status == 409:
mod.state = "idle"
counters.record_reject()
return
else:
mod.state = "idle"
counters.inc("errors")
return
counters.inc("provisions")
if schedule_burst:
activity_queue.put((time.monotonic() + random.uniform(0.1, 0.3), generation, module_id))
def _task_deprovision(
module_id: str,
port: int,
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
counters: Counters,
retries: int = 5,
) -> None:
succeeded = False
for attempt in range(retries + 1):
status, _ = _hub_post(port, f"/hub/modules/{module_id}/deprovision")
if status in (200, 202, 404):
succeeded = True
break
if status == 409 and attempt < retries:
time.sleep(0.2)
continue
counters.inc("errors")
break
with state_lock:
state_map[module_id].state = "idle"
state_map[module_id].base_uri = None
state_map[module_id].written_keys = []
if succeeded:
counters.inc("deprovisions_explicit")
def _task_activity_burst(
module_id: str,
generation: int,
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
activity_queue: queue.PriorityQueue,
counters: Counters,
pool: ThreadPoolExecutor,
port: int,
) -> None:
with state_lock:
mod = state_map[module_id]
if mod.state != "active" or mod.generation != generation:
return
base_uri = mod.base_uri
existing_keys = list(mod.written_keys)
if not base_uri:
with state_lock:
state_map[module_id].state = "idle"
return
# Write 3-8 new keys per burst
num_writes = random.randint(3, 8)
new_keys: list[str] = []
write_errors = 0
for _ in range(num_writes):
key = f"{random.getrandbits(160):040x}"
data = random.choice(_BLOBS)
status = _instance_put(base_uri, key, data)
if status in (200, 201, 204):
new_keys.append(key)
counters.inc("writes")
else:
write_errors += 1
counters.inc("errors")
if write_errors > 0 and not new_keys and not existing_keys:
# Instance unreachable - likely watchdog fired while we were scheduled
with state_lock:
mod = state_map[module_id]
if mod.generation == generation:
mod.state = "idle"
mod.base_uri = None
mod.written_keys = []
return
# Read back 1-3 random keys from all known keys
all_keys = existing_keys + new_keys
num_reads = min(random.randint(2, 5), len(all_keys))
for key in random.sample(all_keys, num_reads):
status = _instance_get(base_uri, key)
if status == 200:
counters.inc("reads")
elif status in (404, 0):
# Key may not exist yet or instance gone; not fatal
pass
else:
counters.inc("errors")
counters.inc("activity_rounds")
next_generation = 0
with state_lock:
mod = state_map[module_id]
if mod.state != "active" or mod.generation != generation:
return
mod.written_keys = (existing_keys + new_keys)[-200:] # cap list size
mod.activity_rounds_left -= 1
if mod.activity_rounds_left <= 0:
if mod.explicit_deprovision:
mod.state = "deprovisioning"
mod.base_uri = None
try:
pool.submit(
_task_deprovision,
module_id, port, state_map, state_lock, counters,
)
except RuntimeError:
pass # pool shutting down; hub watchdog will clean up
else:
mod.state = "watchdog-pending"
mod.watchdog_pending_since = time.monotonic()
return
next_generation = mod.generation # capture under lock before releasing
# Schedule next burst soon for semi-continuous activity
next_time = time.monotonic() + random.uniform(0.2, 0.8)
activity_queue.put((next_time, next_generation, module_id))
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
def _orchestrate(
port: int,
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
activity_queue: queue.PriorityQueue,
counters: Counters,
pool: ThreadPoolExecutor,
stop_event: threading.Event,
target_active: int,
explicit_deprovision_rate: float,
idle_timeout: int,
) -> None:
last_status_poll = 0.0
provision_backoff = False
while not stop_event.is_set():
now = time.monotonic()
# Drain activity queue for due bursts
while True:
try:
next_time, generation, module_id = activity_queue.get_nowait()
except queue.Empty:
break
if next_time <= now:
try:
pool.submit(
_task_activity_burst,
module_id, generation, state_map, state_lock,
activity_queue, counters, pool, port,
)
except RuntimeError:
break # pool shutting down
else:
activity_queue.put((next_time, generation, module_id))
break
# Activate/deactivate backoff based on recent 409 rejections
last_reject = counters.last_reject_time
if last_reject > 0 and now - last_reject < 5.0:
provision_backoff = True
elif provision_backoff and now - last_reject >= 5.0:
provision_backoff = False
# Count states and submit provision tasks if needed
if not provision_backoff:
with state_lock:
idle_ids = [mid for mid, m in state_map.items() if m.state == "idle"]
working_count = sum(
1 for m in state_map.values()
if m.state in ("active", "provisioning", "deprovisioning")
)
watchdog_count = sum(
1 for m in state_map.values()
if m.state == "watchdog-pending"
)
# Provision enough to keep working_count at target, but cap total
# in-flight (working + watchdog-pending) at 2x target to prevent
# runaway accumulation when all modules cycle to watchdog-pending.
inflight_cap = target_active * 2
deficit = min(
target_active - working_count,
inflight_cap - working_count - watchdog_count,
)
to_provision = idle_ids[:max(0, deficit)]
for mid in to_provision:
with state_lock:
if state_map[mid].state != "idle":
continue
state_map[mid].state = "provisioning"
try:
pool.submit(
_task_provision,
mid, port, state_map, state_lock,
activity_queue, counters, explicit_deprovision_rate,
stop_event,
)
except RuntimeError:
with state_lock:
state_map[mid].state = "idle"
# Poll hub status to detect async provision completions and watchdog-fired modules
if now - last_status_poll >= 5.0:
last_status_poll = now
modules_status = _hub_status(port)
if modules_status is not None:
hub_ids = {m["moduleId"]: m.get("state", "") for m in modules_status}
# Hub watchdog fires at ~idle_timeout from last activity. However, if the
# watchdog's previous visit predates the last burst, it sees a changed
# activity sum and resets LastActivityTime, delaying deprovision by ~173s.
# Explicitly deprovision after idle_timeout + 15s to avoid waiting for that.
timeout_threshold = idle_timeout + 15
to_deprovision_explicitly: list[str] = []
with state_lock:
for mid, mod in state_map.items():
if mod.state == "provisioning" and mod.base_uri is not None:
# Waiting for async (202) provision to complete
hub_state = hub_ids.get(mid, "")
if hub_state == "provisioned":
mod.generation += 1
mod.state = "active"
mod.written_keys = []
mod.activity_rounds_left = random.randint(5, 20)
mod.explicit_deprovision = random.random() < explicit_deprovision_rate
activity_queue.put(
(time.monotonic() + random.uniform(0.1, 0.3), mod.generation, mid)
)
elif mid not in hub_ids:
# Provision failed or was rolled back
mod.state = "idle"
mod.base_uri = None
elif mod.state == "watchdog-pending":
hub_state = hub_ids.get(mid, "")
gone = mid not in hub_ids
timed_out = (now - mod.watchdog_pending_since) > timeout_threshold
if gone or hub_state in ("unprovisioned", "deprovisioning"):
mod.state = "idle"
mod.base_uri = None
mod.written_keys = []
counters.inc("deprovisions_watchdog")
elif timed_out:
mod.state = "deprovisioning"
to_deprovision_explicitly.append(mid)
for mid in to_deprovision_explicitly:
try:
pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
except RuntimeError:
with state_lock:
if state_map[mid].state == "deprovisioning":
state_map[mid].state = "idle"
stop_event.wait(timeout=0.05)
# ---------------------------------------------------------------------------
# Stats display
# ---------------------------------------------------------------------------
def _stats_thread(
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
counters: Counters,
stop_event: threading.Event,
interval_s: float = 5.0,
) -> None:
is_tty = sys.stdout.isatty()
prev_lines = 0
t0 = time.monotonic()
prev_snap: Optional[dict] = None
prev_t = t0
while not stop_event.is_set():
stop_event.wait(timeout=interval_s)
now = time.monotonic()
elapsed = now - t0
dt = now - prev_t
snap = counters.snapshot()
with state_lock:
states: dict[str, int] = {}
for m in state_map.values():
states[m.state] = states.get(m.state, 0) + 1
def rate(key: str) -> float:
if prev_snap is None or dt <= 0:
return 0.0
return (snap[key] - prev_snap[key]) / dt * 60.0
lines = [
f"[{time.strftime('%H:%M:%S')}] elapsed={elapsed:.0f}s",
f" modules: idle={states.get('idle', 0)} "
f"provisioning={states.get('provisioning', 0)} "
f"active={states.get('active', 0)} "
f"watchdog-pending={states.get('watchdog-pending', 0)} "
f"deprovisioning={states.get('deprovisioning', 0)}",
f" totals: provisions={snap['provisions']} "
f"deprov-explicit={snap['deprovisions_explicit']} "
f"deprov-watchdog={snap['deprovisions_watchdog']} "
f"rejected={snap['provision_rejected']} "
f"errors={snap['errors']}",
f" data: writes={snap['writes']} reads={snap['reads']} "
f"rounds={snap['activity_rounds']}",
f" rates/min: provisions={rate('provisions'):.1f} "
f"deprov={rate('deprovisions_explicit') + rate('deprovisions_watchdog'):.1f} "
f"writes={rate('writes'):.1f} reads={rate('reads'):.1f}",
]
if is_tty and prev_lines > 0:
# Move cursor up to overwrite previous block
sys.stdout.write(f"\033[{prev_lines}A\033[J")
sys.stdout.write("\n".join(lines) + "\n")
sys.stdout.flush()
prev_lines = len(lines)
prev_snap = snap
prev_t = now
# ---------------------------------------------------------------------------
# Shutdown
# ---------------------------------------------------------------------------
def _wait_for_hub_idle(port: int, hub_proc: subprocess.Popen, timeout_s: float = 120.0) -> None:
"""Wait until the hub reports no transitioning instances (dehydration done)."""
_STABLE = {"provisioned", "hibernated", "crashed", "unprovisioned"}
print(f"[shutdown] waiting for hub dehydration (up to {timeout_s:.0f}s)...")
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if hub_proc.poll() is not None:
print("[shutdown] hub process has exited")
return
modules = _hub_status(port, timeout_s=5.0)
if modules is None:
# Hub not responding. If it has exited, we're done. If it's still alive
# it may be saturated with S3 uploads - keep waiting rather than assuming done.
if hub_proc.poll() is not None:
print("[shutdown] hub process has exited")
return
time.sleep(1.0)
continue
transitioning = [m for m in modules if m.get("state") not in _STABLE]
remaining = len(modules)
if not transitioning:
if remaining:
print(f"[shutdown] hub idle ({remaining} instances in stable state)")
else:
print("[shutdown] hub idle (no instances remaining)")
return
print(f"[shutdown] {len(transitioning)} instance(s) still dehydrating...", end="\r")
time.sleep(1.0)
print(f"\n[shutdown] WARNING: hub did not become idle within {timeout_s:.0f}s")
def _shutdown_deprovision_all(
port: int,
state_map: dict[str, ModuleState],
state_lock: threading.Lock,
counters: Counters,
workers: int,
timeout_s: float = 60.0,
) -> None:
with state_lock:
to_deprovision = [
mid for mid, m in state_map.items()
if m.state in ("active", "watchdog-pending", "provisioning")
]
for mid in to_deprovision:
state_map[mid].state = "deprovisioning"
if not to_deprovision:
return
print(f"\n[shutdown] deprovisioning {len(to_deprovision)} active modules...")
pool = ThreadPoolExecutor(max_workers=min(workers, len(to_deprovision)))
futures: list[Future] = [
pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
for mid in to_deprovision
]
pool.shutdown(wait=False)
done_set, not_done_set = futures_wait(futures, timeout=timeout_s)
if not_done_set:
print(f"[shutdown] WARNING: {len(not_done_set)} deprovision tasks did not complete within {timeout_s}s")
else:
print(f"[shutdown] all modules deprovisioned")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest",
help="Hub --data-dir (default: E:/Dev/hub-loadtest)")
parser.add_argument("--port", type=int, default=8558,
help="Hub HTTP port (default: 8558)")
parser.add_argument("--workers", type=int, default=50,
help="Thread pool size for HTTP calls (default: 50)")
parser.add_argument("--active-modules", type=int, default=100,
help="Target number of concurrently provisioned modules (default: 100)")
parser.add_argument("--module-count", type=int, default=1000,
help="Total predefined module name pool size (default: 1000)")
parser.add_argument("--idle-timeout", type=int, default=90,
help="Hub watchdog inactivity timeout in seconds (default: 90)")
parser.add_argument("--explicit-deprovision-rate", type=float, default=0.3,
help="Fraction of modules explicitly deprovisioned (default: 0.3)")
parser.add_argument("--zenserver-dir",
help="Directory containing zenserver executable (auto-detected by default)")
parser.add_argument("--s3", action="store_true",
help="Use local MinIO for hub de/hydration instead of filesystem")
parser.add_argument("--minio-port", type=int, default=9000,
help="MinIO S3 API port (default: 9000)")
parser.add_argument("--minio-console-port", type=int, default=9001,
help="MinIO web console port (default: 9001)")
parser.add_argument("--s3-bucket", default="zen-load-test",
help="S3 bucket name for MinIO hydration (default: zen-load-test)")
parser.add_argument("--no-browser", action="store_true",
help="Skip opening browser tabs")
args = parser.parse_args()
if args.active_modules > args.module_count:
sys.exit(
f"--active-modules ({args.active_modules}) must not exceed "
f"--module-count ({args.module_count})"
)
_init_blobs()
data_dir = Path(args.data_dir)
hub_log = data_dir / "hub.log"
zenserver_exe = _find_zenserver(args.zenserver_dir)
print(f"[setup] zenserver: {zenserver_exe}")
module_names = [f"load-test-module-{i:04d}" for i in range(args.module_count)]
state_map: dict[str, ModuleState] = {mid: ModuleState() for mid in module_names}
state_lock = threading.Lock()
activity_queue: queue.PriorityQueue = queue.PriorityQueue()
counters = Counters()
stop_event = threading.Event()
minio_proc: Optional[subprocess.Popen] = None
hub_proc: Optional[subprocess.Popen] = None
hub_log_handle = None
hub_extra_args: list[str] = []
hub_extra_env: Optional[dict[str, str]] = None
try:
if args.s3:
minio_exe = _find_minio(zenserver_exe)
minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
_wait_for_minio(args.minio_port)
_create_minio_bucket(args.minio_port, args.s3_bucket)
if not args.no_browser:
webbrowser.open(f"http://localhost:{args.minio_console_port}")
config_json = {
"type": "s3",
"settings": {
"uri": f"s3://{args.s3_bucket}",
"endpoint": f"http://localhost:{args.minio_port}",
"path-style": True,
"region": "us-east-1",
},
}
data_dir.mkdir(parents=True, exist_ok=True)
config_path = data_dir / "hydration_config.json"
config_path.write_text(json.dumps(config_json), encoding="ascii")
hub_extra_args = [f"--hub-hydration-target-config={config_path}"]
hub_extra_env = {
"AWS_ACCESS_KEY_ID": _MINIO_USER,
"AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
}
hub_proc, hub_log_handle = _start_hub(
zenserver_exe, data_dir, args.port,
hub_log, args.idle_timeout,
hub_extra_args, hub_extra_env,
)
_wait_for_hub(hub_proc, args.port)
if not args.no_browser:
webbrowser.open(f"http://localhost:{args.port}/dashboard")
print(f"[load-test] starting: pool={args.module_count} modules, "
f"target={args.active_modules} active, "
f"idle-timeout={args.idle_timeout}s, "
f"explicit-deprovision={args.explicit_deprovision_rate:.0%}")
print("[load-test] press Ctrl-C to stop")
with ThreadPoolExecutor(max_workers=args.workers) as pool:
stats_t = threading.Thread(
target=_stats_thread,
args=(state_map, state_lock, counters, stop_event),
daemon=True,
)
stats_t.start()
orch_t = threading.Thread(
target=_orchestrate,
args=(
args.port, state_map, state_lock,
activity_queue, counters, pool,
stop_event, args.active_modules,
args.explicit_deprovision_rate, args.idle_timeout,
),
daemon=True,
)
orch_t.start()
try:
while not stop_event.is_set():
time.sleep(0.5)
if hub_proc.poll() is not None:
print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
break
except KeyboardInterrupt:
print("\n[load-test] Ctrl-C received, shutting down...")
stop_event.set()
orch_t.join(timeout=3.0)
stats_t.join(timeout=3.0)
_shutdown_deprovision_all(
args.port, state_map, state_lock, counters,
args.workers, timeout_s=60.0,
)
_wait_for_hub_idle(args.port, hub_proc, timeout_s=max(120.0, args.active_modules * 1.0))
_stop_process(hub_proc, "hub", timeout_s=120.0)
if hub_log_handle is not None:
hub_log_handle.close()
hub_log_handle = None
if minio_proc is not None:
_stop_process(minio_proc, "minio")
minio_proc = None
finally:
# Safety net: only reached if an exception occurred before normal shutdown
if hub_proc is not None and hub_proc.poll() is None:
_stop_process(hub_proc, "hub")
if hub_log_handle is not None:
hub_log_handle.close()
if minio_proc is not None:
_stop_process(minio_proc, "minio")
if __name__ == "__main__":
main()