diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-27 11:14:09 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-27 11:14:09 +0200 |
| commit | 753ab4e89b9a5952e50bc77d404198520b362a3a (patch) | |
| tree | 39dbaad8389677981281b8c1585ac846251539f0 /scripts/test_scripts | |
| parent | fix crash when scavenging sequences or copying local chunks (#1013) (diff) | |
| download | archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.tar.xz archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.zip | |
hydration with pack (#1016)
- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files
- `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true)
- `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB)
- `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB)
- Feature: Hub hydration and dehydration can be disabled per direction
- `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true)
- `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true)
- Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults.
- Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately
- Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes
- Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer
- Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB)
- Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
Diffstat (limited to 'scripts/test_scripts')
| -rw-r--r-- | scripts/test_scripts/hub/PERF_SEED_README.md | 148 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/analyze_perf_runs.py | 348 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/hub_load_test_s3.py | 537 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/perf_configs/hub.lua | 48 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/perf_configs/instance.lua | 15 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/preserve_minio_state.py | 126 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/run_minio_perf.py | 614 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/seed_minio.py | 720 | ||||
| -rw-r--r-- | scripts/test_scripts/hub/seed_s3_snapshot.py | 635 |
9 files changed, 3191 insertions, 0 deletions
diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md new file mode 100644 index 000000000..fb471d4bb --- /dev/null +++ b/scripts/test_scripts/hub/PERF_SEED_README.md @@ -0,0 +1,148 @@ +# Perf-seed workflow + +Three-stage pipeline for running repeatable hub-hydration perf tests against a +local MinIO backend seeded with real module data pulled from production S3. + +## Layout + +All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/` +in the script defaults, but every path is overridable via CLI flag (see the +per-stage options below). Pick a root with enough free space (snapshots and +preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on +each invocation or change the script defaults to your chosen root. + +Layout under the chosen root (`<perf-seed>/`): + +``` +<perf-seed>/ + hub-a/ Stage A hub data dir (transient) + servers/<moduleid>/ + s3-snapshot/ Preserved production server-state trees (read-only after Stage A) + <moduleid>/ + hubs/ Stage B per-bucket hub data dirs (transient) + hub-b-zen-seed-packed/ + hub-b-zen-seed-unpacked/ + minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket) + minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve) + README.txt + minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree) + README.txt + hub-perf/ Stage C hub data dir (wiped each run) + minio-run/ Stage C MinIO data dir (wiped + re-copied each run) + perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json + 20260423-141530_zen-seed-packed/ + 20260423-143112_zen-seed-unpacked/ +``` + +## Prerequisites + +- Debug or release build of zenserver + minio: `xmake -y` +- `pip install boto3` +- AWS CLI v2 with an SSO profile configured (for Stage A only) +- Environment variables (or pass equivalents via CLI flags): + - `ZEN_PERF_S3_URI` - source S3 bucket, e.g. `s3://your-bucket/optional-prefix/` + - `ZEN_PERF_AWS_PROFILE` - AWS SSO profile name with read access to that bucket + - `ZEN_PERF_AWS_REGION` - optional, defaults to `us-east-1` + +## Stage A - snapshot real S3 data + +One-time (or when you want a fresh baseline from production). + +``` +export ZEN_PERF_S3_URI=s3://your-bucket/ +export ZEN_PERF_AWS_PROFILE=your-sso-profile +python scripts/test_scripts/hub/seed_s3_snapshot.py +``` + +Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies +`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login` +automatically if the SSO token is missing or expired. + +Module selection ranks all UUID-shaped folders by their +`incremental-state.cbo` `LastModified` (newest first, a proxy for +most-recently-accessed) and takes the top `--module-count`. + +Options: +- `--module-count N` (default 1000) +- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`) +- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`) + +## Stage B - seed MinIO from the snapshot + +One-time per pack-mode (or when `s3-snapshot` changes). + +`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is +hardcoded inside the script (`--hub-hydration-enable-pack=true` near the +top of `_start_hub`). To produce both packed and unpacked baselines for +comparison, invoke the script twice from two separate worktrees - one with +the flag flipped to `false` - and preserve the resulting MinIO data dir +each time. + +``` +# In the pack worktree (flag = true), seeds zen-seed-packed +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed + +# In the no-pack worktree (flag = false), seeds zen-seed-unpacked +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked +``` + +The script provisions every module found under `s3-snapshot/`, hibernates +them, overlays the snapshot on top of the hub's servers dir, then +deprovisions all modules - which runs the dehydrate path and uploads the +content into the bucket. + +`preserve_minio_state.py` copies the resulting `minio-data/` to a +variant-specific preservation dir and writes a README with provenance. + +Options of interest: +- `--bucket NAME` - bucket name (default `zen-seed-packed`). +- `--wipe` removes the per-bucket hub data dir and the shared minio-data + dir before starting. +- `--module-count N` caps the set (0 = every module in snapshot-dir). + +## Stage C - run a perf iteration + +Repeat as often as you want; each run starts from the preserved baseline. + +``` +# Pack-on bucket +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace + +# Pack-off bucket (for comparison) +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace +``` + +Steps: +1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state. +2. Wipes `hub-perf/` (unless `--no-wipe-hub`). +3. Starts MinIO and hub. +4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone. +5. Stops everything cleanly. + +Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every +iteration exercises the hydrate-only path against the same baseline CAS. The +`--bucket` flag selects which seeded bucket (and therefore which pack mode) +to exercise. + +Pass `--enable-dehydration` to run a full provision -> deprovision cycle that +includes re-upload (dehydrate) at deprovision time. Use this to measure the +dehydrate phase end-to-end against the seeded baseline. Note the seeded +baseline diverges after a `--enable-dehydration` run - re-copy `--minio-seeded` +or re-run `preserve_minio_state.py` if you want to compare to the pristine state. + +After each run the hub log, structured zenserver logs, any utrace file, and a +`summary.json` with the run's timings are copied into +`perf-runs/<timestamp>_<bucket>/` so Stage C runs can be compared +post-hoc. Override the destination with `--archive-dir PATH`. + +## Resetting between runs + +- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild. +- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`. + +To force a fresh MinIO seed for one variant: delete the matching +`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching +`--dest`) in that worktree. To force a fresh S3 snapshot: delete +`s3-snapshot/` and re-run Stage A. diff --git a/scripts/test_scripts/hub/analyze_perf_runs.py b/scripts/test_scripts/hub/analyze_perf_runs.py new file mode 100644 index 000000000..521d9cc1e --- /dev/null +++ b/scripts/test_scripts/hub/analyze_perf_runs.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +"""Parse two hub.log archives (packed vs unpacked) and produce a comparison.""" + +from __future__ import annotations + +import argparse +import re +import sys +from pathlib import Path +from statistics import mean, median + + +_SIZE_UNITS = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} +_TIME_UNITS = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1.0} + + +def parse_size(s: str) -> float: + # Accept forms: "1.53K", "22.6K", "180M", "1.83G", "25" (bare bytes), "0B". + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)B?", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "B" + return float(val) * _SIZE_UNITS.get(unit, 1) + + +def parse_time_s(s: str) -> float: + # Accept forms: "1s", "46ms", "712us", "0ns" + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(ns|us|ms|s)", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) + return float(val) * _TIME_UNITS[unit] + + +def parse_rate_bits(s: str) -> float: + # "30.7Mbits/s" -> bits/sec + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)bits/s", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "" + mult = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12}[unit] + return float(val) * mult + + +_RE_COMPLETE = re.compile(r"Hydration complete module '([0-9a-f-]+)': (\d+) files \(([^)]+)\) in (\S+)$") +_RE_PACK = re.compile(r"Pack unpack:\s+(\d+) packs,\s+(\d+) files,\s+(\S+),\s+download (\S+),\s+unpack (\S+)") +_RE_DLPHASE = re.compile(r"Download phase:\s+(\S+)\s+(\d+)/(\d+)\s+\(([^)]+)\) downloaded,\s+(\S+),\s+(\d+) threads") +_RE_REQS = re.compile(r"Requests:\s+(\d+) reqs,\s+avg (\S+)/req,\s+avg (\S+)/req,\s+peak in-flight (\d+),\s+saturation (\d+)%") +_RE_SCHED = re.compile(r"Scheduling:\s+1st schedule \+(\S+),\s+1st start \+(\S+),\s+queue wait (\S+)") +_RE_LOADMETA = re.compile(r"Load metadata:\s+(\S+)") +_RE_RENAME = re.compile(r"Rename/copy:\s+(\S+)") + + +def parse_log(path: Path) -> list[dict]: + modules: list[dict] = [] + text = path.read_text(encoding="utf-8", errors="replace").splitlines() + i = 0 + while i < len(text): + line = text[i] + m = _RE_COMPLETE.search(line) + if not m: + i += 1 + continue + mod = { + "mid": m.group(1), + "files": int(m.group(2)), + "total_bytes": parse_size(m.group(3)), + "wall_s": parse_time_s(m.group(4)), + } + # Read the block that follows (up to ~12 lines). + block = text[i : min(i + 20, len(text))] + joined = "\n".join(block) + pm = _RE_PACK.search(joined) + if pm: + mod["packs"] = int(pm.group(1)) + mod["packed_files"] = int(pm.group(2)) + # Uncompressed format: single size, no integrity rehash. + mod["pack_bytes"] = parse_size(pm.group(3)) + mod["pack_download_s"] = parse_time_s(pm.group(4)) + mod["pack_unpack_s"] = parse_time_s(pm.group(5)) + else: + mod["packs"] = 0 + mod["packed_files"] = 0 + mod["pack_bytes"] = 0.0 + mod["pack_download_s"] = 0.0 + mod["pack_unpack_s"] = 0.0 + + dm = _RE_DLPHASE.search(joined) + if dm: + mod["dl_phase_s"] = parse_time_s(dm.group(1)) + mod["dl_files"] = int(dm.group(2)) + mod["dl_total_files"] = int(dm.group(3)) + mod["dl_bytes"] = parse_size(dm.group(4)) + mod["dl_throughput_bps"] = parse_rate_bits(dm.group(5)) + mod["dl_threads"] = int(dm.group(6)) + rm = _RE_REQS.search(joined) + if rm: + mod["reqs"] = int(rm.group(1)) + mod["req_avg_s"] = parse_time_s(rm.group(2)) + mod["req_avg_throughput_bps"] = parse_rate_bits(rm.group(3)) + mod["req_peak_inflight"] = int(rm.group(4)) + mod["req_saturation_pct"] = int(rm.group(5)) + sm = _RE_SCHED.search(joined) + if sm: + mod["sched_1st_schedule_s"] = parse_time_s(sm.group(1)) + mod["sched_1st_start_s"] = parse_time_s(sm.group(2)) + mod["queue_wait_s"] = parse_time_s(sm.group(3)) + lm = _RE_LOADMETA.search(joined) + if lm: + mod["load_metadata_s"] = parse_time_s(lm.group(1)) + rcm = _RE_RENAME.search(joined) + if rcm: + mod["rename_copy_s"] = parse_time_s(rcm.group(1)) + + modules.append(mod) + i += 1 + + return modules + + +def fmt_bytes(n: float) -> str: + for u in ("B", "KB", "MB", "GB", "TB"): + if n < 1024 or u == "TB": + return f"{n:.1f} {u}" + n /= 1024 + return f"{n:.1f} TB" + + +def fmt_s(s: float) -> str: + if s < 1e-3: + return f"{s*1e6:.0f} us" + if s < 1.0: + return f"{s*1e3:.1f} ms" + return f"{s:.2f} s" + + +def agg(modules: list[dict]) -> dict: + a: dict = {} + a["count"] = len(modules) + a["total_files"] = sum(m["files"] for m in modules) + a["total_bytes"] = sum(m["total_bytes"] for m in modules) + a["total_reqs"] = sum(m.get("reqs", 0) for m in modules) + a["total_packs"] = sum(m.get("packs", 0) for m in modules) + a["total_packed_files"] = sum(m.get("packed_files", 0) for m in modules) + a["total_pack_bytes"] = sum(m.get("pack_bytes", 0.0) for m in modules) + a["total_dl_bytes"] = sum(m.get("dl_bytes", 0.0) for m in modules) + a["total_dl_phase_s"] = sum(m.get("dl_phase_s", 0.0) for m in modules) + a["total_pack_download_s"] = sum(m.get("pack_download_s", 0.0) for m in modules) + a["total_pack_unpack_s"] = sum(m.get("pack_unpack_s", 0.0) for m in modules) + a["total_rename_s"] = sum(m.get("rename_copy_s", 0.0) for m in modules) + a["total_load_meta_s"] = sum(m.get("load_metadata_s", 0.0) for m in modules) + a["total_wall_s"] = sum(m["wall_s"] for m in modules) + + # Per-module medians and percentiles of request-level metrics + req_avgs = [m["req_avg_s"] for m in modules if m.get("reqs", 0) > 0] + a["req_avg_latency_median_s"] = median(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_mean_s"] = mean(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_p95_s"] = ( + sorted(req_avgs)[int(0.95 * len(req_avgs))] if req_avgs else 0.0 + ) + + queue_waits = [m["queue_wait_s"] for m in modules if "queue_wait_s" in m] + a["queue_wait_median_s"] = median(queue_waits) if queue_waits else 0.0 + a["queue_wait_mean_s"] = mean(queue_waits) if queue_waits else 0.0 + a["queue_wait_p95_s"] = ( + sorted(queue_waits)[int(0.95 * len(queue_waits))] if queue_waits else 0.0 + ) + a["queue_wait_max_s"] = max(queue_waits) if queue_waits else 0.0 + + first_starts = [m["sched_1st_start_s"] for m in modules if "sched_1st_start_s" in m] + a["first_start_median_s"] = median(first_starts) if first_starts else 0.0 + a["first_start_p95_s"] = ( + sorted(first_starts)[int(0.95 * len(first_starts))] if first_starts else 0.0 + ) + + wall_times = [m["wall_s"] for m in modules] + a["wall_median_s"] = median(wall_times) if wall_times else 0.0 + a["wall_mean_s"] = mean(wall_times) if wall_times else 0.0 + a["wall_p95_s"] = sorted(wall_times)[int(0.95 * len(wall_times))] if wall_times else 0.0 + a["wall_max_s"] = max(wall_times) if wall_times else 0.0 + + saturations = [m["req_saturation_pct"] for m in modules if "req_saturation_pct" in m] + a["saturation_median_pct"] = median(saturations) if saturations else 0 + a["saturation_mean_pct"] = mean(saturations) if saturations else 0 + + peak_inflights = [m["req_peak_inflight"] for m in modules if "req_peak_inflight" in m] + a["peak_inflight_median"] = median(peak_inflights) if peak_inflights else 0 + a["peak_inflight_max"] = max(peak_inflights) if peak_inflights else 0 + + return a + + +def print_row(label: str, packed_val: str, unpacked_val: str, delta: str = ""): + print(f" {label:<42} {packed_val:>14} {unpacked_val:>14} {delta}") + + +def delta_pct(packed: float, unpacked: float) -> str: + if unpacked == 0: + return "" + d = packed - unpacked + pct = (d / unpacked) * 100.0 + sign = "+" if d >= 0 else "" + return f"{sign}{pct:.1f}%" + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--packed-log", required=True) + p.add_argument("--unpacked-log", required=True) + args = p.parse_args() + + pm = parse_log(Path(args.packed_log)) + um = parse_log(Path(args.unpacked_log)) + + pa = agg(pm) + ua = agg(um) + + print(f"\nParsed packed: {pa['count']} modules from {args.packed_log}") + print(f"Parsed unpacked: {ua['count']} modules from {args.unpacked_log}\n") + + print(f" {'metric':<42} {'packed':>14} {'unpacked':>14} delta(p vs u)") + print(f" {'-'*42} {'-'*14} {'-'*14} {'-'*12}") + + print("\n-- workload --") + print_row("modules", f"{pa['count']}", f"{ua['count']}") + print_row("total files (entries)", f"{pa['total_files']}", f"{ua['total_files']}") + print_row("total raw data", fmt_bytes(pa["total_bytes"]), fmt_bytes(ua["total_bytes"])) + + print("\n-- S3 requests --") + print_row("total S3 GETs", + f"{pa['total_reqs']}", f"{ua['total_reqs']}", + delta_pct(pa["total_reqs"], ua["total_reqs"])) + print_row("GETs per module (mean)", + f"{pa['total_reqs']/pa['count']:.1f}", + f"{ua['total_reqs']/ua['count']:.1f}", + delta_pct(pa['total_reqs']/pa['count'], ua['total_reqs']/ua['count'])) + print_row("requests saved by packing", + f"{ua['total_reqs'] - pa['total_reqs']}", "-", + f"{((ua['total_reqs'] - pa['total_reqs']) / ua['total_reqs']) * 100:.1f}%") + + print("\n-- payload --") + print_row("total bytes downloaded", + fmt_bytes(pa["total_dl_bytes"]), + fmt_bytes(ua["total_dl_bytes"]), + delta_pct(pa["total_dl_bytes"], ua["total_dl_bytes"])) + print_row(" pack bytes (aggregate)", + fmt_bytes(pa["total_pack_bytes"]), "-") + diff = pa["total_dl_bytes"] - ua["total_dl_bytes"] + direction = "saved by packing" if diff < 0 else "added by packing" + print_row(f"bytes {direction}", fmt_bytes(abs(diff)), "-") + + print("\n-- per-request latency (per-module avg) --") + print_row("median req avg latency", + fmt_s(pa["req_avg_latency_median_s"]), + fmt_s(ua["req_avg_latency_median_s"]), + delta_pct(pa["req_avg_latency_median_s"], ua["req_avg_latency_median_s"])) + print_row("mean req avg latency", + fmt_s(pa["req_avg_latency_mean_s"]), + fmt_s(ua["req_avg_latency_mean_s"]), + delta_pct(pa["req_avg_latency_mean_s"], ua["req_avg_latency_mean_s"])) + print_row("p95 req avg latency", + fmt_s(pa["req_avg_latency_p95_s"]), + fmt_s(ua["req_avg_latency_p95_s"]), + delta_pct(pa["req_avg_latency_p95_s"], ua["req_avg_latency_p95_s"])) + + print("\n-- queue / scheduling --") + print_row("median queue wait (per module)", + fmt_s(pa["queue_wait_median_s"]), + fmt_s(ua["queue_wait_median_s"]), + delta_pct(pa["queue_wait_median_s"], ua["queue_wait_median_s"])) + print_row("mean queue wait", + fmt_s(pa["queue_wait_mean_s"]), + fmt_s(ua["queue_wait_mean_s"]), + delta_pct(pa["queue_wait_mean_s"], ua["queue_wait_mean_s"])) + print_row("p95 queue wait", + fmt_s(pa["queue_wait_p95_s"]), + fmt_s(ua["queue_wait_p95_s"]), + delta_pct(pa["queue_wait_p95_s"], ua["queue_wait_p95_s"])) + print_row("max queue wait", + fmt_s(pa["queue_wait_max_s"]), + fmt_s(ua["queue_wait_max_s"]), + delta_pct(pa["queue_wait_max_s"], ua["queue_wait_max_s"])) + print_row("median 1st-start delay", + fmt_s(pa["first_start_median_s"]), + fmt_s(ua["first_start_median_s"]), + delta_pct(pa["first_start_median_s"], ua["first_start_median_s"])) + + print("\n-- per-module wall time (hydration block) --") + print_row("median", + fmt_s(pa["wall_median_s"]), + fmt_s(ua["wall_median_s"]), + delta_pct(pa["wall_median_s"], ua["wall_median_s"])) + print_row("mean", + fmt_s(pa["wall_mean_s"]), + fmt_s(ua["wall_mean_s"]), + delta_pct(pa["wall_mean_s"], ua["wall_mean_s"])) + print_row("p95", + fmt_s(pa["wall_p95_s"]), + fmt_s(ua["wall_p95_s"]), + delta_pct(pa["wall_p95_s"], ua["wall_p95_s"])) + print_row("max", + fmt_s(pa["wall_max_s"]), + fmt_s(ua["wall_max_s"]), + delta_pct(pa["wall_max_s"], ua["wall_max_s"])) + + print("\n-- thread-pool saturation / concurrency --") + print_row("saturation median %", + f"{pa['saturation_median_pct']}", + f"{ua['saturation_median_pct']}") + print_row("saturation mean %", + f"{pa['saturation_mean_pct']:.1f}", + f"{ua['saturation_mean_pct']:.1f}") + print_row("peak in-flight median", + f"{pa['peak_inflight_median']}", + f"{ua['peak_inflight_median']}") + print_row("peak in-flight max", + f"{pa['peak_inflight_max']}", + f"{ua['peak_inflight_max']}") + + print("\n-- pack-specific costs (packed run only) --") + print(f" pack GETs added: {pa['total_packs']} ({pa['total_packs']/pa['count']:.2f}/module)") + print(f" small files folded: {pa['total_packed_files']} ({pa['total_packed_files']/pa['count']:.1f}/module)") + print(f" pack download time total: {pa['total_pack_download_s']:.1f} s") + print(f" pack unpack total: {pa['total_pack_unpack_s']:.2f} s") + + print("\n-- per-phase totals (sum across modules, wall-clock contribution) --") + print_row("load metadata total", + fmt_s(pa["total_load_meta_s"]), + fmt_s(ua["total_load_meta_s"]), + delta_pct(pa["total_load_meta_s"], ua["total_load_meta_s"])) + print_row("download phase total", + fmt_s(pa["total_dl_phase_s"]), + fmt_s(ua["total_dl_phase_s"]), + delta_pct(pa["total_dl_phase_s"], ua["total_dl_phase_s"])) + print_row("rename/copy total", + fmt_s(pa["total_rename_s"]), + fmt_s(ua["total_rename_s"]), + delta_pct(pa["total_rename_s"], ua["total_rename_s"])) + print_row("hydration wall sum", + fmt_s(pa["total_wall_s"]), + fmt_s(ua["total_wall_s"]), + delta_pct(pa["total_wall_s"], ua["total_wall_s"])) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py new file mode 100644 index 000000000..23014409c --- /dev/null +++ b/scripts/test_scripts/hub/hub_load_test_s3.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +"""Hub provision/deprovision sweep against a real S3 bucket. + +Lists the top-level folders in S3_URI (each folder name is a moduleId), +picks 200 of them, then: + 1. fires all provision requests concurrently, + 2. polls until every module reaches 'provisioned', + 3. waits 5 seconds, + 4. fires all deprovision requests concurrently, + 5. polls until every module is deprovisioned, + 6. waits 5 seconds, + 7. shuts down the hub. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO +session is missing or expired, the script triggers the login automatically. + +Requirements: + pip install boto3 + aws CLI v2 +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}")) + if matches: + return max(matches, key=lambda p: p.stat().st_mtime) + + sys.exit( + "zenserver debug executable not found in build/. " + "Run: xmake config -y -m debug -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +# --------------------------------------------------------------------------- +# AWS SSO + S3 listing +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit( + "[aws] boto3 is required.\n" + "Install it with: pip install boto3" + ) + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]: + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + paginator = s3.get_paginator("list_objects_v2") + module_ids: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + # Strip outer prefix + trailing slash to get the folder name + folder = full[len(prefix_norm):].rstrip("/") + if folder: + module_ids.append(folder) + return module_ids + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + out: dict[str, str] = {} + for m in data.get("modules", []) or []: + mid = m.get("moduleId") + if mid: + out[mid] = m.get("state", "") + return out + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """Fire POST /hub/modules/<id>/<verb> for every module concurrently. + + Returns (accepted_ids, failures) where accepted_ids got 200/202 and + failures is a list of (mid, status, body) for everything else. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + # Already gone - treat as success for deprovision fan-out. + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _wait_for_provisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is 'provisioned' or gone. + + Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach + 'provisioned' within timeout_s; last_states maps all module_ids to their + last-seen state (or empty string if the module was absent from the report). + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + now_done: list[str] = [] + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == "provisioned": + now_done.append(mid) + elif s in ("", "unprovisioned", "crashed"): + # Not useful end-states for "waiting to become provisioned", + # but we don't block forever on them either - let timeout decide. + pass + for mid in now_done: + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is gone from hub status.""" + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3", + help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=200, + help="Number of modules to sweep (default: 200)") + parser.add_argument("--settle-seconds", type=float, default=5.0, + help="Seconds to wait between provision-complete and deprovision, " + "and between deprovision-complete and shutdown (default: 5.0)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=600.0, + help="Max seconds to wait for provision or deprovision to finish (default: 600)") + parser.add_argument("--trace", + nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. " + "Optionally pass channel spec (default: 'default')") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + data_dir = Path(args.data_dir) + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] region: {aws_region}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region) + print(f"[s3] found {len(module_ids)} module folders") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + selected = module_ids[:args.module_count] + if len(selected) < args.module_count: + print(f"[s3] only {len(selected)} folders available (requested {args.module_count})") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back. + "--hub-enable-dehydration=false", + ] + if args.trace: + trace_path = data_dir / "hub.utrace" + hub_extra_args += [ + f"--trace={args.trace}", + f"--tracefile={trace_path}", + ] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + + try: + hub_instance_limit = max(args.module_count, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision phase --- + print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...") + t_pv0 = time.monotonic() + accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision") + print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} " + f"(fan-out {time.monotonic() - t_pv0:.1f}s)") + for mid, status, body in pv_failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if len(pv_failures) > 10: + print(f"[provision] ... and {len(pv_failures) - 10} more failures") + + if hub_proc.poll() is not None: + print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") + return + + if accepted: + stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout) + provisioned_count = len(accepted) - len(stuck) + print() + print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' " + f"({time.monotonic() - t_pv0:.1f}s total)") + if stuck: + print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s") + for mid in stuck[:10]: + print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'") + else: + print("[provision] nothing provisioned") + return + + print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...") + time.sleep(args.settle_seconds) + + # --- Deprovision phase --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t_dp0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision") + print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} " + f"(fan-out {time.monotonic() - t_dp0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[deprovision] FAILED {mid}: status={status} body={body}") + if len(dp_failures) > 10: + print(f"[deprovision] ... and {len(dp_failures) - 10} more failures") + + stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout) + deprovisioned_count = len(dp_accepted) - len(stuck_dp) + print() + print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone " + f"({time.monotonic() - t_dp0:.1f}s total)") + if stuck_dp: + print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s") + for mid in stuck_dp[:10]: + print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'") + + print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...") + time.sleep(args.settle_seconds) + + print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _stop_process(hub_proc, "hub", timeout_s=120.0) + if hub_log_handle is not None: + hub_log_handle.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua new file mode 100644 index 000000000..f3cf3e697 --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/hub.lua @@ -0,0 +1,48 @@ +-- Perf-test hub config: mirrors the production hub config in the repo root +-- (hub.lua / instance.lua). The launch script overrides hub.instance.config +-- and the effective concurrency (--corelimit=128) via CLI to simulate an +-- r5n.32xlarge box so the default auto-picks match what prod sees there. + +hub = { + instance = { + baseportnumber = 21000, -- default + limits = { + count = 1100, -- headroom for 1000-module perf runs + memorylimitpercent = 90, -- default: 0 (disabled) + disklimitpercent = 90, -- default: 0 (disabled) + }, + corelimit = 4, -- default: 0 (auto) + provisionthreads = 8, -- default: auto + -- NOTE: hub.instance.config (path to instance lua) is overridden via + -- --hub-instance-config on the CLI. If left here, it would be resolved + -- relative to the hub's CWD at spawn time (NOT this file's dir). + }, + + hydration = { + -- Match production's per-module download pool size. Without this, the + -- default auto-picks hardware_concurrency/4 which on --corelimit=128 + -- would be 32. Prod logs consistently show "16 threads" in Download phase. + threads = 16, + }, + + watchdog = { + cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan + cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count + instancecheckthrottlems = 10, -- default: 5. slight throttle to reduce hub CPU + provisionedinactivitytimeoutseconds = 600, -- default + hibernatedinactivitytimeoutseconds = 1800, -- default + inactivitycheckmarginseconds = 60, -- default + }, +} + +network = { + httpserverthreads = 8, -- default: auto. hub itself needs few threads +} + +server = { + dedicated = true, -- default: false. signals build-farm use, affects thread scaling heuristics +} + +gc = { + enabled = false, -- default: true. hub has no storage, no need for GC +} diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua new file mode 100644 index 000000000..1251997db --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/instance.lua @@ -0,0 +1,15 @@ +network = { + httpserverthreads = 4, -- default: auto +} + +gc = { + enabled = false, -- default: true. hub triggers GC at deprovision, no need for periodic GC +} + +cache = { + bucket = { + memlayer = { + sizethreshold = 0, -- default: 1024. 0 disables memlayer entirely + }, + }, +} diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py new file mode 100644 index 000000000..365e4a542 --- /dev/null +++ b/scripts/test_scripts/hub/preserve_minio_state.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +"""Preserve a freshly-seeded MinIO data directory. + +Run this after seed_minio.py has finished. By default it MOVES --source (the +MinIO --data-dir used during seeding) to --dest (the preservation path) and +writes a README with provenance so future perf runs start from a known +baseline. Move avoids a ~0.5 TB copy on a full 1000-module seed; Stage B +wipes --minio-data-dir on its next invocation anyway. + +Pass --copy to keep --source in place (slower; needs 2x disk). + +Typical invocation: + python preserve_minio_state.py + +Defaults map to the paths recommended by PERF_SEED_README.md. +""" + +from __future__ import annotations + +import argparse +import datetime +import os +import shutil +import stat +import sys +from pathlib import Path + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + def _onerror(func, p, exc_info): + try: + os.chmod(p, stat.S_IWRITE) + func(p) + except Exception: + pass + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +def _size_of(path: Path) -> tuple[int, int]: + files = 0 + total = 0 + for root, _dirs, names in os.walk(path): + for n in names: + p = Path(root) / n + try: + total += p.stat().st_size + except OSError: + pass + files += 1 + return files, total + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data", + help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""), + help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)") + parser.add_argument("--bucket", default="zen-seed", + help="MinIO bucket name recorded in the README") + parser.add_argument("--module-count", type=int, default=300, + help="Module count recorded in the README") + parser.add_argument("--copy", action="store_true", + help="Copy --source to --dest instead of moving it. Default is move " + "(fast, in-place rename when on the same volume). Use --copy if you " + "want to keep --source intact for another preserve run.") + args = parser.parse_args() + + source = Path(args.source).resolve() + dest = Path(args.dest).resolve() + + if not source.is_dir(): + sys.exit(f"[preserve] source not found: {source}") + + # Dest is wiped and rewritten. Refuse any path that would clobber source. + if dest == source or dest in source.parents or source in dest.parents: + sys.exit(f"[preserve] source ({source}) and dest ({dest}) must be disjoint") + + files, total = _size_of(source) + mode = "copy" if args.copy else "move" + print(f"[preserve] source: {source} -> {files:,} files, {total/1024/1024:.1f} MB") + print(f"[preserve] dest: {dest}") + print(f"[preserve] mode: {mode}") + + if dest.exists(): + print(f"[preserve] removing existing dest {dest}") + _rmtree_robust(dest) + + dest.parent.mkdir(parents=True, exist_ok=True) + if args.copy: + shutil.copytree(source, dest, symlinks=False) + else: + shutil.move(str(source), str(dest)) + + readme = dest / "README.txt" + readme.write_text( + "\n".join([ + "zen-perf-seed preserved MinIO state", + "", + f"Created: {datetime.datetime.now(datetime.timezone.utc).isoformat()}", + f"Source s3 URI: {args.s3_uri}", + f"Bucket: {args.bucket}", + f"Modules: {args.module_count}", + f"Files: {files:,}", + f"Bytes: {total:,}", + "", + "To run a perf iteration: copy this directory onto a fresh MinIO data", + "dir (see scripts/test_scripts/hub/run_minio_perf.py) and point a hub at it.", + "", + ]), + encoding="ascii", + ) + print(f"[preserve] wrote {readme}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py new file mode 100644 index 000000000..b59cff41a --- /dev/null +++ b/scripts/test_scripts/hub/run_minio_perf.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python3 +"""Stage C of the perf-seed workflow: run a perf iteration against the +preserved MinIO state. + +Flow per invocation: + 1. Reset --minio-run by copying --minio-seeded over it (so every iteration + starts from the same canonical state). + 2. Start MinIO against --minio-run. + 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so + the perf run is read-only and the seeded baseline survives intact. + Pass --enable-dehydration to run a full provision -> deprovision cycle + that re-uploads at deprovision time (use this to measure the dehydrate + phase as well as hydrate; the seeded baseline diverges after such a run). + 4. Provision every module found under --snapshot-dir (same set that was + used to seed MinIO), wait for all to reach 'provisioned'. + 5. Deprovision them all, wait for them to be gone. + 6. Stop hub + MinIO cleanly. + +Optional --trace writes a utrace file to --hub-data-dir/hub.utrace. + +This script reuses the lightweight harness helpers from the other stages but +keeps the run fully offline once the seed is preserved. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p}") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}") + return p + + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out after {timeout_s}s") + + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + # Use the production-like Lua config (limits, thread counts, watchdog timers) + # so perf numbers reflect what prod actually runs. Hub.hydration.threads is + # left unset so the default auto-pick (hardware_concurrency/4) fires; combined + # with --corelimit=128 below, that yields 32 hydration threads on the actual + # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes. + config_dir = Path(__file__).resolve().parent / "perf_configs" + hub_config = config_dir / "hub.lua" + inst_config = config_dir / "instance.lua" + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + f"--config={hub_config}", + f"--hub-instance-config={inst_config}", + # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box + # with fewer physical cores, this clamps to the actual count (see + # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates). + # When running on the actual prod instance class, this makes the default + # auto-picks (provision/hydration thread counts) behave as prod does. + "--corelimit=128", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' for graceful hub shutdown.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_states(port: int) -> Optional[dict[str, str]]: + # Without Accept: application/json the hub serves compact binary (CBOR) + # which json.loads can't parse. 60s timeout - /hub/status can take a few + # seconds to serialize with 300+ modules in flight. + req = urllib.request.Request( + f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=60) as resp: + data = json.loads(resp.read()) + except Exception as e: + # Visibility: prior silent None caused "0/300 provisioned..." forever. + print(f"[status] WARN /hub/status failed: {e}") + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids} + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _state_histogram(states: dict[str, str], remaining: set[str]) -> str: + counts: dict[str, int] = {} + for mid in remaining: + s = states.get(mid, "<absent>") + counts[s] = counts.get(s, 0) + 1 + return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1])) + + +def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + if states.get(mid) == "provisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all provisioned)" + print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}") + last_verbose = now + else: + print(f"[provision] {done}/{len(ids)} provisioned...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all gone)" + print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}") + last_verbose = now + else: + print(f"[deprovision] {done}/{len(ids)} gone...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _robust_copytree(src: Path, dst: Path) -> None: + """Windows-friendly directory copy with progress. + + Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy + file data + attributes + timestamps explicitly (not just the defaults). + Safe because dst is always the working dir (minio-run) - never the + preserved baseline. + """ + if sys.platform == "win32": + cmd = [ + "robocopy", str(src), str(dst), + "/MIR", + "/COPY:DAT", "/DCOPY:DAT", + "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP", + "/R:2", "/W:1", + ] + print(f"[reset] robocopy {src} -> {dst}") + rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if rc >= 8: + sys.exit(f"[reset] robocopy failed rc={rc}") + else: + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False) + + +def _archive_run( + archive_dir: Path, + hub_data_dir: Path, + bucket: str, + summary: dict, +) -> Optional[Path]: + """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a + timestamped subdir under archive_dir so successive runs can be compared. + + Returns the archive destination path or None if there was nothing to copy. + """ + try: + ts = time.strftime("%Y%m%d-%H%M%S", time.localtime()) + dest = archive_dir / f"{ts}_{bucket}" + dest.mkdir(parents=True, exist_ok=True) + + copied: list[str] = [] + # Hub stdout/stderr (captured by _start_hub via subprocess stdout=) + hub_log = hub_data_dir / "hub.log" + if hub_log.is_file(): + shutil.copy2(hub_log, dest / "hub.log") + copied.append("hub.log") + # Structured zenserver logs (rotated variants included) + logs_src = hub_data_dir / "logs" + if logs_src.is_dir(): + logs_dst = dest / "logs" + logs_dst.mkdir(exist_ok=True) + for p in logs_src.iterdir(): + if p.is_file(): + shutil.copy2(p, logs_dst / p.name) + copied.append(f"logs/{p.name}") + # Optional trace + utrace = hub_data_dir / "hub.utrace" + if utrace.is_file(): + shutil.copy2(utrace, dest / "hub.utrace") + copied.append("hub.utrace") + + (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii") + + print(f"[archive] {dest} ({len(copied)} files)") + return dest + except Exception as e: + print(f"[archive] WARNING: failed to archive run: {e}") + return None + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run", + help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf", + help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf") + parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs", + help="Where to archive hub.log + zenserver.log + hub.utrace after each run " + "(default: E:/Dev/zen-perf-seed/perf-runs)") + parser.add_argument("--bucket", default="zen-seed-packed", + help="MinIO bucket name to exercise (default: zen-seed-packed). " + "Pack worktree seeds only the packed bucket.") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules used (0 = all from snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1200.0) + parser.add_argument("--settle-seconds", type=float, default=5.0) + parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')") + parser.add_argument("--enable-dehydration", action="store_true", + help="Run with --hub-enable-dehydration=true so the deprovision phase " + "actually re-uploads to MinIO. Default false preserves the seeded " + "baseline; pass this to measure dehydrate cost as well as hydrate.") + parser.add_argument("--no-wipe-hub", action="store_true", + help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + minio_seeded = Path(args.minio_seeded).resolve() + minio_run = Path(args.minio_run).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_dir = Path(args.hub_data_dir).resolve() + archive_dir = Path(args.archive_dir).resolve() + + if not minio_seeded.is_dir(): + sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}") + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}") + + # Preserved inputs are read-only from this script's perspective. Refuse to + # run if any mutable path could accidentally clobber them. + for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)): + for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)): + if mutable == ro or mutable in ro.parents or ro in mutable.parents: + sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no modules in {snapshot_dir}") + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] minio-seeded: {minio_seeded}") + print(f"[setup] minio-run: {minio_run}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + + # Reset MinIO run dir from the baseline. + _robust_copytree(minio_seeded, minio_run) + + # Wipe the hub data dir so every run starts from scratch unless the user opts out. + if not args.no_wipe_hub and hub_data_dir.exists(): + print(f"[reset] wiping {hub_data_dir}") + _rmtree_robust(hub_data_dir) + hub_data_dir.mkdir(parents=True, exist_ok=True) + + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{args.bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", + ] + if args.enable_dehydration: + print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") + if args.trace: + trace_path = hub_data_dir / "hub.utrace" + hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + timings: dict = { + "bucket": args.bucket, + "module_count": len(module_ids), + "build_mode": zenserver_mode, + "provision_fanout_s": None, + "provision_total_s": None, + "deprovision_fanout_s": None, + "deprovision_total_s": None, + "total_s": None, + } + + try: + minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + + hub_log = hub_data_dir / "hub.log" + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.hub_port, hub_log, + hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, args.hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision") + fanout_s = time.monotonic() - t0 + timings["provision_fanout_s"] = round(fanout_s, 2) + print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)") + for mid, status, body in failures[:5]: + print(f"[provision] FAIL {mid}: status={status} body={body}") + if failures: + exit_code = 1 + stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["provision_total_s"] = round(total_s, 2) + print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})") + if stuck: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + # --- Deprovision --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision") + fanout_s = time.monotonic() - t0 + timings["deprovision_fanout_s"] = round(fanout_s, 2) + print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)") + if dp_failures: + exit_code = 1 + stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["deprovision_total_s"] = round(total_s, 2) + print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})") + if stuck_dp: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + elapsed = time.monotonic() - t_start + timings["total_s"] = round(elapsed, 2) + print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None and minio_proc.poll() is None: + _stop_minio_graceful(minio_proc) + # Archive AFTER the hub has exited so in-flight log writes are flushed. + timings["exit_code"] = exit_code + _archive_run(archive_dir, hub_data_dir, args.bucket, timings) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py new file mode 100644 index 000000000..e0e45c4cb --- /dev/null +++ b/scripts/test_scripts/hub/seed_minio.py @@ -0,0 +1,720 @@ +#!/usr/bin/env python3 +"""Stage B of the perf-seed workflow. + +Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO +bucket so a later perf run can exercise that bucket. Pack mode is hardcoded +in this script (see --hub-hydration-enable-pack flag in _start_hub) and +governs how the hub uploads into MinIO. To compare packed vs unpacked, +invoke this script twice from two separate worktrees - one with the pack +flag flipped to false - producing two preserved MinIO data dirs, then run +run_minio_perf.py against each. + +Flow: + 1. Start a local MinIO server against --minio-data-dir, create the bucket. + 2. Start a hub against MinIO (--bucket as target), dehydration ON, + hibernated-watchdog timeout set huge so hibernated modules are not + auto-deprovisioned before we deprovision them ourselves. + 3. Provision every module discovered under --snapshot-dir (fresh hydrate + against an empty bucket spawns the child on an empty dir). + 4. Hibernate every module (child exits, state dir unlocked). + 5. Overlay the snapshot into <hub-data-dir>/servers/<mid>/. + 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(), + which scans the overlaid ServerStateDir and uploads content into the + bucket. + 7. Shut the hub down via 'zen down --pid'. + 8. Stop MinIO. + +Preservation of the resulting MinIO data directory is done by a separate +step (see preserve_minio_state.py / PERF_SEED_README.md). +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +# --------------------------------------------------------------------------- +# Executable discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}. Build the zen project first.") + return p + + +# --------------------------------------------------------------------------- +# MinIO lifecycle +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 # type: ignore[import-not-found] + import botocore.config # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[minio] boto3 is required. pip install boto3") + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # Prevent the watchdog from auto-deprovisioning modules while we're + # still hydrating the tail / in the overlay phase. BOTH timers have to + # be extended - the provisioned one (default 600s) is what bites on + # large-N runs where early provisions go idle waiting for the rest. + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + # Explicit - default is true, but make it obvious that Stage B needs + # it since the final deprovision drives the MinIO upload. + "--hub-enable-dehydration=true", + # Pack worktree: turn pack on so dehydrate emits packed CAS. + "--hub-hydration-enable-pack=true", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' so the hub runs its normal shutdown path.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + """MinIO has no external shutdown tool; Ctrl-Break lets it flush.""" + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST concurrently. For 'deprovision' a 404 means the module is already + gone - count it as accepted so the state-poll recognises completion. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], list[str], dict[str, str]]: + """Wait until each module is gone from /hub/status or in 'unprovisioned'. + 'crashed' counts as a hard failure - dehydrate never completed for it. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + elif s == "crashed": + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot overlay +# --------------------------------------------------------------------------- + +def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*. + + snapshot_root is treated as read-only; only hub_servers_root is written to. + """ + files_copied = 0 + bytes_copied = 0 + modules_overlaid = 0 + + for i, mid in enumerate(module_ids, 1): + src = snapshot_root / mid + dst = hub_servers_root / mid + if not src.is_dir(): + print(f"[overlay] WARNING: snapshot missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_overlaid += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[overlay] {i}/{len(module_ids)} modules overlaid " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + return modules_overlaid, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _seed_one_bucket( + *, + bucket: str, + hub_data_dir: Path, + snapshot_dir: Path, + module_ids: list[str], + minio_port: int, + hub_port: int, + poll_timeout: float, + workers: int, + zenserver_exe: Path, + zen_exe: Path, +) -> tuple[int, dict[str, float]]: + """Run the provision/hibernate/overlay/deprovision cycle against a single + MinIO bucket. Returns (exit_code, timings) where timings maps phase name + to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s, + total_s). + """ + print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================") + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{bucket}", + "endpoint": f"http://localhost:{minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [f"--hub-hydration-target-config={config_path}"] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + hib_accepted: list[str] = [] + stuck_hib: list[str] = [] + failed_hib: list[str] = [] + timings: dict[str, float] = { + "provision_s": 0.0, + "hibernate_s": 0.0, + "overlay_s": 0.0, + "deprovision_s": 0.0, + "total_s": 0.0, + } + + try: + hub_log = hub_data_dir / "hub.log" + hub_instance_limit = max(len(module_ids) + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, hub_port, hub_log, + hub_instance_limit, hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=workers) as pool: + # --- Provision --- + print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision") + print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + if not accepted: + return 1, timings + + stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision") + timings["provision_s"] = time.monotonic() - t0 + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({timings['provision_s']:.1f}s)") + if failed or stuck: + exit_code = 1 + for mid in (failed + stuck)[:10]: + print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'") + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + if not accepted: + return 1, timings + + # --- Hibernate --- + print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate") + print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate") + timings["hibernate_s"] = time.monotonic() - t0 + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({timings['hibernate_s']:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Overlay snapshot onto hub's servers dir --- + to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + hub_servers = hub_data_dir / "servers" + print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}") + t0 = time.monotonic() + modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay) + timings["overlay_s"] = time.monotonic() - t0 + print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({timings['overlay_s']:.1f}s)") + if modules_overlaid < len(to_overlay): + exit_code = 1 + + # --- Deprovision (triggers dehydrate -> MinIO upload) --- + with ThreadPoolExecutor(max_workers=workers) as pool: + print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision") + print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout) + timings["deprovision_s"] = time.monotonic() - t0 + dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp) + print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck " + f"({timings['deprovision_s']:.1f}s)") + if failed_dp or stuck_dp: + exit_code = 1 + for mid in (failed_dp + stuck_dp)[:10]: + print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'") + + timings["total_s"] = time.monotonic() - t_start + print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + print(f"[{bucket}][hub] stopping...") + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code, timings + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs", + help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ " + "(default: E:/Dev/zen-perf-seed/hubs)") + parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data", + help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--bucket", default="zen-seed-packed", + help="Bucket to seed (default: zen-seed-packed). Pack worktree - " + "hub is launched with --hub-hydration-enable-pack=true.") + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules processed (0 = all modules in snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for each state transition (default: 1800)") + parser.add_argument("--wipe", action="store_true", + help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting " + "(never touches --snapshot-dir)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + bucket_name: str = args.bucket + + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_root = Path(args.hub_data_root).resolve() + minio_data_dir = Path(args.minio_data_dir).resolve() + + hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve() + + # Safety: snapshot-dir must not overlap any mutable path. + for label, d in [ + ("minio-data-dir", minio_data_dir), + ("hub-data-root", hub_data_root), + (f"hub-b-{bucket_name}", hub_data_dir), + ]: + if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents: + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint") + + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no module directories found in {snapshot_dir}") + + if args.wipe: + for d in [hub_data_dir, minio_data_dir]: + if d.exists(): + if snapshot_dir.is_relative_to(d): + sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it") + print(f"[setup] wiping {d}") + _rmtree_robust(d) + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)") + print(f"[setup] hub-data-root: {hub_data_root}") + print(f"[setup] minio-data-dir: {minio_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})") + + minio_proc: Optional[subprocess.Popen] = None + exit_code = 0 + + try: + minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, bucket_name) + + t_total = time.monotonic() + rc, timings = _seed_one_bucket( + bucket=bucket_name, + hub_data_dir=hub_data_dir, + snapshot_dir=snapshot_dir, + module_ids=module_ids, + minio_port=args.minio_port, + hub_port=args.hub_port, + poll_timeout=args.poll_timeout, + workers=args.workers, + zenserver_exe=zenserver_exe, + zen_exe=zen_exe, + ) + exit_code = rc + + print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}") + print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}") + + phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"] + labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"] + print(f"\n[summary] timings (s):") + for key, lbl in zip(phases, labels): + print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}") + + print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/") + + finally: + if minio_proc is not None and minio_proc.poll() is None: + print("[minio] stopping...") + _stop_minio_graceful(minio_proc) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py new file mode 100644 index 000000000..f0bc7b607 --- /dev/null +++ b/scripts/test_scripts/hub/seed_s3_snapshot.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +"""Stage A of the perf-seed workflow. + +Fetches real module data from a configured S3 bucket via the hub hydration +pipeline and preserves the decompressed server-state trees to disk so a +later stage can replay them into a local MinIO backend. + +Flow: + 1. Start a hub against real S3 (read-only SSO credentials), dehydration off, + hibernated-watchdog timeout set huge so modules are not auto-deprovisioned + mid-flow. + 2. Provision N modules (first N moduleIds from the bucket listing, filtered + to UUID-shaped folders). + 3. Wait until every module reaches 'provisioned'. + 4. Hibernate every module (child processes exit, state dir intact and no + remaining writer while we copy). + 5. Wait until every module reaches 'hibernated'. + 6. Copy <hub-data-dir>/servers/<moduleid>/ -> <snapshot-dir>/<moduleid>/ + with the hub still running - hibernate took the writers offline and the + watchdog is muted. + 7. Shut the hub down via 'zen down --pid <hub pid>'. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Requirements: + pip install boto3 + aws CLI v2 with the SSO profile configured +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + +# Matches UUID-shaped module IDs (UUID-ish with mixed 4-char groups). Filters +# out stray top-level keys (readmes, test scratches) that aren't real modules. +_MODULEID_RE = re.compile(r"^[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}$") + + +# --------------------------------------------------------------------------- +# zenserver binary discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +# --------------------------------------------------------------------------- +# AWS SSO credential resolution +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[aws] boto3 is required. Install it with: pip install boto3") + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI: {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]: + """List UUID-shaped module folders under the bucket and return the `limit` + most recently active ones, ranked by the LastModified of each module's + `incremental-state.cbo` (newest first - these were last dehydrated most + recently, which is the closest proxy for "most recently accessed"). + + Falls back to listing order for any folder whose state file can't be + HEADed (missing / 403 / transient error). + """ + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + # 1. Enumerate every UUID-shaped folder. + paginator = s3.get_paginator("list_objects_v2") + candidates: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/") + if folder and _MODULEID_RE.match(folder): + candidates.append(folder) + print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...") + + # 2. HEAD each module's state file in parallel. Missing/failed HEADs land + # at the tail via a sentinel epoch 0 timestamp. + from datetime import datetime, timezone + epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) + + def _state_mtime(mid: str) -> datetime: + key = f"{prefix_norm}{mid}/incremental-state.cbo" + try: + resp = s3.head_object(Bucket=bucket, Key=key) + return resp.get("LastModified", epoch) + except Exception: + return epoch + + with ThreadPoolExecutor(max_workers=50) as pool: + times = list(pool.map(_state_mtime, candidates)) + + # 3. Sort descending (newest first). Folders without a state file sink. + ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True) + missing = sum(1 for _, t in ranked if t == epoch) + if missing: + print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)") + return [mid for mid, _ in ranked[:limit]] + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH + # watchdog inactivity timers so early-provisioned modules do not get + # auto-deprovisioned while we're still hydrating the tail (hibernated + # default 1800s, provisioned default 600s - the latter is the one that + # bit us on the 1000-module run and dropped 335 modules). + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """Shut the hub down via 'zen down --pid <pid>'. zen down performs a + proper server-initiated shutdown (signals the shutdown event, waits for + the server to drain) rather than a blunt kill.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +# --------------------------------------------------------------------------- +# Hub HTTP API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST /hub/modules/<id>/<verb> concurrently. Returns (accepted, failures).""" + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +# Any of these means the module is done transitioning and will not reach the +# target state on its own (without a retry we control). +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + """Poll hub status until every module hits target_state, fails, or times out. + + Returns (stuck, failed, last_states). 'stuck' = still mid-transition when + we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer) +# --------------------------------------------------------------------------- + +def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Copy src_root/<mid>/* to dst_root/<mid>/* for each module. + + Returns (modules_copied, files_copied, bytes_copied). + Replaces only the specific per-module subdirs; never touches siblings. + """ + dst_root.mkdir(parents=True, exist_ok=True) + modules_copied = 0 + files_copied = 0 + bytes_copied = 0 + + for i, mid in enumerate(module_ids, 1): + src = src_root / mid + dst = dst_root / mid + if not src.is_dir(): + print(f"[snapshot] WARNING: source missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_copied += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[snapshot] {i}/{len(module_ids)} modules copied " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + + return modules_copied, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a", + help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=1000, + help="Number of modules to snapshot (default: 300)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for provision or hibernate to finish (default: 1800)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + hub_data_dir = Path(args.hub_data_dir).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + + # Safety: snapshot-dir is the preserved output. It must not overlap the + # hub data-dir in either direction so nothing the hub writes can clobber + # the preserved state. + if (snapshot_dir == hub_data_dir + or snapshot_dir in hub_data_dir.parents + or hub_data_dir in snapshot_dir.parents): + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and hub-data-dir ({hub_data_dir}) must be disjoint") + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + hub_log = hub_data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] snapshot-dir: {snapshot_dir}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region, args.module_count) + print(f"[s3] selected {len(module_ids)} module folders (UUID-shaped)") + if len(module_ids) < args.module_count: + print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + "--hub-enable-dehydration=false", + ] + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + + try: + hub_instance_limit = max(args.module_count + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.port, module_ids, "provision") + print(f"[provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if not accepted: + sys.exit("[provision] nothing accepted, aborting") + + stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision") + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed: + for mid in failed[:10]: + print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + if stuck: + for mid in stuck[:10]: + print(f"[provision] STUCK {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + + if not accepted: + sys.exit("[provision] nothing successfully provisioned, aborting") + + # --- Hibernate --- + print(f"[hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, args.port, accepted, "hibernate") + print(f"[hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(args.port, hib_accepted, "hibernated", args.poll_timeout, "hibernate") + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Copy snapshots while hub is still running. All instances are + # hibernated (no writers), watchdog-hibernated-timeout is 86400s + # (no auto-deprovision), hub is only touching its own metadata + # outside servers/<mid>/. Safe. --- + copy_src = hub_data_dir / "servers" + to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}") + t0 = time.monotonic() + modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy) + print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({time.monotonic()-t0:.1f}s)") + + if modules_copied < len(to_copy): + print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied") + exit_code = 1 + + # --- Graceful hub shutdown via 'zen down' --- + _zen_down_hub(zen_exe, hub_proc) + hub_proc = None + if hub_log_handle is not None: + hub_log_handle.close() + hub_log_handle = None + + print(f"[summary] stage A total elapsed: {time.monotonic()-t_start:.1f}s, exit={exit_code}") + print(f"[summary] snapshot available at: {snapshot_dir}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + # Fallback: something aborted before the 'zen down' path ran. + print("[hub] force-terminating (zen down was not invoked)") + hub_proc.terminate() + try: + hub_proc.wait(timeout=60) + except subprocess.TimeoutExpired: + hub_proc.kill() + hub_proc.wait() + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) |