diff options
Diffstat (limited to 'scripts/test_scripts')
| -rw-r--r-- | scripts/test_scripts/hub/PERF_SEED_README.md | 105 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/hub_load_test_s3.py | 4 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/parse_perf_log.py | 117 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/perf_configs/hub.lua | 11 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/perf_configs/instance.lua | 6 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/preserve_minio_state.py | 15 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/run_minio_perf.py | 243 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/seed_minio.py | 82 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/seed_s3_snapshot.py | 327 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/sweep_threads.py | 264 |
10 files changed, 955 insertions, 219 deletions
diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md index fb471d4bb..eacb0da55 100644 --- a/scripts/test_scripts/hub/PERF_SEED_README.md +++ b/scripts/test_scripts/hub/PERF_SEED_README.md @@ -3,35 +3,40 @@ Three-stage pipeline for running repeatable hub-hydration perf tests against a local MinIO backend seeded with real module data pulled from production S3. +The pipeline is **pack-on only** - the seeded baseline always comes from a hub +launched with `--hub-hydration-enable-pack=true`. The pack-off variant is no +longer maintained. + ## Layout -All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/` -in the script defaults, but every path is overridable via CLI flag (see the -per-stage options below). Pick a root with enough free space (snapshots and -preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on -each invocation or change the script defaults to your chosen root. +All path arguments are required (no hardcoded defaults). Pick a perf-seed root +with enough free space (snapshots and preserved CAS dirs can be large) and pass +the matching `--*-dir` flag on each invocation. Stage A's hub data dir should +live on the same volume as the snapshot dir so snapshotting is an O(1) rename +per module instead of a cross-volume byte copy; Stage C's hub data dir should +live on a different volume from the MinIO data dir so hub I/O does not skew the +measured perf run. -Layout under the chosen root (`<perf-seed>/`): +Example layout (directory names only; pick volumes/roots and pass via `--*-dir` +flags): ``` -<perf-seed>/ - hub-a/ Stage A hub data dir (transient) +<perf-seed-A>/ bulk data + Stage A/B flow (one volume = move-friendly) + hub-a/ Stage A hub data dir (transient; snapshot-step rename source) servers/<moduleid>/ - s3-snapshot/ Preserved production server-state trees (read-only after Stage A) + s3-snapshot/ Preserved production server-state trees (read-only after Stage A) <moduleid>/ - hubs/ Stage B per-bucket hub data dirs (transient) + hubs/ Stage B per-bucket hub data dirs (transient) hub-b-zen-seed-packed/ - hub-b-zen-seed-unpacked/ - minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket) - minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve) - README.txt - minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree) + minio-data/ Stage B MinIO data dir (transient) + minio-seeded-packed/ Preserved packed MinIO CAS (read-only after Stage B + preserve) README.txt - hub-perf/ Stage C hub data dir (wiped each run) - minio-run/ Stage C MinIO data dir (wiped + re-copied each run) - perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json + minio-run/ Stage C MinIO data dir (wiped + re-copied each run) + perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json 20260423-141530_zen-seed-packed/ - 20260423-143112_zen-seed-unpacked/ + +<perf-seed-B>/ separate volume from <perf-seed-A> for measurement isolation + hub-perf/ Stage C hub data dir (wiped each run) ``` ## Prerequisites @@ -46,7 +51,7 @@ Layout under the chosen root (`<perf-seed>/`): ## Stage A - snapshot real S3 data -One-time (or when you want a fresh baseline from production). +One-time (or when you want a fresh snapshot from production). ``` export ZEN_PERF_S3_URI=s3://your-bucket/ @@ -54,8 +59,11 @@ export ZEN_PERF_AWS_PROFILE=your-sso-profile python scripts/test_scripts/hub/seed_s3_snapshot.py ``` -Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies -`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login` +Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then **moves** +`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. When `--hub-data-dir` and +`--snapshot-dir` share a volume (the default) the move is an O(1) rename per +module; cross-volume falls back to a byte copy with the old cost profile. The +hub data dir is wiped on the next run regardless. Triggers `aws sso login` automatically if the SSO token is missing or expired. Module selection ranks all UUID-shaped folders by their @@ -64,37 +72,27 @@ most-recently-accessed) and takes the top `--module-count`. Options: - `--module-count N` (default 1000) -- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`) -- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`) +- `--snapshot-dir PATH` (required, e.g. `<perf-seed>/s3-snapshot`) +- `--hub-data-dir PATH` (required, e.g. `<perf-seed>/hub-a`) ## Stage B - seed MinIO from the snapshot -One-time per pack-mode (or when `s3-snapshot` changes). +One-time, or when `s3-snapshot/` changes. -`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is -hardcoded inside the script (`--hub-hydration-enable-pack=true` near the -top of `_start_hub`). To produce both packed and unpacked baselines for -comparison, invoke the script twice from two separate worktrees - one with -the flag flipped to `false` - and preserve the resulting MinIO data dir -each time. +`seed_minio.py` seeds the `zen-seed-packed` bucket with pack ON +(`--hub-hydration-enable-pack=true` is hardcoded). The script provisions every +module found under `s3-snapshot/`, hibernates them, overlays the snapshot on +top of the hub's servers dir, then deprovisions all modules - which runs the +dehydrate path and uploads the content into the bucket. ``` -# In the pack worktree (flag = true), seeds zen-seed-packed python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed - -# In the no-pack worktree (flag = false), seeds zen-seed-unpacked -python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked -python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked ``` -The script provisions every module found under `s3-snapshot/`, hibernates -them, overlays the snapshot on top of the hub's servers dir, then -deprovisions all modules - which runs the dehydrate path and uploads the -content into the bucket. - -`preserve_minio_state.py` copies the resulting `minio-data/` to a -variant-specific preservation dir and writes a README with provenance. +`preserve_minio_state.py` MOVES (default; `--copy` to keep source) the +resulting `minio-data/` to the preservation dir and writes a README with +provenance. Options of interest: - `--bucket NAME` - bucket name (default `zen-seed-packed`). @@ -107,24 +105,18 @@ Options of interest: Repeat as often as you want; each run starts from the preserved baseline. ``` -# Pack-on bucket python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace - -# Pack-off bucket (for comparison) -python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace ``` Steps: -1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state. -2. Wipes `hub-perf/` (unless `--no-wipe-hub`). +1. Copies `--minio-seeded` over `--minio-run` so MinIO starts from a known state. +2. Wipes `--hub-data-dir` (unless `--no-wipe-hub`). 3. Starts MinIO and hub. 4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone. 5. Stops everything cleanly. Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every -iteration exercises the hydrate-only path against the same baseline CAS. The -`--bucket` flag selects which seeded bucket (and therefore which pack mode) -to exercise. +iteration exercises the hydrate-only path against the same baseline CAS. Pass `--enable-dehydration` to run a full provision -> deprovision cycle that includes re-upload (dehydrate) at deprovision time. Use this to measure the @@ -139,10 +131,9 @@ post-hoc. Override the destination with `--archive-dir PATH`. ## Resetting between runs -- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild. +- **Keep**: `s3-snapshot/`, `minio-seeded-packed/`. These are expensive to rebuild. - **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`. -To force a fresh MinIO seed for one variant: delete the matching -`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching -`--dest`) in that worktree. To force a fresh S3 snapshot: delete -`s3-snapshot/` and re-run Stage A. +To force a fresh MinIO seed: delete `minio-seeded-packed/` and re-run Stage B ++ preserve. To force a fresh S3 snapshot: delete `s3-snapshot/` and re-run +Stage A. diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py index 23014409c..e71222e31 100644 --- a/scripts/test_scripts/hub/hub_load_test_s3.py +++ b/scripts/test_scripts/hub/hub_load_test_s3.py @@ -372,8 +372,8 @@ def _wait_for_deprovisioned( def main() -> None: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3", - help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)") + parser.add_argument("--data-dir", required=True, + help="Hub --data-dir.") parser.add_argument("--port", type=int, default=8558, help="Hub HTTP port (default: 8558)") parser.add_argument("--module-count", type=int, default=200, diff --git a/scripts/test_scripts/hub/parse_perf_log.py b/scripts/test_scripts/hub/parse_perf_log.py new file mode 100644 index 000000000..8833a5211 --- /dev/null +++ b/scripts/test_scripts/hub/parse_perf_log.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""Parse hub perf log to extract hydration / spawn / deprovision timings.""" +from __future__ import annotations +import re +import statistics +import sys +from pathlib import Path + +HYDRATE_RE = re.compile(r"Hydration complete module '([^']+)': (\d+) files \(([^)]+)\) in ([\d.]+)(ms|s)") +SPAWN_RE = re.compile(r"module '([^']+)' started, listening on port \d+, spawn took ([\d.]+)(ms|s)") +PROVISION_START_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Provisioning storage server instance for module '([^']+)'") +SPAWN_DONE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Storage server instance for module '([^']+)' started") +HYDRATE_DONE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Hydration complete module '([^']+)'") +DEPROV_START_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Module '([^']+)' changed state from provisioned to deprovisioning") +DEPROV_REMOVE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Module '([^']+)' (?:removed|changed state from deprovisioning to unprovisioned)") + + +def to_ms(num: str, unit: str) -> float: + v = float(num) + return v * 1000.0 if unit == "s" else v + + +def parse_ts(ts: str) -> float: + # 26-04-29 22:41:34.009 -> seconds since arbitrary epoch (use day-of-year * 86400 + h*3600 + m*60 + s) + import datetime + dt = datetime.datetime.strptime(ts, "%y-%m-%d %H:%M:%S.%f") + return dt.timestamp() + + +def stats(name: str, vals: list[float]) -> str: + if not vals: + return f"{name:14s} n=0" + vs = sorted(vals) + n = len(vs) + p50 = vs[n // 2] + p95 = vs[min(n - 1, int(n * 0.95))] + p99 = vs[min(n - 1, int(n * 0.99))] + return (f"{name:14s} n={n:5d} mean={statistics.mean(vs):8.0f}ms " + f"p50={p50:7.0f}ms p95={p95:7.0f}ms p99={p99:7.0f}ms max={max(vs):7.0f}ms " + f"sum={sum(vs)/1000.0:7.1f}s") + + +def analyse(log_path: Path) -> None: + print(f"=== {log_path}") + hydrate_durations: list[float] = [] + spawn_durations: list[float] = [] + + prov_start: dict[str, float] = {} + hydrate_done: dict[str, float] = {} + spawn_done: dict[str, float] = {} + deprov_start: dict[str, float] = {} + deprov_done: dict[str, float] = {} + + with log_path.open("r", encoding="utf-8", errors="replace") as f: + for line in f: + m = HYDRATE_RE.search(line) + if m: + hydrate_durations.append(to_ms(m.group(4), m.group(5))) + m = SPAWN_RE.search(line) + if m: + spawn_durations.append(to_ms(m.group(2), m.group(3))) + m = PROVISION_START_RE.search(line) + if m: + prov_start[m.group(2)] = parse_ts(m.group(1)) + continue + m = HYDRATE_DONE_RE.search(line) + if m: + hydrate_done[m.group(2)] = parse_ts(m.group(1)) + continue + m = SPAWN_DONE_RE.search(line) + if m: + spawn_done[m.group(2)] = parse_ts(m.group(1)) + continue + m = DEPROV_START_RE.search(line) + if m: + deprov_start[m.group(2)] = parse_ts(m.group(1)) + continue + m = DEPROV_REMOVE_RE.search(line) + if m: + deprov_done[m.group(2)] = parse_ts(m.group(1)) + continue + + # Reconstruct end-to-end provision wall time per module: + e2e_prov_ms: list[float] = [] + queue_to_hydrate_ms: list[float] = [] # time in DataPool queue before hydrate begun (proxy: prov_start->hydrate_done minus hydrate ms) + spawn_minus_data_ms: list[float] = [] # time between hydrate_done and spawn_done (lifetime + lifetime queue) + for mid, t0 in prov_start.items(): + t_end = spawn_done.get(mid) + if t_end is None: + continue + e2e_prov_ms.append((t_end - t0) * 1000.0) + t_h = hydrate_done.get(mid) + if t_h is not None: + spawn_minus_data_ms.append((t_end - t_h) * 1000.0) + + e2e_deprov_ms: list[float] = [] + for mid, t0 in deprov_start.items(): + t_end = deprov_done.get(mid) + if t_end is None: + continue + e2e_deprov_ms.append((t_end - t0) * 1000.0) + + print(stats("hydrate_dur", hydrate_durations)) + print(stats("spawn_dur", spawn_durations)) + print(stats("e2e_prov", e2e_prov_ms)) + print(stats("after_hydrate", spawn_minus_data_ms)) + print(stats("e2e_deprov", e2e_deprov_ms)) + if hydrate_durations and spawn_durations: + h_sum = sum(hydrate_durations) / 1000.0 + s_sum = sum(spawn_durations) / 1000.0 + print(f" total hydrate work: {h_sum:.1f}s, total spawn work: {s_sum:.1f}s, ratio H/S: {h_sum/s_sum:.2f}") + print() + + +if __name__ == "__main__": + for arg in sys.argv[1:]: + analyse(Path(arg)) diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua index f3cf3e697..ff9ab582e 100644 --- a/scripts/test_scripts/hub/perf_configs/hub.lua +++ b/scripts/test_scripts/hub/perf_configs/hub.lua @@ -12,19 +12,14 @@ hub = { disklimitpercent = 90, -- default: 0 (disabled) }, corelimit = 4, -- default: 0 (auto) - provisionthreads = 8, -- default: auto + -- provisionthreads / spawnthreads / hub.hydration.threads left unset: defaults + -- (clamp(cpu/8,4,12) / clamp(cpu/8,4,16) / clamp(cpu/8,4,12)) are tuned from + -- 1000-module sweep at 128 vCPU + 30ms latency. Override here only to A/B test. -- NOTE: hub.instance.config (path to instance lua) is overridden via -- --hub-instance-config on the CLI. If left here, it would be resolved -- relative to the hub's CWD at spawn time (NOT this file's dir). }, - hydration = { - -- Match production's per-module download pool size. Without this, the - -- default auto-picks hardware_concurrency/4 which on --corelimit=128 - -- would be 32. Prod logs consistently show "16 threads" in Download phase. - threads = 16, - }, - watchdog = { cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua index 1251997db..cb292a0cd 100644 --- a/scripts/test_scripts/hub/perf_configs/instance.lua +++ b/scripts/test_scripts/hub/perf_configs/instance.lua @@ -13,3 +13,9 @@ cache = { }, }, } + +server = { + sentry = { + disable = true, -- perf runs: keep Sentry on the hub only; skip per-child crash reporter init + }, +} diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py index 365e4a542..cfa63af31 100644 --- a/scripts/test_scripts/hub/preserve_minio_state.py +++ b/scripts/test_scripts/hub/preserve_minio_state.py @@ -9,10 +9,8 @@ wipes --minio-data-dir on its next invocation anyway. Pass --copy to keep --source in place (slower; needs 2x disk). -Typical invocation: - python preserve_minio_state.py - -Defaults map to the paths recommended by PERF_SEED_README.md. +Both --source and --dest are required. See PERF_SEED_README.md for the +expected layout. """ from __future__ import annotations @@ -57,11 +55,10 @@ def _size_of(path: Path) -> tuple[int, int]: def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data", - help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)") - parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed", - help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " - "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--source", required=True, + help="Source MinIO data dir produced by Stage B (seed_minio.py).") + parser.add_argument("--dest", required=True, + help="Preservation path; the move/copy target that becomes the baseline read by Stage C.") parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""), help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)") parser.add_argument("--bucket", default="zen-seed", diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py index b59cff41a..575303c61 100644 --- a/scripts/test_scripts/hub/run_minio_perf.py +++ b/scripts/test_scripts/hub/run_minio_perf.py @@ -68,7 +68,7 @@ def _find_zenserver(override: Optional[str]) -> Path: sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent - repo_root = script_dir.parent.parent + repo_root = script_dir.parents[2] for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" @@ -91,6 +91,73 @@ def _find_minio(zenserver_path: Path) -> Path: return p +def _resolve_toxiproxy_exe(arg_value: Optional[str], env_var: str, default_name: str) -> Path: + """Resolve a toxiproxy executable: explicit --toxiproxy-* arg wins, then env var, + then PATH lookup of `default_name`. Exits if none of the three resolve to a real + file.""" + candidate = arg_value or os.environ.get(env_var) + if candidate: + p = Path(candidate) + if not p.exists(): + sys.exit(f"[toxiproxy] {default_name}: explicit path '{p}' not found") + return p + found = shutil.which(default_name) + if found: + return Path(found) + sys.exit(f"[toxiproxy] {default_name} not found: pass --toxiproxy-server-exe / --toxiproxy-cli-exe, " + f"set {env_var}, or put {default_name} on PATH") + + +def _start_toxiproxy(server_exe: Path, cli_exe: Path, api_port: int, listen_port: int, + upstream_port: int, latency_ms: int, jitter_ms: int) -> subprocess.Popen: + """Start toxiproxy-server, create a proxy minio_proxy listening on listen_port and + forwarding to localhost:upstream_port, and add a latency toxic. Returns the server + Popen so the caller can terminate it on cleanup.""" + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(server_exe), "-port", str(api_port)], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, + ) + print(f"[toxiproxy] server started (pid {proc.pid}) api-port={api_port}") + # Wait for the API to come up. + api_url = f"http://localhost:{api_port}/version" + deadline = time.monotonic() + 10.0 + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(api_url, timeout=1): + break + except Exception: + time.sleep(0.1) + else: + proc.terminate() + sys.exit(f"[toxiproxy] API at {api_url} never came up") + + env = os.environ.copy() + env["TOXIPROXY_URL"] = f"http://localhost:{api_port}" + # Create the proxy. + subprocess.run( + [str(cli_exe), "create", "-l", f"127.0.0.1:{listen_port}", + "-u", f"127.0.0.1:{upstream_port}", "minio_proxy"], + env=env, check=True, + ) + # Add latency toxic to both directions (default direction is downstream; + # add explicit upstream too for symmetry). + args_common = ["-t", "latency", "-a", f"latency={latency_ms}", "-a", f"jitter={jitter_ms}"] + subprocess.run( + [str(cli_exe), "toxic", "add", "-n", "lat_down", "-d", *args_common, "minio_proxy"], + env=env, check=True, + ) + subprocess.run( + [str(cli_exe), "toxic", "add", "-n", "lat_up", "-u", *args_common, "minio_proxy"], + env=env, check=True, + ) + print(f"[toxiproxy] minio_proxy listening on 127.0.0.1:{listen_port} -> " + f"127.0.0.1:{upstream_port}, latency={latency_ms}ms jitter={jitter_ms}ms (per direction)") + return proc + + def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: data_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() @@ -116,10 +183,12 @@ def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" + t0 = time.monotonic() + print(f"[minio] waiting for ready on port {port} ...", flush=True) while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): - print("[minio] ready") + print(f"[minio] ready ({time.monotonic()-t0:.1f}s)", flush=True) return except Exception: time.sleep(0.1) @@ -178,14 +247,21 @@ def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) - deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) + t0 = time.monotonic() + last_tick = t0 + print(f"[hub] waiting for ready on port {port} ...", flush=True) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") try: with urllib.request.urlopen(req, timeout=2): - print("[hub] ready") + print(f"[hub] ready ({time.monotonic()-t0:.1f}s)", flush=True) return except Exception: + now = time.monotonic() + if now - last_tick >= 5.0: + print(f"[hub] still waiting ({now-t0:.1f}s elapsed)", flush=True) + last_tick = now time.sleep(0.2) sys.exit(f"[hub] timed out after {timeout_s}s") @@ -195,14 +271,16 @@ def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = if hub_proc.poll() is not None: return pid = hub_proc.pid - print(f"[hub] zen down --pid {pid}") + print(f"[hub] zen down --pid {pid} ...", flush=True) + t0 = time.time() rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) if rc != 0: - print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway", flush=True) try: hub_proc.wait(timeout=timeout_s) + print(f"[hub] exited ({time.time()-t0:.1f}s)", flush=True) except subprocess.TimeoutExpired: - print(f"[hub] did not exit after {timeout_s}s, killing") + print(f"[hub] did not exit after {timeout_s}s, killing", flush=True) hub_proc.kill() hub_proc.wait() @@ -210,6 +288,8 @@ def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: if proc.poll() is not None: return + print(f"[minio] stopping (pid {proc.pid}) ...", flush=True) + t0 = time.time() try: if sys.platform == "win32": proc.send_signal(signal.CTRL_BREAK_EVENT) @@ -219,8 +299,9 @@ def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> Non proc.terminate() try: proc.wait(timeout=timeout_s) + print(f"[minio] stopped ({time.time()-t0:.1f}s)", flush=True) except subprocess.TimeoutExpired: - print(f"[minio] did not exit after {timeout_s}s, killing") + print(f"[minio] did not exit after {timeout_s}s, killing", flush=True) proc.kill() proc.wait() @@ -341,29 +422,34 @@ def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: def _robust_copytree(src: Path, dst: Path) -> None: - """Windows-friendly directory copy with progress. + """Directory copy of the seeded baseline onto the working MinIO data dir. + + On a ReFS volume we use `refs_clone.clone_tree` to issue + FSCTL_DUPLICATE_EXTENTS_TO_FILE per file: O(1) per file via copy-on-write + metadata, regardless of size. A 300+ GB baseline clones in seconds. + Off ReFS we fall back to `shutil.copytree`. PowerShell `Copy-Item` was + tried previously and observed to byte-copy on this host, so it is no + longer used. - Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy - file data + attributes + timestamps explicitly (not just the defaults). Safe because dst is always the working dir (minio-run) - never the preserved baseline. """ - if sys.platform == "win32": - cmd = [ - "robocopy", str(src), str(dst), - "/MIR", - "/COPY:DAT", "/DCOPY:DAT", - "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP", - "/R:2", "/W:1", - ] - print(f"[reset] robocopy {src} -> {dst}") - rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - if rc >= 8: - sys.exit(f"[reset] robocopy failed rc={rc}") + from refs_clone import clone_tree, is_refs_volume + + if dst.exists(): + _rmtree_robust(dst) + + if sys.platform == "win32" and is_refs_volume(src) and is_refs_volume(dst.parent): + print(f"[reset] ReFS block-clone {src} -> {dst} ...", flush=True) + t0 = time.time() + files, bytes_total = clone_tree(src, dst) + print(f"[reset] cloned {files:,} files, {bytes_total/1024/1024:.1f} MB " + f"in {time.time()-t0:.1f}s", flush=True) else: - if dst.exists(): - _rmtree_robust(dst) + print(f"[reset] copytree (non-ReFS) {src} -> {dst} ...", flush=True) + t0 = time.time() shutil.copytree(src, dst, symlinks=False) + print(f"[reset] copytree done in {time.time()-t0:.1f}s", flush=True) def _archive_run( @@ -415,26 +501,27 @@ def _archive_run( def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed", - help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " - "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") - parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run", - help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)") - parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", - help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)") - parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf", - help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf") - parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs", - help="Where to archive hub.log + zenserver.log + hub.utrace after each run " - "(default: E:/Dev/zen-perf-seed/perf-runs)") + parser.add_argument("--minio-seeded", required=True, + help="Preserved MinIO baseline (read-only source the working dir is reset from each run).") + parser.add_argument("--minio-run", required=True, + help="Working MinIO data dir, wiped and re-copied from --minio-seeded each run.") + parser.add_argument("--snapshot-dir", required=True, + help="Source of module IDs to run against (per-module server-state trees from Stage A).") + parser.add_argument("--hub-data-dir", required=True, + help="Hub --data-dir for the perf run (wiped each run unless --no-wipe-hub). " + "Place on a different volume from --minio-run to keep hub I/O from skewing MinIO measurements.") + parser.add_argument("--archive-dir", required=True, + help="Where to archive hub.log + zenserver.log + hub.utrace after each run.") parser.add_argument("--bucket", default="zen-seed-packed", - help="MinIO bucket name to exercise (default: zen-seed-packed). " - "Pack worktree seeds only the packed bucket.") + help="MinIO bucket name to exercise (default: zen-seed-packed).") parser.add_argument("--minio-port", type=int, default=9000) parser.add_argument("--minio-console-port", type=int, default=9001) parser.add_argument("--hub-port", type=int, default=8558) - parser.add_argument("--module-count", type=int, default=0, - help="Cap on modules used (0 = all from snapshot-dir)") + parser.add_argument("--module-count", type=int, default=1000, + help="Cap on modules used (default 1000, the hub instance shared-state " + "limit; raising this above ~1023 will hit 'all slots occupied' errors). " + "Pass 0 to use every module under --snapshot-dir but expect failures " + "if the snapshot has more than ~1023 entries.") parser.add_argument("--workers", type=int, default=50) parser.add_argument("--poll-timeout", type=float, default=1200.0) parser.add_argument("--settle-seconds", type=float, default=5.0) @@ -446,8 +533,29 @@ def main() -> int: "baseline; pass this to measure dehydrate cost as well as hydrate.") parser.add_argument("--no-wipe-hub", action="store_true", help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") + parser.add_argument("--blocking", action="store_true", + help="Force blocking S3Client path (pass --hub-hydration-async-enabled=false). " + "Default is the hub default, which routes S3 hydration through AsyncHttpClient.") + parser.add_argument("--hub-arg", action="append", default=[], + help="Extra raw arg appended verbatim to the hub command line. Repeatable.") parser.add_argument("--zenserver-dir", help="Directory containing zenserver + minio executables (auto-detected)") + # Toxiproxy: simulate network latency between hub and MinIO. + parser.add_argument("--toxiproxy-latency-ms", type=int, default=0, + help="If >0, route hub through toxiproxy with this latency injected on inbound+outbound. " + "Approximates real-network RTT on top of localhost MinIO.") + parser.add_argument("--toxiproxy-jitter-ms", type=int, default=0, + help="Jitter added to toxiproxy latency (passed as -a jitter=N to toxic add).") + parser.add_argument("--toxiproxy-port", type=int, default=9100, + help="Toxiproxy listen port for the MinIO proxy (the hub connects here when --toxiproxy-latency-ms > 0).") + parser.add_argument("--toxiproxy-api-port", type=int, default=8474, + help="Toxiproxy server API port.") + parser.add_argument("--toxiproxy-server-exe", default=None, + help="Path to toxiproxy-server executable. Defaults to ZEN_PERF_TOXIPROXY_SERVER_EXE " + "env var, or 'toxiproxy-server' from PATH.") + parser.add_argument("--toxiproxy-cli-exe", default=None, + help="Path to toxiproxy-cli executable. Defaults to ZEN_PERF_TOXIPROXY_CLI_EXE " + "env var, or 'toxiproxy-cli' from PATH.") args = parser.parse_args() minio_seeded = Path(args.minio_seeded).resolve() @@ -491,27 +599,39 @@ def main() -> int: # Wipe the hub data dir so every run starts from scratch unless the user opts out. if not args.no_wipe_hub and hub_data_dir.exists(): - print(f"[reset] wiping {hub_data_dir}") + print(f"[reset] wiping {hub_data_dir} ...", flush=True) + t0 = time.time() _rmtree_robust(hub_data_dir) + print(f"[reset] wipe done in {time.time()-t0:.1f}s", flush=True) hub_data_dir.mkdir(parents=True, exist_ok=True) + # If toxiproxy is enabled, the hub points at the proxy port, which forwards + # to the real MinIO with the configured latency injected. + hub_endpoint_port = args.toxiproxy_port if args.toxiproxy_latency_ms > 0 else args.minio_port + + s3_settings = { + "uri": f"s3://{args.bucket}", + "endpoint": f"http://localhost:{hub_endpoint_port}", + "path-style": True, + "region": "us-east-1", + } config_path = hub_data_dir / "hydration_config.json" config_path.write_text( - json.dumps({ - "type": "s3", - "settings": { - "uri": f"s3://{args.bucket}", - "endpoint": f"http://localhost:{args.minio_port}", - "path-style": True, - "region": "us-east-1", - }, - }), + json.dumps({"type": "s3", "settings": s3_settings}), encoding="ascii", ) hub_extra_args = [ f"--hub-hydration-target-config={config_path}", f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", ] + if args.blocking: + hub_extra_args.append("--hub-hydration-async-enabled=false") + print("[mode] --blocking=true: hub will use blocking S3Client path") + else: + print("[mode] async path (default): hub routes S3 hydration through AsyncHttpClient") + if args.hub_arg: + hub_extra_args.extend(args.hub_arg) + print(f"[mode] extra hub args: {args.hub_arg}") if args.enable_dehydration: print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") if args.trace: @@ -524,6 +644,7 @@ def main() -> int: } minio_proc: Optional[subprocess.Popen] = None + toxiproxy_proc: Optional[subprocess.Popen] = None hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None exit_code = 0 @@ -542,6 +663,21 @@ def main() -> int: minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) + if args.toxiproxy_latency_ms > 0: + toxiproxy_server = _resolve_toxiproxy_exe( + args.toxiproxy_server_exe, "ZEN_PERF_TOXIPROXY_SERVER_EXE", "toxiproxy-server") + toxiproxy_cli = _resolve_toxiproxy_exe( + args.toxiproxy_cli_exe, "ZEN_PERF_TOXIPROXY_CLI_EXE", "toxiproxy-cli") + toxiproxy_proc = _start_toxiproxy( + toxiproxy_server, + toxiproxy_cli, + args.toxiproxy_api_port, + args.toxiproxy_port, + args.minio_port, + args.toxiproxy_latency_ms, + args.toxiproxy_jitter_ms, + ) + hub_log = hub_data_dir / "hub.log" hub_proc, hub_log_handle = _start_hub( zenserver_exe, hub_data_dir, args.hub_port, hub_log, @@ -601,6 +737,13 @@ def main() -> int: _zen_down_hub(zen_exe, hub_proc) if hub_log_handle is not None: hub_log_handle.close() + if toxiproxy_proc is not None and toxiproxy_proc.poll() is None: + try: + toxiproxy_proc.terminate() + toxiproxy_proc.wait(timeout=5) + except Exception: + toxiproxy_proc.kill() + print("[toxiproxy] stopped") if minio_proc is not None and minio_proc.poll() is None: _stop_minio_graceful(minio_proc) # Archive AFTER the hub has exited so in-flight log writes are flushed. diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py index e0e45c4cb..f5928b995 100644 --- a/scripts/test_scripts/hub/seed_minio.py +++ b/scripts/test_scripts/hub/seed_minio.py @@ -1,13 +1,10 @@ #!/usr/bin/env python3 """Stage B of the perf-seed workflow. -Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO -bucket so a later perf run can exercise that bucket. Pack mode is hardcoded -in this script (see --hub-hydration-enable-pack flag in _start_hub) and -governs how the hub uploads into MinIO. To compare packed vs unpacked, -invoke this script twice from two separate worktrees - one with the pack -flag flipped to false - producing two preserved MinIO data dirs, then run -run_minio_perf.py against each. +Replays the snapshot produced by seed_s3_snapshot.py into a MinIO bucket so +a later perf run can exercise it. Pack mode is fixed ON (the only mode the +perf-seed pipeline caters to) - the hub is launched with +--hub-hydration-enable-pack=true so dehydrate emits packed CAS. Flow: 1. Start a local MinIO server against --minio-data-dir, create the bucket. @@ -78,7 +75,7 @@ def _find_zenserver(override: Optional[str]) -> Path: sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent - repo_root = script_dir.parent.parent + repo_root = script_dir.parents[2] for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" @@ -189,11 +186,10 @@ def _start_hub( "--hub-provision-disk-limit-percent=99", "--hub-provision-memory-limit-percent=80", f"--hub-instance-limit={instance_limit}", - # Seeding is not a perf-measurement path - we want it as fast as the - # host can manage. Let the hub go wide on both provisioning and - # hydration thread pools rather than matching prod limits. - "--hub-instance-provision-threads=64", - "--hub-hydration-threads=64", + # Provision / hydration / async-cap use server defaults; on a 128-core + # host these resolve to 16 / 16 / 512 which are sized for both the + # async hydrate (Stage A) and the sync dehydrate-PUT path that drives + # the upload here. # Prevent the watchdog from auto-deprovisioning modules while we're # still hydrating the tail / in the overlay phase. BOTH timers have to # be extended - the provisioned one (default 600s) is what bites on @@ -203,7 +199,7 @@ def _start_hub( # Explicit - default is true, but make it obvious that Stage B needs # it since the final deprovision drives the MinIO upload. "--hub-enable-dehydration=true", - # Pack worktree: turn pack on so dehydrate emits packed CAS. + # Pack ON (the only seeding mode) so dehydrate emits packed CAS. "--hub-hydration-enable-pack=true", ] + extra_args @@ -411,7 +407,16 @@ def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: l """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*. snapshot_root is treated as read-only; only hub_servers_root is written to. + Uses ReFS block-clone (`refs_clone.clone_tree`) when source and dest live on + a ReFS volume so the overlay is O(1) per file via copy-on-write metadata + rather than a full byte copy. Falls back to byte copy per-file when the + volume is not ReFS or a file is too small for FSCTL_DUPLICATE_EXTENTS_TO_FILE. """ + from refs_clone import clone_tree, is_refs_volume + + use_clone = is_refs_volume(snapshot_root) and is_refs_volume(hub_servers_root) + print(f"[overlay] {'ReFS block-clone path' if use_clone else 'byte-copy path (non-ReFS)'}") + files_copied = 0 bytes_copied = 0 modules_overlaid = 0 @@ -424,16 +429,21 @@ def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: l continue if dst.exists(): _rmtree_robust(dst) - shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + if use_clone: + f, b = clone_tree(src, dst) + files_copied += f + bytes_copied += b + else: + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 modules_overlaid += 1 - for root, _dirs, files in os.walk(dst): - for f in files: - p = Path(root) / f - try: - bytes_copied += p.stat().st_size - except OSError: - pass - files_copied += 1 if i % 25 == 0 or i == len(module_ids): print(f"[overlay] {i}/{len(module_ids)} modules overlaid " f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") @@ -601,21 +611,23 @@ def _seed_one_bucket( def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", - help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)") - parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs", - help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ " - "(default: E:/Dev/zen-perf-seed/hubs)") - parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data", - help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--snapshot-dir", required=True, + help="Source of per-module server-state trees (READ-ONLY).") + parser.add_argument("--hub-data-root", required=True, + help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/") + parser.add_argument("--minio-data-dir", required=True, + help="MinIO data dir shared by every bucket.") parser.add_argument("--minio-port", type=int, default=9000) parser.add_argument("--minio-console-port", type=int, default=9001) parser.add_argument("--hub-port", type=int, default=8558) parser.add_argument("--bucket", default="zen-seed-packed", - help="Bucket to seed (default: zen-seed-packed). Pack worktree - " - "hub is launched with --hub-hydration-enable-pack=true.") - parser.add_argument("--module-count", type=int, default=0, - help="Cap on modules processed (0 = all modules in snapshot-dir)") + help="Bucket to seed (default: zen-seed-packed). Hub is launched with " + "--hub-hydration-enable-pack=true (the only seeding mode).") + parser.add_argument("--module-count", type=int, default=1000, + help="Cap on modules processed (default 1000, the hub instance shared-state " + "limit; raising this above ~1023 will hit 'all slots occupied' errors). " + "Pass 0 to process every module under --snapshot-dir but expect failures " + "if the snapshot has more than ~1023 entries.") parser.add_argument("--workers", type=int, default=50) parser.add_argument("--poll-timeout", type=float, default=1800.0, help="Max seconds to wait for each state transition (default: 1800)") @@ -706,7 +718,7 @@ def main() -> int: for key, lbl in zip(phases, labels): print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}") - print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/") + print(f"[summary] next: preserve {minio_data_dir} via preserve_minio_state.py --source <this> --dest <baseline>") finally: if minio_proc is not None and minio_proc.poll() is None: diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py index f0bc7b607..a99c9e130 100644 --- a/scripts/test_scripts/hub/seed_s3_snapshot.py +++ b/scripts/test_scripts/hub/seed_s3_snapshot.py @@ -87,7 +87,7 @@ def _find_zenserver(override: Optional[str]) -> Path: return p script_dir = Path(__file__).resolve().parent - repo_root = script_dir.parent.parent + repo_root = script_dir.parents[2] for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" @@ -161,14 +161,24 @@ def _parse_s3_uri(uri: str) -> tuple[str, str]: return bucket, prefix +_EMPTY_MIN_OBJECTS = 3 +_EMPTY_MIN_BYTES = 16 * 1024 + +# Size buckets for stratified selection. Pyramid distribution mirrors typical +# workloads: many small modules, fewer medium, fewest large. +_SMALL_MAX_BYTES = 1 * 1024 * 1024 # <1 MiB +_LARGE_MIN_BYTES = 500 * 1024 * 1024 # >500 MiB +_BUCKET_RATIO = (("small", 500), ("medium", 350), ("large", 150)) + + def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]: """List UUID-shaped module folders under the bucket and return the `limit` - most recently active ones, ranked by the LastModified of each module's - `incremental-state.cbo` (newest first - these were last dehydrated most - recently, which is the closest proxy for "most recently accessed"). + most recently active non-empty ones, ranked by the LastModified of each + module's `incremental-state.cbo` (newest first - these were last dehydrated + most recently, which is the closest proxy for "most recently accessed"). - Falls back to listing order for any folder whose state file can't be - HEADed (missing / 403 / transient error). + Modules with < _EMPTY_MIN_OBJECTS objects or < _EMPTY_MIN_BYTES total bytes + are treated as empty and dropped. """ s3 = session.client("s3", region_name=region) prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" @@ -181,30 +191,147 @@ def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/") if folder and _MODULEID_RE.match(folder): candidates.append(folder) - print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...") + print(f"[s3] {len(candidates)} module folders match UUID shape; " + f"sizing + ranking by state.cbo LastModified (skip empty <{_EMPTY_MIN_OBJECTS} obj or <{_EMPTY_MIN_BYTES} B)...") - # 2. HEAD each module's state file in parallel. Missing/failed HEADs land - # at the tail via a sentinel epoch 0 timestamp. + # 2. Per-module ListObjectsV2: total object count + total bytes + state.cbo + # LastModified, all in one pass. Missing state file => epoch sentinel. from datetime import datetime, timezone epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) - def _state_mtime(mid: str) -> datetime: - key = f"{prefix_norm}{mid}/incremental-state.cbo" + def _probe(mid: str) -> tuple[int, int, datetime]: + mid_prefix = f"{prefix_norm}{mid}/" + state_key = f"{mid_prefix}incremental-state.cbo" + count = 0 + total_bytes = 0 + state_mtime = epoch try: - resp = s3.head_object(Bucket=bucket, Key=key) - return resp.get("LastModified", epoch) + for page in paginator.paginate(Bucket=bucket, Prefix=mid_prefix): + for obj in page.get("Contents", []) or []: + count += 1 + total_bytes += int(obj.get("Size", 0)) + if obj.get("Key") == state_key: + state_mtime = obj.get("LastModified", epoch) or epoch except Exception: - return epoch + return 0, 0, epoch + return count, total_bytes, state_mtime with ThreadPoolExecutor(max_workers=50) as pool: - times = list(pool.map(_state_mtime, candidates)) - - # 3. Sort descending (newest first). Folders without a state file sink. - ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True) - missing = sum(1 for _, t in ranked if t == epoch) - if missing: - print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)") - return [mid for mid, _ in ranked[:limit]] + probes = list(pool.map(_probe, candidates)) + + # 3. Stratified selection. Bucket non-empty modules by total_bytes into + # small/medium/large, sort each bucket by state.cbo mtime desc, and pull + # according to _BUCKET_RATIO scaled to `limit`. If a bucket is short, + # spill the deficit onto the relaxed pool first, then onto neighbouring + # buckets so the caller still gets `limit` modules. + small: list[tuple[str, datetime, int]] = [] + medium: list[tuple[str, datetime, int]] = [] + large: list[tuple[str, datetime, int]] = [] + relaxed: list[tuple[str, datetime, int, int]] = [] + empties = 0 + no_state = 0 + for mid, (count, total_bytes, mtime) in zip(candidates, probes): + is_empty = count < _EMPTY_MIN_OBJECTS or total_bytes < _EMPTY_MIN_BYTES + if is_empty: + empties += 1 + if mtime == epoch: + no_state += 1 + if not is_empty and mtime != epoch: + if total_bytes < _SMALL_MAX_BYTES: + small.append((mid, mtime, total_bytes)) + elif total_bytes >= _LARGE_MIN_BYTES: + large.append((mid, mtime, total_bytes)) + else: + medium.append((mid, mtime, total_bytes)) + else: + relaxed.append((mid, mtime, count, total_bytes)) + + for bucket in (small, medium, large): + bucket.sort(key=lambda x: x[1], reverse=True) + print(f"[s3] candidates={len(candidates)} empty={empties} no-state={no_state} " + f"small={len(small)} medium={len(medium)} large={len(large)}") + + total_ratio = sum(w for _, w in _BUCKET_RATIO) + targets = {name: max(0, (limit * w) // total_ratio) for name, w in _BUCKET_RATIO} + # Distribute rounding leftovers to first bucket(s) deterministically. + leftover = limit - sum(targets.values()) + for name, _ in _BUCKET_RATIO: + if leftover <= 0: + break + targets[name] += 1 + leftover -= 1 + + pools = {"small": small, "medium": medium, "large": large} + # If a bucket is short of its target, the deficit redirects to the nearest + # neighbour bucket(s) before falling back to anything further away. Small + # is rare in real workloads, so its deficit pads from medium - not large. + fallback_chain = { + "small": ["medium", "large"], + "medium": ["small", "large"], + "large": ["medium", "small"], + } + cursors = {"small": 0, "medium": 0, "large": 0} + selected: list[str] = [] + seen: set[str] = set() + fills = {"small": 0, "medium": 0, "large": 0} + redirected = {"small": 0, "medium": 0, "large": 0} + + def _take_from(name: str, n: int) -> int: + """Take up to n entries from pools[name] starting at cursor. Returns + the number actually taken.""" + if n <= 0: + return 0 + pool = pools[name] + start = cursors[name] + end = start + n + taken = 0 + for mid, _, _ in pool[start:end]: + if mid in seen: + continue + seen.add(mid) + selected.append(mid) + taken += 1 + cursors[name] = end + return taken + + for name, _ in _BUCKET_RATIO: + want = targets[name] + got = _take_from(name, want) + fills[name] = got + deficit = want - got + if deficit <= 0: + continue + for fb in fallback_chain[name]: + if deficit <= 0: + break + avail = max(0, len(pools[fb]) - cursors[fb]) + grab = min(deficit, avail) + if grab <= 0: + continue + got_fb = _take_from(fb, grab) + redirected[name] += got_fb + deficit -= got_fb + print(f"[s3] bucket fills: small={fills['small']}+{redirected['small']} " + f"medium={fills['medium']}+{redirected['medium']} " + f"large={fills['large']}+{redirected['large']} " + f"(numbers after '+' = redirected from neighbour buckets)") + + deficit = limit - len(selected) + if deficit > 0: + # Last-resort spill: relaxed pool (empties + no-state). + relaxed.sort(key=lambda x: (x[1], x[3], x[2]), reverse=True) + added = 0 + for mid, _, _, _ in relaxed: + if added >= deficit: + break + if mid in seen: + continue + seen.add(mid) + selected.append(mid) + added += 1 + print(f"[s3] extended with {added} relaxed entries to fill deficit") + + return selected[:limit] # --------------------------------------------------------------------------- @@ -231,11 +358,9 @@ def _start_hub( "--hub-provision-disk-limit-percent=99", "--hub-provision-memory-limit-percent=80", f"--hub-instance-limit={instance_limit}", - # Seeding is not a perf-measurement path - we want it as fast as the - # host can manage. Let the hub go wide on both provisioning and - # hydration thread pools rather than matching prod limits. - "--hub-instance-provision-threads=64", - "--hub-hydration-threads=64", + # Provision pool + async S3 in-flight cap use server defaults; on a + # 128-core host these resolve to 16 / 512 which is what the seed run + # needs (clamp(cpu/8, 4, 16) and clamp(cpu*4, 128, 512)). # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH # watchdog inactivity timers so early-provisioned modules do not get # auto-deprovisioned while we're still hydrating the tail (hibernated @@ -326,6 +451,37 @@ def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict] return 0, {"error": str(e)} +_HYDRATION_FAIL_RE = re.compile(r"Hydration of module '([0-9a-f-]+)' failed: (.+?)(?:\.\.|$)") + + +def _scan_hydration_failures(log_path: Path, offset: int, already_warned: set[str], label: str) -> int: + """Read new hub.log content from `offset` to EOF; print a [warn] line for + each newly observed `Hydration of module 'X' failed` warning. Returns the + new offset (resume point for next call).""" + try: + size = log_path.stat().st_size + except OSError: + return offset + if size <= offset: + return offset + try: + with open(log_path, "rb") as f: + f.seek(offset) + chunk = f.read(size - offset).decode("utf-8", errors="replace") + except OSError: + return offset + for m in _HYDRATION_FAIL_RE.finditer(chunk): + mid, reason = m.group(1), m.group(2).strip() + if mid in already_warned: + continue + already_warned.add(mid) + # Trim long S3 error reasons so the line stays scannable. + if len(reason) > 200: + reason = reason[:200] + "..." + print(f"\n[{label}] HYDRATION FAILED {mid}: {reason}", flush=True) + return size + + def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: url = f"http://localhost:{port}/hub/status" req = urllib.request.Request(url, headers={"Accept": "application/json"}) @@ -370,16 +526,25 @@ def _wait_for_state( target_state: str, timeout_s: float, label: str, + hub_log: Optional[Path] = None, + hydration_warned: Optional[set[str]] = None, ) -> tuple[list[str], list[str], dict[str, str]]: """Poll hub status until every module hits target_state, fails, or times out. Returns (stuck, failed, last_states). 'stuck' = still mid-transition when we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'. + + When hub_log + hydration_warned are supplied, the wait loop also tails the + hub log and surfaces `Hydration of module ... failed` warnings as they + appear. Hydration failures do NOT push modules into _FAILED_STATES (the hub + cleans the state and still marks the module 'provisioned'), so this is the + only way to see them in script output. """ deadline = time.monotonic() + timeout_s remaining = set(module_ids) failed: list[str] = [] last_states: dict[str, str] = {mid: "" for mid in module_ids} + log_offset = hub_log.stat().st_size if (hub_log and hub_log.exists()) else 0 while remaining and time.monotonic() < deadline: states = _hub_module_states(port) @@ -393,10 +558,16 @@ def _wait_for_state( remaining.discard(mid) failed.append(mid) done = len(module_ids) - len(remaining) - print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + warned_count = len(hydration_warned) if hydration_warned is not None else 0 + suffix = f", hydration-failed {warned_count}" if hydration_warned is not None else "" + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed{suffix})...", end="\r") + if hub_log is not None and hydration_warned is not None: + log_offset = _scan_hydration_failures(hub_log, log_offset, hydration_warned, label) time.sleep(2.0) print() + if hub_log is not None and hydration_warned is not None: + _scan_hydration_failures(hub_log, log_offset, hydration_warned, label) return list(remaining), failed, last_states @@ -404,16 +575,37 @@ def _wait_for_state( # Snapshot copy (run AFTER hub shutdown so there's no concurrent writer) # --------------------------------------------------------------------------- +def _same_volume(a: Path, b: Path) -> bool: + """True when a and b live on the same filesystem volume (so an os.replace + rename is O(1) instead of an EXDEV-driven copy+delete fallback).""" + try: + return os.stat(a).st_dev == os.stat(b).st_dev + except OSError: + return False + + def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]: - """Copy src_root/<mid>/* to dst_root/<mid>/* for each module. + """Move (preferred) or copy src_root/<mid>/ to dst_root/<mid>/ for each module. - Returns (modules_copied, files_copied, bytes_copied). + Hub data dir is throwaway after Stage A (the hub is shut down right after + this step), so per-module trees can be moved out of it. When src_root and + dst_root share a volume the move is an O(1) directory rename; when they + don't, shutil.move falls back to a byte copy plus rmtree, which matches + the old shutil.copytree cost. + + Returns (modules_moved, files_moved, bytes_moved). Replaces only the specific per-module subdirs; never touches siblings. """ dst_root.mkdir(parents=True, exist_ok=True) - modules_copied = 0 - files_copied = 0 - bytes_copied = 0 + same_vol = _same_volume(src_root, dst_root) + if same_vol: + print(f"[snapshot] src and dst share a volume; moving per-module trees (O(1) rename per module)") + else: + print(f"[snapshot] src and dst are on different volumes; falling back to byte copy") + + modules_moved = 0 + files_moved = 0 + bytes_moved = 0 for i, mid in enumerate(module_ids, 1): src = src_root / mid @@ -423,21 +615,24 @@ def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tup continue if dst.exists(): _rmtree_robust(dst) - shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) - modules_copied += 1 + if same_vol: + os.replace(src, dst) + else: + shutil.move(str(src), str(dst)) + modules_moved += 1 for root, _dirs, files in os.walk(dst): for f in files: p = Path(root) / f try: - bytes_copied += p.stat().st_size + bytes_moved += p.stat().st_size except OSError: pass - files_copied += 1 + files_moved += 1 if i % 25 == 0 or i == len(module_ids): - print(f"[snapshot] {i}/{len(module_ids)} modules copied " - f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + print(f"[snapshot] {i}/{len(module_ids)} modules moved " + f"({files_moved:,} files, {bytes_moved/1024/1024:.1f} MB)") - return modules_copied, files_copied, bytes_copied + return modules_moved, files_moved, bytes_moved # --------------------------------------------------------------------------- @@ -447,10 +642,10 @@ def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tup def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a", - help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)") - parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", - help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-dir", required=True, + help="Hub --data-dir. Place on the same volume as --snapshot-dir so the snapshot step is a rename per module instead of a cross-volume byte copy.") + parser.add_argument("--snapshot-dir", required=True, + help="Destination for per-module server-state trees.") parser.add_argument("--port", type=int, default=8558, help="Hub HTTP port (default: 8558)") parser.add_argument("--module-count", type=int, default=1000, @@ -505,6 +700,15 @@ def main() -> int: if len(module_ids) < args.module_count: print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter") + # _list_module_ids returns modules grouped by size bucket (small first, then + # medium, then large). Provisioning in that order would create a size-sorted + # fan-out and skew load: all small first (cheap, fast hydrations) then all + # large (heavy, slow). Shuffle so provision/hibernate/copy hit a mixed-size + # stream that better resembles real workload distribution. Fixed seed keeps + # runs reproducible across reseeds with the same bucket contents. + import random + random.Random(0xC0FFEE).shuffle(module_ids) + if not module_ids: sys.exit("[s3] no module folders found, aborting") @@ -554,10 +758,14 @@ def main() -> int: if not accepted: sys.exit("[provision] nothing accepted, aborting") - stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision") + hydration_warned: set[str] = set() + stuck, failed, last_states = _wait_for_state( + args.port, accepted, "provisioned", args.poll_timeout, "provision", + hub_log=hub_log, hydration_warned=hydration_warned, + ) prov_done = len(accepted) - len(stuck) - len(failed) - print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " - f"({time.monotonic()-t0:.1f}s)") + print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck, " + f"{len(hydration_warned)} hydration-failed ({time.monotonic()-t0:.1f}s)") if failed: for mid in failed[:10]: print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'") @@ -589,20 +797,23 @@ def main() -> int: for mid in (failed_hib + stuck_hib)[:10]: print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") - # --- Copy snapshots while hub is still running. All instances are - # hibernated (no writers), watchdog-hibernated-timeout is 86400s - # (no auto-deprovision), hub is only touching its own metadata - # outside servers/<mid>/. Safe. --- - copy_src = hub_data_dir / "servers" - to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] - print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}") + # --- Snapshot per-module state out of the hub data dir while the hub + # is still running. All instances are hibernated (no writers), + # watchdog-hibernated-timeout is 86400s (no auto-deprovision), + # hub is only touching its own metadata outside servers/<mid>/. + # When --hub-data-dir and --snapshot-dir share a volume the per- + # module trees are renamed (O(1)); cross-volume falls back to a + # byte copy. The hub data dir is wiped on the next run regardless. + snapshot_src = hub_data_dir / "servers" + to_snapshot = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + print(f"[snapshot] moving {len(to_snapshot)} module trees from {snapshot_src} -> {snapshot_dir}") t0 = time.monotonic() - modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy) - print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + modules_moved, files_moved, bytes_moved = _copy_snapshot(snapshot_src, snapshot_dir, to_snapshot) + print(f"[snapshot] moved {modules_moved} modules, {files_moved:,} files, {bytes_moved/1024/1024:.1f} MB " f"({time.monotonic()-t0:.1f}s)") - if modules_copied < len(to_copy): - print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied") + if modules_moved < len(to_snapshot): + print(f"[snapshot] WARNING: only {modules_moved}/{len(to_snapshot)} trees moved") exit_code = 1 # --- Graceful hub shutdown via 'zen down' --- diff --git a/scripts/test_scripts/hub/sweep_threads.py b/scripts/test_scripts/hub/sweep_threads.py new file mode 100644 index 000000000..f92a5d734 --- /dev/null +++ b/scripts/test_scripts/hub/sweep_threads.py @@ -0,0 +1,264 @@ +"""Greedy hill-climb search for hub thread balance. + +Per phase: from current best, try +STEP on each of (data, lifetime, hydration). +Pick best. Repeat. Stop when no candidate improves or all blocked by caps. + +Constraints: +- async_threads fixed at 1 +- step size = 4 per phase +- per-dim cap: 32 +- total cap: data + lifetime + hydration <= 96 +- start (8, 8, 8) + +Usage: + python sweep_threads.py +""" +from __future__ import annotations +import argparse +import atexit +import json +import signal +import subprocess +import sys +from pathlib import Path +import csv +import time + +HERE = Path(__file__).resolve().parent +RUNNER = HERE / "run_minio_perf.py" +LOG = HERE / "threads_sweep.csv" +ZEN_DIR = "build/windows/x64/release" +STEP = 4 +PER_DIM_CAP = 32 +TOTAL_CAP = 96 +ASYNC = 1 +DIMS = ("data", "lifetime", "hydration") +CHILD_PROCS: list[subprocess.Popen] = [] + + +def kill_strays(): + """Best-effort terminate hub, minio, toxiproxy. Idempotent.""" + for proc in CHILD_PROCS: + if proc.poll() is None: + try: + proc.terminate() + except Exception: + pass + targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"] + for name in targets: + try: + subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10) + except Exception: + pass + + +def install_signal_handlers(): + def handler(signum, frame): + print(f"\n[sweep] signal {signum} received - cleaning up", flush=True) + kill_strays() + sys.exit(130) + for sig in (signal.SIGINT, signal.SIGTERM): + try: + signal.signal(sig, handler) + except Exception: + pass + atexit.register(kill_strays) + + +def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict: + cmd = [ + sys.executable, "-u", str(RUNNER), + "--async-http", + f"--async-threads={async_threads}", + f"--toxiproxy-latency-ms={latency_ms}", + f"--zenserver-dir={ZEN_DIR}", + f"--hub-arg=--hub-instance-provision-threads={data}", + f"--hub-arg=--hub-instance-spawn-threads={lifetime}", + f"--hub-arg=--hub-hydration-threads={hydration}", + ] + print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True) + t0 = time.time() + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) + CHILD_PROCS.append(proc) + archive = None + rc = 0 + assert proc.stdout is not None + for line in proc.stdout: + line = line.rstrip() + if not line: + continue + if line.startswith("[archive]"): + archive = line.split(" ", 1)[1].split(" ")[0] + # Compress spinner-tick pile-up to last segment so we don't print + # "[provision] N/1000 provisioned..." 5x per line. Only triggers when + # the line is actually a spinner concat (contains the milestone marker). + if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line): + line = line.split("...")[-1].lstrip() + if not line: + continue + print(f" {line}", flush=True) + rc = proc.wait() + if proc in CHILD_PROCS: + CHILD_PROCS.remove(proc) + wall = time.time() - t0 + summary = {} + if archive: + sj = Path(archive) / "summary.json" + if sj.exists(): + summary = json.loads(sj.read_text()) + row = { + "label": label, + "data": data, + "lifetime": lifetime, + "hydration": hydration, + "async_threads": async_threads, + "latency_ms": latency_ms, + "exit_code": rc, + "wall_s": round(wall, 1), + "provision_s": summary.get("provision_total_s"), + "deprovision_s": summary.get("deprovision_total_s"), + "total_s": summary.get("total_s"), + "archive": archive, + } + print(f" -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True) + append_row(row) + return row + + +def append_row(row: dict): + new = not LOG.exists() + with LOG.open("a", newline="") as f: + w = csv.DictWriter(f, fieldnames=list(row.keys())) + if new: + w.writeheader() + w.writerow(row) + + +def feasible(p: dict) -> bool: + if any(p[d] > PER_DIM_CAP for d in DIMS): + return False + if sum(p[d] for d in DIMS) > TOTAL_CAP: + return False + return True + + +def candidates(center: dict) -> list[tuple[str, dict]]: + out = [] + for d in DIMS: + c = dict(center) + c[d] += STEP + if feasible(c): + out.append((d, c)) + return out + + +def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]: + out = [] + for chunk in s.split(";"): + chunk = chunk.strip() + if not chunk: + continue + parts = [int(x.strip()) for x in chunk.split(",")] + if len(parts) == 3: + d, l, h = parts + a = default_async + elif len(parts) == 4: + d, l, h, a = parts + else: + raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}") + out.append((d, l, h, a)) + return out + + +def run_points(points: list[tuple[int, int, int, int]], latency_ms: int): + rows = [] + for i, (d, l, h, a) in enumerate(points, 1): + if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP: + print(f" -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True) + continue + if d + l + h > TOTAL_CAP: + print(f" -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True) + continue + row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms, + label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a) + rows.append(row) + return rows + + +def main(): + p = argparse.ArgumentParser() + p.add_argument("--latency-ms", type=int, default=30) + p.add_argument("--max-phases", type=int, default=20) + p.add_argument("--async-threads", type=int, default=ASYNC, + help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). " + "Per-point override available via 4th value in --points entry.") + p.add_argument("--points", type=str, default="", + help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. " + "Format: 'd,l,h;d,l,h,a;...'. CSV appended.") + args = p.parse_args() + + install_signal_handlers() + + if args.points: + pts = parse_points(args.points, args.async_threads) + print(f"[sweep] explicit list: {len(pts)} points", flush=True) + rows = run_points(pts, args.latency_ms) + valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0] + if valid: + valid.sort(key=lambda r: r["total_s"]) + print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True) + for r in valid: + tot = r["data"] + r["lifetime"] + r["hydration"] + print(f" d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True) + print(f" log={LOG}", flush=True) + return + + cur = dict(data=8, lifetime=8, hydration=8) + tried = set() + history = [] + + base = run_once(**cur, latency_ms=args.latency_ms, label="phase0") + history.append(base) + tried.add((cur["data"], cur["lifetime"], cur["hydration"])) + best = base + + for phase in range(1, args.max_phases + 1): + cands = candidates(cur) + if not cands: + print(f" === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True) + break + phase_best = None + for dim, c in cands: + key = (c["data"], c["lifetime"], c["hydration"]) + if key in tried: + continue + tried.add(key) + row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}") + history.append(row) + if row.get("total_s") is not None and row["exit_code"] == 0: + if phase_best is None or row["total_s"] < phase_best["total_s"]: + phase_best = row + if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0: + print(f" === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True) + break + cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"]) + best = phase_best + print(f" +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True) + + print("\n=== OPTIMUM ===", flush=True) + print(f" data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True) + print(f" total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True) + print(f" log={LOG}", flush=True) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\n[sweep] interrupted - cleaning up", flush=True) + kill_strays() + sys.exit(130) + except Exception as e: + print(f"\n[sweep] error: {e!r} - cleaning up", flush=True) + kill_strays() + raise |