1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
|
#!/usr/bin/env python3
"""Stage C of the perf-seed workflow: run a perf iteration against the
preserved MinIO state.
Flow per invocation:
1. Reset --minio-run by copying --minio-seeded over it (so every iteration
starts from the same canonical state).
2. Start MinIO against --minio-run.
3. Start a hub against MinIO. By default --hub-enable-dehydration=false so
the perf run is read-only and the seeded baseline survives intact.
Pass --enable-dehydration to run a full provision -> deprovision cycle
that re-uploads at deprovision time (use this to measure the dehydrate
phase as well as hydrate; the seeded baseline diverges after such a run).
4. Provision every module found under --snapshot-dir (same set that was
used to seed MinIO), wait for all to reach 'provisioned'.
5. Deprovision them all, wait for them to be gone.
6. Stop hub + MinIO cleanly.
Optional --trace writes a utrace file to --hub-data-dir/hub.utrace.
This script reuses the lightweight harness helpers from the other stages but
keeps the run fully offline once the seed is preserved.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import signal
import subprocess
import sys
import time
import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional
_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
_MINIO_USER = "minioadmin"
_MINIO_PASS = "minioadmin"
def _rmtree_robust(path) -> None:
"""shutil.rmtree with a Windows-friendly retry for read-only files."""
import os as _os
import stat as _stat
def _onerror(func, p, exc_info):
try:
_os.chmod(p, _stat.S_IWRITE)
func(p)
except Exception:
pass
# onexc was introduced in py3.12; fall back to onerror on older versions.
if sys.version_info >= (3, 12):
shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
else:
shutil.rmtree(path, onerror=_onerror)
def _find_zenserver(override: Optional[str]) -> Path:
if override:
p = Path(override) / f"zenserver{_EXE_SUFFIX}"
if not p.exists():
sys.exit(f"zenserver not found at {p}")
return p
script_dir = Path(__file__).resolve().parent
repo_root = script_dir.parent.parent
for mode in ("release", "debug"):
for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
if p.exists():
return p
sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
def _find_zen(zenserver_exe: Path) -> Path:
p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
if not p.exists():
sys.exit(f"zen CLI not found at {p}")
return p
def _find_minio(zenserver_path: Path) -> Path:
p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
if not p.exists():
sys.exit(f"minio executable not found at {p}")
return p
def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
data_dir.mkdir(parents=True, exist_ok=True)
env = os.environ.copy()
env["MINIO_ROOT_USER"] = _MINIO_USER
env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
popen_kwargs: dict = {}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = subprocess.Popen(
[str(minio_exe), "server", str(data_dir),
"--address", f":{port}",
"--console-address", f":{console_port}",
"--quiet"],
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
**popen_kwargs,
)
print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}")
return proc
def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
deadline = time.monotonic() + timeout_s
url = f"http://localhost:{port}/minio/health/live"
while time.monotonic() < deadline:
try:
with urllib.request.urlopen(url, timeout=1):
print("[minio] ready")
return
except Exception:
time.sleep(0.1)
sys.exit(f"[minio] timed out after {timeout_s}s")
def _start_hub(
zenserver_exe: Path,
data_dir: Path,
port: int,
log_file: Path,
extra_args: list[str],
extra_env: dict[str, str],
) -> tuple[subprocess.Popen, object]:
data_dir.mkdir(parents=True, exist_ok=True)
# Use the production-like Lua config (limits, thread counts, watchdog timers)
# so perf numbers reflect what prod actually runs. Hub.hydration.threads is
# left unset so the default auto-pick (hardware_concurrency/4) fires; combined
# with --corelimit=128 below, that yields 32 hydration threads on the actual
# prod r5n.32xlarge class, and clamps to the real core count on smaller boxes.
config_dir = Path(__file__).resolve().parent / "perf_configs"
hub_config = config_dir / "hub.lua"
inst_config = config_dir / "instance.lua"
cmd = [
str(zenserver_exe),
"hub",
"--enable-execution-history=false",
f"--data-dir={data_dir}",
f"--port={port}",
f"--config={hub_config}",
f"--hub-instance-config={inst_config}",
# Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box
# with fewer physical cores, this clamps to the actual count (see
# LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates).
# When running on the actual prod instance class, this makes the default
# auto-picks (provision/hydration thread counts) behave as prod does.
"--corelimit=128",
] + extra_args
env = os.environ.copy()
env.update(extra_env)
popen_kwargs: dict = {}
if sys.platform == "win32":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
log_handle = log_file.open("wb")
try:
proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs)
except Exception:
log_handle.close()
raise
print(f"[hub] started (pid {proc.pid}), log: {log_file}")
return proc, log_handle
def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
deadline = time.monotonic() + timeout_s
req = urllib.request.Request(f"http://localhost:{port}/hub/status",
headers={"Accept": "application/json"})
while time.monotonic() < deadline:
if proc.poll() is not None:
sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})")
try:
with urllib.request.urlopen(req, timeout=2):
print("[hub] ready")
return
except Exception:
time.sleep(0.2)
sys.exit(f"[hub] timed out after {timeout_s}s")
def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
"""'zen down --pid' for graceful hub shutdown."""
if hub_proc.poll() is not None:
return
pid = hub_proc.pid
print(f"[hub] zen down --pid {pid}")
rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
if rc != 0:
print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
try:
hub_proc.wait(timeout=timeout_s)
except subprocess.TimeoutExpired:
print(f"[hub] did not exit after {timeout_s}s, killing")
hub_proc.kill()
hub_proc.wait()
def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
if proc.poll() is not None:
return
try:
if sys.platform == "win32":
proc.send_signal(signal.CTRL_BREAK_EVENT)
else:
proc.terminate()
except Exception:
proc.terminate()
try:
proc.wait(timeout=timeout_s)
except subprocess.TimeoutExpired:
print(f"[minio] did not exit after {timeout_s}s, killing")
proc.kill()
proc.wait()
def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
url = f"http://localhost:{port}{path}"
req = urllib.request.Request(url, data=b"{}", method="POST",
headers={"Content-Type": "application/json",
"Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
try:
body = json.loads(resp.read())
except Exception:
body = {}
return resp.status, body
except urllib.error.HTTPError as e:
try:
body = json.loads(e.read())
except Exception:
body = {}
return e.code, body
except Exception as e:
return 0, {"error": str(e)}
def _hub_states(port: int) -> Optional[dict[str, str]]:
# Without Accept: application/json the hub serves compact binary (CBOR)
# which json.loads can't parse. 60s timeout - /hub/status can take a few
# seconds to serialize with 300+ modules in flight.
req = urllib.request.Request(
f"http://localhost:{port}/hub/status",
headers={"Accept": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
except Exception as e:
# Visibility: prior silent None caused "0/300 provisioned..." forever.
print(f"[status] WARN /hub/status failed: {e}")
return None
return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
def _fan_out_post(
pool: ThreadPoolExecutor,
port: int,
module_ids: list[str],
verb: str,
) -> tuple[list[str], list[tuple[str, int, dict]]]:
futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids}
accepted: list[str] = []
failures: list[tuple[str, int, dict]] = []
for mid, fut in futures.items():
status, body = fut.result()
if status in (200, 202):
accepted.append(mid)
elif verb == "deprovision" and status == 404:
accepted.append(mid)
else:
failures.append((mid, status, body))
return accepted, failures
def _state_histogram(states: dict[str, str], remaining: set[str]) -> str:
counts: dict[str, int] = {}
for mid in remaining:
s = states.get(mid, "<absent>")
counts[s] = counts.get(s, 0) + 1
return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1]))
def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]:
deadline = time.monotonic() + timeout_s
remaining = set(ids)
last_verbose = 0.0
while remaining and time.monotonic() < deadline:
states = _hub_states(port)
if states is not None:
for mid in list(remaining):
if states.get(mid) == "provisioned":
remaining.discard(mid)
done = len(ids) - len(remaining)
now = time.monotonic()
if states is not None and (now - last_verbose >= 10.0 or not remaining):
hist = _state_histogram(states, remaining) if remaining else "(all provisioned)"
print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}")
last_verbose = now
else:
print(f"[provision] {done}/{len(ids)} provisioned...", end="\r")
time.sleep(2.0)
print()
return list(remaining)
def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]:
deadline = time.monotonic() + timeout_s
remaining = set(ids)
last_verbose = 0.0
while remaining and time.monotonic() < deadline:
states = _hub_states(port)
if states is not None:
for mid in list(remaining):
s = states.get(mid, "")
if mid not in states or s == "unprovisioned":
remaining.discard(mid)
done = len(ids) - len(remaining)
now = time.monotonic()
if states is not None and (now - last_verbose >= 10.0 or not remaining):
hist = _state_histogram(states, remaining) if remaining else "(all gone)"
print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}")
last_verbose = now
else:
print(f"[deprovision] {done}/{len(ids)} gone...", end="\r")
time.sleep(2.0)
print()
return list(remaining)
def _robust_copytree(src: Path, dst: Path) -> None:
"""Windows-friendly directory copy with progress.
Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy
file data + attributes + timestamps explicitly (not just the defaults).
Safe because dst is always the working dir (minio-run) - never the
preserved baseline.
"""
if sys.platform == "win32":
cmd = [
"robocopy", str(src), str(dst),
"/MIR",
"/COPY:DAT", "/DCOPY:DAT",
"/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP",
"/R:2", "/W:1",
]
print(f"[reset] robocopy {src} -> {dst}")
rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if rc >= 8:
sys.exit(f"[reset] robocopy failed rc={rc}")
else:
if dst.exists():
_rmtree_robust(dst)
shutil.copytree(src, dst, symlinks=False)
def _archive_run(
archive_dir: Path,
hub_data_dir: Path,
bucket: str,
summary: dict,
) -> Optional[Path]:
"""Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a
timestamped subdir under archive_dir so successive runs can be compared.
Returns the archive destination path or None if there was nothing to copy.
"""
try:
ts = time.strftime("%Y%m%d-%H%M%S", time.localtime())
dest = archive_dir / f"{ts}_{bucket}"
dest.mkdir(parents=True, exist_ok=True)
copied: list[str] = []
# Hub stdout/stderr (captured by _start_hub via subprocess stdout=)
hub_log = hub_data_dir / "hub.log"
if hub_log.is_file():
shutil.copy2(hub_log, dest / "hub.log")
copied.append("hub.log")
# Structured zenserver logs (rotated variants included)
logs_src = hub_data_dir / "logs"
if logs_src.is_dir():
logs_dst = dest / "logs"
logs_dst.mkdir(exist_ok=True)
for p in logs_src.iterdir():
if p.is_file():
shutil.copy2(p, logs_dst / p.name)
copied.append(f"logs/{p.name}")
# Optional trace
utrace = hub_data_dir / "hub.utrace"
if utrace.is_file():
shutil.copy2(utrace, dest / "hub.utrace")
copied.append("hub.utrace")
(dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii")
print(f"[archive] {dest} ({len(copied)} files)")
return dest
except Exception as e:
print(f"[archive] WARNING: failed to archive run: {e}")
return None
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
"Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run",
help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)")
parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)")
parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf",
help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf")
parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs",
help="Where to archive hub.log + zenserver.log + hub.utrace after each run "
"(default: E:/Dev/zen-perf-seed/perf-runs)")
parser.add_argument("--bucket", default="zen-seed-packed",
help="MinIO bucket name to exercise (default: zen-seed-packed). "
"Pack worktree seeds only the packed bucket.")
parser.add_argument("--minio-port", type=int, default=9000)
parser.add_argument("--minio-console-port", type=int, default=9001)
parser.add_argument("--hub-port", type=int, default=8558)
parser.add_argument("--module-count", type=int, default=0,
help="Cap on modules used (0 = all from snapshot-dir)")
parser.add_argument("--workers", type=int, default=50)
parser.add_argument("--poll-timeout", type=float, default=1200.0)
parser.add_argument("--settle-seconds", type=float, default=5.0)
parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS",
help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')")
parser.add_argument("--enable-dehydration", action="store_true",
help="Run with --hub-enable-dehydration=true so the deprovision phase "
"actually re-uploads to MinIO. Default false preserves the seeded "
"baseline; pass this to measure dehydrate cost as well as hydrate.")
parser.add_argument("--no-wipe-hub", action="store_true",
help="Don't wipe the hub data dir before starting (useful to inspect leftover state)")
parser.add_argument("--zenserver-dir",
help="Directory containing zenserver + minio executables (auto-detected)")
args = parser.parse_args()
minio_seeded = Path(args.minio_seeded).resolve()
minio_run = Path(args.minio_run).resolve()
snapshot_dir = Path(args.snapshot_dir).resolve()
hub_data_dir = Path(args.hub_data_dir).resolve()
archive_dir = Path(args.archive_dir).resolve()
if not minio_seeded.is_dir():
sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}")
if not snapshot_dir.is_dir():
sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}")
# Preserved inputs are read-only from this script's perspective. Refuse to
# run if any mutable path could accidentally clobber them.
for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)):
for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)):
if mutable == ro or mutable in ro.parents or ro in mutable.parents:
sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}")
module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()])
if args.module_count > 0:
module_ids = module_ids[:args.module_count]
if not module_ids:
sys.exit(f"[setup] no modules in {snapshot_dir}")
zenserver_exe = _find_zenserver(args.zenserver_dir)
zen_exe = _find_zen(zenserver_exe)
minio_exe = _find_minio(zenserver_exe)
zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
print(f"[setup] build mode: {zenserver_mode}")
print(f"[setup] zenserver: {zenserver_exe}")
print(f"[setup] minio: {minio_exe}")
print(f"[setup] minio-seeded: {minio_seeded}")
print(f"[setup] minio-run: {minio_run}")
print(f"[setup] hub-data-dir: {hub_data_dir}")
print(f"[setup] modules: {len(module_ids)}")
# Reset MinIO run dir from the baseline.
_robust_copytree(minio_seeded, minio_run)
# Wipe the hub data dir so every run starts from scratch unless the user opts out.
if not args.no_wipe_hub and hub_data_dir.exists():
print(f"[reset] wiping {hub_data_dir}")
_rmtree_robust(hub_data_dir)
hub_data_dir.mkdir(parents=True, exist_ok=True)
config_path = hub_data_dir / "hydration_config.json"
config_path.write_text(
json.dumps({
"type": "s3",
"settings": {
"uri": f"s3://{args.bucket}",
"endpoint": f"http://localhost:{args.minio_port}",
"path-style": True,
"region": "us-east-1",
},
}),
encoding="ascii",
)
hub_extra_args = [
f"--hub-hydration-target-config={config_path}",
f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}",
]
if args.enable_dehydration:
print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge")
if args.trace:
trace_path = hub_data_dir / "hub.utrace"
hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"]
print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
hub_extra_env = {
"AWS_ACCESS_KEY_ID": _MINIO_USER,
"AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
}
minio_proc: Optional[subprocess.Popen] = None
hub_proc: Optional[subprocess.Popen] = None
hub_log_handle = None
exit_code = 0
timings: dict = {
"bucket": args.bucket,
"module_count": len(module_ids),
"build_mode": zenserver_mode,
"provision_fanout_s": None,
"provision_total_s": None,
"deprovision_fanout_s": None,
"deprovision_total_s": None,
"total_s": None,
}
try:
minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port)
_wait_for_minio(args.minio_port)
hub_log = hub_data_dir / "hub.log"
hub_proc, hub_log_handle = _start_hub(
zenserver_exe, hub_data_dir, args.hub_port, hub_log,
hub_extra_args, hub_extra_env,
)
_wait_for_hub(hub_proc, args.hub_port)
t_start = time.monotonic()
with ThreadPoolExecutor(max_workers=args.workers) as pool:
# --- Provision ---
print(f"[provision] firing {len(module_ids)} provision requests...")
t0 = time.monotonic()
accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision")
fanout_s = time.monotonic() - t0
timings["provision_fanout_s"] = round(fanout_s, 2)
print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)")
for mid, status, body in failures[:5]:
print(f"[provision] FAIL {mid}: status={status} body={body}")
if failures:
exit_code = 1
stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout)
total_s = time.monotonic() - t0
timings["provision_total_s"] = round(total_s, 2)
print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})")
if stuck:
exit_code = 1
print(f"[settle] waiting {args.settle_seconds:.0f}s")
time.sleep(args.settle_seconds)
# --- Deprovision ---
print(f"[deprovision] firing {len(accepted)} deprovision requests...")
t0 = time.monotonic()
dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision")
fanout_s = time.monotonic() - t0
timings["deprovision_fanout_s"] = round(fanout_s, 2)
print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)")
if dp_failures:
exit_code = 1
stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout)
total_s = time.monotonic() - t0
timings["deprovision_total_s"] = round(total_s, 2)
print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})")
if stuck_dp:
exit_code = 1
print(f"[settle] waiting {args.settle_seconds:.0f}s")
time.sleep(args.settle_seconds)
elapsed = time.monotonic() - t_start
timings["total_s"] = round(elapsed, 2)
print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}")
finally:
if hub_proc is not None and hub_proc.poll() is None:
_zen_down_hub(zen_exe, hub_proc)
if hub_log_handle is not None:
hub_log_handle.close()
if minio_proc is not None and minio_proc.poll() is None:
_stop_minio_graceful(minio_proc)
# Archive AFTER the hub has exited so in-flight log writes are flushed.
timings["exit_code"] = exit_code
_archive_run(archive_dir, hub_data_dir, args.bucket, timings)
return exit_code
if __name__ == "__main__":
sys.exit(main())
|