aboutsummaryrefslogtreecommitdiff
path: root/scripts/test_scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/test_scripts')
-rw-r--r--scripts/test_scripts/hub/PERF_SEED_README.md105
-rw-r--r--scripts/test_scripts/hub/hub_load_test_s3.py4
-rw-r--r--scripts/test_scripts/hub/parse_perf_log.py117
-rw-r--r--scripts/test_scripts/hub/perf_configs/hub.lua11
-rw-r--r--scripts/test_scripts/hub/perf_configs/instance.lua6
-rw-r--r--scripts/test_scripts/hub/preserve_minio_state.py15
-rw-r--r--scripts/test_scripts/hub/run_minio_perf.py243
-rw-r--r--scripts/test_scripts/hub/seed_minio.py82
-rw-r--r--scripts/test_scripts/hub/seed_s3_snapshot.py327
-rw-r--r--scripts/test_scripts/hub/sweep_threads.py264
10 files changed, 955 insertions, 219 deletions
diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md
index fb471d4bb..eacb0da55 100644
--- a/scripts/test_scripts/hub/PERF_SEED_README.md
+++ b/scripts/test_scripts/hub/PERF_SEED_README.md
@@ -3,35 +3,40 @@
Three-stage pipeline for running repeatable hub-hydration perf tests against a
local MinIO backend seeded with real module data pulled from production S3.
+The pipeline is **pack-on only** - the seeded baseline always comes from a hub
+launched with `--hub-hydration-enable-pack=true`. The pack-off variant is no
+longer maintained.
+
## Layout
-All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/`
-in the script defaults, but every path is overridable via CLI flag (see the
-per-stage options below). Pick a root with enough free space (snapshots and
-preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on
-each invocation or change the script defaults to your chosen root.
+All path arguments are required (no hardcoded defaults). Pick a perf-seed root
+with enough free space (snapshots and preserved CAS dirs can be large) and pass
+the matching `--*-dir` flag on each invocation. Stage A's hub data dir should
+live on the same volume as the snapshot dir so snapshotting is an O(1) rename
+per module instead of a cross-volume byte copy; Stage C's hub data dir should
+live on a different volume from the MinIO data dir so hub I/O does not skew the
+measured perf run.
-Layout under the chosen root (`<perf-seed>/`):
+Example layout (directory names only; pick volumes/roots and pass via `--*-dir`
+flags):
```
-<perf-seed>/
- hub-a/ Stage A hub data dir (transient)
+<perf-seed-A>/ bulk data + Stage A/B flow (one volume = move-friendly)
+ hub-a/ Stage A hub data dir (transient; snapshot-step rename source)
servers/<moduleid>/
- s3-snapshot/ Preserved production server-state trees (read-only after Stage A)
+ s3-snapshot/ Preserved production server-state trees (read-only after Stage A)
<moduleid>/
- hubs/ Stage B per-bucket hub data dirs (transient)
+ hubs/ Stage B per-bucket hub data dirs (transient)
hub-b-zen-seed-packed/
- hub-b-zen-seed-unpacked/
- minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket)
- minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve)
- README.txt
- minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree)
+ minio-data/ Stage B MinIO data dir (transient)
+ minio-seeded-packed/ Preserved packed MinIO CAS (read-only after Stage B + preserve)
README.txt
- hub-perf/ Stage C hub data dir (wiped each run)
- minio-run/ Stage C MinIO data dir (wiped + re-copied each run)
- perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json
+ minio-run/ Stage C MinIO data dir (wiped + re-copied each run)
+ perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json
20260423-141530_zen-seed-packed/
- 20260423-143112_zen-seed-unpacked/
+
+<perf-seed-B>/ separate volume from <perf-seed-A> for measurement isolation
+ hub-perf/ Stage C hub data dir (wiped each run)
```
## Prerequisites
@@ -46,7 +51,7 @@ Layout under the chosen root (`<perf-seed>/`):
## Stage A - snapshot real S3 data
-One-time (or when you want a fresh baseline from production).
+One-time (or when you want a fresh snapshot from production).
```
export ZEN_PERF_S3_URI=s3://your-bucket/
@@ -54,8 +59,11 @@ export ZEN_PERF_AWS_PROFILE=your-sso-profile
python scripts/test_scripts/hub/seed_s3_snapshot.py
```
-Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies
-`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login`
+Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then **moves**
+`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. When `--hub-data-dir` and
+`--snapshot-dir` share a volume (the default) the move is an O(1) rename per
+module; cross-volume falls back to a byte copy with the old cost profile. The
+hub data dir is wiped on the next run regardless. Triggers `aws sso login`
automatically if the SSO token is missing or expired.
Module selection ranks all UUID-shaped folders by their
@@ -64,37 +72,27 @@ most-recently-accessed) and takes the top `--module-count`.
Options:
- `--module-count N` (default 1000)
-- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`)
-- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`)
+- `--snapshot-dir PATH` (required, e.g. `<perf-seed>/s3-snapshot`)
+- `--hub-data-dir PATH` (required, e.g. `<perf-seed>/hub-a`)
## Stage B - seed MinIO from the snapshot
-One-time per pack-mode (or when `s3-snapshot` changes).
+One-time, or when `s3-snapshot/` changes.
-`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is
-hardcoded inside the script (`--hub-hydration-enable-pack=true` near the
-top of `_start_hub`). To produce both packed and unpacked baselines for
-comparison, invoke the script twice from two separate worktrees - one with
-the flag flipped to `false` - and preserve the resulting MinIO data dir
-each time.
+`seed_minio.py` seeds the `zen-seed-packed` bucket with pack ON
+(`--hub-hydration-enable-pack=true` is hardcoded). The script provisions every
+module found under `s3-snapshot/`, hibernates them, overlays the snapshot on
+top of the hub's servers dir, then deprovisions all modules - which runs the
+dehydrate path and uploads the content into the bucket.
```
-# In the pack worktree (flag = true), seeds zen-seed-packed
python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed
python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed
-
-# In the no-pack worktree (flag = false), seeds zen-seed-unpacked
-python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked
-python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked
```
-The script provisions every module found under `s3-snapshot/`, hibernates
-them, overlays the snapshot on top of the hub's servers dir, then
-deprovisions all modules - which runs the dehydrate path and uploads the
-content into the bucket.
-
-`preserve_minio_state.py` copies the resulting `minio-data/` to a
-variant-specific preservation dir and writes a README with provenance.
+`preserve_minio_state.py` MOVES (default; `--copy` to keep source) the
+resulting `minio-data/` to the preservation dir and writes a README with
+provenance.
Options of interest:
- `--bucket NAME` - bucket name (default `zen-seed-packed`).
@@ -107,24 +105,18 @@ Options of interest:
Repeat as often as you want; each run starts from the preserved baseline.
```
-# Pack-on bucket
python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace
-
-# Pack-off bucket (for comparison)
-python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace
```
Steps:
-1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state.
-2. Wipes `hub-perf/` (unless `--no-wipe-hub`).
+1. Copies `--minio-seeded` over `--minio-run` so MinIO starts from a known state.
+2. Wipes `--hub-data-dir` (unless `--no-wipe-hub`).
3. Starts MinIO and hub.
4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone.
5. Stops everything cleanly.
Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every
-iteration exercises the hydrate-only path against the same baseline CAS. The
-`--bucket` flag selects which seeded bucket (and therefore which pack mode)
-to exercise.
+iteration exercises the hydrate-only path against the same baseline CAS.
Pass `--enable-dehydration` to run a full provision -> deprovision cycle that
includes re-upload (dehydrate) at deprovision time. Use this to measure the
@@ -139,10 +131,9 @@ post-hoc. Override the destination with `--archive-dir PATH`.
## Resetting between runs
-- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild.
+- **Keep**: `s3-snapshot/`, `minio-seeded-packed/`. These are expensive to rebuild.
- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`.
-To force a fresh MinIO seed for one variant: delete the matching
-`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching
-`--dest`) in that worktree. To force a fresh S3 snapshot: delete
-`s3-snapshot/` and re-run Stage A.
+To force a fresh MinIO seed: delete `minio-seeded-packed/` and re-run Stage B
++ preserve. To force a fresh S3 snapshot: delete `s3-snapshot/` and re-run
+Stage A.
diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py
index 23014409c..e71222e31 100644
--- a/scripts/test_scripts/hub/hub_load_test_s3.py
+++ b/scripts/test_scripts/hub/hub_load_test_s3.py
@@ -372,8 +372,8 @@ def _wait_for_deprovisioned(
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3",
- help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)")
+ parser.add_argument("--data-dir", required=True,
+ help="Hub --data-dir.")
parser.add_argument("--port", type=int, default=8558,
help="Hub HTTP port (default: 8558)")
parser.add_argument("--module-count", type=int, default=200,
diff --git a/scripts/test_scripts/hub/parse_perf_log.py b/scripts/test_scripts/hub/parse_perf_log.py
new file mode 100644
index 000000000..8833a5211
--- /dev/null
+++ b/scripts/test_scripts/hub/parse_perf_log.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python3
+"""Parse hub perf log to extract hydration / spawn / deprovision timings."""
+from __future__ import annotations
+import re
+import statistics
+import sys
+from pathlib import Path
+
+HYDRATE_RE = re.compile(r"Hydration complete module '([^']+)': (\d+) files \(([^)]+)\) in ([\d.]+)(ms|s)")
+SPAWN_RE = re.compile(r"module '([^']+)' started, listening on port \d+, spawn took ([\d.]+)(ms|s)")
+PROVISION_START_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Provisioning storage server instance for module '([^']+)'")
+SPAWN_DONE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Storage server instance for module '([^']+)' started")
+HYDRATE_DONE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Hydration complete module '([^']+)'")
+DEPROV_START_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Module '([^']+)' changed state from provisioned to deprovisioning")
+DEPROV_REMOVE_RE = re.compile(r"\[(\d{2}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3})\] \[inf\] Module '([^']+)' (?:removed|changed state from deprovisioning to unprovisioned)")
+
+
+def to_ms(num: str, unit: str) -> float:
+ v = float(num)
+ return v * 1000.0 if unit == "s" else v
+
+
+def parse_ts(ts: str) -> float:
+ # 26-04-29 22:41:34.009 -> seconds since arbitrary epoch (use day-of-year * 86400 + h*3600 + m*60 + s)
+ import datetime
+ dt = datetime.datetime.strptime(ts, "%y-%m-%d %H:%M:%S.%f")
+ return dt.timestamp()
+
+
+def stats(name: str, vals: list[float]) -> str:
+ if not vals:
+ return f"{name:14s} n=0"
+ vs = sorted(vals)
+ n = len(vs)
+ p50 = vs[n // 2]
+ p95 = vs[min(n - 1, int(n * 0.95))]
+ p99 = vs[min(n - 1, int(n * 0.99))]
+ return (f"{name:14s} n={n:5d} mean={statistics.mean(vs):8.0f}ms "
+ f"p50={p50:7.0f}ms p95={p95:7.0f}ms p99={p99:7.0f}ms max={max(vs):7.0f}ms "
+ f"sum={sum(vs)/1000.0:7.1f}s")
+
+
+def analyse(log_path: Path) -> None:
+ print(f"=== {log_path}")
+ hydrate_durations: list[float] = []
+ spawn_durations: list[float] = []
+
+ prov_start: dict[str, float] = {}
+ hydrate_done: dict[str, float] = {}
+ spawn_done: dict[str, float] = {}
+ deprov_start: dict[str, float] = {}
+ deprov_done: dict[str, float] = {}
+
+ with log_path.open("r", encoding="utf-8", errors="replace") as f:
+ for line in f:
+ m = HYDRATE_RE.search(line)
+ if m:
+ hydrate_durations.append(to_ms(m.group(4), m.group(5)))
+ m = SPAWN_RE.search(line)
+ if m:
+ spawn_durations.append(to_ms(m.group(2), m.group(3)))
+ m = PROVISION_START_RE.search(line)
+ if m:
+ prov_start[m.group(2)] = parse_ts(m.group(1))
+ continue
+ m = HYDRATE_DONE_RE.search(line)
+ if m:
+ hydrate_done[m.group(2)] = parse_ts(m.group(1))
+ continue
+ m = SPAWN_DONE_RE.search(line)
+ if m:
+ spawn_done[m.group(2)] = parse_ts(m.group(1))
+ continue
+ m = DEPROV_START_RE.search(line)
+ if m:
+ deprov_start[m.group(2)] = parse_ts(m.group(1))
+ continue
+ m = DEPROV_REMOVE_RE.search(line)
+ if m:
+ deprov_done[m.group(2)] = parse_ts(m.group(1))
+ continue
+
+ # Reconstruct end-to-end provision wall time per module:
+ e2e_prov_ms: list[float] = []
+ queue_to_hydrate_ms: list[float] = [] # time in DataPool queue before hydrate begun (proxy: prov_start->hydrate_done minus hydrate ms)
+ spawn_minus_data_ms: list[float] = [] # time between hydrate_done and spawn_done (lifetime + lifetime queue)
+ for mid, t0 in prov_start.items():
+ t_end = spawn_done.get(mid)
+ if t_end is None:
+ continue
+ e2e_prov_ms.append((t_end - t0) * 1000.0)
+ t_h = hydrate_done.get(mid)
+ if t_h is not None:
+ spawn_minus_data_ms.append((t_end - t_h) * 1000.0)
+
+ e2e_deprov_ms: list[float] = []
+ for mid, t0 in deprov_start.items():
+ t_end = deprov_done.get(mid)
+ if t_end is None:
+ continue
+ e2e_deprov_ms.append((t_end - t0) * 1000.0)
+
+ print(stats("hydrate_dur", hydrate_durations))
+ print(stats("spawn_dur", spawn_durations))
+ print(stats("e2e_prov", e2e_prov_ms))
+ print(stats("after_hydrate", spawn_minus_data_ms))
+ print(stats("e2e_deprov", e2e_deprov_ms))
+ if hydrate_durations and spawn_durations:
+ h_sum = sum(hydrate_durations) / 1000.0
+ s_sum = sum(spawn_durations) / 1000.0
+ print(f" total hydrate work: {h_sum:.1f}s, total spawn work: {s_sum:.1f}s, ratio H/S: {h_sum/s_sum:.2f}")
+ print()
+
+
+if __name__ == "__main__":
+ for arg in sys.argv[1:]:
+ analyse(Path(arg))
diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua
index f3cf3e697..ff9ab582e 100644
--- a/scripts/test_scripts/hub/perf_configs/hub.lua
+++ b/scripts/test_scripts/hub/perf_configs/hub.lua
@@ -12,19 +12,14 @@ hub = {
disklimitpercent = 90, -- default: 0 (disabled)
},
corelimit = 4, -- default: 0 (auto)
- provisionthreads = 8, -- default: auto
+ -- provisionthreads / spawnthreads / hub.hydration.threads left unset: defaults
+ -- (clamp(cpu/8,4,12) / clamp(cpu/8,4,16) / clamp(cpu/8,4,12)) are tuned from
+ -- 1000-module sweep at 128 vCPU + 30ms latency. Override here only to A/B test.
-- NOTE: hub.instance.config (path to instance lua) is overridden via
-- --hub-instance-config on the CLI. If left here, it would be resolved
-- relative to the hub's CWD at spawn time (NOT this file's dir).
},
- hydration = {
- -- Match production's per-module download pool size. Without this, the
- -- default auto-picks hardware_concurrency/4 which on --corelimit=128
- -- would be 32. Prod logs consistently show "16 threads" in Download phase.
- threads = 16,
- },
-
watchdog = {
cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan
cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count
diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua
index 1251997db..cb292a0cd 100644
--- a/scripts/test_scripts/hub/perf_configs/instance.lua
+++ b/scripts/test_scripts/hub/perf_configs/instance.lua
@@ -13,3 +13,9 @@ cache = {
},
},
}
+
+server = {
+ sentry = {
+ disable = true, -- perf runs: keep Sentry on the hub only; skip per-child crash reporter init
+ },
+}
diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py
index 365e4a542..cfa63af31 100644
--- a/scripts/test_scripts/hub/preserve_minio_state.py
+++ b/scripts/test_scripts/hub/preserve_minio_state.py
@@ -9,10 +9,8 @@ wipes --minio-data-dir on its next invocation anyway.
Pass --copy to keep --source in place (slower; needs 2x disk).
-Typical invocation:
- python preserve_minio_state.py
-
-Defaults map to the paths recommended by PERF_SEED_README.md.
+Both --source and --dest are required. See PERF_SEED_README.md for the
+expected layout.
"""
from __future__ import annotations
@@ -57,11 +55,10 @@ def _size_of(path: Path) -> tuple[int, int]:
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data",
- help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)")
- parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
- help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
- "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
+ parser.add_argument("--source", required=True,
+ help="Source MinIO data dir produced by Stage B (seed_minio.py).")
+ parser.add_argument("--dest", required=True,
+ help="Preservation path; the move/copy target that becomes the baseline read by Stage C.")
parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""),
help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)")
parser.add_argument("--bucket", default="zen-seed",
diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py
index b59cff41a..575303c61 100644
--- a/scripts/test_scripts/hub/run_minio_perf.py
+++ b/scripts/test_scripts/hub/run_minio_perf.py
@@ -68,7 +68,7 @@ def _find_zenserver(override: Optional[str]) -> Path:
sys.exit(f"zenserver not found at {p}")
return p
script_dir = Path(__file__).resolve().parent
- repo_root = script_dir.parent.parent
+ repo_root = script_dir.parents[2]
for mode in ("release", "debug"):
for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
@@ -91,6 +91,73 @@ def _find_minio(zenserver_path: Path) -> Path:
return p
+def _resolve_toxiproxy_exe(arg_value: Optional[str], env_var: str, default_name: str) -> Path:
+ """Resolve a toxiproxy executable: explicit --toxiproxy-* arg wins, then env var,
+ then PATH lookup of `default_name`. Exits if none of the three resolve to a real
+ file."""
+ candidate = arg_value or os.environ.get(env_var)
+ if candidate:
+ p = Path(candidate)
+ if not p.exists():
+ sys.exit(f"[toxiproxy] {default_name}: explicit path '{p}' not found")
+ return p
+ found = shutil.which(default_name)
+ if found:
+ return Path(found)
+ sys.exit(f"[toxiproxy] {default_name} not found: pass --toxiproxy-server-exe / --toxiproxy-cli-exe, "
+ f"set {env_var}, or put {default_name} on PATH")
+
+
+def _start_toxiproxy(server_exe: Path, cli_exe: Path, api_port: int, listen_port: int,
+ upstream_port: int, latency_ms: int, jitter_ms: int) -> subprocess.Popen:
+ """Start toxiproxy-server, create a proxy minio_proxy listening on listen_port and
+ forwarding to localhost:upstream_port, and add a latency toxic. Returns the server
+ Popen so the caller can terminate it on cleanup."""
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(server_exe), "-port", str(api_port)],
+ stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs,
+ )
+ print(f"[toxiproxy] server started (pid {proc.pid}) api-port={api_port}")
+ # Wait for the API to come up.
+ api_url = f"http://localhost:{api_port}/version"
+ deadline = time.monotonic() + 10.0
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(api_url, timeout=1):
+ break
+ except Exception:
+ time.sleep(0.1)
+ else:
+ proc.terminate()
+ sys.exit(f"[toxiproxy] API at {api_url} never came up")
+
+ env = os.environ.copy()
+ env["TOXIPROXY_URL"] = f"http://localhost:{api_port}"
+ # Create the proxy.
+ subprocess.run(
+ [str(cli_exe), "create", "-l", f"127.0.0.1:{listen_port}",
+ "-u", f"127.0.0.1:{upstream_port}", "minio_proxy"],
+ env=env, check=True,
+ )
+ # Add latency toxic to both directions (default direction is downstream;
+ # add explicit upstream too for symmetry).
+ args_common = ["-t", "latency", "-a", f"latency={latency_ms}", "-a", f"jitter={jitter_ms}"]
+ subprocess.run(
+ [str(cli_exe), "toxic", "add", "-n", "lat_down", "-d", *args_common, "minio_proxy"],
+ env=env, check=True,
+ )
+ subprocess.run(
+ [str(cli_exe), "toxic", "add", "-n", "lat_up", "-u", *args_common, "minio_proxy"],
+ env=env, check=True,
+ )
+ print(f"[toxiproxy] minio_proxy listening on 127.0.0.1:{listen_port} -> "
+ f"127.0.0.1:{upstream_port}, latency={latency_ms}ms jitter={jitter_ms}ms (per direction)")
+ return proc
+
+
def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
data_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
@@ -116,10 +183,12 @@ def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int)
def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
deadline = time.monotonic() + timeout_s
url = f"http://localhost:{port}/minio/health/live"
+ t0 = time.monotonic()
+ print(f"[minio] waiting for ready on port {port} ...", flush=True)
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=1):
- print("[minio] ready")
+ print(f"[minio] ready ({time.monotonic()-t0:.1f}s)", flush=True)
return
except Exception:
time.sleep(0.1)
@@ -178,14 +247,21 @@ def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -
deadline = time.monotonic() + timeout_s
req = urllib.request.Request(f"http://localhost:{port}/hub/status",
headers={"Accept": "application/json"})
+ t0 = time.monotonic()
+ last_tick = t0
+ print(f"[hub] waiting for ready on port {port} ...", flush=True)
while time.monotonic() < deadline:
if proc.poll() is not None:
sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})")
try:
with urllib.request.urlopen(req, timeout=2):
- print("[hub] ready")
+ print(f"[hub] ready ({time.monotonic()-t0:.1f}s)", flush=True)
return
except Exception:
+ now = time.monotonic()
+ if now - last_tick >= 5.0:
+ print(f"[hub] still waiting ({now-t0:.1f}s elapsed)", flush=True)
+ last_tick = now
time.sleep(0.2)
sys.exit(f"[hub] timed out after {timeout_s}s")
@@ -195,14 +271,16 @@ def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float =
if hub_proc.poll() is not None:
return
pid = hub_proc.pid
- print(f"[hub] zen down --pid {pid}")
+ print(f"[hub] zen down --pid {pid} ...", flush=True)
+ t0 = time.time()
rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
if rc != 0:
- print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway", flush=True)
try:
hub_proc.wait(timeout=timeout_s)
+ print(f"[hub] exited ({time.time()-t0:.1f}s)", flush=True)
except subprocess.TimeoutExpired:
- print(f"[hub] did not exit after {timeout_s}s, killing")
+ print(f"[hub] did not exit after {timeout_s}s, killing", flush=True)
hub_proc.kill()
hub_proc.wait()
@@ -210,6 +288,8 @@ def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float =
def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
if proc.poll() is not None:
return
+ print(f"[minio] stopping (pid {proc.pid}) ...", flush=True)
+ t0 = time.time()
try:
if sys.platform == "win32":
proc.send_signal(signal.CTRL_BREAK_EVENT)
@@ -219,8 +299,9 @@ def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> Non
proc.terminate()
try:
proc.wait(timeout=timeout_s)
+ print(f"[minio] stopped ({time.time()-t0:.1f}s)", flush=True)
except subprocess.TimeoutExpired:
- print(f"[minio] did not exit after {timeout_s}s, killing")
+ print(f"[minio] did not exit after {timeout_s}s, killing", flush=True)
proc.kill()
proc.wait()
@@ -341,29 +422,34 @@ def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]:
def _robust_copytree(src: Path, dst: Path) -> None:
- """Windows-friendly directory copy with progress.
+ """Directory copy of the seeded baseline onto the working MinIO data dir.
+
+ On a ReFS volume we use `refs_clone.clone_tree` to issue
+ FSCTL_DUPLICATE_EXTENTS_TO_FILE per file: O(1) per file via copy-on-write
+ metadata, regardless of size. A 300+ GB baseline clones in seconds.
+ Off ReFS we fall back to `shutil.copytree`. PowerShell `Copy-Item` was
+ tried previously and observed to byte-copy on this host, so it is no
+ longer used.
- Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy
- file data + attributes + timestamps explicitly (not just the defaults).
Safe because dst is always the working dir (minio-run) - never the
preserved baseline.
"""
- if sys.platform == "win32":
- cmd = [
- "robocopy", str(src), str(dst),
- "/MIR",
- "/COPY:DAT", "/DCOPY:DAT",
- "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP",
- "/R:2", "/W:1",
- ]
- print(f"[reset] robocopy {src} -> {dst}")
- rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
- if rc >= 8:
- sys.exit(f"[reset] robocopy failed rc={rc}")
+ from refs_clone import clone_tree, is_refs_volume
+
+ if dst.exists():
+ _rmtree_robust(dst)
+
+ if sys.platform == "win32" and is_refs_volume(src) and is_refs_volume(dst.parent):
+ print(f"[reset] ReFS block-clone {src} -> {dst} ...", flush=True)
+ t0 = time.time()
+ files, bytes_total = clone_tree(src, dst)
+ print(f"[reset] cloned {files:,} files, {bytes_total/1024/1024:.1f} MB "
+ f"in {time.time()-t0:.1f}s", flush=True)
else:
- if dst.exists():
- _rmtree_robust(dst)
+ print(f"[reset] copytree (non-ReFS) {src} -> {dst} ...", flush=True)
+ t0 = time.time()
shutil.copytree(src, dst, symlinks=False)
+ print(f"[reset] copytree done in {time.time()-t0:.1f}s", flush=True)
def _archive_run(
@@ -415,26 +501,27 @@ def _archive_run(
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
- help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
- "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
- parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run",
- help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)")
- parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
- help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)")
- parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf",
- help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf")
- parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs",
- help="Where to archive hub.log + zenserver.log + hub.utrace after each run "
- "(default: E:/Dev/zen-perf-seed/perf-runs)")
+ parser.add_argument("--minio-seeded", required=True,
+ help="Preserved MinIO baseline (read-only source the working dir is reset from each run).")
+ parser.add_argument("--minio-run", required=True,
+ help="Working MinIO data dir, wiped and re-copied from --minio-seeded each run.")
+ parser.add_argument("--snapshot-dir", required=True,
+ help="Source of module IDs to run against (per-module server-state trees from Stage A).")
+ parser.add_argument("--hub-data-dir", required=True,
+ help="Hub --data-dir for the perf run (wiped each run unless --no-wipe-hub). "
+ "Place on a different volume from --minio-run to keep hub I/O from skewing MinIO measurements.")
+ parser.add_argument("--archive-dir", required=True,
+ help="Where to archive hub.log + zenserver.log + hub.utrace after each run.")
parser.add_argument("--bucket", default="zen-seed-packed",
- help="MinIO bucket name to exercise (default: zen-seed-packed). "
- "Pack worktree seeds only the packed bucket.")
+ help="MinIO bucket name to exercise (default: zen-seed-packed).")
parser.add_argument("--minio-port", type=int, default=9000)
parser.add_argument("--minio-console-port", type=int, default=9001)
parser.add_argument("--hub-port", type=int, default=8558)
- parser.add_argument("--module-count", type=int, default=0,
- help="Cap on modules used (0 = all from snapshot-dir)")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Cap on modules used (default 1000, the hub instance shared-state "
+ "limit; raising this above ~1023 will hit 'all slots occupied' errors). "
+ "Pass 0 to use every module under --snapshot-dir but expect failures "
+ "if the snapshot has more than ~1023 entries.")
parser.add_argument("--workers", type=int, default=50)
parser.add_argument("--poll-timeout", type=float, default=1200.0)
parser.add_argument("--settle-seconds", type=float, default=5.0)
@@ -446,8 +533,29 @@ def main() -> int:
"baseline; pass this to measure dehydrate cost as well as hydrate.")
parser.add_argument("--no-wipe-hub", action="store_true",
help="Don't wipe the hub data dir before starting (useful to inspect leftover state)")
+ parser.add_argument("--blocking", action="store_true",
+ help="Force blocking S3Client path (pass --hub-hydration-async-enabled=false). "
+ "Default is the hub default, which routes S3 hydration through AsyncHttpClient.")
+ parser.add_argument("--hub-arg", action="append", default=[],
+ help="Extra raw arg appended verbatim to the hub command line. Repeatable.")
parser.add_argument("--zenserver-dir",
help="Directory containing zenserver + minio executables (auto-detected)")
+ # Toxiproxy: simulate network latency between hub and MinIO.
+ parser.add_argument("--toxiproxy-latency-ms", type=int, default=0,
+ help="If >0, route hub through toxiproxy with this latency injected on inbound+outbound. "
+ "Approximates real-network RTT on top of localhost MinIO.")
+ parser.add_argument("--toxiproxy-jitter-ms", type=int, default=0,
+ help="Jitter added to toxiproxy latency (passed as -a jitter=N to toxic add).")
+ parser.add_argument("--toxiproxy-port", type=int, default=9100,
+ help="Toxiproxy listen port for the MinIO proxy (the hub connects here when --toxiproxy-latency-ms > 0).")
+ parser.add_argument("--toxiproxy-api-port", type=int, default=8474,
+ help="Toxiproxy server API port.")
+ parser.add_argument("--toxiproxy-server-exe", default=None,
+ help="Path to toxiproxy-server executable. Defaults to ZEN_PERF_TOXIPROXY_SERVER_EXE "
+ "env var, or 'toxiproxy-server' from PATH.")
+ parser.add_argument("--toxiproxy-cli-exe", default=None,
+ help="Path to toxiproxy-cli executable. Defaults to ZEN_PERF_TOXIPROXY_CLI_EXE "
+ "env var, or 'toxiproxy-cli' from PATH.")
args = parser.parse_args()
minio_seeded = Path(args.minio_seeded).resolve()
@@ -491,27 +599,39 @@ def main() -> int:
# Wipe the hub data dir so every run starts from scratch unless the user opts out.
if not args.no_wipe_hub and hub_data_dir.exists():
- print(f"[reset] wiping {hub_data_dir}")
+ print(f"[reset] wiping {hub_data_dir} ...", flush=True)
+ t0 = time.time()
_rmtree_robust(hub_data_dir)
+ print(f"[reset] wipe done in {time.time()-t0:.1f}s", flush=True)
hub_data_dir.mkdir(parents=True, exist_ok=True)
+ # If toxiproxy is enabled, the hub points at the proxy port, which forwards
+ # to the real MinIO with the configured latency injected.
+ hub_endpoint_port = args.toxiproxy_port if args.toxiproxy_latency_ms > 0 else args.minio_port
+
+ s3_settings = {
+ "uri": f"s3://{args.bucket}",
+ "endpoint": f"http://localhost:{hub_endpoint_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ }
config_path = hub_data_dir / "hydration_config.json"
config_path.write_text(
- json.dumps({
- "type": "s3",
- "settings": {
- "uri": f"s3://{args.bucket}",
- "endpoint": f"http://localhost:{args.minio_port}",
- "path-style": True,
- "region": "us-east-1",
- },
- }),
+ json.dumps({"type": "s3", "settings": s3_settings}),
encoding="ascii",
)
hub_extra_args = [
f"--hub-hydration-target-config={config_path}",
f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}",
]
+ if args.blocking:
+ hub_extra_args.append("--hub-hydration-async-enabled=false")
+ print("[mode] --blocking=true: hub will use blocking S3Client path")
+ else:
+ print("[mode] async path (default): hub routes S3 hydration through AsyncHttpClient")
+ if args.hub_arg:
+ hub_extra_args.extend(args.hub_arg)
+ print(f"[mode] extra hub args: {args.hub_arg}")
if args.enable_dehydration:
print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge")
if args.trace:
@@ -524,6 +644,7 @@ def main() -> int:
}
minio_proc: Optional[subprocess.Popen] = None
+ toxiproxy_proc: Optional[subprocess.Popen] = None
hub_proc: Optional[subprocess.Popen] = None
hub_log_handle = None
exit_code = 0
@@ -542,6 +663,21 @@ def main() -> int:
minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port)
_wait_for_minio(args.minio_port)
+ if args.toxiproxy_latency_ms > 0:
+ toxiproxy_server = _resolve_toxiproxy_exe(
+ args.toxiproxy_server_exe, "ZEN_PERF_TOXIPROXY_SERVER_EXE", "toxiproxy-server")
+ toxiproxy_cli = _resolve_toxiproxy_exe(
+ args.toxiproxy_cli_exe, "ZEN_PERF_TOXIPROXY_CLI_EXE", "toxiproxy-cli")
+ toxiproxy_proc = _start_toxiproxy(
+ toxiproxy_server,
+ toxiproxy_cli,
+ args.toxiproxy_api_port,
+ args.toxiproxy_port,
+ args.minio_port,
+ args.toxiproxy_latency_ms,
+ args.toxiproxy_jitter_ms,
+ )
+
hub_log = hub_data_dir / "hub.log"
hub_proc, hub_log_handle = _start_hub(
zenserver_exe, hub_data_dir, args.hub_port, hub_log,
@@ -601,6 +737,13 @@ def main() -> int:
_zen_down_hub(zen_exe, hub_proc)
if hub_log_handle is not None:
hub_log_handle.close()
+ if toxiproxy_proc is not None and toxiproxy_proc.poll() is None:
+ try:
+ toxiproxy_proc.terminate()
+ toxiproxy_proc.wait(timeout=5)
+ except Exception:
+ toxiproxy_proc.kill()
+ print("[toxiproxy] stopped")
if minio_proc is not None and minio_proc.poll() is None:
_stop_minio_graceful(minio_proc)
# Archive AFTER the hub has exited so in-flight log writes are flushed.
diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py
index e0e45c4cb..f5928b995 100644
--- a/scripts/test_scripts/hub/seed_minio.py
+++ b/scripts/test_scripts/hub/seed_minio.py
@@ -1,13 +1,10 @@
#!/usr/bin/env python3
"""Stage B of the perf-seed workflow.
-Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO
-bucket so a later perf run can exercise that bucket. Pack mode is hardcoded
-in this script (see --hub-hydration-enable-pack flag in _start_hub) and
-governs how the hub uploads into MinIO. To compare packed vs unpacked,
-invoke this script twice from two separate worktrees - one with the pack
-flag flipped to false - producing two preserved MinIO data dirs, then run
-run_minio_perf.py against each.
+Replays the snapshot produced by seed_s3_snapshot.py into a MinIO bucket so
+a later perf run can exercise it. Pack mode is fixed ON (the only mode the
+perf-seed pipeline caters to) - the hub is launched with
+--hub-hydration-enable-pack=true so dehydrate emits packed CAS.
Flow:
1. Start a local MinIO server against --minio-data-dir, create the bucket.
@@ -78,7 +75,7 @@ def _find_zenserver(override: Optional[str]) -> Path:
sys.exit(f"zenserver not found at {p}")
return p
script_dir = Path(__file__).resolve().parent
- repo_root = script_dir.parent.parent
+ repo_root = script_dir.parents[2]
for mode in ("release", "debug"):
for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
@@ -189,11 +186,10 @@ def _start_hub(
"--hub-provision-disk-limit-percent=99",
"--hub-provision-memory-limit-percent=80",
f"--hub-instance-limit={instance_limit}",
- # Seeding is not a perf-measurement path - we want it as fast as the
- # host can manage. Let the hub go wide on both provisioning and
- # hydration thread pools rather than matching prod limits.
- "--hub-instance-provision-threads=64",
- "--hub-hydration-threads=64",
+ # Provision / hydration / async-cap use server defaults; on a 128-core
+ # host these resolve to 16 / 16 / 512 which are sized for both the
+ # async hydrate (Stage A) and the sync dehydrate-PUT path that drives
+ # the upload here.
# Prevent the watchdog from auto-deprovisioning modules while we're
# still hydrating the tail / in the overlay phase. BOTH timers have to
# be extended - the provisioned one (default 600s) is what bites on
@@ -203,7 +199,7 @@ def _start_hub(
# Explicit - default is true, but make it obvious that Stage B needs
# it since the final deprovision drives the MinIO upload.
"--hub-enable-dehydration=true",
- # Pack worktree: turn pack on so dehydrate emits packed CAS.
+ # Pack ON (the only seeding mode) so dehydrate emits packed CAS.
"--hub-hydration-enable-pack=true",
] + extra_args
@@ -411,7 +407,16 @@ def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: l
"""Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*.
snapshot_root is treated as read-only; only hub_servers_root is written to.
+ Uses ReFS block-clone (`refs_clone.clone_tree`) when source and dest live on
+ a ReFS volume so the overlay is O(1) per file via copy-on-write metadata
+ rather than a full byte copy. Falls back to byte copy per-file when the
+ volume is not ReFS or a file is too small for FSCTL_DUPLICATE_EXTENTS_TO_FILE.
"""
+ from refs_clone import clone_tree, is_refs_volume
+
+ use_clone = is_refs_volume(snapshot_root) and is_refs_volume(hub_servers_root)
+ print(f"[overlay] {'ReFS block-clone path' if use_clone else 'byte-copy path (non-ReFS)'}")
+
files_copied = 0
bytes_copied = 0
modules_overlaid = 0
@@ -424,16 +429,21 @@ def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: l
continue
if dst.exists():
_rmtree_robust(dst)
- shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ if use_clone:
+ f, b = clone_tree(src, dst)
+ files_copied += f
+ bytes_copied += b
+ else:
+ shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ for root, _dirs, files in os.walk(dst):
+ for f in files:
+ p = Path(root) / f
+ try:
+ bytes_copied += p.stat().st_size
+ except OSError:
+ pass
+ files_copied += 1
modules_overlaid += 1
- for root, _dirs, files in os.walk(dst):
- for f in files:
- p = Path(root) / f
- try:
- bytes_copied += p.stat().st_size
- except OSError:
- pass
- files_copied += 1
if i % 25 == 0 or i == len(module_ids):
print(f"[overlay] {i}/{len(module_ids)} modules overlaid "
f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
@@ -601,21 +611,23 @@ def _seed_one_bucket(
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
- help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)")
- parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs",
- help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ "
- "(default: E:/Dev/zen-perf-seed/hubs)")
- parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data",
- help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)")
+ parser.add_argument("--snapshot-dir", required=True,
+ help="Source of per-module server-state trees (READ-ONLY).")
+ parser.add_argument("--hub-data-root", required=True,
+ help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/")
+ parser.add_argument("--minio-data-dir", required=True,
+ help="MinIO data dir shared by every bucket.")
parser.add_argument("--minio-port", type=int, default=9000)
parser.add_argument("--minio-console-port", type=int, default=9001)
parser.add_argument("--hub-port", type=int, default=8558)
parser.add_argument("--bucket", default="zen-seed-packed",
- help="Bucket to seed (default: zen-seed-packed). Pack worktree - "
- "hub is launched with --hub-hydration-enable-pack=true.")
- parser.add_argument("--module-count", type=int, default=0,
- help="Cap on modules processed (0 = all modules in snapshot-dir)")
+ help="Bucket to seed (default: zen-seed-packed). Hub is launched with "
+ "--hub-hydration-enable-pack=true (the only seeding mode).")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Cap on modules processed (default 1000, the hub instance shared-state "
+ "limit; raising this above ~1023 will hit 'all slots occupied' errors). "
+ "Pass 0 to process every module under --snapshot-dir but expect failures "
+ "if the snapshot has more than ~1023 entries.")
parser.add_argument("--workers", type=int, default=50)
parser.add_argument("--poll-timeout", type=float, default=1800.0,
help="Max seconds to wait for each state transition (default: 1800)")
@@ -706,7 +718,7 @@ def main() -> int:
for key, lbl in zip(phases, labels):
print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}")
- print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/")
+ print(f"[summary] next: preserve {minio_data_dir} via preserve_minio_state.py --source <this> --dest <baseline>")
finally:
if minio_proc is not None and minio_proc.poll() is None:
diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py
index f0bc7b607..a99c9e130 100644
--- a/scripts/test_scripts/hub/seed_s3_snapshot.py
+++ b/scripts/test_scripts/hub/seed_s3_snapshot.py
@@ -87,7 +87,7 @@ def _find_zenserver(override: Optional[str]) -> Path:
return p
script_dir = Path(__file__).resolve().parent
- repo_root = script_dir.parent.parent
+ repo_root = script_dir.parents[2]
for mode in ("release", "debug"):
for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
@@ -161,14 +161,24 @@ def _parse_s3_uri(uri: str) -> tuple[str, str]:
return bucket, prefix
+_EMPTY_MIN_OBJECTS = 3
+_EMPTY_MIN_BYTES = 16 * 1024
+
+# Size buckets for stratified selection. Pyramid distribution mirrors typical
+# workloads: many small modules, fewer medium, fewest large.
+_SMALL_MAX_BYTES = 1 * 1024 * 1024 # <1 MiB
+_LARGE_MIN_BYTES = 500 * 1024 * 1024 # >500 MiB
+_BUCKET_RATIO = (("small", 500), ("medium", 350), ("large", 150))
+
+
def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]:
"""List UUID-shaped module folders under the bucket and return the `limit`
- most recently active ones, ranked by the LastModified of each module's
- `incremental-state.cbo` (newest first - these were last dehydrated most
- recently, which is the closest proxy for "most recently accessed").
+ most recently active non-empty ones, ranked by the LastModified of each
+ module's `incremental-state.cbo` (newest first - these were last dehydrated
+ most recently, which is the closest proxy for "most recently accessed").
- Falls back to listing order for any folder whose state file can't be
- HEADed (missing / 403 / transient error).
+ Modules with < _EMPTY_MIN_OBJECTS objects or < _EMPTY_MIN_BYTES total bytes
+ are treated as empty and dropped.
"""
s3 = session.client("s3", region_name=region)
prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
@@ -181,30 +191,147 @@ def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int)
folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/")
if folder and _MODULEID_RE.match(folder):
candidates.append(folder)
- print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...")
+ print(f"[s3] {len(candidates)} module folders match UUID shape; "
+ f"sizing + ranking by state.cbo LastModified (skip empty <{_EMPTY_MIN_OBJECTS} obj or <{_EMPTY_MIN_BYTES} B)...")
- # 2. HEAD each module's state file in parallel. Missing/failed HEADs land
- # at the tail via a sentinel epoch 0 timestamp.
+ # 2. Per-module ListObjectsV2: total object count + total bytes + state.cbo
+ # LastModified, all in one pass. Missing state file => epoch sentinel.
from datetime import datetime, timezone
epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
- def _state_mtime(mid: str) -> datetime:
- key = f"{prefix_norm}{mid}/incremental-state.cbo"
+ def _probe(mid: str) -> tuple[int, int, datetime]:
+ mid_prefix = f"{prefix_norm}{mid}/"
+ state_key = f"{mid_prefix}incremental-state.cbo"
+ count = 0
+ total_bytes = 0
+ state_mtime = epoch
try:
- resp = s3.head_object(Bucket=bucket, Key=key)
- return resp.get("LastModified", epoch)
+ for page in paginator.paginate(Bucket=bucket, Prefix=mid_prefix):
+ for obj in page.get("Contents", []) or []:
+ count += 1
+ total_bytes += int(obj.get("Size", 0))
+ if obj.get("Key") == state_key:
+ state_mtime = obj.get("LastModified", epoch) or epoch
except Exception:
- return epoch
+ return 0, 0, epoch
+ return count, total_bytes, state_mtime
with ThreadPoolExecutor(max_workers=50) as pool:
- times = list(pool.map(_state_mtime, candidates))
-
- # 3. Sort descending (newest first). Folders without a state file sink.
- ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True)
- missing = sum(1 for _, t in ranked if t == epoch)
- if missing:
- print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)")
- return [mid for mid, _ in ranked[:limit]]
+ probes = list(pool.map(_probe, candidates))
+
+ # 3. Stratified selection. Bucket non-empty modules by total_bytes into
+ # small/medium/large, sort each bucket by state.cbo mtime desc, and pull
+ # according to _BUCKET_RATIO scaled to `limit`. If a bucket is short,
+ # spill the deficit onto the relaxed pool first, then onto neighbouring
+ # buckets so the caller still gets `limit` modules.
+ small: list[tuple[str, datetime, int]] = []
+ medium: list[tuple[str, datetime, int]] = []
+ large: list[tuple[str, datetime, int]] = []
+ relaxed: list[tuple[str, datetime, int, int]] = []
+ empties = 0
+ no_state = 0
+ for mid, (count, total_bytes, mtime) in zip(candidates, probes):
+ is_empty = count < _EMPTY_MIN_OBJECTS or total_bytes < _EMPTY_MIN_BYTES
+ if is_empty:
+ empties += 1
+ if mtime == epoch:
+ no_state += 1
+ if not is_empty and mtime != epoch:
+ if total_bytes < _SMALL_MAX_BYTES:
+ small.append((mid, mtime, total_bytes))
+ elif total_bytes >= _LARGE_MIN_BYTES:
+ large.append((mid, mtime, total_bytes))
+ else:
+ medium.append((mid, mtime, total_bytes))
+ else:
+ relaxed.append((mid, mtime, count, total_bytes))
+
+ for bucket in (small, medium, large):
+ bucket.sort(key=lambda x: x[1], reverse=True)
+ print(f"[s3] candidates={len(candidates)} empty={empties} no-state={no_state} "
+ f"small={len(small)} medium={len(medium)} large={len(large)}")
+
+ total_ratio = sum(w for _, w in _BUCKET_RATIO)
+ targets = {name: max(0, (limit * w) // total_ratio) for name, w in _BUCKET_RATIO}
+ # Distribute rounding leftovers to first bucket(s) deterministically.
+ leftover = limit - sum(targets.values())
+ for name, _ in _BUCKET_RATIO:
+ if leftover <= 0:
+ break
+ targets[name] += 1
+ leftover -= 1
+
+ pools = {"small": small, "medium": medium, "large": large}
+ # If a bucket is short of its target, the deficit redirects to the nearest
+ # neighbour bucket(s) before falling back to anything further away. Small
+ # is rare in real workloads, so its deficit pads from medium - not large.
+ fallback_chain = {
+ "small": ["medium", "large"],
+ "medium": ["small", "large"],
+ "large": ["medium", "small"],
+ }
+ cursors = {"small": 0, "medium": 0, "large": 0}
+ selected: list[str] = []
+ seen: set[str] = set()
+ fills = {"small": 0, "medium": 0, "large": 0}
+ redirected = {"small": 0, "medium": 0, "large": 0}
+
+ def _take_from(name: str, n: int) -> int:
+ """Take up to n entries from pools[name] starting at cursor. Returns
+ the number actually taken."""
+ if n <= 0:
+ return 0
+ pool = pools[name]
+ start = cursors[name]
+ end = start + n
+ taken = 0
+ for mid, _, _ in pool[start:end]:
+ if mid in seen:
+ continue
+ seen.add(mid)
+ selected.append(mid)
+ taken += 1
+ cursors[name] = end
+ return taken
+
+ for name, _ in _BUCKET_RATIO:
+ want = targets[name]
+ got = _take_from(name, want)
+ fills[name] = got
+ deficit = want - got
+ if deficit <= 0:
+ continue
+ for fb in fallback_chain[name]:
+ if deficit <= 0:
+ break
+ avail = max(0, len(pools[fb]) - cursors[fb])
+ grab = min(deficit, avail)
+ if grab <= 0:
+ continue
+ got_fb = _take_from(fb, grab)
+ redirected[name] += got_fb
+ deficit -= got_fb
+ print(f"[s3] bucket fills: small={fills['small']}+{redirected['small']} "
+ f"medium={fills['medium']}+{redirected['medium']} "
+ f"large={fills['large']}+{redirected['large']} "
+ f"(numbers after '+' = redirected from neighbour buckets)")
+
+ deficit = limit - len(selected)
+ if deficit > 0:
+ # Last-resort spill: relaxed pool (empties + no-state).
+ relaxed.sort(key=lambda x: (x[1], x[3], x[2]), reverse=True)
+ added = 0
+ for mid, _, _, _ in relaxed:
+ if added >= deficit:
+ break
+ if mid in seen:
+ continue
+ seen.add(mid)
+ selected.append(mid)
+ added += 1
+ print(f"[s3] extended with {added} relaxed entries to fill deficit")
+
+ return selected[:limit]
# ---------------------------------------------------------------------------
@@ -231,11 +358,9 @@ def _start_hub(
"--hub-provision-disk-limit-percent=99",
"--hub-provision-memory-limit-percent=80",
f"--hub-instance-limit={instance_limit}",
- # Seeding is not a perf-measurement path - we want it as fast as the
- # host can manage. Let the hub go wide on both provisioning and
- # hydration thread pools rather than matching prod limits.
- "--hub-instance-provision-threads=64",
- "--hub-hydration-threads=64",
+ # Provision pool + async S3 in-flight cap use server defaults; on a
+ # 128-core host these resolve to 16 / 512 which is what the seed run
+ # needs (clamp(cpu/8, 4, 16) and clamp(cpu*4, 128, 512)).
# With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH
# watchdog inactivity timers so early-provisioned modules do not get
# auto-deprovisioned while we're still hydrating the tail (hibernated
@@ -326,6 +451,37 @@ def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]
return 0, {"error": str(e)}
+_HYDRATION_FAIL_RE = re.compile(r"Hydration of module '([0-9a-f-]+)' failed: (.+?)(?:\.\.|$)")
+
+
+def _scan_hydration_failures(log_path: Path, offset: int, already_warned: set[str], label: str) -> int:
+ """Read new hub.log content from `offset` to EOF; print a [warn] line for
+ each newly observed `Hydration of module 'X' failed` warning. Returns the
+ new offset (resume point for next call)."""
+ try:
+ size = log_path.stat().st_size
+ except OSError:
+ return offset
+ if size <= offset:
+ return offset
+ try:
+ with open(log_path, "rb") as f:
+ f.seek(offset)
+ chunk = f.read(size - offset).decode("utf-8", errors="replace")
+ except OSError:
+ return offset
+ for m in _HYDRATION_FAIL_RE.finditer(chunk):
+ mid, reason = m.group(1), m.group(2).strip()
+ if mid in already_warned:
+ continue
+ already_warned.add(mid)
+ # Trim long S3 error reasons so the line stays scannable.
+ if len(reason) > 200:
+ reason = reason[:200] + "..."
+ print(f"\n[{label}] HYDRATION FAILED {mid}: {reason}", flush=True)
+ return size
+
+
def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
url = f"http://localhost:{port}/hub/status"
req = urllib.request.Request(url, headers={"Accept": "application/json"})
@@ -370,16 +526,25 @@ def _wait_for_state(
target_state: str,
timeout_s: float,
label: str,
+ hub_log: Optional[Path] = None,
+ hydration_warned: Optional[set[str]] = None,
) -> tuple[list[str], list[str], dict[str, str]]:
"""Poll hub status until every module hits target_state, fails, or times out.
Returns (stuck, failed, last_states). 'stuck' = still mid-transition when
we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'.
+
+ When hub_log + hydration_warned are supplied, the wait loop also tails the
+ hub log and surfaces `Hydration of module ... failed` warnings as they
+ appear. Hydration failures do NOT push modules into _FAILED_STATES (the hub
+ cleans the state and still marks the module 'provisioned'), so this is the
+ only way to see them in script output.
"""
deadline = time.monotonic() + timeout_s
remaining = set(module_ids)
failed: list[str] = []
last_states: dict[str, str] = {mid: "" for mid in module_ids}
+ log_offset = hub_log.stat().st_size if (hub_log and hub_log.exists()) else 0
while remaining and time.monotonic() < deadline:
states = _hub_module_states(port)
@@ -393,10 +558,16 @@ def _wait_for_state(
remaining.discard(mid)
failed.append(mid)
done = len(module_ids) - len(remaining)
- print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r")
+ warned_count = len(hydration_warned) if hydration_warned is not None else 0
+ suffix = f", hydration-failed {warned_count}" if hydration_warned is not None else ""
+ print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed{suffix})...", end="\r")
+ if hub_log is not None and hydration_warned is not None:
+ log_offset = _scan_hydration_failures(hub_log, log_offset, hydration_warned, label)
time.sleep(2.0)
print()
+ if hub_log is not None and hydration_warned is not None:
+ _scan_hydration_failures(hub_log, log_offset, hydration_warned, label)
return list(remaining), failed, last_states
@@ -404,16 +575,37 @@ def _wait_for_state(
# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer)
# ---------------------------------------------------------------------------
+def _same_volume(a: Path, b: Path) -> bool:
+ """True when a and b live on the same filesystem volume (so an os.replace
+ rename is O(1) instead of an EXDEV-driven copy+delete fallback)."""
+ try:
+ return os.stat(a).st_dev == os.stat(b).st_dev
+ except OSError:
+ return False
+
+
def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]:
- """Copy src_root/<mid>/* to dst_root/<mid>/* for each module.
+ """Move (preferred) or copy src_root/<mid>/ to dst_root/<mid>/ for each module.
- Returns (modules_copied, files_copied, bytes_copied).
+ Hub data dir is throwaway after Stage A (the hub is shut down right after
+ this step), so per-module trees can be moved out of it. When src_root and
+ dst_root share a volume the move is an O(1) directory rename; when they
+ don't, shutil.move falls back to a byte copy plus rmtree, which matches
+ the old shutil.copytree cost.
+
+ Returns (modules_moved, files_moved, bytes_moved).
Replaces only the specific per-module subdirs; never touches siblings.
"""
dst_root.mkdir(parents=True, exist_ok=True)
- modules_copied = 0
- files_copied = 0
- bytes_copied = 0
+ same_vol = _same_volume(src_root, dst_root)
+ if same_vol:
+ print(f"[snapshot] src and dst share a volume; moving per-module trees (O(1) rename per module)")
+ else:
+ print(f"[snapshot] src and dst are on different volumes; falling back to byte copy")
+
+ modules_moved = 0
+ files_moved = 0
+ bytes_moved = 0
for i, mid in enumerate(module_ids, 1):
src = src_root / mid
@@ -423,21 +615,24 @@ def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tup
continue
if dst.exists():
_rmtree_robust(dst)
- shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
- modules_copied += 1
+ if same_vol:
+ os.replace(src, dst)
+ else:
+ shutil.move(str(src), str(dst))
+ modules_moved += 1
for root, _dirs, files in os.walk(dst):
for f in files:
p = Path(root) / f
try:
- bytes_copied += p.stat().st_size
+ bytes_moved += p.stat().st_size
except OSError:
pass
- files_copied += 1
+ files_moved += 1
if i % 25 == 0 or i == len(module_ids):
- print(f"[snapshot] {i}/{len(module_ids)} modules copied "
- f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
+ print(f"[snapshot] {i}/{len(module_ids)} modules moved "
+ f"({files_moved:,} files, {bytes_moved/1024/1024:.1f} MB)")
- return modules_copied, files_copied, bytes_copied
+ return modules_moved, files_moved, bytes_moved
# ---------------------------------------------------------------------------
@@ -447,10 +642,10 @@ def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tup
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
- parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a",
- help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)")
- parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
- help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--hub-data-dir", required=True,
+ help="Hub --data-dir. Place on the same volume as --snapshot-dir so the snapshot step is a rename per module instead of a cross-volume byte copy.")
+ parser.add_argument("--snapshot-dir", required=True,
+ help="Destination for per-module server-state trees.")
parser.add_argument("--port", type=int, default=8558,
help="Hub HTTP port (default: 8558)")
parser.add_argument("--module-count", type=int, default=1000,
@@ -505,6 +700,15 @@ def main() -> int:
if len(module_ids) < args.module_count:
print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter")
+ # _list_module_ids returns modules grouped by size bucket (small first, then
+ # medium, then large). Provisioning in that order would create a size-sorted
+ # fan-out and skew load: all small first (cheap, fast hydrations) then all
+ # large (heavy, slow). Shuffle so provision/hibernate/copy hit a mixed-size
+ # stream that better resembles real workload distribution. Fixed seed keeps
+ # runs reproducible across reseeds with the same bucket contents.
+ import random
+ random.Random(0xC0FFEE).shuffle(module_ids)
+
if not module_ids:
sys.exit("[s3] no module folders found, aborting")
@@ -554,10 +758,14 @@ def main() -> int:
if not accepted:
sys.exit("[provision] nothing accepted, aborting")
- stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision")
+ hydration_warned: set[str] = set()
+ stuck, failed, last_states = _wait_for_state(
+ args.port, accepted, "provisioned", args.poll_timeout, "provision",
+ hub_log=hub_log, hydration_warned=hydration_warned,
+ )
prov_done = len(accepted) - len(stuck) - len(failed)
- print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck "
- f"({time.monotonic()-t0:.1f}s)")
+ print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck, "
+ f"{len(hydration_warned)} hydration-failed ({time.monotonic()-t0:.1f}s)")
if failed:
for mid in failed[:10]:
print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'")
@@ -589,20 +797,23 @@ def main() -> int:
for mid in (failed_hib + stuck_hib)[:10]:
print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'")
- # --- Copy snapshots while hub is still running. All instances are
- # hibernated (no writers), watchdog-hibernated-timeout is 86400s
- # (no auto-deprovision), hub is only touching its own metadata
- # outside servers/<mid>/. Safe. ---
- copy_src = hub_data_dir / "servers"
- to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
- print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}")
+ # --- Snapshot per-module state out of the hub data dir while the hub
+ # is still running. All instances are hibernated (no writers),
+ # watchdog-hibernated-timeout is 86400s (no auto-deprovision),
+ # hub is only touching its own metadata outside servers/<mid>/.
+ # When --hub-data-dir and --snapshot-dir share a volume the per-
+ # module trees are renamed (O(1)); cross-volume falls back to a
+ # byte copy. The hub data dir is wiped on the next run regardless.
+ snapshot_src = hub_data_dir / "servers"
+ to_snapshot = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
+ print(f"[snapshot] moving {len(to_snapshot)} module trees from {snapshot_src} -> {snapshot_dir}")
t0 = time.monotonic()
- modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy)
- print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB "
+ modules_moved, files_moved, bytes_moved = _copy_snapshot(snapshot_src, snapshot_dir, to_snapshot)
+ print(f"[snapshot] moved {modules_moved} modules, {files_moved:,} files, {bytes_moved/1024/1024:.1f} MB "
f"({time.monotonic()-t0:.1f}s)")
- if modules_copied < len(to_copy):
- print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied")
+ if modules_moved < len(to_snapshot):
+ print(f"[snapshot] WARNING: only {modules_moved}/{len(to_snapshot)} trees moved")
exit_code = 1
# --- Graceful hub shutdown via 'zen down' ---
diff --git a/scripts/test_scripts/hub/sweep_threads.py b/scripts/test_scripts/hub/sweep_threads.py
new file mode 100644
index 000000000..f92a5d734
--- /dev/null
+++ b/scripts/test_scripts/hub/sweep_threads.py
@@ -0,0 +1,264 @@
+"""Greedy hill-climb search for hub thread balance.
+
+Per phase: from current best, try +STEP on each of (data, lifetime, hydration).
+Pick best. Repeat. Stop when no candidate improves or all blocked by caps.
+
+Constraints:
+- async_threads fixed at 1
+- step size = 4 per phase
+- per-dim cap: 32
+- total cap: data + lifetime + hydration <= 96
+- start (8, 8, 8)
+
+Usage:
+ python sweep_threads.py
+"""
+from __future__ import annotations
+import argparse
+import atexit
+import json
+import signal
+import subprocess
+import sys
+from pathlib import Path
+import csv
+import time
+
+HERE = Path(__file__).resolve().parent
+RUNNER = HERE / "run_minio_perf.py"
+LOG = HERE / "threads_sweep.csv"
+ZEN_DIR = "build/windows/x64/release"
+STEP = 4
+PER_DIM_CAP = 32
+TOTAL_CAP = 96
+ASYNC = 1
+DIMS = ("data", "lifetime", "hydration")
+CHILD_PROCS: list[subprocess.Popen] = []
+
+
+def kill_strays():
+ """Best-effort terminate hub, minio, toxiproxy. Idempotent."""
+ for proc in CHILD_PROCS:
+ if proc.poll() is None:
+ try:
+ proc.terminate()
+ except Exception:
+ pass
+ targets = ["zenserver.exe", "minio.exe", "toxiproxy-server.exe", "toxiproxy.exe"]
+ for name in targets:
+ try:
+ subprocess.run(["taskkill", "/F", "/IM", name], capture_output=True, timeout=10)
+ except Exception:
+ pass
+
+
+def install_signal_handlers():
+ def handler(signum, frame):
+ print(f"\n[sweep] signal {signum} received - cleaning up", flush=True)
+ kill_strays()
+ sys.exit(130)
+ for sig in (signal.SIGINT, signal.SIGTERM):
+ try:
+ signal.signal(sig, handler)
+ except Exception:
+ pass
+ atexit.register(kill_strays)
+
+
+def run_once(data: int, lifetime: int, hydration: int, latency_ms: int, label: str, async_threads: int = ASYNC) -> dict:
+ cmd = [
+ sys.executable, "-u", str(RUNNER),
+ "--async-http",
+ f"--async-threads={async_threads}",
+ f"--toxiproxy-latency-ms={latency_ms}",
+ f"--zenserver-dir={ZEN_DIR}",
+ f"--hub-arg=--hub-instance-provision-threads={data}",
+ f"--hub-arg=--hub-instance-spawn-threads={lifetime}",
+ f"--hub-arg=--hub-hydration-threads={hydration}",
+ ]
+ print(f"\n=== [{label}] data={data} lifetime={lifetime} hydration={hydration} async={async_threads} latency={latency_ms}ms ===", flush=True)
+ t0 = time.time()
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1)
+ CHILD_PROCS.append(proc)
+ archive = None
+ rc = 0
+ assert proc.stdout is not None
+ for line in proc.stdout:
+ line = line.rstrip()
+ if not line:
+ continue
+ if line.startswith("[archive]"):
+ archive = line.split(" ", 1)[1].split(" ")[0]
+ # Compress spinner-tick pile-up to last segment so we don't print
+ # "[provision] N/1000 provisioned..." 5x per line. Only triggers when
+ # the line is actually a spinner concat (contains the milestone marker).
+ if line.count("...") >= 2 and ("/1000 provisioned" in line or "/1000 gone" in line):
+ line = line.split("...")[-1].lstrip()
+ if not line:
+ continue
+ print(f" {line}", flush=True)
+ rc = proc.wait()
+ if proc in CHILD_PROCS:
+ CHILD_PROCS.remove(proc)
+ wall = time.time() - t0
+ summary = {}
+ if archive:
+ sj = Path(archive) / "summary.json"
+ if sj.exists():
+ summary = json.loads(sj.read_text())
+ row = {
+ "label": label,
+ "data": data,
+ "lifetime": lifetime,
+ "hydration": hydration,
+ "async_threads": async_threads,
+ "latency_ms": latency_ms,
+ "exit_code": rc,
+ "wall_s": round(wall, 1),
+ "provision_s": summary.get("provision_total_s"),
+ "deprovision_s": summary.get("deprovision_total_s"),
+ "total_s": summary.get("total_s"),
+ "archive": archive,
+ }
+ print(f" -> total={row['total_s']}s provision={row['provision_s']}s deprovision={row['deprovision_s']}s exit={row['exit_code']}", flush=True)
+ append_row(row)
+ return row
+
+
+def append_row(row: dict):
+ new = not LOG.exists()
+ with LOG.open("a", newline="") as f:
+ w = csv.DictWriter(f, fieldnames=list(row.keys()))
+ if new:
+ w.writeheader()
+ w.writerow(row)
+
+
+def feasible(p: dict) -> bool:
+ if any(p[d] > PER_DIM_CAP for d in DIMS):
+ return False
+ if sum(p[d] for d in DIMS) > TOTAL_CAP:
+ return False
+ return True
+
+
+def candidates(center: dict) -> list[tuple[str, dict]]:
+ out = []
+ for d in DIMS:
+ c = dict(center)
+ c[d] += STEP
+ if feasible(c):
+ out.append((d, c))
+ return out
+
+
+def parse_points(s: str, default_async: int) -> list[tuple[int, int, int, int]]:
+ out = []
+ for chunk in s.split(";"):
+ chunk = chunk.strip()
+ if not chunk:
+ continue
+ parts = [int(x.strip()) for x in chunk.split(",")]
+ if len(parts) == 3:
+ d, l, h = parts
+ a = default_async
+ elif len(parts) == 4:
+ d, l, h, a = parts
+ else:
+ raise SystemExit(f"--points entry needs 3 ints (d,l,h) or 4 (d,l,h,async): {chunk!r}")
+ out.append((d, l, h, a))
+ return out
+
+
+def run_points(points: list[tuple[int, int, int, int]], latency_ms: int):
+ rows = []
+ for i, (d, l, h, a) in enumerate(points, 1):
+ if d > PER_DIM_CAP or l > PER_DIM_CAP or h > PER_DIM_CAP:
+ print(f" -- skip {d},{l},{h} (per-dim cap {PER_DIM_CAP})", flush=True)
+ continue
+ if d + l + h > TOTAL_CAP:
+ print(f" -- skip {d},{l},{h} (total {d+l+h} > {TOTAL_CAP})", flush=True)
+ continue
+ row = run_once(data=d, lifetime=l, hydration=h, latency_ms=latency_ms,
+ label=f"pt{i}-{d},{l},{h},a{a}", async_threads=a)
+ rows.append(row)
+ return rows
+
+
+def main():
+ p = argparse.ArgumentParser()
+ p.add_argument("--latency-ms", type=int, default=30)
+ p.add_argument("--max-phases", type=int, default=20)
+ p.add_argument("--async-threads", type=int, default=ASYNC,
+ help=f"AsyncHttpClient io_context threads/shards (default {ASYNC}). "
+ "Per-point override available via 4th value in --points entry.")
+ p.add_argument("--points", type=str, default="",
+ help="Run an explicit list of (d,l,h) or (d,l,h,async) points instead of hill-climb. "
+ "Format: 'd,l,h;d,l,h,a;...'. CSV appended.")
+ args = p.parse_args()
+
+ install_signal_handlers()
+
+ if args.points:
+ pts = parse_points(args.points, args.async_threads)
+ print(f"[sweep] explicit list: {len(pts)} points", flush=True)
+ rows = run_points(pts, args.latency_ms)
+ valid = [r for r in rows if r.get("total_s") is not None and r["exit_code"] == 0]
+ if valid:
+ valid.sort(key=lambda r: r["total_s"])
+ print("\n=== POINT RESULTS (sorted by total_s) ===", flush=True)
+ for r in valid:
+ tot = r["data"] + r["lifetime"] + r["hydration"]
+ print(f" d={r['data']:>3} l={r['lifetime']:>3} h={r['hydration']:>3} a={r['async_threads']:>2} sum={tot:>3} -> total={r['total_s']:.1f}s prov={r['provision_s']:.1f}s deprov={r['deprovision_s']:.1f}s", flush=True)
+ print(f" log={LOG}", flush=True)
+ return
+
+ cur = dict(data=8, lifetime=8, hydration=8)
+ tried = set()
+ history = []
+
+ base = run_once(**cur, latency_ms=args.latency_ms, label="phase0")
+ history.append(base)
+ tried.add((cur["data"], cur["lifetime"], cur["hydration"]))
+ best = base
+
+ for phase in range(1, args.max_phases + 1):
+ cands = candidates(cur)
+ if not cands:
+ print(f" === phase {phase}: all candidates blocked (per-dim {PER_DIM_CAP} or total {TOTAL_CAP}) ===", flush=True)
+ break
+ phase_best = None
+ for dim, c in cands:
+ key = (c["data"], c["lifetime"], c["hydration"])
+ if key in tried:
+ continue
+ tried.add(key)
+ row = run_once(**c, latency_ms=args.latency_ms, label=f"p{phase}-{dim}+{STEP}")
+ history.append(row)
+ if row.get("total_s") is not None and row["exit_code"] == 0:
+ if phase_best is None or row["total_s"] < phase_best["total_s"]:
+ phase_best = row
+ if phase_best is None or phase_best["total_s"] >= best["total_s"] - 1.0:
+ print(f" === phase {phase}: no improvement (best {best['total_s']}s vs phase {phase_best and phase_best['total_s']}s) ===", flush=True)
+ break
+ cur = dict(data=phase_best["data"], lifetime=phase_best["lifetime"], hydration=phase_best["hydration"])
+ best = phase_best
+ print(f" +++ phase {phase}: -> data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} total={best['total_s']}s", flush=True)
+
+ print("\n=== OPTIMUM ===", flush=True)
+ print(f" data={cur['data']} lifetime={cur['lifetime']} hydration={cur['hydration']} async={ASYNC}", flush=True)
+ print(f" total={best['total_s']}s provision={best['provision_s']}s deprovision={best['deprovision_s']}s", flush=True)
+ print(f" log={LOG}", flush=True)
+
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ print("\n[sweep] interrupted - cleaning up", flush=True)
+ kill_strays()
+ sys.exit(130)
+ except Exception as e:
+ print(f"\n[sweep] error: {e!r} - cleaning up", flush=True)
+ kill_strays()
+ raise