aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts/hub/sweep_threads.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/test_scripts/hub/sweep_threads.py')
-rw-r--r--scripts/test_scripts/hub/sweep_threads.py264
1 files changed, 264 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub/sweep_threads.py b/scripts/test_scripts/hub/sweep_threads.py
new file mode 100644
index 000000000..f92a5d734
--- /dev/null
+++ b/scripts/test_scripts/hub/sweep_threads.py
@@ -0,0 +1,264 @@
+"""Greedy hill-climb search for hub thread balance.
+
+Per phase: from current best, try +STEP on each of (data, lifetime, hydration).
+Pick best. Repeat. Stop when no candidate improves or all blocked by caps.
+
+Constraints:
+- async_threads fixed at 1
+- step size = 4 per phase
+- per-dim cap: 32
+- total cap: data + lifetime + hydration <= 96
+- start (8, 8, 8)
+
+Usage:
+ python sweep_threads.py
+"""
+from __future__ import annotations
+import argparse
+import atexit
+import json
+import signal
+import subprocess
+import sys
+from pathlib import Path
+import csv
+import time
+
+HERE = Path(__file__).resolve().parent
+RUNNER = HERE / "run_minio_perf.py"
+LOG = HERE / "threads_sweep.csv"
+ZEN_DIR = "build/windows/x64/release"
+STEP = 4
+PER_DIM_CAP = 32
+TOTAL_CAP = 96
+ASYNC = 1
+DIMS = ("data", "lifetime", "hydration")
+CHILD_PROCS: list[subprocess.Popen] = []
+
+
+def kill_strays():
+ """Best-effort terminate hub, minio, toxiproxy. Idempotent."""
+ for proc in CHILD_PROCS:
+ if proc.poll() is None:
+ try:
+ proc.terminate()
+ except Exception:
+ pass
+ targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"]
+ for name in targets:
+ try:
+ subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10)
+ except Exception:
+ pass
+
+
+def install_signal_handlers():
+ def handler(signum, frame):
+ print(f"\n[sweep] signal {signum} received - cleaning up", flush=True)
+ kill_strays()
+ sys.exit(130)
+ for sig in (signal.SIGINT, signal.SIGTERM):
+ try:
+ signal.signal(sig, handler)
+ except Exception:
+ pass
+ atexit.register(kill_strays)
+
+
+def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict:
+ cmd = [
+ sys.executable, "-u", str(RUNNER),
+ "--async-http",
+ f"--async-threads={async_threads}",
+ f"--toxiproxy-latency-ms={latency_ms}",
+ f"--zenserver-dir={ZEN_DIR}",
+ f"--hub-arg=--hub-instance-provision-threads={data}",
+ f"--hub-arg=--hub-instance-spawn-threads={lifetime}",
+ f"--hub-arg=--hub-hydration-threads={hydration}",
+ ]
+ print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True)
+ t0 = time.time()
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
+ CHILD_PROCS.append(proc)
+ archive = None
+ rc = 0
+ assert proc.stdout is not None
+ for line in proc.stdout:
+ line = line.rstrip()
+ if not line:
+ continue
+ if line.startswith("[archive]"):
+ archive = line.split(" ", 1)[1].split(" ")[0]
+ # Compress spinner-tick pile-up to last segment so we don't print
+ # "[provision] N/1000 provisioned..." 5x per line. Only triggers when
+ # the line is actually a spinner concat (contains the milestone marker).
+ if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line):
+ line = line.split("...")[-1].lstrip()
+ if not line:
+ continue
+ print(f" {line}", flush=True)
+ rc = proc.wait()
+ if proc in CHILD_PROCS:
+ CHILD_PROCS.remove(proc)
+ wall = time.time() - t0
+ summary = {}
+ if archive:
+ sj = Path(archive) / "summary.json"
+ if sj.exists():
+ summary = json.loads(sj.read_text())
+ row = {
+ "label": label,
+ "data": data,
+ "lifetime": lifetime,
+ "hydration": hydration,
+ "async_threads": async_threads,
+ "latency_ms": latency_ms,
+ "exit_code": rc,
+ "wall_s": round(wall, 1),
+ "provision_s": summary.get("provision_total_s"),
+ "deprovision_s": summary.get("deprovision_total_s"),
+ "total_s": summary.get("total_s"),
+ "archive": archive,
+ }
+ print(f" -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True)
+ append_row(row)
+ return row
+
+
+def append_row(row: dict):
+ new = not LOG.exists()
+ with LOG.open("a", newline="") as f:
+ w = csv.DictWriter(f, fieldnames=list(row.keys()))
+ if new:
+ w.writeheader()
+ w.writerow(row)
+
+
+def feasible(p: dict) -> bool:
+ if any(p[d] > PER_DIM_CAP for d in DIMS):
+ return False
+ if sum(p[d] for d in DIMS) > TOTAL_CAP:
+ return False
+ return True
+
+
+def candidates(center: dict) -> list[tuple[str, dict]]:
+ out = []
+ for d in DIMS:
+ c = dict(center)
+ c[d] += STEP
+ if feasible(c):
+ out.append((d, c))
+ return out
+
+
+def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]:
+ out = []
+ for chunk in s.split(";"):
+ chunk = chunk.strip()
+ if not chunk:
+ continue
+ parts = [int(x.strip()) for x in chunk.split(",")]
+ if len(parts) == 3:
+ d, l, h = parts
+ a = default_async
+ elif len(parts) == 4:
+ d, l, h, a = parts
+ else:
+ raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}")
+ out.append((d, l, h, a))
+ return out
+
+
+def run_points(points: list[tuple[int, int, int, int]], latency_ms: int):
+ rows = []
+ for i, (d, l, h, a) in enumerate(points, 1):
+ if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP:
+ print(f" -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True)
+ continue
+ if d + l + h > TOTAL_CAP:
+ print(f" -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True)
+ continue
+ row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms,
+ label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a)
+ rows.append(row)
+ return rows
+
+
+def main():
+ p = argparse.ArgumentParser()
+ p.add_argument("--latency-ms", type=int, default=30)
+ p.add_argument("--max-phases", type=int, default=20)
+ p.add_argument("--async-threads", type=int, default=ASYNC,
+ help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). "
+ "Per-point override available via 4th value in --points entry.")
+ p.add_argument("--points", type=str, default="",
+ help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. "
+ "Format: 'd,l,h;d,l,h,a;...'. CSV appended.")
+ args = p.parse_args()
+
+ install_signal_handlers()
+
+ if args.points:
+ pts = parse_points(args.points, args.async_threads)
+ print(f"[sweep] explicit list: {len(pts)} points", flush=True)
+ rows = run_points(pts, args.latency_ms)
+ valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0]
+ if valid:
+ valid.sort(key=lambda r: r["total_s"])
+ print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True)
+ for r in valid:
+ tot = r["data"] + r["lifetime"] + r["hydration"]
+ print(f" d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True)
+ print(f" log={LOG}", flush=True)
+ return
+
+ cur = dict(data=8, lifetime=8, hydration=8)
+ tried = set()
+ history = []
+
+ base = run_once(**cur, latency_ms=args.latency_ms, label="phase0")
+ history.append(base)
+ tried.add((cur["data"], cur["lifetime"], cur["hydration"]))
+ best = base
+
+ for phase in range(1, args.max_phases + 1):
+ cands = candidates(cur)
+ if not cands:
+ print(f" === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True)
+ break
+ phase_best = None
+ for dim, c in cands:
+ key = (c["data"], c["lifetime"], c["hydration"])
+ if key in tried:
+ continue
+ tried.add(key)
+ row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}")
+ history.append(row)
+ if row.get("total_s") is not None and row["exit_code"] == 0:
+ if phase_best is None or row["total_s"] < phase_best["total_s"]:
+ phase_best = row
+ if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0:
+ print(f" === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True)
+ break
+ cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"])
+ best = phase_best
+ print(f" +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True)
+
+ print("\n=== OPTIMUM ===", flush=True)
+ print(f" data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True)
+ print(f" total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True)
+ print(f" log={LOG}", flush=True)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ print("\n[sweep] interrupted - cleaning up", flush=True)
+ kill_strays()
+ sys.exit(130)
+ except Exception as e:
+ print(f"\n[sweep] error: {e!r} - cleaning up", flush=True)
+ kill_strays()
+ raise