#!/usr/bin/env python3 """Stage C of the perf-seed workflow: run a perf iteration against the preserved MinIO state. Flow per invocation: 1. Reset --minio-run by copying --minio-seeded over it (so every iteration starts from the same canonical state). 2. Start MinIO against --minio-run. 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so the perf run is read-only and the seeded baseline survives intact. Pass --enable-dehydration to run a full provision -> deprovision cycle that re-uploads at deprovision time (use this to measure the dehydrate phase as well as hydrate; the seeded baseline diverges after such a run). 4. Provision every module found under --snapshot-dir (same set that was used to seed MinIO), wait for all to reach 'provisioned'. 5. Deprovision them all, wait for them to be gone. 6. Stop hub + MinIO cleanly. Optional --trace writes a utrace file to --hub-data-dir/hub.utrace. This script reuses the lightweight harness helpers from the other stages but keeps the run fully offline once the seed is preserved. """ from __future__ import annotations import argparse import json import os import shutil import signal import subprocess import sys import time import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Optional _EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" _MINIO_USER = "minioadmin" _MINIO_PASS = "minioadmin" def _rmtree_robust(path) -> None: """shutil.rmtree with a Windows-friendly retry for read-only files.""" import os as _os import stat as _stat def _onerror(func, p, exc_info): try: _os.chmod(p, _stat.S_IWRITE) func(p) except Exception: pass # onexc was introduced in py3.12; fall back to onerror on older versions. if sys.version_info >= (3, 12): shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) else: shutil.rmtree(path, onerror=_onerror) def _find_zenserver(override: Optional[str]) -> Path: if override: p = Path(override) / f"zenserver{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent repo_root = script_dir.parent.parent for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" if p.exists(): return p sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") def _find_zen(zenserver_exe: Path) -> Path: p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zen CLI not found at {p}") return p def _find_minio(zenserver_path: Path) -> Path: p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"minio executable not found at {p}") return p def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: data_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["MINIO_ROOT_USER"] = _MINIO_USER env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen( [str(minio_exe), "server", str(data_dir), "--address", f":{port}", "--console-address", f":{console_port}", "--quiet"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, ) print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}") return proc def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): print("[minio] ready") return except Exception: time.sleep(0.1) sys.exit(f"[minio] timed out after {timeout_s}s") def _start_hub( zenserver_exe: Path, data_dir: Path, port: int, log_file: Path, extra_args: list[str], extra_env: dict[str, str], ) -> tuple[subprocess.Popen, object]: data_dir.mkdir(parents=True, exist_ok=True) # Use the production-like Lua config (limits, thread counts, watchdog timers) # so perf numbers reflect what prod actually runs. Hub.hydration.threads is # left unset so the default auto-pick (hardware_concurrency/4) fires; combined # with --corelimit=128 below, that yields 32 hydration threads on the actual # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes. config_dir = Path(__file__).resolve().parent / "perf_configs" hub_config = config_dir / "hub.lua" inst_config = config_dir / "instance.lua" cmd = [ str(zenserver_exe), "hub", "--enable-execution-history=false", f"--data-dir={data_dir}", f"--port={port}", f"--config={hub_config}", f"--hub-instance-config={inst_config}", # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box # with fewer physical cores, this clamps to the actual count (see # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates). # When running on the actual prod instance class, this makes the default # auto-picks (provision/hydration thread counts) behave as prod does. "--corelimit=128", ] + extra_args env = os.environ.copy() env.update(extra_env) popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP log_handle = log_file.open("wb") try: proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs) except Exception: log_handle.close() raise print(f"[hub] started (pid {proc.pid}), log: {log_file}") return proc, log_handle def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") try: with urllib.request.urlopen(req, timeout=2): print("[hub] ready") return except Exception: time.sleep(0.2) sys.exit(f"[hub] timed out after {timeout_s}s") def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: """'zen down --pid' for graceful hub shutdown.""" if hub_proc.poll() is not None: return pid = hub_proc.pid print(f"[hub] zen down --pid {pid}") rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) if rc != 0: print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") try: hub_proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[hub] did not exit after {timeout_s}s, killing") hub_proc.kill() hub_proc.wait() def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: if proc.poll() is not None: return try: if sys.platform == "win32": proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.terminate() except Exception: proc.terminate() try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[minio] did not exit after {timeout_s}s, killing") proc.kill() proc.wait() def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: url = f"http://localhost:{port}{path}" req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: try: body = json.loads(resp.read()) except Exception: body = {} return resp.status, body except urllib.error.HTTPError as e: try: body = json.loads(e.read()) except Exception: body = {} return e.code, body except Exception as e: return 0, {"error": str(e)} def _hub_states(port: int) -> Optional[dict[str, str]]: # Without Accept: application/json the hub serves compact binary (CBOR) # which json.loads can't parse. 60s timeout - /hub/status can take a few # seconds to serialize with 300+ modules in flight. req = urllib.request.Request( f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=60) as resp: data = json.loads(resp.read()) except Exception as e: # Visibility: prior silent None caused "0/300 provisioned..." forever. print(f"[status] WARN /hub/status failed: {e}") return None return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} def _fan_out_post( pool: ThreadPoolExecutor, port: int, module_ids: list[str], verb: str, ) -> tuple[list[str], list[tuple[str, int, dict]]]: futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids} accepted: list[str] = [] failures: list[tuple[str, int, dict]] = [] for mid, fut in futures.items(): status, body = fut.result() if status in (200, 202): accepted.append(mid) elif verb == "deprovision" and status == 404: accepted.append(mid) else: failures.append((mid, status, body)) return accepted, failures def _state_histogram(states: dict[str, str], remaining: set[str]) -> str: counts: dict[str, int] = {} for mid in remaining: s = states.get(mid, "") counts[s] = counts.get(s, 0) + 1 return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1])) def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]: deadline = time.monotonic() + timeout_s remaining = set(ids) last_verbose = 0.0 while remaining and time.monotonic() < deadline: states = _hub_states(port) if states is not None: for mid in list(remaining): if states.get(mid) == "provisioned": remaining.discard(mid) done = len(ids) - len(remaining) now = time.monotonic() if states is not None and (now - last_verbose >= 10.0 or not remaining): hist = _state_histogram(states, remaining) if remaining else "(all provisioned)" print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}") last_verbose = now else: print(f"[provision] {done}/{len(ids)} provisioned...", end="\r") time.sleep(2.0) print() return list(remaining) def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: deadline = time.monotonic() + timeout_s remaining = set(ids) last_verbose = 0.0 while remaining and time.monotonic() < deadline: states = _hub_states(port) if states is not None: for mid in list(remaining): s = states.get(mid, "") if mid not in states or s == "unprovisioned": remaining.discard(mid) done = len(ids) - len(remaining) now = time.monotonic() if states is not None and (now - last_verbose >= 10.0 or not remaining): hist = _state_histogram(states, remaining) if remaining else "(all gone)" print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}") last_verbose = now else: print(f"[deprovision] {done}/{len(ids)} gone...", end="\r") time.sleep(2.0) print() return list(remaining) def _robust_copytree(src: Path, dst: Path) -> None: """Windows-friendly directory copy with progress. Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy file data + attributes + timestamps explicitly (not just the defaults). Safe because dst is always the working dir (minio-run) - never the preserved baseline. """ if sys.platform == "win32": cmd = [ "robocopy", str(src), str(dst), "/MIR", "/COPY:DAT", "/DCOPY:DAT", "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP", "/R:2", "/W:1", ] print(f"[reset] robocopy {src} -> {dst}") rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if rc >= 8: sys.exit(f"[reset] robocopy failed rc={rc}") else: if dst.exists(): _rmtree_robust(dst) shutil.copytree(src, dst, symlinks=False) def _archive_run( archive_dir: Path, hub_data_dir: Path, bucket: str, summary: dict, ) -> Optional[Path]: """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a timestamped subdir under archive_dir so successive runs can be compared. Returns the archive destination path or None if there was nothing to copy. """ try: ts = time.strftime("%Y%m%d-%H%M%S", time.localtime()) dest = archive_dir / f"{ts}_{bucket}" dest.mkdir(parents=True, exist_ok=True) copied: list[str] = [] # Hub stdout/stderr (captured by _start_hub via subprocess stdout=) hub_log = hub_data_dir / "hub.log" if hub_log.is_file(): shutil.copy2(hub_log, dest / "hub.log") copied.append("hub.log") # Structured zenserver logs (rotated variants included) logs_src = hub_data_dir / "logs" if logs_src.is_dir(): logs_dst = dest / "logs" logs_dst.mkdir(exist_ok=True) for p in logs_src.iterdir(): if p.is_file(): shutil.copy2(p, logs_dst / p.name) copied.append(f"logs/{p.name}") # Optional trace utrace = hub_data_dir / "hub.utrace" if utrace.is_file(): shutil.copy2(utrace, dest / "hub.utrace") copied.append("hub.utrace") (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii") print(f"[archive] {dest} ({len(copied)} files)") return dest except Exception as e: print(f"[archive] WARNING: failed to archive run: {e}") return None def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed", help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run", help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)") parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)") parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf", help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf") parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs", help="Where to archive hub.log + zenserver.log + hub.utrace after each run " "(default: E:/Dev/zen-perf-seed/perf-runs)") parser.add_argument("--bucket", default="zen-seed-packed", help="MinIO bucket name to exercise (default: zen-seed-packed). " "Pack worktree seeds only the packed bucket.") parser.add_argument("--minio-port", type=int, default=9000) parser.add_argument("--minio-console-port", type=int, default=9001) parser.add_argument("--hub-port", type=int, default=8558) parser.add_argument("--module-count", type=int, default=0, help="Cap on modules used (0 = all from snapshot-dir)") parser.add_argument("--workers", type=int, default=50) parser.add_argument("--poll-timeout", type=float, default=1200.0) parser.add_argument("--settle-seconds", type=float, default=5.0) parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS", help="Enable UE trace on the hub, writing to /hub.utrace (default channels: 'default')") parser.add_argument("--enable-dehydration", action="store_true", help="Run with --hub-enable-dehydration=true so the deprovision phase " "actually re-uploads to MinIO. Default false preserves the seeded " "baseline; pass this to measure dehydrate cost as well as hydrate.") parser.add_argument("--no-wipe-hub", action="store_true", help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") parser.add_argument("--zenserver-dir", help="Directory containing zenserver + minio executables (auto-detected)") args = parser.parse_args() minio_seeded = Path(args.minio_seeded).resolve() minio_run = Path(args.minio_run).resolve() snapshot_dir = Path(args.snapshot_dir).resolve() hub_data_dir = Path(args.hub_data_dir).resolve() archive_dir = Path(args.archive_dir).resolve() if not minio_seeded.is_dir(): sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}") if not snapshot_dir.is_dir(): sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}") # Preserved inputs are read-only from this script's perspective. Refuse to # run if any mutable path could accidentally clobber them. for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)): for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)): if mutable == ro or mutable in ro.parents or ro in mutable.parents: sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}") module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) if args.module_count > 0: module_ids = module_ids[:args.module_count] if not module_ids: sys.exit(f"[setup] no modules in {snapshot_dir}") zenserver_exe = _find_zenserver(args.zenserver_dir) zen_exe = _find_zen(zenserver_exe) minio_exe = _find_minio(zenserver_exe) zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") print(f"[setup] build mode: {zenserver_mode}") print(f"[setup] zenserver: {zenserver_exe}") print(f"[setup] minio: {minio_exe}") print(f"[setup] minio-seeded: {minio_seeded}") print(f"[setup] minio-run: {minio_run}") print(f"[setup] hub-data-dir: {hub_data_dir}") print(f"[setup] modules: {len(module_ids)}") # Reset MinIO run dir from the baseline. _robust_copytree(minio_seeded, minio_run) # Wipe the hub data dir so every run starts from scratch unless the user opts out. if not args.no_wipe_hub and hub_data_dir.exists(): print(f"[reset] wiping {hub_data_dir}") _rmtree_robust(hub_data_dir) hub_data_dir.mkdir(parents=True, exist_ok=True) config_path = hub_data_dir / "hydration_config.json" config_path.write_text( json.dumps({ "type": "s3", "settings": { "uri": f"s3://{args.bucket}", "endpoint": f"http://localhost:{args.minio_port}", "path-style": True, "region": "us-east-1", }, }), encoding="ascii", ) hub_extra_args = [ f"--hub-hydration-target-config={config_path}", f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", ] if args.enable_dehydration: print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") if args.trace: trace_path = hub_data_dir / "hub.utrace" hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"] print(f"[trace] enabled channels='{args.trace}', file={trace_path}") hub_extra_env = { "AWS_ACCESS_KEY_ID": _MINIO_USER, "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, } minio_proc: Optional[subprocess.Popen] = None hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None exit_code = 0 timings: dict = { "bucket": args.bucket, "module_count": len(module_ids), "build_mode": zenserver_mode, "provision_fanout_s": None, "provision_total_s": None, "deprovision_fanout_s": None, "deprovision_total_s": None, "total_s": None, } try: minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) hub_log = hub_data_dir / "hub.log" hub_proc, hub_log_handle = _start_hub( zenserver_exe, hub_data_dir, args.hub_port, hub_log, hub_extra_args, hub_extra_env, ) _wait_for_hub(hub_proc, args.hub_port) t_start = time.monotonic() with ThreadPoolExecutor(max_workers=args.workers) as pool: # --- Provision --- print(f"[provision] firing {len(module_ids)} provision requests...") t0 = time.monotonic() accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision") fanout_s = time.monotonic() - t0 timings["provision_fanout_s"] = round(fanout_s, 2) print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)") for mid, status, body in failures[:5]: print(f"[provision] FAIL {mid}: status={status} body={body}") if failures: exit_code = 1 stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout) total_s = time.monotonic() - t0 timings["provision_total_s"] = round(total_s, 2) print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})") if stuck: exit_code = 1 print(f"[settle] waiting {args.settle_seconds:.0f}s") time.sleep(args.settle_seconds) # --- Deprovision --- print(f"[deprovision] firing {len(accepted)} deprovision requests...") t0 = time.monotonic() dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision") fanout_s = time.monotonic() - t0 timings["deprovision_fanout_s"] = round(fanout_s, 2) print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)") if dp_failures: exit_code = 1 stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout) total_s = time.monotonic() - t0 timings["deprovision_total_s"] = round(total_s, 2) print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})") if stuck_dp: exit_code = 1 print(f"[settle] waiting {args.settle_seconds:.0f}s") time.sleep(args.settle_seconds) elapsed = time.monotonic() - t_start timings["total_s"] = round(elapsed, 2) print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}") finally: if hub_proc is not None and hub_proc.poll() is None: _zen_down_hub(zen_exe, hub_proc) if hub_log_handle is not None: hub_log_handle.close() if minio_proc is not None and minio_proc.poll() is None: _stop_minio_graceful(minio_proc) # Archive AFTER the hub has exited so in-flight log writes are flushed. timings["exit_code"] = exit_code _archive_run(archive_dir, hub_data_dir, args.bucket, timings) return exit_code if __name__ == "__main__": sys.exit(main())