aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts/hub/sweep_threads.py
blob: f92a5d734c878b1125c83a7ea7470cb59147416a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""Greedy hill-climb search for hub thread balance.

Per phase: from current best, try +STEP on each of (data, lifetime, hydration).
Pick best. Repeat. Stop when no candidate improves or all blocked by caps.

Constraints:
- async_threads fixed at 1
- step size = 4 per phase
- per-dim cap: 32
- total cap: data + lifetime + hydration <= 96
- start (8, 8, 8)

Usage:
    python sweep_threads.py
"""
from __future__ import annotations
import argparse
import atexit
import json
import signal
import subprocess
import sys
from pathlib import Path
import csv
import time

HERE = Path(__file__).resolve().parent
RUNNER = HERE / "run_minio_perf.py"
LOG = HERE / "threads_sweep.csv"
ZEN_DIR = "build/windows/x64/release"
STEP = 4
PER_DIM_CAP = 32
TOTAL_CAP = 96
ASYNC = 1
DIMS = ("data", "lifetime", "hydration")
CHILD_PROCS: list[subprocess.Popen] = []


def kill_strays():
    """Best-effort terminate hub, minio, toxiproxy. Idempotent."""
    for proc in CHILD_PROCS:
        if proc.poll() is None:
            try:
                proc.terminate()
            except Exception:
                pass
    targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"]
    for name in targets:
        try:
            subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10)
        except Exception:
            pass


def install_signal_handlers():
    def handler(signum, frame):
        print(f"\n[sweep] signal {signum} received - cleaning up", flush=True)
        kill_strays()
        sys.exit(130)
    for sig in (signal.SIGINT, signal.SIGTERM):
        try:
            signal.signal(sig, handler)
        except Exception:
            pass
    atexit.register(kill_strays)


def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict:
    cmd = [
        sys.executable, "-u", str(RUNNER),
        "--async-http",
        f"--async-threads={async_threads}",
        f"--toxiproxy-latency-ms={latency_ms}",
        f"--zenserver-dir={ZEN_DIR}",
        f"--hub-arg=--hub-instance-provision-threads={data}",
        f"--hub-arg=--hub-instance-spawn-threads={lifetime}",
        f"--hub-arg=--hub-hydration-threads={hydration}",
    ]
    print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True)
    t0 = time.time()
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
    CHILD_PROCS.append(proc)
    archive = None
    rc = 0
    assert proc.stdout is not None
    for line in proc.stdout:
        line = line.rstrip()
        if not line:
            continue
        if line.startswith("[archive]"):
            archive = line.split(" ", 1)[1].split(" ")[0]
        # Compress spinner-tick pile-up to last segment so we don't print
        # "[provision] N/1000 provisioned..." 5x per line. Only triggers when
        # the line is actually a spinner concat (contains the milestone marker).
        if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line):
            line = line.split("...")[-1].lstrip()
            if not line:
                continue
        print(f"  {line}", flush=True)
    rc = proc.wait()
    if proc in CHILD_PROCS:
        CHILD_PROCS.remove(proc)
    wall = time.time() - t0
    summary = {}
    if archive:
        sj = Path(archive) / "summary.json"
        if sj.exists():
            summary = json.loads(sj.read_text())
    row = {
        "label": label,
        "data": data,
        "lifetime": lifetime,
        "hydration": hydration,
        "async_threads": async_threads,
        "latency_ms": latency_ms,
        "exit_code": rc,
        "wall_s": round(wall, 1),
        "provision_s": summary.get("provision_total_s"),
        "deprovision_s": summary.get("deprovision_total_s"),
        "total_s": summary.get("total_s"),
        "archive": archive,
    }
    print(f"  -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True)
    append_row(row)
    return row


def append_row(row: dict):
    new = not LOG.exists()
    with LOG.open("a", newline="") as f:
        w = csv.DictWriter(f, fieldnames=list(row.keys()))
        if new:
            w.writeheader()
        w.writerow(row)


def feasible(p: dict) -> bool:
    if any(p[d] > PER_DIM_CAP for d in DIMS):
        return False
    if sum(p[d] for d in DIMS) > TOTAL_CAP:
        return False
    return True


def candidates(center: dict) -> list[tuple[str, dict]]:
    out = []
    for d in DIMS:
        c = dict(center)
        c[d] += STEP
        if feasible(c):
            out.append((d, c))
    return out


def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]:
    out = []
    for chunk in s.split(";"):
        chunk = chunk.strip()
        if not chunk:
            continue
        parts = [int(x.strip()) for x in chunk.split(",")]
        if len(parts) == 3:
            d, l, h = parts
            a = default_async
        elif len(parts) == 4:
            d, l, h, a = parts
        else:
            raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}")
        out.append((d, l, h, a))
    return out


def run_points(points: list[tuple[int, int, int, int]], latency_ms: int):
    rows = []
    for i, (d, l, h, a) in enumerate(points, 1):
        if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP:
            print(f"  -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True)
            continue
        if d + l + h > TOTAL_CAP:
            print(f"  -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True)
            continue
        row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms,
                       label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a)
        rows.append(row)
    return rows


def main():
    p = argparse.ArgumentParser()
    p.add_argument("--latency-ms", type=int, default=30)
    p.add_argument("--max-phases", type=int, default=20)
    p.add_argument("--async-threads", type=int, default=ASYNC,
                   help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). "
                        "Per-point override available via 4th value in --points entry.")
    p.add_argument("--points", type=str, default="",
                   help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. "
                        "Format: 'd,l,h;d,l,h,a;...'. CSV appended.")
    args = p.parse_args()

    install_signal_handlers()

    if args.points:
        pts = parse_points(args.points, args.async_threads)
        print(f"[sweep] explicit list: {len(pts)} points", flush=True)
        rows = run_points(pts, args.latency_ms)
        valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0]
        if valid:
            valid.sort(key=lambda r: r["total_s"])
            print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True)
            for r in valid:
                tot = r["data"] + r["lifetime"] + r["hydration"]
                print(f"  d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True)
        print(f"  log={LOG}", flush=True)
        return

    cur = dict(data=8, lifetime=8, hydration=8)
    tried = set()
    history = []

    base = run_once(**cur, latency_ms=args.latency_ms, label="phase0")
    history.append(base)
    tried.add((cur["data"], cur["lifetime"], cur["hydration"]))
    best = base

    for phase in range(1, args.max_phases + 1):
        cands = candidates(cur)
        if not cands:
            print(f"  === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True)
            break
        phase_best = None
        for dim, c in cands:
            key = (c["data"], c["lifetime"], c["hydration"])
            if key in tried:
                continue
            tried.add(key)
            row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}")
            history.append(row)
            if row.get("total_s") is not None and row["exit_code"] == 0:
                if phase_best is None or row["total_s"] < phase_best["total_s"]:
                    phase_best = row
        if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0:
            print(f"  === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True)
            break
        cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"])
        best = phase_best
        print(f"  +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True)

    print("\n=== OPTIMUM ===", flush=True)
    print(f"  data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True)
    print(f"  total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True)
    print(f"  log={LOG}", flush=True)


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n[sweep] interrupted - cleaning up", flush=True)
        kill_strays()
        sys.exit(130)
    except Exception as e:
        print(f"\n[sweep] error: {e!r} - cleaning up", flush=True)
        kill_strays()
        raise