#!/usr/bin/env python3 """Stage B of the perf-seed workflow. Replays the snapshot produced by seed_s3_snapshot.py into a MinIO bucket so a later perf run can exercise it. Pack mode is fixed ON (the only mode the perf-seed pipeline caters to) - the hub is launched with --hub-hydration-enable-pack=true so dehydrate emits packed CAS. Flow: 1. Start a local MinIO server against --minio-data-dir, create the bucket. 2. Start a hub against MinIO (--bucket as target), dehydration ON, hibernated-watchdog timeout set huge so hibernated modules are not auto-deprovisioned before we deprovision them ourselves. 3. Provision every module discovered under --snapshot-dir (fresh hydrate against an empty bucket spawns the child on an empty dir). 4. Hibernate every module (child exits, state dir unlocked). 5. Overlay the snapshot into /servers//. 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(), which scans the overlaid ServerStateDir and uploads content into the bucket. 7. Shut the hub down via 'zen down --pid'. 8. Stop MinIO. Preservation of the resulting MinIO data directory is done by a separate step (see preserve_minio_state.py / PERF_SEED_README.md). """ from __future__ import annotations import argparse import json import os import shutil import signal import subprocess import sys import time import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor from pathlib import Path from typing import Optional _EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" _MINIO_USER = "minioadmin" _MINIO_PASS = "minioadmin" def _rmtree_robust(path) -> None: """shutil.rmtree with a Windows-friendly retry for read-only files.""" import os as _os import stat as _stat def _onerror(func, p, exc_info): try: _os.chmod(p, _stat.S_IWRITE) func(p) except Exception: pass # onexc was introduced in py3.12; fall back to onerror on older versions. if sys.version_info >= (3, 12): shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) else: shutil.rmtree(path, onerror=_onerror) # --------------------------------------------------------------------------- # Executable discovery - prefer release, fall back to debug. # --------------------------------------------------------------------------- def _find_zenserver(override: Optional[str]) -> Path: if override: p = Path(override) / f"zenserver{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zenserver not found at {p}") return p script_dir = Path(__file__).resolve().parent repo_root = script_dir.parents[2] for mode in ("release", "debug"): for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" if p.exists(): return p sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") def _find_zen(zenserver_exe: Path) -> Path: p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") return p def _find_minio(zenserver_path: Path) -> Path: p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" if not p.exists(): sys.exit(f"minio executable not found at {p}. Build the zen project first.") return p # --------------------------------------------------------------------------- # MinIO lifecycle # --------------------------------------------------------------------------- def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: data_dir.mkdir(parents=True, exist_ok=True) env = os.environ.copy() env["MINIO_ROOT_USER"] = _MINIO_USER env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP proc = subprocess.Popen( [str(minio_exe), "server", str(data_dir), "--address", f":{port}", "--console-address", f":{console_port}", "--quiet"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **popen_kwargs, ) print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}") return proc def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: deadline = time.monotonic() + timeout_s url = f"http://localhost:{port}/minio/health/live" while time.monotonic() < deadline: try: with urllib.request.urlopen(url, timeout=1): print("[minio] ready") return except Exception: time.sleep(0.1) sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") def _create_minio_bucket(port: int, bucket: str) -> None: try: import boto3 # type: ignore[import-not-found] import botocore.config # type: ignore[import-not-found] import botocore.exceptions # type: ignore[import-not-found] except ImportError: sys.exit("[minio] boto3 is required. pip install boto3") s3 = boto3.client( "s3", endpoint_url=f"http://localhost:{port}", aws_access_key_id=_MINIO_USER, aws_secret_access_key=_MINIO_PASS, region_name="us-east-1", config=botocore.config.Config(signature_version="s3v4"), ) try: s3.create_bucket(Bucket=bucket) print(f"[minio] created bucket '{bucket}'") except botocore.exceptions.ClientError as e: if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): print(f"[minio] bucket '{bucket}' already exists") else: raise # --------------------------------------------------------------------------- # Hub lifecycle # --------------------------------------------------------------------------- def _start_hub( zenserver_exe: Path, data_dir: Path, port: int, log_file: Path, instance_limit: int, extra_args: list[str], extra_env: dict[str, str], ) -> tuple[subprocess.Popen, object]: data_dir.mkdir(parents=True, exist_ok=True) cmd = [ str(zenserver_exe), "hub", "--enable-execution-history=false", f"--data-dir={data_dir}", f"--port={port}", "--hub-instance-corelimit=4", "--hub-provision-disk-limit-percent=99", "--hub-provision-memory-limit-percent=80", f"--hub-instance-limit={instance_limit}", # Provision / hydration / async-cap use server defaults; on a 128-core # host these resolve to 16 / 16 / 512 which are sized for both the # async hydrate (Stage A) and the sync dehydrate-PUT path that drives # the upload here. # Prevent the watchdog from auto-deprovisioning modules while we're # still hydrating the tail / in the overlay phase. BOTH timers have to # be extended - the provisioned one (default 600s) is what bites on # large-N runs where early provisions go idle waiting for the rest. "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", # Explicit - default is true, but make it obvious that Stage B needs # it since the final deprovision drives the MinIO upload. "--hub-enable-dehydration=true", # Pack ON (the only seeding mode) so dehydrate emits packed CAS. "--hub-hydration-enable-pack=true", ] + extra_args env = os.environ.copy() env.update(extra_env) popen_kwargs: dict = {} if sys.platform == "win32": popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP log_handle = log_file.open("wb") try: proc = subprocess.Popen( cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs, ) except Exception: log_handle.close() raise print(f"[hub] started (pid {proc.pid}), log: {log_file}") return proc, log_handle def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: deadline = time.monotonic() + timeout_s req = urllib.request.Request(f"http://localhost:{port}/hub/status", headers={"Accept": "application/json"}) while time.monotonic() < deadline: if proc.poll() is not None: sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})") try: with urllib.request.urlopen(req, timeout=2): print("[hub] ready") return except Exception: time.sleep(0.2) sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: """'zen down --pid' so the hub runs its normal shutdown path.""" if hub_proc.poll() is not None: return pid = hub_proc.pid print(f"[hub] zen down --pid {pid}") rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) if rc != 0: print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") try: hub_proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[hub] did not exit after {timeout_s}s, killing") hub_proc.kill() hub_proc.wait() def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: """MinIO has no external shutdown tool; Ctrl-Break lets it flush.""" if proc.poll() is not None: return try: if sys.platform == "win32": proc.send_signal(signal.CTRL_BREAK_EVENT) else: proc.terminate() except Exception: proc.terminate() try: proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: print(f"[minio] did not exit after {timeout_s}s, killing") proc.kill() proc.wait() # --------------------------------------------------------------------------- # Hub API # --------------------------------------------------------------------------- def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: url = f"http://localhost:{port}{path}" req = urllib.request.Request(url, data=b"{}", method="POST", headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: try: body = json.loads(resp.read()) except Exception: body = {} return resp.status, body except urllib.error.HTTPError as e: try: body = json.loads(e.read()) except Exception: body = {} return e.code, body except Exception as e: return 0, {"error": str(e)} def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: url = f"http://localhost:{port}/hub/status" req = urllib.request.Request(url, headers={"Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout_s) as resp: data = json.loads(resp.read()) except Exception: return None return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} def _fan_out_post( pool: ThreadPoolExecutor, port: int, module_ids: list[str], verb: str, ) -> tuple[list[str], list[tuple[str, int, dict]]]: """POST concurrently. For 'deprovision' a 404 means the module is already gone - count it as accepted so the state-poll recognises completion. """ futures = { mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids } accepted: list[str] = [] failures: list[tuple[str, int, dict]] = [] for mid, fut in futures.items(): status, body = fut.result() if status in (200, 202): accepted.append(mid) elif verb == "deprovision" and status == 404: accepted.append(mid) else: failures.append((mid, status, body)) return accepted, failures _FAILED_STATES = {"crashed", "unprovisioned"} def _wait_for_state( port: int, module_ids: list[str], target_state: str, timeout_s: float, label: str, ) -> tuple[list[str], list[str], dict[str, str]]: deadline = time.monotonic() + timeout_s remaining = set(module_ids) failed: list[str] = [] last_states: dict[str, str] = {mid: "" for mid in module_ids} while remaining and time.monotonic() < deadline: states = _hub_module_states(port) if states is not None: for mid in list(remaining): s = states.get(mid, "") last_states[mid] = s if s == target_state: remaining.discard(mid) elif s in _FAILED_STATES and target_state not in _FAILED_STATES: remaining.discard(mid) failed.append(mid) done = len(module_ids) - len(remaining) print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") time.sleep(2.0) print() return list(remaining), failed, last_states def _wait_for_deprovisioned( port: int, module_ids: list[str], timeout_s: float, ) -> tuple[list[str], list[str], dict[str, str]]: """Wait until each module is gone from /hub/status or in 'unprovisioned'. 'crashed' counts as a hard failure - dehydrate never completed for it. """ deadline = time.monotonic() + timeout_s remaining = set(module_ids) failed: list[str] = [] last_states: dict[str, str] = {mid: "" for mid in module_ids} while remaining and time.monotonic() < deadline: states = _hub_module_states(port) if states is not None: for mid in list(remaining): s = states.get(mid, "") last_states[mid] = s if mid not in states or s == "unprovisioned": remaining.discard(mid) elif s == "crashed": remaining.discard(mid) failed.append(mid) done = len(module_ids) - len(remaining) print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r") time.sleep(2.0) print() return list(remaining), failed, last_states # --------------------------------------------------------------------------- # Snapshot overlay # --------------------------------------------------------------------------- def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]: """Replace hub_servers_root//* with snapshot_root//*. snapshot_root is treated as read-only; only hub_servers_root is written to. Uses ReFS block-clone (`refs_clone.clone_tree`) when source and dest live on a ReFS volume so the overlay is O(1) per file via copy-on-write metadata rather than a full byte copy. Falls back to byte copy per-file when the volume is not ReFS or a file is too small for FSCTL_DUPLICATE_EXTENTS_TO_FILE. """ from refs_clone import clone_tree, is_refs_volume use_clone = is_refs_volume(snapshot_root) and is_refs_volume(hub_servers_root) print(f"[overlay] {'ReFS block-clone path' if use_clone else 'byte-copy path (non-ReFS)'}") files_copied = 0 bytes_copied = 0 modules_overlaid = 0 for i, mid in enumerate(module_ids, 1): src = snapshot_root / mid dst = hub_servers_root / mid if not src.is_dir(): print(f"[overlay] WARNING: snapshot missing for {mid}: {src}") continue if dst.exists(): _rmtree_robust(dst) if use_clone: f, b = clone_tree(src, dst) files_copied += f bytes_copied += b else: shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) for root, _dirs, files in os.walk(dst): for f in files: p = Path(root) / f try: bytes_copied += p.stat().st_size except OSError: pass files_copied += 1 modules_overlaid += 1 if i % 25 == 0 or i == len(module_ids): print(f"[overlay] {i}/{len(module_ids)} modules overlaid " f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") return modules_overlaid, files_copied, bytes_copied # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def _seed_one_bucket( *, bucket: str, hub_data_dir: Path, snapshot_dir: Path, module_ids: list[str], minio_port: int, hub_port: int, poll_timeout: float, workers: int, zenserver_exe: Path, zen_exe: Path, ) -> tuple[int, dict[str, float]]: """Run the provision/hibernate/overlay/deprovision cycle against a single MinIO bucket. Returns (exit_code, timings) where timings maps phase name to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s, total_s). """ print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================") hub_data_dir.mkdir(parents=True, exist_ok=True) config_path = hub_data_dir / "hydration_config.json" config_path.write_text( json.dumps({ "type": "s3", "settings": { "uri": f"s3://{bucket}", "endpoint": f"http://localhost:{minio_port}", "path-style": True, "region": "us-east-1", }, }), encoding="ascii", ) hub_extra_args = [f"--hub-hydration-target-config={config_path}"] hub_extra_env = { "AWS_ACCESS_KEY_ID": _MINIO_USER, "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, } hub_proc: Optional[subprocess.Popen] = None hub_log_handle = None exit_code = 0 hib_accepted: list[str] = [] stuck_hib: list[str] = [] failed_hib: list[str] = [] timings: dict[str, float] = { "provision_s": 0.0, "hibernate_s": 0.0, "overlay_s": 0.0, "deprovision_s": 0.0, "total_s": 0.0, } try: hub_log = hub_data_dir / "hub.log" hub_instance_limit = max(len(module_ids) + 10, 500) hub_proc, hub_log_handle = _start_hub( zenserver_exe, hub_data_dir, hub_port, hub_log, hub_instance_limit, hub_extra_args, hub_extra_env, ) _wait_for_hub(hub_proc, hub_port) t_start = time.monotonic() with ThreadPoolExecutor(max_workers=workers) as pool: # --- Provision --- print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...") t0 = time.monotonic() accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision") print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") for mid, status, body in failures[:10]: print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}") exit_code = 1 if not accepted: return 1, timings stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision") timings["provision_s"] = time.monotonic() - t0 prov_done = len(accepted) - len(stuck) - len(failed) print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " f"({timings['provision_s']:.1f}s)") if failed or stuck: exit_code = 1 for mid in (failed + stuck)[:10]: print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'") accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] if not accepted: return 1, timings # --- Hibernate --- print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...") t0 = time.monotonic() hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate") print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") for mid, status, body in hib_failures[:10]: print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}") exit_code = 1 stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate") timings["hibernate_s"] = time.monotonic() - t0 hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " f"({timings['hibernate_s']:.1f}s)") if failed_hib or stuck_hib: exit_code = 1 for mid in (failed_hib + stuck_hib)[:10]: print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") # --- Overlay snapshot onto hub's servers dir --- to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] hub_servers = hub_data_dir / "servers" print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}") t0 = time.monotonic() modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay) timings["overlay_s"] = time.monotonic() - t0 print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " f"({timings['overlay_s']:.1f}s)") if modules_overlaid < len(to_overlay): exit_code = 1 # --- Deprovision (triggers dehydrate -> MinIO upload) --- with ThreadPoolExecutor(max_workers=workers) as pool: print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...") t0 = time.monotonic() dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision") print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)") for mid, status, body in dp_failures[:10]: print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}") exit_code = 1 stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout) timings["deprovision_s"] = time.monotonic() - t0 dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp) print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck " f"({timings['deprovision_s']:.1f}s)") if failed_dp or stuck_dp: exit_code = 1 for mid in (failed_dp + stuck_dp)[:10]: print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'") timings["total_s"] = time.monotonic() - t_start print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}") finally: if hub_proc is not None and hub_proc.poll() is None: print(f"[{bucket}][hub] stopping...") _zen_down_hub(zen_exe, hub_proc) if hub_log_handle is not None: hub_log_handle.close() return exit_code, timings def main() -> int: parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--snapshot-dir", required=True, help="Source of per-module server-state trees (READ-ONLY).") parser.add_argument("--hub-data-root", required=True, help="Each bucket gets its own hub data dir under this root: /hub-b-/") parser.add_argument("--minio-data-dir", required=True, help="MinIO data dir shared by every bucket.") parser.add_argument("--minio-port", type=int, default=9000) parser.add_argument("--minio-console-port", type=int, default=9001) parser.add_argument("--hub-port", type=int, default=8558) parser.add_argument("--bucket", default="zen-seed-packed", help="Bucket to seed (default: zen-seed-packed). Hub is launched with " "--hub-hydration-enable-pack=true (the only seeding mode).") parser.add_argument("--module-count", type=int, default=1000, help="Cap on modules processed (default 1000, the hub instance shared-state " "limit; raising this above ~1023 will hit 'all slots occupied' errors). " "Pass 0 to process every module under --snapshot-dir but expect failures " "if the snapshot has more than ~1023 entries.") parser.add_argument("--workers", type=int, default=50) parser.add_argument("--poll-timeout", type=float, default=1800.0, help="Max seconds to wait for each state transition (default: 1800)") parser.add_argument("--wipe", action="store_true", help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting " "(never touches --snapshot-dir)") parser.add_argument("--zenserver-dir", help="Directory containing zenserver + minio executables (auto-detected)") args = parser.parse_args() bucket_name: str = args.bucket snapshot_dir = Path(args.snapshot_dir).resolve() hub_data_root = Path(args.hub_data_root).resolve() minio_data_dir = Path(args.minio_data_dir).resolve() hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve() # Safety: snapshot-dir must not overlap any mutable path. for label, d in [ ("minio-data-dir", minio_data_dir), ("hub-data-root", hub_data_root), (f"hub-b-{bucket_name}", hub_data_dir), ]: if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents: sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint") if not snapshot_dir.is_dir(): sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}") module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) if args.module_count > 0: module_ids = module_ids[:args.module_count] if not module_ids: sys.exit(f"[setup] no module directories found in {snapshot_dir}") if args.wipe: for d in [hub_data_dir, minio_data_dir]: if d.exists(): if snapshot_dir.is_relative_to(d): sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it") print(f"[setup] wiping {d}") _rmtree_robust(d) zenserver_exe = _find_zenserver(args.zenserver_dir) zen_exe = _find_zen(zenserver_exe) minio_exe = _find_minio(zenserver_exe) zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") print(f"[setup] build mode: {zenserver_mode}") print(f"[setup] zenserver: {zenserver_exe}") print(f"[setup] zen cli: {zen_exe}") print(f"[setup] minio: {minio_exe}") print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)") print(f"[setup] hub-data-root: {hub_data_root}") print(f"[setup] minio-data-dir: {minio_data_dir}") print(f"[setup] modules: {len(module_ids)}") print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})") minio_proc: Optional[subprocess.Popen] = None exit_code = 0 try: minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port) _wait_for_minio(args.minio_port) _create_minio_bucket(args.minio_port, bucket_name) t_total = time.monotonic() rc, timings = _seed_one_bucket( bucket=bucket_name, hub_data_dir=hub_data_dir, snapshot_dir=snapshot_dir, module_ids=module_ids, minio_port=args.minio_port, hub_port=args.hub_port, poll_timeout=args.poll_timeout, workers=args.workers, zenserver_exe=zenserver_exe, zen_exe=zen_exe, ) exit_code = rc print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}") print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}") phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"] labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"] print(f"\n[summary] timings (s):") for key, lbl in zip(phases, labels): print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}") print(f"[summary] next: preserve {minio_data_dir} via preserve_minio_state.py --source --dest ") finally: if minio_proc is not None and minio_proc.poll() is None: print("[minio] stopping...") _stop_minio_graceful(minio_proc) return exit_code if __name__ == "__main__": sys.exit(main())