diff options
Diffstat (limited to 'scripts/test_scripts/hub/sweep_threads.py')
| -rw-r--r-- | scripts/test_scripts/hub/sweep_threads.py | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub/sweep_threads.py b/scripts/test_scripts/hub/sweep_threads.py new file mode 100644 index 000000000..f92a5d734 --- /dev/null +++ b/scripts/test_scripts/hub/sweep_threads.py @@ -0,0 +1,264 @@ +"""Greedy hill-climb search for hub thread balance. + +Per phase: from current best, try +STEP on each of (data, lifetime, hydration). +Pick best. Repeat. Stop when no candidate improves or all blocked by caps. + +Constraints: +- async_threads fixed at 1 +- step size = 4 per phase +- per-dim cap: 32 +- total cap: data + lifetime + hydration <= 96 +- start (8, 8, 8) + +Usage: + python sweep_threads.py +""" +from __future__ import annotations +import argparse +import atexit +import json +import signal +import subprocess +import sys +from pathlib import Path +import csv +import time + +HERE = Path(__file__).resolve().parent +RUNNER = HERE / "run_minio_perf.py" +LOG = HERE / "threads_sweep.csv" +ZEN_DIR = "build/windows/x64/release" +STEP = 4 +PER_DIM_CAP = 32 +TOTAL_CAP = 96 +ASYNC = 1 +DIMS = ("data", "lifetime", "hydration") +CHILD_PROCS: list[subprocess.Popen] = [] + + +def kill_strays(): + """Best-effort terminate hub, minio, toxiproxy. Idempotent.""" + for proc in CHILD_PROCS: + if proc.poll() is None: + try: + proc.terminate() + except Exception: + pass + targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"] + for name in targets: + try: + subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10) + except Exception: + pass + + +def install_signal_handlers(): + def handler(signum, frame): + print(f"\n[sweep] signal {signum} received - cleaning up", flush=True) + kill_strays() + sys.exit(130) + for sig in (signal.SIGINT, signal.SIGTERM): + try: + signal.signal(sig, handler) + except Exception: + pass + atexit.register(kill_strays) + + +def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict: + cmd = [ + sys.executable, "-u", str(RUNNER), + "--async-http", + f"--async-threads={async_threads}", + f"--toxiproxy-latency-ms={latency_ms}", + f"--zenserver-dir={ZEN_DIR}", + f"--hub-arg=--hub-instance-provision-threads={data}", + f"--hub-arg=--hub-instance-spawn-threads={lifetime}", + f"--hub-arg=--hub-hydration-threads={hydration}", + ] + print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True) + t0 = time.time() + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) + CHILD_PROCS.append(proc) + archive = None + rc = 0 + assert proc.stdout is not None + for line in proc.stdout: + line = line.rstrip() + if not line: + continue + if line.startswith("[archive]"): + archive = line.split(" ", 1)[1].split(" ")[0] + # Compress spinner-tick pile-up to last segment so we don't print + # "[provision] N/1000 provisioned..." 5x per line. Only triggers when + # the line is actually a spinner concat (contains the milestone marker). + if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line): + line = line.split("...")[-1].lstrip() + if not line: + continue + print(f" {line}", flush=True) + rc = proc.wait() + if proc in CHILD_PROCS: + CHILD_PROCS.remove(proc) + wall = time.time() - t0 + summary = {} + if archive: + sj = Path(archive) / "summary.json" + if sj.exists(): + summary = json.loads(sj.read_text()) + row = { + "label": label, + "data": data, + "lifetime": lifetime, + "hydration": hydration, + "async_threads": async_threads, + "latency_ms": latency_ms, + "exit_code": rc, + "wall_s": round(wall, 1), + "provision_s": summary.get("provision_total_s"), + "deprovision_s": summary.get("deprovision_total_s"), + "total_s": summary.get("total_s"), + "archive": archive, + } + print(f" -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True) + append_row(row) + return row + + +def append_row(row: dict): + new = not LOG.exists() + with LOG.open("a", newline="") as f: + w = csv.DictWriter(f, fieldnames=list(row.keys())) + if new: + w.writeheader() + w.writerow(row) + + +def feasible(p: dict) -> bool: + if any(p[d] > PER_DIM_CAP for d in DIMS): + return False + if sum(p[d] for d in DIMS) > TOTAL_CAP: + return False + return True + + +def candidates(center: dict) -> list[tuple[str, dict]]: + out = [] + for d in DIMS: + c = dict(center) + c[d] += STEP + if feasible(c): + out.append((d, c)) + return out + + +def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]: + out = [] + for chunk in s.split(";"): + chunk = chunk.strip() + if not chunk: + continue + parts = [int(x.strip()) for x in chunk.split(",")] + if len(parts) == 3: + d, l, h = parts + a = default_async + elif len(parts) == 4: + d, l, h, a = parts + else: + raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}") + out.append((d, l, h, a)) + return out + + +def run_points(points: list[tuple[int, int, int, int]], latency_ms: int): + rows = [] + for i, (d, l, h, a) in enumerate(points, 1): + if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP: + print(f" -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True) + continue + if d + l + h > TOTAL_CAP: + print(f" -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True) + continue + row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms, + label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a) + rows.append(row) + return rows + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--latency-ms", type=int, default=30) + p.add_argument("--max-phases", type=int, default=20) + p.add_argument("--async-threads", type=int, default=ASYNC, + help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). " + "Per-point override available via 4th value in --points entry.") + p.add_argument("--points", type=str, default="", + help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. " + "Format: 'd,l,h;d,l,h,a;...'. CSV appended.") + args = p.parse_args() + + install_signal_handlers() + + if args.points: + pts = parse_points(args.points, args.async_threads) + print(f"[sweep] explicit list: {len(pts)} points", flush=True) + rows = run_points(pts, args.latency_ms) + valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0] + if valid: + valid.sort(key=lambda r: r["total_s"]) + print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True) + for r in valid: + tot = r["data"] + r["lifetime"] + r["hydration"] + print(f" d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True) + print(f" log={LOG}", flush=True) + return + + cur = dict(data=8, lifetime=8, hydration=8) + tried = set() + history = [] + + base = run_once(**cur, latency_ms=args.latency_ms, label="phase0") + history.append(base) + tried.add((cur["data"], cur["lifetime"], cur["hydration"])) + best = base + + for phase in range(1, args.max_phases + 1): + cands = candidates(cur) + if not cands: + print(f" === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True) + break + phase_best = None + for dim, c in cands: + key = (c["data"], c["lifetime"], c["hydration"]) + if key in tried: + continue + tried.add(key) + row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}") + history.append(row) + if row.get("total_s") is not None and row["exit_code"] == 0: + if phase_best is None or row["total_s"] < phase_best["total_s"]: + phase_best = row + if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0: + print(f" === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True) + break + cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"]) + best = phase_best + print(f" +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True) + + print("\n=== OPTIMUM ===", flush=True) + print(f" data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True) + print(f" total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True) + print(f" log={LOG}", flush=True) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n[sweep] interrupted - cleaning up", flush=True) + kill_strays() + sys.exit(130) + except Exception as e: + print(f"\n[sweep] error: {e!r} - cleaning up", flush=True) + kill_strays() + raise |