#!/usr/bin/env python3 """Stage C of the perf-seed workflow: run a perf iteration against the preserved MinIO state. Flow per invocation: 1. Reset --minio-run by copying --minio-seeded over it (so every iteration starts from the same canonical state). 2. Start MinIO against --minio-run. 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so the perf run is read-only and the seeded baseline survives intact. Pass --enable-dehydration to run a full provision -> deprovision cycle that re-uploads at deprovision time (use this to measure the dehydrate phase as well as hydrate; the seeded baseline diverges after such a run). 4. Provision every module found under --snapshot-dir (same set that was used to seed MinIO), wait for all to reach 'provisioned'. 5. Deprovision them all, wait for them to be gone. 6. Stop hub + MinIO cleanly. Optional --trace writes a utrace file to --hub-data-dir/hub.utrace. This script reuses the lightweight harness helpers from the other stages but keeps the run fully offline once the seed is preserved. """ from __future__ import annotations import argparse import json import os import shutil import signal import subprocess import sys import time import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Optional _EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" _MINIO_USER = "minioadmin" _MINIO_PASS = "minioadmin" def _rmtree_robust(path) -> None: """shutil.rmtree with a Windows-friendly retry for read-only files.""" import os as _os import stat as _stat def _onerror(func, p, exc_info): try: _os.chmod(p, _stat.S_IWRITE) func(p) except Exception: pass # onexc was introduced in py3.12; fall back to onerror on older versions. if sys.version_info >= (3, 12): shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) else: shutil.rmtree(path, onerror=_onerror) def _find_zenserver(override: Optional[str]) -> Path: if override: p = Path(override) / f"zenserver{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent repo_root = script_dir.parents[2] for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" if p.exists(): return p sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") def _find_zen(zenserver_exe: Path) -> Path: p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zen CLI not found at {p}") return p def _find_minio(zenserver_path: Path) -> Path: p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"minio executable not found at {p}") return p def _resolve_toxiproxy_exe(arg_value: Optional[str], env_var: str, default_name: str) -> Path: """Resolve a toxiproxy executable: explicit --toxiproxy-* arg wins, then env var, then PATH lookup of `default_name`. Exits if none of the three resolve to a real file.""" candidate = arg_value or os.environ.get(env_var) if candidate: p = Path(candidate) if not p.exists(): sys.exit(f"[toxiproxy] {default_name}: explicit path '{p}' not found") return p found = shutil.which(default_name) if found: return Path(found) sys.exit(f"[toxiproxy] {default_name} not found: pass --toxiproxy-server-exe / --toxiproxy-cli-exe, " f"set {env_var}, or put {default_name} on PATH") def _start_toxiproxy(server_exe: Path, cli_exe: Path, api_port: int, listen_port: int, upstream_port: int, latency_ms: int, jitter_ms: int) -> subprocess.Popen: """Start toxiproxy-server, create a proxy minio_proxy listening on listen_port and forwarding to localhost:upstream_port, and add a latency toxic. Returns the server Popen so the caller can terminate it on cleanup.""" popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen( [str(server_exe), "-port", str(api_port)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, ) print(f"[toxiproxy] server started (pid {proc.pid}) api-port={api_port}") # Wait for the API to come up. api_url = f"http://localhost:{api_port}/version" deadline = time.monotonic() + 10.0 while time.monotonic() < deadline: try: with urllib.request.urlopen(api_url, timeout=1): break except Exception: time.sleep(0.1) else: proc.terminate() sys.exit(f"[toxiproxy] API at {api_url} never came up") env = os.environ.copy() env["TOXIPROXY_URL"] = f"http://localhost:{api_port}" # Create the proxy. subprocess.run( [str(cli_exe), "create", "-l", f"127.0.0.1:{listen_port}", "-u", f"127.0.0.1:{upstream_port}", "minio_proxy"], env=env, check=True, ) # Add latency toxic to both directions (default direction is downstream; # add explicit upstream too for symmetry). args_common = ["-t", "latency", "-a", f"latency={latency_ms}", "-a", f"jitter={jitter_ms}"] subprocess.run( [str(cli_exe), "toxic", "add", "-n", "lat_down", "-d", *args_common, "minio_proxy"], env=env, check=True, ) subprocess.run( [str(cli_exe), "toxic", "add", "-n", "lat_up", "-u", *args_common, "minio_proxy"], env=env, check=True, ) print(f"[toxiproxy] minio_proxy listening on 127.0.0.1:{listen_port} -> " f"127.0.0.1:{upstream_port}, latency={latency_ms}ms jitter={jitter_ms}ms (per direction)") return proc def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: data_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["MINIO_ROOT_USER"] = _MINIO_USER env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen( [str(minio_exe), "server", str(data_dir), "--address", f":{port}", "--console-address", f":{console_port}", "--quiet"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, ) print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}") return proc def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" t0 = time.monotonic() print(f"[minio] waiting for ready on port {port} ...", flush=True) while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): print(f"[minio] ready ({time.monotonic()-t0:.1f}s)", flush=True) return except Exception: time.sleep(0.1) sys.exit(f"[minio] timed out after {timeout_s}s") def _start_hub( zenserver_exe: Path, data_dir: Path, port: int, log_file: Path, extra_args: list[str], extra_env: dict[str, str], ) -> tuple[subprocess.Popen, object]: data_dir.mkdir(parents=True, exist_ok=True) # Use the production-like Lua config (limits, thread counts, watchdog timers) # so perf numbers reflect what prod actually runs. Hub.hydration.threads is # left unset so the default auto-pick (hardware_concurrency/4) fires; combined # with --corelimit=128 below, that yields 32 hydration threads on the actual # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes. config_dir = Path(__file__).resolve().parent / "perf_configs" hub_config = config_dir / "hub.lua" inst_config = config_dir / "instance.lua" cmd = [ str(zenserver_exe), "hub", "--enable-execution-history=false", f"--data-dir={data_dir}", f"--port={port}", f"--config={hub_config}", f"--hub-instance-config={inst_config}", # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box # with fewer physical cores, this clamps to the actual count (see # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates). # When running on the actual prod instance class, this makes the default # auto-picks (provision/hydration thread counts) behave as prod does. "--corelimit=128", ] + extra_args env = os.environ.copy() env.update(extra_env) popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP log_handle = log_file.open("wb") try: proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs) except Exception: log_handle.close() raise print(f"[hub] started (pid {proc.pid}), log: {log_file}") return proc, log_handle def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) t0 = time.monotonic() last_tick = t0 print(f"[hub] waiting for ready on port {port} ...", flush=True) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") try: with urllib.request.urlopen(req, timeout=2): print(f"[hub] ready ({time.monotonic()-t0:.1f}s)", flush=True) return except Exception: now = time.monotonic() if now - last_tick >= 5.0: print(f"[hub] still waiting ({now-t0:.1f}s elapsed)", flush=True) last_tick = now time.sleep(0.2) sys.exit(f"[hub] timed out after {timeout_s}s") def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: """'zen down --pid' for graceful hub shutdown.""" if hub_proc.poll() is not None: return pid = hub_proc.pid print(f"[hub] zen down --pid {pid} ...", flush=True) t0 = time.time() rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) if rc != 0: print(f"[hub] zen down returned rc={rc}; waiting for exit anyway", flush=True) try: hub_proc.wait(timeout=timeout_s) print(f"[hub] exited ({time.time()-t0:.1f}s)", flush=True) except subprocess.TimeoutExpired: print(f"[hub] did not exit after {timeout_s}s, killing", flush=True) hub_proc.kill() hub_proc.wait() def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: if proc.poll() is not None: return print(f"[minio] stopping (pid {proc.pid}) ...", flush=True) t0 = time.time() try: if sys.platform == "win32": proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.terminate() except Exception: proc.terminate() try: proc.wait(timeout=timeout_s) print(f"[minio] stopped ({time.time()-t0:.1f}s)", flush=True) except subprocess.TimeoutExpired: print(f"[minio] did not exit after {timeout_s}s, killing", flush=True) proc.kill() proc.wait() def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: url = f"http://localhost:{port}{path}" req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: try: body = json.loads(resp.read()) except Exception: body = {} return resp.status, body except urllib.error.HTTPError as e: try: body = json.loads(e.read()) except Exception: body = {} return e.code, body except Exception as e: return 0, {"error": str(e)} def _hub_states(port: int) -> Optional[dict[str, str]]: # Without Accept: application/json the hub serves compact binary (CBOR) # which json.loads can't parse. 60s timeout - /hub/status can take a few # seconds to serialize with 300+ modules in flight. req = urllib.request.Request( f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=60) as resp: data = json.loads(resp.read()) except Exception as e: # Visibility: prior silent None caused "0/300 provisioned..." forever. print(f"[status] WARN /hub/status failed: {e}") return None return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} def _fan_out_post( pool: ThreadPoolExecutor, port: int, module_ids: list[str], verb: str, ) -> tuple[list[str], list[tuple[str, int, dict]]]: futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids} accepted: list[str] = [] failures: list[tuple[str, int, dict]] = [] for mid, fut in futures.items(): status, body = fut.result() if status in (200, 202): accepted.append(mid) elif verb == "deprovision" and status == 404: accepted.append(mid) else: failures.append((mid, status, body)) return accepted, failures def _state_histogram(states: dict[str, str], remaining: set[str]) -> str: counts: dict[str, int] = {} for mid in remaining: s = states.get(mid, "") counts[s] = counts.get(s, 0) + 1 return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1])) def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]: deadline = time.monotonic() + timeout_s remaining = set(ids) last_verbose = 0.0 while remaining and time.monotonic() < deadline: states = _hub_states(port) if states is not None: for mid in list(remaining): if states.get(mid) == "provisioned": remaining.discard(mid) done = len(ids) - len(remaining) now = time.monotonic() if states is not None and (now - last_verbose >= 10.0 or not remaining): hist = _state_histogram(states, remaining) if remaining else "(all provisioned)" print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}") last_verbose = now else: print(f"[provision] {done}/{len(ids)} provisioned...", end="\r") time.sleep(2.0) print() return list(remaining) def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: deadline = time.monotonic() + timeout_s remaining = set(ids) last_verbose = 0.0 while remaining and time.monotonic() < deadline: states = _hub_states(port) if states is not None: for mid in list(remaining): s = states.get(mid, "") if mid not in states or s == "unprovisioned": remaining.discard(mid) done = len(ids) - len(remaining) now = time.monotonic() if states is not None and (now - last_verbose >= 10.0 or not remaining): hist = _state_histogram(states, remaining) if remaining else "(all gone)" print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}") last_verbose = now else: print(f"[deprovision] {done}/{len(ids)} gone...", end="\r") time.sleep(2.0) print() return list(remaining) def _robust_copytree(src: Path, dst: Path) -> None: """Directory copy of the seeded baseline onto the working MinIO data dir. On a ReFS volume we use `refs_clone.clone_tree` to issue FSCTL_DUPLICATE_EXTENTS_TO_FILE per file: O(1) per file via copy-on-write metadata, regardless of size. A 300+ GB baseline clones in seconds. Off ReFS we fall back to `shutil.copytree`. PowerShell `Copy-Item` was tried previously and observed to byte-copy on this host, so it is no longer used. Safe because dst is always the working dir (minio-run) - never the preserved baseline. """ from refs_clone import clone_tree, is_refs_volume if dst.exists(): _rmtree_robust(dst) if sys.platform == "win32" and is_refs_volume(src) and is_refs_volume(dst.parent): print(f"[reset] ReFS block-clone {src} -> {dst} ...", flush=True) t0 = time.time() files, bytes_total = clone_tree(src, dst) print(f"[reset] cloned {files:,} files, {bytes_total/1024/1024:.1f} MB " f"in {time.time()-t0:.1f}s", flush=True) else: print(f"[reset] copytree (non-ReFS) {src} -> {dst} ...", flush=True) t0 = time.time() shutil.copytree(src, dst, symlinks=False) print(f"[reset] copytree done in {time.time()-t0:.1f}s", flush=True) def _archive_run( archive_dir: Path, hub_data_dir: Path, bucket: str, summary: dict, ) -> Optional[Path]: """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a timestamped subdir under archive_dir so successive runs can be compared. Returns the archive destination path or None if there was nothing to copy. """ try: ts = time.strftime("%Y%m%d-%H%M%S", time.localtime()) dest = archive_dir / f"{ts}_{bucket}" dest.mkdir(parents=True, exist_ok=True) copied: list[str] = [] # Hub stdout/stderr (captured by _start_hub via subprocess stdout=) hub_log = hub_data_dir / "hub.log" if hub_log.is_file(): shutil.copy2(hub_log, dest / "hub.log") copied.append("hub.log") # Structured zenserver logs (rotated variants included) logs_src = hub_data_dir / "logs" if logs_src.is_dir(): logs_dst = dest / "logs" logs_dst.mkdir(exist_ok=True) for p in logs_src.iterdir(): if p.is_file(): shutil.copy2(p, logs_dst / p.name) copied.append(f"logs/{p.name}") # Optional trace utrace = hub_data_dir / "hub.utrace" if utrace.is_file(): shutil.copy2(utrace, dest / "hub.utrace") copied.append("hub.utrace") (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii") print(f"[archive] {dest} ({len(copied)} files)") return dest except Exception as e: print(f"[archive] WARNING: failed to archive run: {e}") return None def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--minio-seeded", required=True, help="Preserved MinIO baseline (read-only source the working dir is reset from each run).") parser.add_argument("--minio-run", required=True, help="Working MinIO data dir, wiped and re-copied from --minio-seeded each run.") parser.add_argument("--snapshot-dir", required=True, help="Source of module IDs to run against (per-module server-state trees from Stage A).") parser.add_argument("--hub-data-dir", required=True, help="Hub --data-dir for the perf run (wiped each run unless --no-wipe-hub). " "Place on a different volume from --minio-run to keep hub I/O from skewing MinIO measurements.") parser.add_argument("--archive-dir", required=True, help="Where to archive hub.log + zenserver.log + hub.utrace after each run.") parser.add_argument("--bucket", default="zen-seed-packed", help="MinIO bucket name to exercise (default: zen-seed-packed).") parser.add_argument("--minio-port", type=int, default=9000) parser.add_argument("--minio-console-port", type=int, default=9001) parser.add_argument("--hub-port", type=int, default=8558) parser.add_argument("--module-count", type=int, default=1000, help="Cap on modules used (default 1000, the hub instance shared-state " "limit; raising this above ~1023 will hit 'all slots occupied' errors). " "Pass 0 to use every module under --snapshot-dir but expect failures " "if the snapshot has more than ~1023 entries.") parser.add_argument("--workers", type=int, default=50) parser.add_argument("--poll-timeout", type=float, default=1200.0) parser.add_argument("--settle-seconds", type=float, default=5.0) parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS", help="Enable UE trace on the hub, writing to /hub.utrace (default channels: 'default')") parser.add_argument("--enable-dehydration", action="store_true", help="Run with --hub-enable-dehydration=true so the deprovision phase " "actually re-uploads to MinIO. Default false preserves the seeded " "baseline; pass this to measure dehydrate cost as well as hydrate.") parser.add_argument("--no-wipe-hub", action="store_true", help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") parser.add_argument("--blocking", action="store_true", help="Force blocking S3Client path (pass --hub-hydration-async-enabled=false). " "Default is the hub default, which routes S3 hydration through AsyncHttpClient.") parser.add_argument("--hub-arg", action="append", default=[], help="Extra raw arg appended verbatim to the hub command line. Repeatable.") parser.add_argument("--zenserver-dir", help="Directory containing zenserver + minio executables (auto-detected)") # Toxiproxy: simulate network latency between hub and MinIO. parser.add_argument("--toxiproxy-latency-ms", type=int, default=0, help="If >0, route hub through toxiproxy with this latency injected on inbound+outbound. " "Approximates real-network RTT on top of localhost MinIO.") parser.add_argument("--toxiproxy-jitter-ms", type=int, default=0, help="Jitter added to toxiproxy latency (passed as -a jitter=N to toxic add).") parser.add_argument("--toxiproxy-port", type=int, default=9100, help="Toxiproxy listen port for the MinIO proxy (the hub connects here when --toxiproxy-latency-ms > 0).") parser.add_argument("--toxiproxy-api-port", type=int, default=8474, help="Toxiproxy server API port.") parser.add_argument("--toxiproxy-server-exe", default=None, help="Path to toxiproxy-server executable. Defaults to ZEN_PERF_TOXIPROXY_SERVER_EXE " "env var, or 'toxiproxy-server' from PATH.") parser.add_argument("--toxiproxy-cli-exe", default=None, help="Path to toxiproxy-cli executable. Defaults to ZEN_PERF_TOXIPROXY_CLI_EXE " "env var, or 'toxiproxy-cli' from PATH.") args = parser.parse_args() minio_seeded = Path(args.minio_seeded).resolve() minio_run = Path(args.minio_run).resolve() snapshot_dir = Path(args.snapshot_dir).resolve() hub_data_dir = Path(args.hub_data_dir).resolve() archive_dir = Path(args.archive_dir).resolve() if not minio_seeded.is_dir(): sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}") if not snapshot_dir.is_dir(): sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}") # Preserved inputs are read-only from this script's perspective. Refuse to # run if any mutable path could accidentally clobber them. for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)): for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)): if mutable == ro or mutable in ro.parents or ro in mutable.parents: sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}") module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) if args.module_count > 0: module_ids = module_ids[:args.module_count] if not module_ids: sys.exit(f"[setup] no modules in {snapshot_dir}") zenserver_exe = _find_zenserver(args.zenserver_dir) zen_exe = _find_zen(zenserver_exe) minio_exe = _find_minio(zenserver_exe) zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") print(f"[setup] build mode: {zenserver_mode}") print(f"[setup] zenserver: {zenserver_exe}") print(f"[setup] minio: {minio_exe}") print(f"[setup] minio-seeded: {minio_seeded}") print(f"[setup] minio-run: {minio_run}") print(f"[setup] hub-data-dir: {hub_data_dir}") print(f"[setup] modules: {len(module_ids)}") # Reset MinIO run dir from the baseline. _robust_copytree(minio_seeded, minio_run) # Wipe the hub data dir so every run starts from scratch unless the user opts out. if not args.no_wipe_hub and hub_data_dir.exists(): print(f"[reset] wiping {hub_data_dir} ...", flush=True) t0 = time.time() _rmtree_robust(hub_data_dir) print(f"[reset] wipe done in {time.time()-t0:.1f}s", flush=True) hub_data_dir.mkdir(parents=True, exist_ok=True) # If toxiproxy is enabled, the hub points at the proxy port, which forwards # to the real MinIO with the configured latency injected. hub_endpoint_port = args.toxiproxy_port if args.toxiproxy_latency_ms > 0 else args.minio_port s3_settings = { "uri": f"s3://{args.bucket}", "endpoint": f"http://localhost:{hub_endpoint_port}", "path-style": True, "region": "us-east-1", } config_path = hub_data_dir / "hydration_config.json" config_path.write_text( json.dumps({"type": "s3", "settings": s3_settings}), encoding="ascii", ) hub_extra_args = [ f"--hub-hydration-target-config={config_path}", f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", ] if args.blocking: hub_extra_args.append("--hub-hydration-async-enabled=false") print("[mode] --blocking=true: hub will use blocking S3Client path") else: print("[mode] async path (default): hub routes S3 hydration through AsyncHttpClient") if args.hub_arg: hub_extra_args.extend(args.hub_arg) print(f"[mode] extra hub args: {args.hub_arg}") if args.enable_dehydration: print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") if args.trace: trace_path = hub_data_dir / "hub.utrace" hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"] print(f"[trace] enabled channels='{args.trace}', file={trace_path}") hub_extra_env = { "AWS_ACCESS_KEY_ID": _MINIO_USER, "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, } minio_proc: Optional[subprocess.Popen] = None toxiproxy_proc: Optional[subprocess.Popen] = None hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None exit_code = 0 timings: dict = { "bucket": args.bucket, "module_count": len(module_ids), "build_mode": zenserver_mode, "provision_fanout_s": None, "provision_total_s": None, "deprovision_fanout_s": None, "deprovision_total_s": None, "total_s": None, } try: minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) if args.toxiproxy_latency_ms > 0: toxiproxy_server = _resolve_toxiproxy_exe( args.toxiproxy_server_exe, "ZEN_PERF_TOXIPROXY_SERVER_EXE", "toxiproxy-server") toxiproxy_cli = _resolve_toxiproxy_exe( args.toxiproxy_cli_exe, "ZEN_PERF_TOXIPROXY_CLI_EXE", "toxiproxy-cli") toxiproxy_proc = _start_toxiproxy( toxiproxy_server, toxiproxy_cli, args.toxiproxy_api_port, args.toxiproxy_port, args.minio_port, args.toxiproxy_latency_ms, args.toxiproxy_jitter_ms, ) hub_log = hub_data_dir / "hub.log" hub_proc, hub_log_handle = _start_hub( zenserver_exe, hub_data_dir, args.hub_port, hub_log, hub_extra_args, hub_extra_env, ) _wait_for_hub(hub_proc, args.hub_port) t_start = time.monotonic() with ThreadPoolExecutor(max_workers=args.workers) as pool: # --- Provision --- print(f"[provision] firing {len(module_ids)} provision requests...") t0 = time.monotonic() accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision") fanout_s = time.monotonic() - t0 timings["provision_fanout_s"] = round(fanout_s, 2) print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)") for mid, status, body in failures[:5]: print(f"[provision] FAIL {mid}: status={status} body={body}") if failures: exit_code = 1 stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout) total_s = time.monotonic() - t0 timings["provision_total_s"] = round(total_s, 2) print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})") if stuck: exit_code = 1 print(f"[settle] waiting {args.settle_seconds:.0f}s") time.sleep(args.settle_seconds) # --- Deprovision --- print(f"[deprovision] firing {len(accepted)} deprovision requests...") t0 = time.monotonic() dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision") fanout_s = time.monotonic() - t0 timings["deprovision_fanout_s"] = round(fanout_s, 2) print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)") if dp_failures: exit_code = 1 stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout) total_s = time.monotonic() - t0 timings["deprovision_total_s"] = round(total_s, 2) print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})") if stuck_dp: exit_code = 1 print(f"[settle] waiting {args.settle_seconds:.0f}s") time.sleep(args.settle_seconds) elapsed = time.monotonic() - t_start timings["total_s"] = round(elapsed, 2) print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}") finally: if hub_proc is not None and hub_proc.poll() is None: _zen_down_hub(zen_exe, hub_proc) if hub_log_handle is not None: hub_log_handle.close() if toxiproxy_proc is not None and toxiproxy_proc.poll() is None: try: toxiproxy_proc.terminate() toxiproxy_proc.wait(timeout=5) except Exception: toxiproxy_proc.kill() print("[toxiproxy] stopped") if minio_proc is not None and minio_proc.poll() is None: _stop_minio_graceful(minio_proc) # Archive AFTER the hub has exited so in-flight log writes are flushed. timings["exit_code"] = exit_code _archive_run(archive_dir, hub_data_dir, args.bucket, timings) return exit_code if __name__ == "__main__": sys.exit(main())