"""Greedy hill-climb search for hub thread balance. Per phase: from current best, try +STEP on each of (data, lifetime, hydration). Pick best. Repeat. Stop when no candidate improves or all blocked by caps. Constraints: - async_threads fixed at 1 - step size = 4 per phase - per-dim cap: 32 - total cap: data + lifetime + hydration <= 96 - start (8, 8, 8) Usage: python sweep_threads.py """ from __future__ import annotations import argparse import atexit import json import signal import subprocess import sys from pathlib import Path import csv import time HERE = Path(__file__).resolve().parent RUNNER = HERE / "run_minio_perf.py" LOG = HERE / "threads_sweep.csv" ZEN_DIR = "build/windows/x64/release" STEP = 4 PER_DIM_CAP = 32 TOTAL_CAP = 96 ASYNC = 1 DIMS = ("data", "lifetime", "hydration") CHILD_PROCS: list[subprocess.Popen] = [] def kill_strays(): """Best-effort terminate hub, minio, toxiproxy. Idempotent.""" for proc in CHILD_PROCS: if proc.poll() is None: try: proc.terminate() except Exception: pass targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"] for name in targets: try: subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10) except Exception: pass def install_signal_handlers(): def handler(signum, frame): print(f"\n[sweep] signal {signum} received - cleaning up", flush=True) kill_strays() sys.exit(130) for sig in (signal.SIGINT, signal.SIGTERM): try: signal.signal(sig, handler) except Exception: pass atexit.register(kill_strays) def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict: cmd = [ sys.executable, "-u", str(RUNNER), "--async-http", f"--async-threads={async_threads}", f"--toxiproxy-latency-ms={latency_ms}", f"--zenserver-dir={ZEN_DIR}", f"--hub-arg=--hub-instance-provision-threads={data}", f"--hub-arg=--hub-instance-spawn-threads={lifetime}", f"--hub-arg=--hub-hydration-threads={hydration}", ] print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True) t0 = time.time() proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) CHILD_PROCS.append(proc) archive = None rc = 0 assert proc.stdout is not None for line in proc.stdout: line = line.rstrip() if not line: continue if line.startswith("[archive]"): archive = line.split(" ", 1)[1].split(" ")[0] # Compress spinner-tick pile-up to last segment so we don't print # "[provision] N/1000 provisioned..." 5x per line. Only triggers when # the line is actually a spinner concat (contains the milestone marker). if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line): line = line.split("...")[-1].lstrip() if not line: continue print(f" {line}", flush=True) rc = proc.wait() if proc in CHILD_PROCS: CHILD_PROCS.remove(proc) wall = time.time() - t0 summary = {} if archive: sj = Path(archive) / "summary.json" if sj.exists(): summary = json.loads(sj.read_text()) row = { "label": label, "data": data, "lifetime": lifetime, "hydration": hydration, "async_threads": async_threads, "latency_ms": latency_ms, "exit_code": rc, "wall_s": round(wall, 1), "provision_s": summary.get("provision_total_s"), "deprovision_s": summary.get("deprovision_total_s"), "total_s": summary.get("total_s"), "archive": archive, } print(f" -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True) append_row(row) return row def append_row(row: dict): new = not LOG.exists() with LOG.open("a", newline="") as f: w = csv.DictWriter(f, fieldnames=list(row.keys())) if new: w.writeheader() w.writerow(row) def feasible(p: dict) -> bool: if any(p[d] > PER_DIM_CAP for d in DIMS): return False if sum(p[d] for d in DIMS) > TOTAL_CAP: return False return True def candidates(center: dict) -> list[tuple[str, dict]]: out = [] for d in DIMS: c = dict(center) c[d] += STEP if feasible(c): out.append((d, c)) return out def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]: out = [] for chunk in s.split(";"): chunk = chunk.strip() if not chunk: continue parts = [int(x.strip()) for x in chunk.split(",")] if len(parts) == 3: d, l, h = parts a = default_async elif len(parts) == 4: d, l, h, a = parts else: raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}") out.append((d, l, h, a)) return out def run_points(points: list[tuple[int, int, int, int]], latency_ms: int): rows = [] for i, (d, l, h, a) in enumerate(points, 1): if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP: print(f" -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True) continue if d + l + h > TOTAL_CAP: print(f" -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True) continue row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms, label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a) rows.append(row) return rows def main(): p = argparse.ArgumentParser() p.add_argument("--latency-ms", type=int, default=30) p.add_argument("--max-phases", type=int, default=20) p.add_argument("--async-threads", type=int, default=ASYNC, help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). " "Per-point override available via 4th value in --points entry.") p.add_argument("--points", type=str, default="", help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. " "Format: 'd,l,h;d,l,h,a;...'. CSV appended.") args = p.parse_args() install_signal_handlers() if args.points: pts = parse_points(args.points, args.async_threads) print(f"[sweep] explicit list: {len(pts)} points", flush=True) rows = run_points(pts, args.latency_ms) valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0] if valid: valid.sort(key=lambda r: r["total_s"]) print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True) for r in valid: tot = r["data"] + r["lifetime"] + r["hydration"] print(f" d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True) print(f" log={LOG}", flush=True) return cur = dict(data=8, lifetime=8, hydration=8) tried = set() history = [] base = run_once(**cur, latency_ms=args.latency_ms, label="phase0") history.append(base) tried.add((cur["data"], cur["lifetime"], cur["hydration"])) best = base for phase in range(1, args.max_phases + 1): cands = candidates(cur) if not cands: print(f" === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True) break phase_best = None for dim, c in cands: key = (c["data"], c["lifetime"], c["hydration"]) if key in tried: continue tried.add(key) row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}") history.append(row) if row.get("total_s") is not None and row["exit_code"] == 0: if phase_best is None or row["total_s"] < phase_best["total_s"]: phase_best = row if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0: print(f" === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True) break cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"]) best = phase_best print(f" +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True) print("\n=== OPTIMUM ===", flush=True) print(f" data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True) print(f" total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True) print(f" log={LOG}", flush=True) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n[sweep] interrupted - cleaning up", flush=True) kill_strays() sys.exit(130) except Exception as e: print(f"\n[sweep] error: {e!r} - cleaning up", flush=True) kill_strays() raise