aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-27 11:14:09 +0200
committerGitHub Enterprise <[email protected]>2026-04-27 11:14:09 +0200
commit753ab4e89b9a5952e50bc77d404198520b362a3a (patch)
tree39dbaad8389677981281b8c1585ac846251539f0
parentfix crash when scavenging sequences or copying local chunks (#1013) (diff)
downloadarchived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.tar.xz
archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.zip
hydration with pack (#1016)
- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files - `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true) - `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB) - `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB) - Feature: Hub hydration and dehydration can be disabled per direction - `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true) - `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true) - Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults. - Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately - Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes - Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer - Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB) - Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
-rw-r--r--CHANGELOG.md13
-rw-r--r--VERSION.txt2
-rw-r--r--docs/hub.md122
-rw-r--r--scripts/test_scripts/hub/PERF_SEED_README.md148
-rw-r--r--scripts/test_scripts/hub/analyze_perf_runs.py348
-rw-r--r--scripts/test_scripts/hub/hub_load_test_s3.py537
-rw-r--r--scripts/test_scripts/hub/perf_configs/hub.lua48
-rw-r--r--scripts/test_scripts/hub/perf_configs/instance.lua15
-rw-r--r--scripts/test_scripts/hub/preserve_minio_state.py126
-rw-r--r--scripts/test_scripts/hub/run_minio_perf.py614
-rw-r--r--scripts/test_scripts/hub/seed_minio.py720
-rw-r--r--scripts/test_scripts/hub/seed_s3_snapshot.py635
-rw-r--r--src/zencore/include/zencore/string.h6
-rw-r--r--src/zencore/string.cpp62
-rw-r--r--src/zenserver/hub/hub.cpp68
-rw-r--r--src/zenserver/hub/hub.h5
-rw-r--r--src/zenserver/hub/hydration.cpp2246
-rw-r--r--src/zenserver/hub/hydration.h45
-rw-r--r--src/zenserver/hub/hydrationdefaults.h26
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp24
-rw-r--r--src/zenserver/hub/storageserverinstance.h5
-rw-r--r--src/zenserver/hub/zenhubserver.cpp52
-rw-r--r--src/zenserver/hub/zenhubserver.h8
-rw-r--r--src/zenutil/cloud/s3client.cpp32
24 files changed, 5378 insertions, 529 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cb515b2e2..3288f8bba 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,17 @@
##
+- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files
+ - `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true)
+ - `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB)
+ - `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB)
+- Feature: Hub hydration and dehydration can be disabled per direction
+ - `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true)
+ - `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true)
+- Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults.
+- Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately
+- Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes
+- Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer
+- Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB)
+- Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
- Bugfix: `zen builds download` no longer crashes intermittently when scavenging sequences or copying local chunks
## 5.8.7
diff --git a/VERSION.txt b/VERSION.txt
index 6f4fe76b0..c7735a587 100644
--- a/VERSION.txt
+++ b/VERSION.txt
@@ -1 +1 @@
-5.8.8 \ No newline at end of file
+5.8.9-pre2 \ No newline at end of file
diff --git a/docs/hub.md b/docs/hub.md
index 27cf98512..63d90c502 100644
--- a/docs/hub.md
+++ b/docs/hub.md
@@ -243,11 +243,18 @@ suitable for single-host deployments where instances share locally cached data.
`targetspec` and `targetconfig` are mutually exclusive.
-| CLI flag | Lua key | Default | Description |
-|-----------------------------------|------------------------------|---------------------------|-------------|
-| `--hub-hydration-target-spec` | `hub.hydration.targetspec` | _(local path, see above)_ | Shorthand URI for the hydration source. Must use the `file://` prefix for file targets: `file:///absolute/path`. |
-| `--hub-hydration-target-config` | `hub.hydration.targetconfig` | _(none)_ | Path to a JSON file specifying the hydration source. Supports `file` and `s3` backends. |
-| `--hub-hydration-threads` | `hub.hydration.threads` | `max(cpu/4, 2)` | Thread count for the hydration/dehydration worker pool. Controls parallel file hashing and backend I/O during hydrate/dehydrate. Set to `0` for synchronous operation. |
+| CLI flag | Lua key | Default | Description |
+|---------------------------------------------|---------------------------------------------|---------------------------|-------------|
+| `--hub-hydration-target-spec` | `hub.hydration.targetspec` | _(local path, see above)_ | Shorthand URI for the hydration source. Must use the `file://` prefix for file targets: `file:///absolute/path`. |
+| `--hub-hydration-target-config` | `hub.hydration.targetconfig` | _(none)_ | Path to a JSON file specifying the hydration source. Supports `file` and `s3` backends. |
+| `--hub-hydration-threads` | `hub.hydration.threads` | `max(cpu/4, 2)` | Thread count for the hydration/dehydration worker pool. Controls parallel file hashing and backend I/O during hydrate/dehydrate. Set to `0` for synchronous operation. |
+| `--hub-enable-hydration` | `hub.enablehydration` | `true` | Load instance state from the hydration target on provision. Disable to start every provision from an empty instance directory. |
+| `--hub-enable-dehydration` | `hub.enabledehydration` | `true` | Save instance state to the hydration target on deprovision. Disable to run the hydrate-only path (useful for perf testing against a fixed backend snapshot). |
+| `--hub-hydration-enable-pack` | `hub.hydration.enablepack` | `true` | Concatenate small files into CAS pack blobs during dehydrate. See [Pack](#pack). |
+| `--hub-hydration-pack-threshold-bytes` | `hub.hydration.packthresholdbytes` | `262144` (256 KiB) | Files strictly smaller than this are pack candidates. Larger files are stored as standalone CAS entries. |
+| `--hub-hydration-max-pack-bytes` | `hub.hydration.maxpackbytes` | `4194304` (4 MiB) | Upper bound on a single pack's concatenation size. Candidates are bin-packed greedily; packs that would exceed this cap are closed and a new pack is started. A unique candidate larger than the cap falls back to standalone upload. |
+
+Multipart chunk size is S3-specific and set via the target config (see [Multipart chunking](#multipart-chunking)).
**File backend** (`hub.hydration.targetconfig` JSON):
@@ -269,24 +276,115 @@ suitable for single-host deployments where instances share locally cached data.
"uri": "s3://bucket-name/optional/prefix",
"region": "us-east-1",
"endpoint": "https://custom-endpoint",
- "path-style": false
+ "path-style": false,
+ "chunksize": 67108864
}
}
```
+Both backends accept the optional top-level `excludes` key to override the built-in
+defaults; see [Excludes](#excludes) for the schema and the default list.
+
S3 settings:
-| Field | Required | Description |
-|---|---|---|
-| `uri` | Yes | S3 URI. Include a prefix path to isolate hub data within a shared bucket. |
-| `region` | No | AWS region. Defaults to `us-east-1`; also reads `AWS_DEFAULT_REGION` / `AWS_REGION`. |
-| `endpoint` | No | Custom endpoint URL for S3-compatible services (MinIO, Ceph, etc.). |
-| `path-style` | No | Use path-style S3 URLs instead of virtual-hosted. Required by some S3-compatible services. Default `false`. |
+| Field | Required | Default | Description |
+|---|---|---|---|
+| `uri` | Yes | - | S3 URI. Include a prefix path to isolate hub data within a shared bucket. |
+| `region` | No | `us-east-1` | AWS region. Also reads `AWS_DEFAULT_REGION` / `AWS_REGION`. |
+| `endpoint` | No | - | Custom endpoint URL for S3-compatible services (MinIO, Ceph, etc.). |
+| `path-style` | No | `false` | Use path-style S3 URLs instead of virtual-hosted. Required by some S3-compatible services. |
+| `chunksize` | No | `67108864` (64 MiB) | Multipart chunk size for GET/PUT on files at or above 1.25x this value. See [Multipart chunking](#multipart-chunking). |
S3 credentials are read from environment variables (`AWS_ACCESS_KEY_ID`,
`AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`). If `AWS_ACCESS_KEY_ID` is not set, the hub
falls back to IMDS instance credentials.
+#### Pack
+
+When pack is enabled (default), dehydrate concatenates every unique file smaller than
+`packthresholdbytes` into one or more CAS pack blobs, rather than uploading each file as
+its own CAS entry. Pack contents are stored raw (no compression); each pack's CAS key is
+the hash of its concatenated bytes. Files at or above the threshold continue to upload as
+standalone CAS entries.
+
+Pack composition is deterministic: candidates are ordered by content hash and bin-packed
+greedily up to `maxpackbytes`. Identical module state produces identical pack hashes
+across runs, so `ExistsLookup` deduplicates packs between redeploys just like regular
+CAS entries. A pack that would contain fewer than two entries is discarded and its
+candidates revert to standalone upload.
+
+The state manifest records per-pack entries and each file's owning pack, so hydrate
+resolves packed files by downloading the pack once and slicing it into the target files.
+
+The primary win is request count: for modules dominated by tiny metadata files, pack can
+collapse dozens of GETs into a single request, which dominates wall time on high-RTT
+backends (real S3) more than it matters on localhost MinIO.
+
+#### Excludes
+
+The optional top-level `excludes` key on the target config is an array of wildcard
+patterns matched against each file's relative path (forward-slash form). Matching
+files are dropped at the dehydrate-side directory scan and never enter the manifest.
+`*` is path-separator-agnostic, so `auth/*` matches `auth/authstate` and any deeper
+path under `auth/`.
+
+`excludes` uses **override semantics, not additive**: if the field is present in the
+target config its contents fully replace the default list. An explicit empty array
+(`"excludes": []`) is honoured as "apply no excludes". Field absent from the config
+means the built-in default applies.
+
+Built-in default:
+
+| Pattern | Why |
+|---|---|
+| `.sentry-native/*` | Sentry-native crash uploader DB; locked while child runs |
+| `state_marker` | Root-level liveness marker re-created by the child |
+| `.lock` | Instance lock file (`FILE_FLAG_DELETE_ON_CLOSE`); locked while child runs |
+| `*.bak` | Transient backups produced by atomic file replace |
+| `gc/reserve.gc` | GC disk reserve under the per-store `gc/` subdirectory |
+| `auth/*` | Encrypted auth state (`auth/authstate`) |
+
+A target config whose `excludes` array reproduces the built-in default (equivalent to
+omitting the key from the config):
+
+```json
+{
+ "type": "file",
+ "settings": {
+ "path": "/data/hydration_storage"
+ },
+ "excludes": [
+ ".sentry-native/*",
+ "state_marker",
+ ".lock",
+ "*.bak",
+ "gc/reserve.gc",
+ "auth/*"
+ ]
+}
+```
+
+#### Multipart chunking
+
+Large files above 1.25x the chunk size are transferred in chunks using S3 range requests
+(GET) or multipart uploads (PUT). Smaller files use a single request.
+
+Chunk size is an S3-specific setting. It has no CLI or Lua surface - it applies only to
+S3 backends and is set per-target:
+- Target config JSON key: `chunksize` (under `settings`).
+- State.cbo persists it per-module as `MultipartChunkSize` (under `StorageSettings`).
+ This locks each module's chunking to what was used at its last dehydrate.
+
+The chunk size a module uses is **persisted into state.cbo during dehydrate** so the
+next hydrate uses the same partitioning. Changing the target-config `chunksize` affects
+only modules with no prior state.cbo; existing modules continue to use the value
+recorded at their last dehydrate. State.cbo files without the field fall back to
+`DefaultMultipartChunkSize` (64 MiB).
+
+The default of 64 MiB is tuned for intra-region Nitro EC2 instances (hub and S3 bucket in
+the same AWS region). For smaller instance types where per-connection bandwidth is under
+~40 MB/s, 32 MiB may give better thread-pool utilisation on multi-hundred-MiB files.
+
---
### Upstream Notifications
diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md
new file mode 100644
index 000000000..fb471d4bb
--- /dev/null
+++ b/scripts/test_scripts/hub/PERF_SEED_README.md
@@ -0,0 +1,148 @@
+# Perf-seed workflow
+
+Three-stage pipeline for running repeatable hub-hydration perf tests against a
+local MinIO backend seeded with real module data pulled from production S3.
+
+## Layout
+
+All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/`
+in the script defaults, but every path is overridable via CLI flag (see the
+per-stage options below). Pick a root with enough free space (snapshots and
+preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on
+each invocation or change the script defaults to your chosen root.
+
+Layout under the chosen root (`<perf-seed>/`):
+
+```
+<perf-seed>/
+ hub-a/ Stage A hub data dir (transient)
+ servers/<moduleid>/
+ s3-snapshot/ Preserved production server-state trees (read-only after Stage A)
+ <moduleid>/
+ hubs/ Stage B per-bucket hub data dirs (transient)
+ hub-b-zen-seed-packed/
+ hub-b-zen-seed-unpacked/
+ minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket)
+ minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve)
+ README.txt
+ minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree)
+ README.txt
+ hub-perf/ Stage C hub data dir (wiped each run)
+ minio-run/ Stage C MinIO data dir (wiped + re-copied each run)
+ perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json
+ 20260423-141530_zen-seed-packed/
+ 20260423-143112_zen-seed-unpacked/
+```
+
+## Prerequisites
+
+- Debug or release build of zenserver + minio: `xmake -y`
+- `pip install boto3`
+- AWS CLI v2 with an SSO profile configured (for Stage A only)
+- Environment variables (or pass equivalents via CLI flags):
+ - `ZEN_PERF_S3_URI` - source S3 bucket, e.g. `s3://your-bucket/optional-prefix/`
+ - `ZEN_PERF_AWS_PROFILE` - AWS SSO profile name with read access to that bucket
+ - `ZEN_PERF_AWS_REGION` - optional, defaults to `us-east-1`
+
+## Stage A - snapshot real S3 data
+
+One-time (or when you want a fresh baseline from production).
+
+```
+export ZEN_PERF_S3_URI=s3://your-bucket/
+export ZEN_PERF_AWS_PROFILE=your-sso-profile
+python scripts/test_scripts/hub/seed_s3_snapshot.py
+```
+
+Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies
+`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login`
+automatically if the SSO token is missing or expired.
+
+Module selection ranks all UUID-shaped folders by their
+`incremental-state.cbo` `LastModified` (newest first, a proxy for
+most-recently-accessed) and takes the top `--module-count`.
+
+Options:
+- `--module-count N` (default 1000)
+- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`)
+- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`)
+
+## Stage B - seed MinIO from the snapshot
+
+One-time per pack-mode (or when `s3-snapshot` changes).
+
+`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is
+hardcoded inside the script (`--hub-hydration-enable-pack=true` near the
+top of `_start_hub`). To produce both packed and unpacked baselines for
+comparison, invoke the script twice from two separate worktrees - one with
+the flag flipped to `false` - and preserve the resulting MinIO data dir
+each time.
+
+```
+# In the pack worktree (flag = true), seeds zen-seed-packed
+python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed
+python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed
+
+# In the no-pack worktree (flag = false), seeds zen-seed-unpacked
+python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked
+python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked
+```
+
+The script provisions every module found under `s3-snapshot/`, hibernates
+them, overlays the snapshot on top of the hub's servers dir, then
+deprovisions all modules - which runs the dehydrate path and uploads the
+content into the bucket.
+
+`preserve_minio_state.py` copies the resulting `minio-data/` to a
+variant-specific preservation dir and writes a README with provenance.
+
+Options of interest:
+- `--bucket NAME` - bucket name (default `zen-seed-packed`).
+- `--wipe` removes the per-bucket hub data dir and the shared minio-data
+ dir before starting.
+- `--module-count N` caps the set (0 = every module in snapshot-dir).
+
+## Stage C - run a perf iteration
+
+Repeat as often as you want; each run starts from the preserved baseline.
+
+```
+# Pack-on bucket
+python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace
+
+# Pack-off bucket (for comparison)
+python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace
+```
+
+Steps:
+1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state.
+2. Wipes `hub-perf/` (unless `--no-wipe-hub`).
+3. Starts MinIO and hub.
+4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone.
+5. Stops everything cleanly.
+
+Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every
+iteration exercises the hydrate-only path against the same baseline CAS. The
+`--bucket` flag selects which seeded bucket (and therefore which pack mode)
+to exercise.
+
+Pass `--enable-dehydration` to run a full provision -> deprovision cycle that
+includes re-upload (dehydrate) at deprovision time. Use this to measure the
+dehydrate phase end-to-end against the seeded baseline. Note the seeded
+baseline diverges after a `--enable-dehydration` run - re-copy `--minio-seeded`
+or re-run `preserve_minio_state.py` if you want to compare to the pristine state.
+
+After each run the hub log, structured zenserver logs, any utrace file, and a
+`summary.json` with the run's timings are copied into
+`perf-runs/<timestamp>_<bucket>/` so Stage C runs can be compared
+post-hoc. Override the destination with `--archive-dir PATH`.
+
+## Resetting between runs
+
+- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild.
+- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`.
+
+To force a fresh MinIO seed for one variant: delete the matching
+`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching
+`--dest`) in that worktree. To force a fresh S3 snapshot: delete
+`s3-snapshot/` and re-run Stage A.
diff --git a/scripts/test_scripts/hub/analyze_perf_runs.py b/scripts/test_scripts/hub/analyze_perf_runs.py
new file mode 100644
index 000000000..521d9cc1e
--- /dev/null
+++ b/scripts/test_scripts/hub/analyze_perf_runs.py
@@ -0,0 +1,348 @@
+#!/usr/bin/env python3
+"""Parse two hub.log archives (packed vs unpacked) and produce a comparison."""
+
+from __future__ import annotations
+
+import argparse
+import re
+import sys
+from pathlib import Path
+from statistics import mean, median
+
+
+_SIZE_UNITS = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4}
+_TIME_UNITS = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1.0}
+
+
+def parse_size(s: str) -> float:
+ # Accept forms: "1.53K", "22.6K", "180M", "1.83G", "25" (bare bytes), "0B".
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)B?", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2) or "B"
+ return float(val) * _SIZE_UNITS.get(unit, 1)
+
+
+def parse_time_s(s: str) -> float:
+ # Accept forms: "1s", "46ms", "712us", "0ns"
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(ns|us|ms|s)", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2)
+ return float(val) * _TIME_UNITS[unit]
+
+
+def parse_rate_bits(s: str) -> float:
+ # "30.7Mbits/s" -> bits/sec
+ m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)bits/s", s.strip())
+ if not m:
+ return 0.0
+ val, unit = m.group(1), m.group(2) or ""
+ mult = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12}[unit]
+ return float(val) * mult
+
+
+_RE_COMPLETE = re.compile(r"Hydration complete module '([0-9a-f-]+)': (\d+) files \(([^)]+)\) in (\S+)$")
+_RE_PACK = re.compile(r"Pack unpack:\s+(\d+) packs,\s+(\d+) files,\s+(\S+),\s+download (\S+),\s+unpack (\S+)")
+_RE_DLPHASE = re.compile(r"Download phase:\s+(\S+)\s+(\d+)/(\d+)\s+\(([^)]+)\) downloaded,\s+(\S+),\s+(\d+) threads")
+_RE_REQS = re.compile(r"Requests:\s+(\d+) reqs,\s+avg (\S+)/req,\s+avg (\S+)/req,\s+peak in-flight (\d+),\s+saturation (\d+)%")
+_RE_SCHED = re.compile(r"Scheduling:\s+1st schedule \+(\S+),\s+1st start \+(\S+),\s+queue wait (\S+)")
+_RE_LOADMETA = re.compile(r"Load metadata:\s+(\S+)")
+_RE_RENAME = re.compile(r"Rename/copy:\s+(\S+)")
+
+
+def parse_log(path: Path) -> list[dict]:
+ modules: list[dict] = []
+ text = path.read_text(encoding="utf-8", errors="replace").splitlines()
+ i = 0
+ while i < len(text):
+ line = text[i]
+ m = _RE_COMPLETE.search(line)
+ if not m:
+ i += 1
+ continue
+ mod = {
+ "mid": m.group(1),
+ "files": int(m.group(2)),
+ "total_bytes": parse_size(m.group(3)),
+ "wall_s": parse_time_s(m.group(4)),
+ }
+ # Read the block that follows (up to ~12 lines).
+ block = text[i : min(i + 20, len(text))]
+ joined = "\n".join(block)
+ pm = _RE_PACK.search(joined)
+ if pm:
+ mod["packs"] = int(pm.group(1))
+ mod["packed_files"] = int(pm.group(2))
+ # Uncompressed format: single size, no integrity rehash.
+ mod["pack_bytes"] = parse_size(pm.group(3))
+ mod["pack_download_s"] = parse_time_s(pm.group(4))
+ mod["pack_unpack_s"] = parse_time_s(pm.group(5))
+ else:
+ mod["packs"] = 0
+ mod["packed_files"] = 0
+ mod["pack_bytes"] = 0.0
+ mod["pack_download_s"] = 0.0
+ mod["pack_unpack_s"] = 0.0
+
+ dm = _RE_DLPHASE.search(joined)
+ if dm:
+ mod["dl_phase_s"] = parse_time_s(dm.group(1))
+ mod["dl_files"] = int(dm.group(2))
+ mod["dl_total_files"] = int(dm.group(3))
+ mod["dl_bytes"] = parse_size(dm.group(4))
+ mod["dl_throughput_bps"] = parse_rate_bits(dm.group(5))
+ mod["dl_threads"] = int(dm.group(6))
+ rm = _RE_REQS.search(joined)
+ if rm:
+ mod["reqs"] = int(rm.group(1))
+ mod["req_avg_s"] = parse_time_s(rm.group(2))
+ mod["req_avg_throughput_bps"] = parse_rate_bits(rm.group(3))
+ mod["req_peak_inflight"] = int(rm.group(4))
+ mod["req_saturation_pct"] = int(rm.group(5))
+ sm = _RE_SCHED.search(joined)
+ if sm:
+ mod["sched_1st_schedule_s"] = parse_time_s(sm.group(1))
+ mod["sched_1st_start_s"] = parse_time_s(sm.group(2))
+ mod["queue_wait_s"] = parse_time_s(sm.group(3))
+ lm = _RE_LOADMETA.search(joined)
+ if lm:
+ mod["load_metadata_s"] = parse_time_s(lm.group(1))
+ rcm = _RE_RENAME.search(joined)
+ if rcm:
+ mod["rename_copy_s"] = parse_time_s(rcm.group(1))
+
+ modules.append(mod)
+ i += 1
+
+ return modules
+
+
+def fmt_bytes(n: float) -> str:
+ for u in ("B", "KB", "MB", "GB", "TB"):
+ if n < 1024 or u == "TB":
+ return f"{n:.1f} {u}"
+ n /= 1024
+ return f"{n:.1f} TB"
+
+
+def fmt_s(s: float) -> str:
+ if s < 1e-3:
+ return f"{s*1e6:.0f} us"
+ if s < 1.0:
+ return f"{s*1e3:.1f} ms"
+ return f"{s:.2f} s"
+
+
+def agg(modules: list[dict]) -> dict:
+ a: dict = {}
+ a["count"] = len(modules)
+ a["total_files"] = sum(m["files"] for m in modules)
+ a["total_bytes"] = sum(m["total_bytes"] for m in modules)
+ a["total_reqs"] = sum(m.get("reqs", 0) for m in modules)
+ a["total_packs"] = sum(m.get("packs", 0) for m in modules)
+ a["total_packed_files"] = sum(m.get("packed_files", 0) for m in modules)
+ a["total_pack_bytes"] = sum(m.get("pack_bytes", 0.0) for m in modules)
+ a["total_dl_bytes"] = sum(m.get("dl_bytes", 0.0) for m in modules)
+ a["total_dl_phase_s"] = sum(m.get("dl_phase_s", 0.0) for m in modules)
+ a["total_pack_download_s"] = sum(m.get("pack_download_s", 0.0) for m in modules)
+ a["total_pack_unpack_s"] = sum(m.get("pack_unpack_s", 0.0) for m in modules)
+ a["total_rename_s"] = sum(m.get("rename_copy_s", 0.0) for m in modules)
+ a["total_load_meta_s"] = sum(m.get("load_metadata_s", 0.0) for m in modules)
+ a["total_wall_s"] = sum(m["wall_s"] for m in modules)
+
+ # Per-module medians and percentiles of request-level metrics
+ req_avgs = [m["req_avg_s"] for m in modules if m.get("reqs", 0) > 0]
+ a["req_avg_latency_median_s"] = median(req_avgs) if req_avgs else 0.0
+ a["req_avg_latency_mean_s"] = mean(req_avgs) if req_avgs else 0.0
+ a["req_avg_latency_p95_s"] = (
+ sorted(req_avgs)[int(0.95 * len(req_avgs))] if req_avgs else 0.0
+ )
+
+ queue_waits = [m["queue_wait_s"] for m in modules if "queue_wait_s" in m]
+ a["queue_wait_median_s"] = median(queue_waits) if queue_waits else 0.0
+ a["queue_wait_mean_s"] = mean(queue_waits) if queue_waits else 0.0
+ a["queue_wait_p95_s"] = (
+ sorted(queue_waits)[int(0.95 * len(queue_waits))] if queue_waits else 0.0
+ )
+ a["queue_wait_max_s"] = max(queue_waits) if queue_waits else 0.0
+
+ first_starts = [m["sched_1st_start_s"] for m in modules if "sched_1st_start_s" in m]
+ a["first_start_median_s"] = median(first_starts) if first_starts else 0.0
+ a["first_start_p95_s"] = (
+ sorted(first_starts)[int(0.95 * len(first_starts))] if first_starts else 0.0
+ )
+
+ wall_times = [m["wall_s"] for m in modules]
+ a["wall_median_s"] = median(wall_times) if wall_times else 0.0
+ a["wall_mean_s"] = mean(wall_times) if wall_times else 0.0
+ a["wall_p95_s"] = sorted(wall_times)[int(0.95 * len(wall_times))] if wall_times else 0.0
+ a["wall_max_s"] = max(wall_times) if wall_times else 0.0
+
+ saturations = [m["req_saturation_pct"] for m in modules if "req_saturation_pct" in m]
+ a["saturation_median_pct"] = median(saturations) if saturations else 0
+ a["saturation_mean_pct"] = mean(saturations) if saturations else 0
+
+ peak_inflights = [m["req_peak_inflight"] for m in modules if "req_peak_inflight" in m]
+ a["peak_inflight_median"] = median(peak_inflights) if peak_inflights else 0
+ a["peak_inflight_max"] = max(peak_inflights) if peak_inflights else 0
+
+ return a
+
+
+def print_row(label: str, packed_val: str, unpacked_val: str, delta: str = ""):
+ print(f" {label:<42} {packed_val:>14} {unpacked_val:>14} {delta}")
+
+
+def delta_pct(packed: float, unpacked: float) -> str:
+ if unpacked == 0:
+ return ""
+ d = packed - unpacked
+ pct = (d / unpacked) * 100.0
+ sign = "+" if d >= 0 else ""
+ return f"{sign}{pct:.1f}%"
+
+
+def main() -> int:
+ p = argparse.ArgumentParser()
+ p.add_argument("--packed-log", required=True)
+ p.add_argument("--unpacked-log", required=True)
+ args = p.parse_args()
+
+ pm = parse_log(Path(args.packed_log))
+ um = parse_log(Path(args.unpacked_log))
+
+ pa = agg(pm)
+ ua = agg(um)
+
+ print(f"\nParsed packed: {pa['count']} modules from {args.packed_log}")
+ print(f"Parsed unpacked: {ua['count']} modules from {args.unpacked_log}\n")
+
+ print(f" {'metric':<42} {'packed':>14} {'unpacked':>14} delta(p vs u)")
+ print(f" {'-'*42} {'-'*14} {'-'*14} {'-'*12}")
+
+ print("\n-- workload --")
+ print_row("modules", f"{pa['count']}", f"{ua['count']}")
+ print_row("total files (entries)", f"{pa['total_files']}", f"{ua['total_files']}")
+ print_row("total raw data", fmt_bytes(pa["total_bytes"]), fmt_bytes(ua["total_bytes"]))
+
+ print("\n-- S3 requests --")
+ print_row("total S3 GETs",
+ f"{pa['total_reqs']}", f"{ua['total_reqs']}",
+ delta_pct(pa["total_reqs"], ua["total_reqs"]))
+ print_row("GETs per module (mean)",
+ f"{pa['total_reqs']/pa['count']:.1f}",
+ f"{ua['total_reqs']/ua['count']:.1f}",
+ delta_pct(pa['total_reqs']/pa['count'], ua['total_reqs']/ua['count']))
+ print_row("requests saved by packing",
+ f"{ua['total_reqs'] - pa['total_reqs']}", "-",
+ f"{((ua['total_reqs'] - pa['total_reqs']) / ua['total_reqs']) * 100:.1f}%")
+
+ print("\n-- payload --")
+ print_row("total bytes downloaded",
+ fmt_bytes(pa["total_dl_bytes"]),
+ fmt_bytes(ua["total_dl_bytes"]),
+ delta_pct(pa["total_dl_bytes"], ua["total_dl_bytes"]))
+ print_row(" pack bytes (aggregate)",
+ fmt_bytes(pa["total_pack_bytes"]), "-")
+ diff = pa["total_dl_bytes"] - ua["total_dl_bytes"]
+ direction = "saved by packing" if diff < 0 else "added by packing"
+ print_row(f"bytes {direction}", fmt_bytes(abs(diff)), "-")
+
+ print("\n-- per-request latency (per-module avg) --")
+ print_row("median req avg latency",
+ fmt_s(pa["req_avg_latency_median_s"]),
+ fmt_s(ua["req_avg_latency_median_s"]),
+ delta_pct(pa["req_avg_latency_median_s"], ua["req_avg_latency_median_s"]))
+ print_row("mean req avg latency",
+ fmt_s(pa["req_avg_latency_mean_s"]),
+ fmt_s(ua["req_avg_latency_mean_s"]),
+ delta_pct(pa["req_avg_latency_mean_s"], ua["req_avg_latency_mean_s"]))
+ print_row("p95 req avg latency",
+ fmt_s(pa["req_avg_latency_p95_s"]),
+ fmt_s(ua["req_avg_latency_p95_s"]),
+ delta_pct(pa["req_avg_latency_p95_s"], ua["req_avg_latency_p95_s"]))
+
+ print("\n-- queue / scheduling --")
+ print_row("median queue wait (per module)",
+ fmt_s(pa["queue_wait_median_s"]),
+ fmt_s(ua["queue_wait_median_s"]),
+ delta_pct(pa["queue_wait_median_s"], ua["queue_wait_median_s"]))
+ print_row("mean queue wait",
+ fmt_s(pa["queue_wait_mean_s"]),
+ fmt_s(ua["queue_wait_mean_s"]),
+ delta_pct(pa["queue_wait_mean_s"], ua["queue_wait_mean_s"]))
+ print_row("p95 queue wait",
+ fmt_s(pa["queue_wait_p95_s"]),
+ fmt_s(ua["queue_wait_p95_s"]),
+ delta_pct(pa["queue_wait_p95_s"], ua["queue_wait_p95_s"]))
+ print_row("max queue wait",
+ fmt_s(pa["queue_wait_max_s"]),
+ fmt_s(ua["queue_wait_max_s"]),
+ delta_pct(pa["queue_wait_max_s"], ua["queue_wait_max_s"]))
+ print_row("median 1st-start delay",
+ fmt_s(pa["first_start_median_s"]),
+ fmt_s(ua["first_start_median_s"]),
+ delta_pct(pa["first_start_median_s"], ua["first_start_median_s"]))
+
+ print("\n-- per-module wall time (hydration block) --")
+ print_row("median",
+ fmt_s(pa["wall_median_s"]),
+ fmt_s(ua["wall_median_s"]),
+ delta_pct(pa["wall_median_s"], ua["wall_median_s"]))
+ print_row("mean",
+ fmt_s(pa["wall_mean_s"]),
+ fmt_s(ua["wall_mean_s"]),
+ delta_pct(pa["wall_mean_s"], ua["wall_mean_s"]))
+ print_row("p95",
+ fmt_s(pa["wall_p95_s"]),
+ fmt_s(ua["wall_p95_s"]),
+ delta_pct(pa["wall_p95_s"], ua["wall_p95_s"]))
+ print_row("max",
+ fmt_s(pa["wall_max_s"]),
+ fmt_s(ua["wall_max_s"]),
+ delta_pct(pa["wall_max_s"], ua["wall_max_s"]))
+
+ print("\n-- thread-pool saturation / concurrency --")
+ print_row("saturation median %",
+ f"{pa['saturation_median_pct']}",
+ f"{ua['saturation_median_pct']}")
+ print_row("saturation mean %",
+ f"{pa['saturation_mean_pct']:.1f}",
+ f"{ua['saturation_mean_pct']:.1f}")
+ print_row("peak in-flight median",
+ f"{pa['peak_inflight_median']}",
+ f"{ua['peak_inflight_median']}")
+ print_row("peak in-flight max",
+ f"{pa['peak_inflight_max']}",
+ f"{ua['peak_inflight_max']}")
+
+ print("\n-- pack-specific costs (packed run only) --")
+ print(f" pack GETs added: {pa['total_packs']} ({pa['total_packs']/pa['count']:.2f}/module)")
+ print(f" small files folded: {pa['total_packed_files']} ({pa['total_packed_files']/pa['count']:.1f}/module)")
+ print(f" pack download time total: {pa['total_pack_download_s']:.1f} s")
+ print(f" pack unpack total: {pa['total_pack_unpack_s']:.2f} s")
+
+ print("\n-- per-phase totals (sum across modules, wall-clock contribution) --")
+ print_row("load metadata total",
+ fmt_s(pa["total_load_meta_s"]),
+ fmt_s(ua["total_load_meta_s"]),
+ delta_pct(pa["total_load_meta_s"], ua["total_load_meta_s"]))
+ print_row("download phase total",
+ fmt_s(pa["total_dl_phase_s"]),
+ fmt_s(ua["total_dl_phase_s"]),
+ delta_pct(pa["total_dl_phase_s"], ua["total_dl_phase_s"]))
+ print_row("rename/copy total",
+ fmt_s(pa["total_rename_s"]),
+ fmt_s(ua["total_rename_s"]),
+ delta_pct(pa["total_rename_s"], ua["total_rename_s"]))
+ print_row("hydration wall sum",
+ fmt_s(pa["total_wall_s"]),
+ fmt_s(ua["total_wall_s"]),
+ delta_pct(pa["total_wall_s"], ua["total_wall_s"]))
+
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py
new file mode 100644
index 000000000..23014409c
--- /dev/null
+++ b/scripts/test_scripts/hub/hub_load_test_s3.py
@@ -0,0 +1,537 @@
+#!/usr/bin/env python3
+"""Hub provision/deprovision sweep against a real S3 bucket.
+
+Lists the top-level folders in S3_URI (each folder name is a moduleId),
+picks 200 of them, then:
+ 1. fires all provision requests concurrently,
+ 2. polls until every module reaches 'provisioned',
+ 3. waits 5 seconds,
+ 4. fires all deprovision requests concurrently,
+ 5. polls until every module is deprovisioned,
+ 6. waits 5 seconds,
+ 7. shuts down the hub.
+
+Required environment variables (or pass via CLI flags):
+ ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/
+ ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access
+ ZEN_PERF_AWS_REGION defaults to us-east-1
+
+Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO
+session is missing or expired, the script triggers the login automatically.
+
+Requirements:
+ pip install boto3
+ aws CLI v2
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "")
+_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "")
+_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1")
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}"))
+ if matches:
+ return max(matches, key=lambda p: p.stat().st_mtime)
+
+ sys.exit(
+ "zenserver debug executable not found in build/. "
+ "Run: xmake config -y -m debug -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+# ---------------------------------------------------------------------------
+# AWS SSO + S3 listing
+# ---------------------------------------------------------------------------
+
+def _require_boto3():
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit(
+ "[aws] boto3 is required.\n"
+ "Install it with: pip install boto3"
+ )
+ return boto3, botocore.exceptions
+
+
+def _sso_login(profile: str) -> None:
+ print(f"[aws] running 'aws sso login --profile {profile}'...")
+ rc = subprocess.call(["aws", "sso", "login", "--profile", profile])
+ if rc != 0:
+ sys.exit(f"[aws] 'aws sso login' failed with rc={rc}")
+
+
+def _get_session(profile: str):
+ boto3, exc_mod = _require_boto3()
+
+ def _load_frozen():
+ session = boto3.Session(profile_name=profile)
+ creds = session.get_credentials()
+ if creds is None:
+ return session, None
+ return session, creds.get_frozen_credentials()
+
+ try:
+ session, frozen = _load_frozen()
+ if frozen is not None and frozen.access_key and frozen.secret_key:
+ return session, frozen
+ except exc_mod.ProfileNotFound:
+ sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config")
+ except Exception as e:
+ print(f"[aws] initial credential load failed: {e}")
+
+ _sso_login(profile)
+
+ session, frozen = _load_frozen()
+ if frozen is None or not frozen.access_key:
+ sys.exit("[aws] could not resolve credentials after sso login")
+ return session, frozen
+
+
+def _parse_s3_uri(uri: str) -> tuple[str, str]:
+ if not uri.startswith("s3://"):
+ sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}")
+ rest = uri[len("s3://"):]
+ if "/" in rest:
+ bucket, prefix = rest.split("/", 1)
+ else:
+ bucket, prefix = rest, ""
+ return bucket, prefix
+
+
+def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]:
+ s3 = session.client("s3", region_name=region)
+ prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
+
+ paginator = s3.get_paginator("list_objects_v2")
+ module_ids: list[str] = []
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"):
+ for cp in page.get("CommonPrefixes", []) or []:
+ full = cp.get("Prefix", "")
+ # Strip outer prefix + trailing slash to get the folder name
+ folder = full[len(prefix_norm):].rstrip("/")
+ if folder:
+ module_ids.append(folder)
+ return module_ids
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ out: dict[str, str] = {}
+ for m in data.get("modules", []) or []:
+ mid = m.get("moduleId")
+ if mid:
+ out[mid] = m.get("state", "")
+ return out
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """Fire POST /hub/modules/<id>/<verb> for every module concurrently.
+
+ Returns (accepted_ids, failures) where accepted_ids got 200/202 and
+ failures is a list of (mid, status, body) for everything else.
+ """
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ # Already gone - treat as success for deprovision fan-out.
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+def _wait_for_provisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is 'provisioned' or gone.
+
+ Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach
+ 'provisioned' within timeout_s; last_states maps all module_ids to their
+ last-seen state (or empty string if the module was absent from the report).
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ now_done: list[str] = []
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == "provisioned":
+ now_done.append(mid)
+ elif s in ("", "unprovisioned", "crashed"):
+ # Not useful end-states for "waiting to become provisioned",
+ # but we don't block forever on them either - let timeout decide.
+ pass
+ for mid in now_done:
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+def _wait_for_deprovisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], dict[str, str]]:
+ """Poll until every module in module_ids is gone from hub status."""
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+
+ done = len(module_ids) - len(remaining)
+ print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r")
+ time.sleep(2.0)
+
+ return list(remaining), last_states
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3",
+ help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--module-count", type=int, default=200,
+ help="Number of modules to sweep (default: 200)")
+ parser.add_argument("--settle-seconds", type=float, default=5.0,
+ help="Seconds to wait between provision-complete and deprovision, "
+ "and between deprovision-complete and shutdown (default: 5.0)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)")
+ parser.add_argument("--poll-timeout", type=float, default=600.0,
+ help="Max seconds to wait for provision or deprovision to finish (default: 600)")
+ parser.add_argument("--trace",
+ nargs="?", const="default", default=None, metavar="CHANNELS",
+ help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. "
+ "Optionally pass channel spec (default: 'default')")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ args = parser.parse_args()
+
+ s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI)
+ aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE)
+ aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION)
+ if not s3_uri:
+ sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/")
+ if not aws_profile:
+ sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name")
+
+ data_dir = Path(args.data_dir)
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] S3 URI: {s3_uri}")
+ print(f"[setup] profile: {aws_profile}")
+ print(f"[setup] region: {aws_region}")
+
+ session, frozen = _get_session(aws_profile)
+ print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})")
+
+ bucket, prefix = _parse_s3_uri(s3_uri)
+ print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...")
+ module_ids = _list_module_ids(session, bucket, prefix, aws_region)
+ print(f"[s3] found {len(module_ids)} module folders")
+
+ if not module_ids:
+ sys.exit("[s3] no module folders found, aborting")
+
+ selected = module_ids[:args.module_count]
+ if len(selected) < args.module_count:
+ print(f"[s3] only {len(selected)} folders available (requested {args.module_count})")
+
+ aws_env = {
+ "AWS_ACCESS_KEY_ID": frozen.access_key,
+ "AWS_SECRET_ACCESS_KEY": frozen.secret_key,
+ }
+ if frozen.token:
+ aws_env["AWS_SESSION_TOKEN"] = frozen.token
+
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {"uri": s3_uri, "region": aws_region},
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back.
+ "--hub-enable-dehydration=false",
+ ]
+ if args.trace:
+ trace_path = data_dir / "hub.utrace"
+ hub_extra_args += [
+ f"--trace={args.trace}",
+ f"--tracefile={trace_path}",
+ ]
+ print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+
+ try:
+ hub_instance_limit = max(args.module_count, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port, hub_log,
+ hub_instance_limit, hub_extra_args, aws_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision phase ---
+ print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...")
+ t_pv0 = time.monotonic()
+ accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision")
+ print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} "
+ f"(fan-out {time.monotonic() - t_pv0:.1f}s)")
+ for mid, status, body in pv_failures[:10]:
+ print(f"[provision] FAILED {mid}: status={status} body={body}")
+ if len(pv_failures) > 10:
+ print(f"[provision] ... and {len(pv_failures) - 10} more failures")
+
+ if hub_proc.poll() is not None:
+ print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
+ return
+
+ if accepted:
+ stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout)
+ provisioned_count = len(accepted) - len(stuck)
+ print()
+ print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' "
+ f"({time.monotonic() - t_pv0:.1f}s total)")
+ if stuck:
+ print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s")
+ for mid in stuck[:10]:
+ print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'")
+ else:
+ print("[provision] nothing provisioned")
+ return
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...")
+ time.sleep(args.settle_seconds)
+
+ # --- Deprovision phase ---
+ print(f"[deprovision] firing {len(accepted)} deprovision requests...")
+ t_dp0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision")
+ print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} "
+ f"(fan-out {time.monotonic() - t_dp0:.1f}s)")
+ for mid, status, body in dp_failures[:10]:
+ print(f"[deprovision] FAILED {mid}: status={status} body={body}")
+ if len(dp_failures) > 10:
+ print(f"[deprovision] ... and {len(dp_failures) - 10} more failures")
+
+ stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout)
+ deprovisioned_count = len(dp_accepted) - len(stuck_dp)
+ print()
+ print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone "
+ f"({time.monotonic() - t_dp0:.1f}s total)")
+ if stuck_dp:
+ print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s")
+ for mid in stuck_dp[:10]:
+ print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'")
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...")
+ time.sleep(args.settle_seconds)
+
+ print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ _stop_process(hub_proc, "hub", timeout_s=120.0)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua
new file mode 100644
index 000000000..f3cf3e697
--- /dev/null
+++ b/scripts/test_scripts/hub/perf_configs/hub.lua
@@ -0,0 +1,48 @@
+-- Perf-test hub config: mirrors the production hub config in the repo root
+-- (hub.lua / instance.lua). The launch script overrides hub.instance.config
+-- and the effective concurrency (--corelimit=128) via CLI to simulate an
+-- r5n.32xlarge box so the default auto-picks match what prod sees there.
+
+hub = {
+ instance = {
+ baseportnumber = 21000, -- default
+ limits = {
+ count = 1100, -- headroom for 1000-module perf runs
+ memorylimitpercent = 90, -- default: 0 (disabled)
+ disklimitpercent = 90, -- default: 0 (disabled)
+ },
+ corelimit = 4, -- default: 0 (auto)
+ provisionthreads = 8, -- default: auto
+ -- NOTE: hub.instance.config (path to instance lua) is overridden via
+ -- --hub-instance-config on the CLI. If left here, it would be resolved
+ -- relative to the hub's CWD at spawn time (NOT this file's dir).
+ },
+
+ hydration = {
+ -- Match production's per-module download pool size. Without this, the
+ -- default auto-picks hardware_concurrency/4 which on --corelimit=128
+ -- would be 32. Prod logs consistently show "16 threads" in Download phase.
+ threads = 16,
+ },
+
+ watchdog = {
+ cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan
+ cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count
+ instancecheckthrottlems = 10, -- default: 5. slight throttle to reduce hub CPU
+ provisionedinactivitytimeoutseconds = 600, -- default
+ hibernatedinactivitytimeoutseconds = 1800, -- default
+ inactivitycheckmarginseconds = 60, -- default
+ },
+}
+
+network = {
+ httpserverthreads = 8, -- default: auto. hub itself needs few threads
+}
+
+server = {
+ dedicated = true, -- default: false. signals build-farm use, affects thread scaling heuristics
+}
+
+gc = {
+ enabled = false, -- default: true. hub has no storage, no need for GC
+}
diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua
new file mode 100644
index 000000000..1251997db
--- /dev/null
+++ b/scripts/test_scripts/hub/perf_configs/instance.lua
@@ -0,0 +1,15 @@
+network = {
+ httpserverthreads = 4, -- default: auto
+}
+
+gc = {
+ enabled = false, -- default: true. hub triggers GC at deprovision, no need for periodic GC
+}
+
+cache = {
+ bucket = {
+ memlayer = {
+ sizethreshold = 0, -- default: 1024. 0 disables memlayer entirely
+ },
+ },
+}
diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py
new file mode 100644
index 000000000..365e4a542
--- /dev/null
+++ b/scripts/test_scripts/hub/preserve_minio_state.py
@@ -0,0 +1,126 @@
+#!/usr/bin/env python3
+"""Preserve a freshly-seeded MinIO data directory.
+
+Run this after seed_minio.py has finished. By default it MOVES --source (the
+MinIO --data-dir used during seeding) to --dest (the preservation path) and
+writes a README with provenance so future perf runs start from a known
+baseline. Move avoids a ~0.5 TB copy on a full 1000-module seed; Stage B
+wipes --minio-data-dir on its next invocation anyway.
+
+Pass --copy to keep --source in place (slower; needs 2x disk).
+
+Typical invocation:
+ python preserve_minio_state.py
+
+Defaults map to the paths recommended by PERF_SEED_README.md.
+"""
+
+from __future__ import annotations
+
+import argparse
+import datetime
+import os
+import shutil
+import stat
+import sys
+from pathlib import Path
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ def _onerror(func, p, exc_info):
+ try:
+ os.chmod(p, stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+def _size_of(path: Path) -> tuple[int, int]:
+ files = 0
+ total = 0
+ for root, _dirs, names in os.walk(path):
+ for n in names:
+ p = Path(root) / n
+ try:
+ total += p.stat().st_size
+ except OSError:
+ pass
+ files += 1
+ return files, total
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data",
+ help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)")
+ parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
+ help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
+ "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
+ parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""),
+ help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)")
+ parser.add_argument("--bucket", default="zen-seed",
+ help="MinIO bucket name recorded in the README")
+ parser.add_argument("--module-count", type=int, default=300,
+ help="Module count recorded in the README")
+ parser.add_argument("--copy", action="store_true",
+ help="Copy --source to --dest instead of moving it. Default is move "
+ "(fast, in-place rename when on the same volume). Use --copy if you "
+ "want to keep --source intact for another preserve run.")
+ args = parser.parse_args()
+
+ source = Path(args.source).resolve()
+ dest = Path(args.dest).resolve()
+
+ if not source.is_dir():
+ sys.exit(f"[preserve] source not found: {source}")
+
+ # Dest is wiped and rewritten. Refuse any path that would clobber source.
+ if dest == source or dest in source.parents or source in dest.parents:
+ sys.exit(f"[preserve] source ({source}) and dest ({dest}) must be disjoint")
+
+ files, total = _size_of(source)
+ mode = "copy" if args.copy else "move"
+ print(f"[preserve] source: {source} -> {files:,} files, {total/1024/1024:.1f} MB")
+ print(f"[preserve] dest: {dest}")
+ print(f"[preserve] mode: {mode}")
+
+ if dest.exists():
+ print(f"[preserve] removing existing dest {dest}")
+ _rmtree_robust(dest)
+
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ if args.copy:
+ shutil.copytree(source, dest, symlinks=False)
+ else:
+ shutil.move(str(source), str(dest))
+
+ readme = dest / "README.txt"
+ readme.write_text(
+ "\n".join([
+ "zen-perf-seed preserved MinIO state",
+ "",
+ f"Created: {datetime.datetime.now(datetime.timezone.utc).isoformat()}",
+ f"Source s3 URI: {args.s3_uri}",
+ f"Bucket: {args.bucket}",
+ f"Modules: {args.module_count}",
+ f"Files: {files:,}",
+ f"Bytes: {total:,}",
+ "",
+ "To run a perf iteration: copy this directory onto a fresh MinIO data",
+ "dir (see scripts/test_scripts/hub/run_minio_perf.py) and point a hub at it.",
+ "",
+ ]),
+ encoding="ascii",
+ )
+ print(f"[preserve] wrote {readme}")
+ return 0
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py
new file mode 100644
index 000000000..b59cff41a
--- /dev/null
+++ b/scripts/test_scripts/hub/run_minio_perf.py
@@ -0,0 +1,614 @@
+#!/usr/bin/env python3
+"""Stage C of the perf-seed workflow: run a perf iteration against the
+preserved MinIO state.
+
+Flow per invocation:
+ 1. Reset --minio-run by copying --minio-seeded over it (so every iteration
+ starts from the same canonical state).
+ 2. Start MinIO against --minio-run.
+ 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so
+ the perf run is read-only and the seeded baseline survives intact.
+ Pass --enable-dehydration to run a full provision -> deprovision cycle
+ that re-uploads at deprovision time (use this to measure the dehydrate
+ phase as well as hydrate; the seeded baseline diverges after such a run).
+ 4. Provision every module found under --snapshot-dir (same set that was
+ used to seed MinIO), wait for all to reach 'provisioned'.
+ 5. Deprovision them all, wait for them to be gone.
+ 6. Stop hub + MinIO cleanly.
+
+Optional --trace writes a utrace file to --hub-data-dir/hub.utrace.
+
+This script reuses the lightweight harness helpers from the other stages but
+keeps the run fully offline once the seed is preserved.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import shutil
+import signal
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p}")
+ return p
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"minio executable not found at {p}")
+ return p
+
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(data_dir),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out after {timeout_s}s")
+
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ # Use the production-like Lua config (limits, thread counts, watchdog timers)
+ # so perf numbers reflect what prod actually runs. Hub.hydration.threads is
+ # left unset so the default auto-pick (hardware_concurrency/4) fires; combined
+ # with --corelimit=128 below, that yields 32 hydration threads on the actual
+ # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes.
+ config_dir = Path(__file__).resolve().parent / "perf_configs"
+ hub_config = config_dir / "hub.lua"
+ inst_config = config_dir / "instance.lua"
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ f"--config={hub_config}",
+ f"--hub-instance-config={inst_config}",
+ # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box
+ # with fewer physical cores, this clamps to the actual count (see
+ # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates).
+ # When running on the actual prod instance class, this makes the default
+ # auto-picks (provision/hydration thread counts) behave as prod does.
+ "--corelimit=128",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs)
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """'zen down --pid' for graceful hub shutdown."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
+ if proc.poll() is not None:
+ return
+ try:
+ if sys.platform == "win32":
+ proc.send_signal(signal.CTRL_BREAK_EVENT)
+ else:
+ proc.terminate()
+ except Exception:
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[minio] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_states(port: int) -> Optional[dict[str, str]]:
+ # Without Accept: application/json the hub serves compact binary (CBOR)
+ # which json.loads can't parse. 60s timeout - /hub/status can take a few
+ # seconds to serialize with 300+ modules in flight.
+ req = urllib.request.Request(
+ f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"},
+ )
+ try:
+ with urllib.request.urlopen(req, timeout=60) as resp:
+ data = json.loads(resp.read())
+ except Exception as e:
+ # Visibility: prior silent None caused "0/300 provisioned..." forever.
+ print(f"[status] WARN /hub/status failed: {e}")
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids}
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+def _state_histogram(states: dict[str, str], remaining: set[str]) -> str:
+ counts: dict[str, int] = {}
+ for mid in remaining:
+ s = states.get(mid, "<absent>")
+ counts[s] = counts.get(s, 0) + 1
+ return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1]))
+
+
+def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(ids)
+ last_verbose = 0.0
+ while remaining and time.monotonic() < deadline:
+ states = _hub_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ if states.get(mid) == "provisioned":
+ remaining.discard(mid)
+ done = len(ids) - len(remaining)
+ now = time.monotonic()
+ if states is not None and (now - last_verbose >= 10.0 or not remaining):
+ hist = _state_histogram(states, remaining) if remaining else "(all provisioned)"
+ print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}")
+ last_verbose = now
+ else:
+ print(f"[provision] {done}/{len(ids)} provisioned...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining)
+
+
+def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(ids)
+ last_verbose = 0.0
+ while remaining and time.monotonic() < deadline:
+ states = _hub_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+ done = len(ids) - len(remaining)
+ now = time.monotonic()
+ if states is not None and (now - last_verbose >= 10.0 or not remaining):
+ hist = _state_histogram(states, remaining) if remaining else "(all gone)"
+ print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}")
+ last_verbose = now
+ else:
+ print(f"[deprovision] {done}/{len(ids)} gone...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining)
+
+
+def _robust_copytree(src: Path, dst: Path) -> None:
+ """Windows-friendly directory copy with progress.
+
+ Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy
+ file data + attributes + timestamps explicitly (not just the defaults).
+ Safe because dst is always the working dir (minio-run) - never the
+ preserved baseline.
+ """
+ if sys.platform == "win32":
+ cmd = [
+ "robocopy", str(src), str(dst),
+ "/MIR",
+ "/COPY:DAT", "/DCOPY:DAT",
+ "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP",
+ "/R:2", "/W:1",
+ ]
+ print(f"[reset] robocopy {src} -> {dst}")
+ rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ if rc >= 8:
+ sys.exit(f"[reset] robocopy failed rc={rc}")
+ else:
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False)
+
+
+def _archive_run(
+ archive_dir: Path,
+ hub_data_dir: Path,
+ bucket: str,
+ summary: dict,
+) -> Optional[Path]:
+ """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a
+ timestamped subdir under archive_dir so successive runs can be compared.
+
+ Returns the archive destination path or None if there was nothing to copy.
+ """
+ try:
+ ts = time.strftime("%Y%m%d-%H%M%S", time.localtime())
+ dest = archive_dir / f"{ts}_{bucket}"
+ dest.mkdir(parents=True, exist_ok=True)
+
+ copied: list[str] = []
+ # Hub stdout/stderr (captured by _start_hub via subprocess stdout=)
+ hub_log = hub_data_dir / "hub.log"
+ if hub_log.is_file():
+ shutil.copy2(hub_log, dest / "hub.log")
+ copied.append("hub.log")
+ # Structured zenserver logs (rotated variants included)
+ logs_src = hub_data_dir / "logs"
+ if logs_src.is_dir():
+ logs_dst = dest / "logs"
+ logs_dst.mkdir(exist_ok=True)
+ for p in logs_src.iterdir():
+ if p.is_file():
+ shutil.copy2(p, logs_dst / p.name)
+ copied.append(f"logs/{p.name}")
+ # Optional trace
+ utrace = hub_data_dir / "hub.utrace"
+ if utrace.is_file():
+ shutil.copy2(utrace, dest / "hub.utrace")
+ copied.append("hub.utrace")
+
+ (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii")
+
+ print(f"[archive] {dest} ({len(copied)} files)")
+ return dest
+ except Exception as e:
+ print(f"[archive] WARNING: failed to archive run: {e}")
+ return None
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed",
+ help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). "
+ "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.")
+ parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run",
+ help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)")
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf",
+ help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf")
+ parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs",
+ help="Where to archive hub.log + zenserver.log + hub.utrace after each run "
+ "(default: E:/Dev/zen-perf-seed/perf-runs)")
+ parser.add_argument("--bucket", default="zen-seed-packed",
+ help="MinIO bucket name to exercise (default: zen-seed-packed). "
+ "Pack worktree seeds only the packed bucket.")
+ parser.add_argument("--minio-port", type=int, default=9000)
+ parser.add_argument("--minio-console-port", type=int, default=9001)
+ parser.add_argument("--hub-port", type=int, default=8558)
+ parser.add_argument("--module-count", type=int, default=0,
+ help="Cap on modules used (0 = all from snapshot-dir)")
+ parser.add_argument("--workers", type=int, default=50)
+ parser.add_argument("--poll-timeout", type=float, default=1200.0)
+ parser.add_argument("--settle-seconds", type=float, default=5.0)
+ parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS",
+ help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')")
+ parser.add_argument("--enable-dehydration", action="store_true",
+ help="Run with --hub-enable-dehydration=true so the deprovision phase "
+ "actually re-uploads to MinIO. Default false preserves the seeded "
+ "baseline; pass this to measure dehydrate cost as well as hydrate.")
+ parser.add_argument("--no-wipe-hub", action="store_true",
+ help="Don't wipe the hub data dir before starting (useful to inspect leftover state)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver + minio executables (auto-detected)")
+ args = parser.parse_args()
+
+ minio_seeded = Path(args.minio_seeded).resolve()
+ minio_run = Path(args.minio_run).resolve()
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+ hub_data_dir = Path(args.hub_data_dir).resolve()
+ archive_dir = Path(args.archive_dir).resolve()
+
+ if not minio_seeded.is_dir():
+ sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}")
+ if not snapshot_dir.is_dir():
+ sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}")
+
+ # Preserved inputs are read-only from this script's perspective. Refuse to
+ # run if any mutable path could accidentally clobber them.
+ for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)):
+ for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)):
+ if mutable == ro or mutable in ro.parents or ro in mutable.parents:
+ sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}")
+
+ module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()])
+ if args.module_count > 0:
+ module_ids = module_ids[:args.module_count]
+ if not module_ids:
+ sys.exit(f"[setup] no modules in {snapshot_dir}")
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ minio_exe = _find_minio(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] minio: {minio_exe}")
+ print(f"[setup] minio-seeded: {minio_seeded}")
+ print(f"[setup] minio-run: {minio_run}")
+ print(f"[setup] hub-data-dir: {hub_data_dir}")
+ print(f"[setup] modules: {len(module_ids)}")
+
+ # Reset MinIO run dir from the baseline.
+ _robust_copytree(minio_seeded, minio_run)
+
+ # Wipe the hub data dir so every run starts from scratch unless the user opts out.
+ if not args.no_wipe_hub and hub_data_dir.exists():
+ print(f"[reset] wiping {hub_data_dir}")
+ _rmtree_robust(hub_data_dir)
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}",
+ ]
+ if args.enable_dehydration:
+ print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge")
+ if args.trace:
+ trace_path = hub_data_dir / "hub.utrace"
+ hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"]
+ print(f"[trace] enabled channels='{args.trace}', file={trace_path}")
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+ timings: dict = {
+ "bucket": args.bucket,
+ "module_count": len(module_ids),
+ "build_mode": zenserver_mode,
+ "provision_fanout_s": None,
+ "provision_total_s": None,
+ "deprovision_fanout_s": None,
+ "deprovision_total_s": None,
+ "total_s": None,
+ }
+
+ try:
+ minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+
+ hub_log = hub_data_dir / "hub.log"
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, args.hub_port, hub_log,
+ hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, args.hub_port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision ---
+ print(f"[provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision")
+ fanout_s = time.monotonic() - t0
+ timings["provision_fanout_s"] = round(fanout_s, 2)
+ print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)")
+ for mid, status, body in failures[:5]:
+ print(f"[provision] FAIL {mid}: status={status} body={body}")
+ if failures:
+ exit_code = 1
+ stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout)
+ total_s = time.monotonic() - t0
+ timings["provision_total_s"] = round(total_s, 2)
+ print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})")
+ if stuck:
+ exit_code = 1
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s")
+ time.sleep(args.settle_seconds)
+
+ # --- Deprovision ---
+ print(f"[deprovision] firing {len(accepted)} deprovision requests...")
+ t0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision")
+ fanout_s = time.monotonic() - t0
+ timings["deprovision_fanout_s"] = round(fanout_s, 2)
+ print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)")
+ if dp_failures:
+ exit_code = 1
+ stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout)
+ total_s = time.monotonic() - t0
+ timings["deprovision_total_s"] = round(total_s, 2)
+ print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})")
+ if stuck_dp:
+ exit_code = 1
+
+ print(f"[settle] waiting {args.settle_seconds:.0f}s")
+ time.sleep(args.settle_seconds)
+
+ elapsed = time.monotonic() - t_start
+ timings["total_s"] = round(elapsed, 2)
+ print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ _zen_down_hub(zen_exe, hub_proc)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None and minio_proc.poll() is None:
+ _stop_minio_graceful(minio_proc)
+ # Archive AFTER the hub has exited so in-flight log writes are flushed.
+ timings["exit_code"] = exit_code
+ _archive_run(archive_dir, hub_data_dir, args.bucket, timings)
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py
new file mode 100644
index 000000000..e0e45c4cb
--- /dev/null
+++ b/scripts/test_scripts/hub/seed_minio.py
@@ -0,0 +1,720 @@
+#!/usr/bin/env python3
+"""Stage B of the perf-seed workflow.
+
+Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO
+bucket so a later perf run can exercise that bucket. Pack mode is hardcoded
+in this script (see --hub-hydration-enable-pack flag in _start_hub) and
+governs how the hub uploads into MinIO. To compare packed vs unpacked,
+invoke this script twice from two separate worktrees - one with the pack
+flag flipped to false - producing two preserved MinIO data dirs, then run
+run_minio_perf.py against each.
+
+Flow:
+ 1. Start a local MinIO server against --minio-data-dir, create the bucket.
+ 2. Start a hub against MinIO (--bucket as target), dehydration ON,
+ hibernated-watchdog timeout set huge so hibernated modules are not
+ auto-deprovisioned before we deprovision them ourselves.
+ 3. Provision every module discovered under --snapshot-dir (fresh hydrate
+ against an empty bucket spawns the child on an empty dir).
+ 4. Hibernate every module (child exits, state dir unlocked).
+ 5. Overlay the snapshot into <hub-data-dir>/servers/<mid>/.
+ 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(),
+ which scans the overlaid ServerStateDir and uploads content into the
+ bucket.
+ 7. Shut the hub down via 'zen down --pid'.
+ 8. Stop MinIO.
+
+Preservation of the resulting MinIO data directory is done by a separate
+step (see preserve_minio_state.py / PERF_SEED_README.md).
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import shutil
+import signal
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery - prefer release, fall back to debug.
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)")
+ return p
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"minio executable not found at {p}. Build the zen project first.")
+ return p
+
+
+# ---------------------------------------------------------------------------
+# MinIO lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(data_dir),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.config # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit("[minio] boto3 is required. pip install boto3")
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ # Seeding is not a perf-measurement path - we want it as fast as the
+ # host can manage. Let the hub go wide on both provisioning and
+ # hydration thread pools rather than matching prod limits.
+ "--hub-instance-provision-threads=64",
+ "--hub-hydration-threads=64",
+ # Prevent the watchdog from auto-deprovisioning modules while we're
+ # still hydrating the tail / in the overlay phase. BOTH timers have to
+ # be extended - the provisioned one (default 600s) is what bites on
+ # large-N runs where early provisions go idle waiting for the rest.
+ "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400",
+ "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400",
+ # Explicit - default is true, but make it obvious that Stage B needs
+ # it since the final deprovision drives the MinIO upload.
+ "--hub-enable-dehydration=true",
+ # Pack worktree: turn pack on so dehydrate emits packed CAS.
+ "--hub-hydration-enable-pack=true",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """'zen down --pid' so the hub runs its normal shutdown path."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None:
+ """MinIO has no external shutdown tool; Ctrl-Break lets it flush."""
+ if proc.poll() is not None:
+ return
+ try:
+ if sys.platform == "win32":
+ proc.send_signal(signal.CTRL_BREAK_EVENT)
+ else:
+ proc.terminate()
+ except Exception:
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[minio] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """POST concurrently. For 'deprovision' a 404 means the module is already
+ gone - count it as accepted so the state-poll recognises completion.
+ """
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ elif verb == "deprovision" and status == 404:
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+_FAILED_STATES = {"crashed", "unprovisioned"}
+
+
+def _wait_for_state(
+ port: int,
+ module_ids: list[str],
+ target_state: str,
+ timeout_s: float,
+ label: str,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == target_state:
+ remaining.discard(mid)
+ elif s in _FAILED_STATES and target_state not in _FAILED_STATES:
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining), failed, last_states
+
+
+def _wait_for_deprovisioned(
+ port: int,
+ module_ids: list[str],
+ timeout_s: float,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ """Wait until each module is gone from /hub/status or in 'unprovisioned'.
+ 'crashed' counts as a hard failure - dehydrate never completed for it.
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if mid not in states or s == "unprovisioned":
+ remaining.discard(mid)
+ elif s == "crashed":
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+ print()
+ return list(remaining), failed, last_states
+
+
+# ---------------------------------------------------------------------------
+# Snapshot overlay
+# ---------------------------------------------------------------------------
+
+def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]:
+ """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*.
+
+ snapshot_root is treated as read-only; only hub_servers_root is written to.
+ """
+ files_copied = 0
+ bytes_copied = 0
+ modules_overlaid = 0
+
+ for i, mid in enumerate(module_ids, 1):
+ src = snapshot_root / mid
+ dst = hub_servers_root / mid
+ if not src.is_dir():
+ print(f"[overlay] WARNING: snapshot missing for {mid}: {src}")
+ continue
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ modules_overlaid += 1
+ for root, _dirs, files in os.walk(dst):
+ for f in files:
+ p = Path(root) / f
+ try:
+ bytes_copied += p.stat().st_size
+ except OSError:
+ pass
+ files_copied += 1
+ if i % 25 == 0 or i == len(module_ids):
+ print(f"[overlay] {i}/{len(module_ids)} modules overlaid "
+ f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
+ return modules_overlaid, files_copied, bytes_copied
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def _seed_one_bucket(
+ *,
+ bucket: str,
+ hub_data_dir: Path,
+ snapshot_dir: Path,
+ module_ids: list[str],
+ minio_port: int,
+ hub_port: int,
+ poll_timeout: float,
+ workers: int,
+ zenserver_exe: Path,
+ zen_exe: Path,
+) -> tuple[int, dict[str, float]]:
+ """Run the provision/hibernate/overlay/deprovision cycle against a single
+ MinIO bucket. Returns (exit_code, timings) where timings maps phase name
+ to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s,
+ total_s).
+ """
+ print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================")
+
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{bucket}",
+ "endpoint": f"http://localhost:{minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [f"--hub-hydration-target-config={config_path}"]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+ hib_accepted: list[str] = []
+ stuck_hib: list[str] = []
+ failed_hib: list[str] = []
+ timings: dict[str, float] = {
+ "provision_s": 0.0,
+ "hibernate_s": 0.0,
+ "overlay_s": 0.0,
+ "deprovision_s": 0.0,
+ "total_s": 0.0,
+ }
+
+ try:
+ hub_log = hub_data_dir / "hub.log"
+ hub_instance_limit = max(len(module_ids) + 10, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, hub_port, hub_log,
+ hub_instance_limit, hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, hub_port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ # --- Provision ---
+ print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision")
+ print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in failures[:10]:
+ print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+ if not accepted:
+ return 1, timings
+
+ stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision")
+ timings["provision_s"] = time.monotonic() - t0
+ prov_done = len(accepted) - len(stuck) - len(failed)
+ print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck "
+ f"({timings['provision_s']:.1f}s)")
+ if failed or stuck:
+ exit_code = 1
+ for mid in (failed + stuck)[:10]:
+ print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'")
+ accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)]
+ if not accepted:
+ return 1, timings
+
+ # --- Hibernate ---
+ print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...")
+ t0 = time.monotonic()
+ hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate")
+ print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in hib_failures[:10]:
+ print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate")
+ timings["hibernate_s"] = time.monotonic() - t0
+ hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib)
+ print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck "
+ f"({timings['hibernate_s']:.1f}s)")
+ if failed_hib or stuck_hib:
+ exit_code = 1
+ for mid in (failed_hib + stuck_hib)[:10]:
+ print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'")
+
+ # --- Overlay snapshot onto hub's servers dir ---
+ to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
+ hub_servers = hub_data_dir / "servers"
+ print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}")
+ t0 = time.monotonic()
+ modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay)
+ timings["overlay_s"] = time.monotonic() - t0
+ print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB "
+ f"({timings['overlay_s']:.1f}s)")
+ if modules_overlaid < len(to_overlay):
+ exit_code = 1
+
+ # --- Deprovision (triggers dehydrate -> MinIO upload) ---
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...")
+ t0 = time.monotonic()
+ dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision")
+ print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in dp_failures[:10]:
+ print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout)
+ timings["deprovision_s"] = time.monotonic() - t0
+ dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp)
+ print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck "
+ f"({timings['deprovision_s']:.1f}s)")
+ if failed_dp or stuck_dp:
+ exit_code = 1
+ for mid in (failed_dp + stuck_dp)[:10]:
+ print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'")
+
+ timings["total_s"] = time.monotonic() - t_start
+ print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ print(f"[{bucket}][hub] stopping...")
+ _zen_down_hub(zen_exe, hub_proc)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+ return exit_code, timings
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs",
+ help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ "
+ "(default: E:/Dev/zen-perf-seed/hubs)")
+ parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data",
+ help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)")
+ parser.add_argument("--minio-port", type=int, default=9000)
+ parser.add_argument("--minio-console-port", type=int, default=9001)
+ parser.add_argument("--hub-port", type=int, default=8558)
+ parser.add_argument("--bucket", default="zen-seed-packed",
+ help="Bucket to seed (default: zen-seed-packed). Pack worktree - "
+ "hub is launched with --hub-hydration-enable-pack=true.")
+ parser.add_argument("--module-count", type=int, default=0,
+ help="Cap on modules processed (0 = all modules in snapshot-dir)")
+ parser.add_argument("--workers", type=int, default=50)
+ parser.add_argument("--poll-timeout", type=float, default=1800.0,
+ help="Max seconds to wait for each state transition (default: 1800)")
+ parser.add_argument("--wipe", action="store_true",
+ help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting "
+ "(never touches --snapshot-dir)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver + minio executables (auto-detected)")
+ args = parser.parse_args()
+
+ bucket_name: str = args.bucket
+
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+ hub_data_root = Path(args.hub_data_root).resolve()
+ minio_data_dir = Path(args.minio_data_dir).resolve()
+
+ hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve()
+
+ # Safety: snapshot-dir must not overlap any mutable path.
+ for label, d in [
+ ("minio-data-dir", minio_data_dir),
+ ("hub-data-root", hub_data_root),
+ (f"hub-b-{bucket_name}", hub_data_dir),
+ ]:
+ if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents:
+ sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint")
+
+ if not snapshot_dir.is_dir():
+ sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}")
+
+ module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()])
+ if args.module_count > 0:
+ module_ids = module_ids[:args.module_count]
+ if not module_ids:
+ sys.exit(f"[setup] no module directories found in {snapshot_dir}")
+
+ if args.wipe:
+ for d in [hub_data_dir, minio_data_dir]:
+ if d.exists():
+ if snapshot_dir.is_relative_to(d):
+ sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it")
+ print(f"[setup] wiping {d}")
+ _rmtree_robust(d)
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ minio_exe = _find_minio(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] zen cli: {zen_exe}")
+ print(f"[setup] minio: {minio_exe}")
+ print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)")
+ print(f"[setup] hub-data-root: {hub_data_root}")
+ print(f"[setup] minio-data-dir: {minio_data_dir}")
+ print(f"[setup] modules: {len(module_ids)}")
+ print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})")
+
+ minio_proc: Optional[subprocess.Popen] = None
+ exit_code = 0
+
+ try:
+ minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, bucket_name)
+
+ t_total = time.monotonic()
+ rc, timings = _seed_one_bucket(
+ bucket=bucket_name,
+ hub_data_dir=hub_data_dir,
+ snapshot_dir=snapshot_dir,
+ module_ids=module_ids,
+ minio_port=args.minio_port,
+ hub_port=args.hub_port,
+ poll_timeout=args.poll_timeout,
+ workers=args.workers,
+ zenserver_exe=zenserver_exe,
+ zen_exe=zen_exe,
+ )
+ exit_code = rc
+
+ print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}")
+ print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}")
+
+ phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"]
+ labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"]
+ print(f"\n[summary] timings (s):")
+ for key, lbl in zip(phases, labels):
+ print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}")
+
+ print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/")
+
+ finally:
+ if minio_proc is not None and minio_proc.poll() is None:
+ print("[minio] stopping...")
+ _stop_minio_graceful(minio_proc)
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py
new file mode 100644
index 000000000..f0bc7b607
--- /dev/null
+++ b/scripts/test_scripts/hub/seed_s3_snapshot.py
@@ -0,0 +1,635 @@
+#!/usr/bin/env python3
+"""Stage A of the perf-seed workflow.
+
+Fetches real module data from a configured S3 bucket via the hub hydration
+pipeline and preserves the decompressed server-state trees to disk so a
+later stage can replay them into a local MinIO backend.
+
+Flow:
+ 1. Start a hub against real S3 (read-only SSO credentials), dehydration off,
+ hibernated-watchdog timeout set huge so modules are not auto-deprovisioned
+ mid-flow.
+ 2. Provision N modules (first N moduleIds from the bucket listing, filtered
+ to UUID-shaped folders).
+ 3. Wait until every module reaches 'provisioned'.
+ 4. Hibernate every module (child processes exit, state dir intact and no
+ remaining writer while we copy).
+ 5. Wait until every module reaches 'hibernated'.
+ 6. Copy <hub-data-dir>/servers/<moduleid>/ -> <snapshot-dir>/<moduleid>/
+ with the hub still running - hibernate took the writers offline and the
+ watchdog is muted.
+ 7. Shut the hub down via 'zen down --pid <hub pid>'.
+
+Required environment variables (or pass via CLI flags):
+ ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/
+ ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access
+ ZEN_PERF_AWS_REGION defaults to us-east-1
+
+Requirements:
+ pip install boto3
+ aws CLI v2 with the SSO profile configured
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import re
+import shutil
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from concurrent.futures import ThreadPoolExecutor
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+
+def _rmtree_robust(path) -> None:
+ """shutil.rmtree with a Windows-friendly retry for read-only files."""
+ import os as _os
+ import stat as _stat
+ def _onerror(func, p, exc_info):
+ try:
+ _os.chmod(p, _stat.S_IWRITE)
+ func(p)
+ except Exception:
+ pass
+ # onexc was introduced in py3.12; fall back to onerror on older versions.
+ if sys.version_info >= (3, 12):
+ shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__)))
+ else:
+ shutil.rmtree(path, onerror=_onerror)
+
+
+_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "")
+_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "")
+_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1")
+
+# Matches UUID-shaped module IDs (UUID-ish with mixed 4-char groups). Filters
+# out stray top-level keys (readmes, test scratches) that aren't real modules.
+_MODULEID_RE = re.compile(r"^[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}$")
+
+
+# ---------------------------------------------------------------------------
+# zenserver binary discovery - prefer release, fall back to debug.
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ for mode in ("release", "debug"):
+ for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")):
+ p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}"
+ if p.exists():
+ return p
+ sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.")
+
+
+def _find_zen(zenserver_exe: Path) -> Path:
+ p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)")
+ return p
+
+
+# ---------------------------------------------------------------------------
+# AWS SSO credential resolution
+# ---------------------------------------------------------------------------
+
+def _require_boto3():
+ try:
+ import boto3 # type: ignore[import-not-found]
+ import botocore.exceptions # type: ignore[import-not-found]
+ except ImportError:
+ sys.exit("[aws] boto3 is required. Install it with: pip install boto3")
+ return boto3, botocore.exceptions
+
+
+def _sso_login(profile: str) -> None:
+ print(f"[aws] running 'aws sso login --profile {profile}'...")
+ rc = subprocess.call(["aws", "sso", "login", "--profile", profile])
+ if rc != 0:
+ sys.exit(f"[aws] 'aws sso login' failed with rc={rc}")
+
+
+def _get_session(profile: str):
+ boto3, exc_mod = _require_boto3()
+
+ def _load_frozen():
+ session = boto3.Session(profile_name=profile)
+ creds = session.get_credentials()
+ if creds is None:
+ return session, None
+ return session, creds.get_frozen_credentials()
+
+ try:
+ session, frozen = _load_frozen()
+ if frozen is not None and frozen.access_key and frozen.secret_key:
+ return session, frozen
+ except exc_mod.ProfileNotFound:
+ sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config")
+ except Exception as e:
+ print(f"[aws] initial credential load failed: {e}")
+
+ _sso_login(profile)
+
+ session, frozen = _load_frozen()
+ if frozen is None or not frozen.access_key:
+ sys.exit("[aws] could not resolve credentials after sso login")
+ return session, frozen
+
+
+def _parse_s3_uri(uri: str) -> tuple[str, str]:
+ if not uri.startswith("s3://"):
+ sys.exit(f"[aws] invalid S3 URI: {uri}")
+ rest = uri[len("s3://"):]
+ if "/" in rest:
+ bucket, prefix = rest.split("/", 1)
+ else:
+ bucket, prefix = rest, ""
+ return bucket, prefix
+
+
+def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]:
+ """List UUID-shaped module folders under the bucket and return the `limit`
+ most recently active ones, ranked by the LastModified of each module's
+ `incremental-state.cbo` (newest first - these were last dehydrated most
+ recently, which is the closest proxy for "most recently accessed").
+
+ Falls back to listing order for any folder whose state file can't be
+ HEADed (missing / 403 / transient error).
+ """
+ s3 = session.client("s3", region_name=region)
+ prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/"
+
+ # 1. Enumerate every UUID-shaped folder.
+ paginator = s3.get_paginator("list_objects_v2")
+ candidates: list[str] = []
+ for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"):
+ for cp in page.get("CommonPrefixes", []) or []:
+ folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/")
+ if folder and _MODULEID_RE.match(folder):
+ candidates.append(folder)
+ print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...")
+
+ # 2. HEAD each module's state file in parallel. Missing/failed HEADs land
+ # at the tail via a sentinel epoch 0 timestamp.
+ from datetime import datetime, timezone
+ epoch = datetime(1970, 1, 1, tzinfo=timezone.utc)
+
+ def _state_mtime(mid: str) -> datetime:
+ key = f"{prefix_norm}{mid}/incremental-state.cbo"
+ try:
+ resp = s3.head_object(Bucket=bucket, Key=key)
+ return resp.get("LastModified", epoch)
+ except Exception:
+ return epoch
+
+ with ThreadPoolExecutor(max_workers=50) as pool:
+ times = list(pool.map(_state_mtime, candidates))
+
+ # 3. Sort descending (newest first). Folders without a state file sink.
+ ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True)
+ missing = sum(1 for _, t in ranked if t == epoch)
+ if missing:
+ print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)")
+ return [mid for mid, _ in ranked[:limit]]
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ instance_limit: int,
+ extra_args: list[str],
+ extra_env: dict[str, str],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ "--enable-execution-history=false",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ f"--hub-instance-limit={instance_limit}",
+ # Seeding is not a perf-measurement path - we want it as fast as the
+ # host can manage. Let the hub go wide on both provisioning and
+ # hydration thread pools rather than matching prod limits.
+ "--hub-instance-provision-threads=64",
+ "--hub-hydration-threads=64",
+ # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH
+ # watchdog inactivity timers so early-provisioned modules do not get
+ # auto-deprovisioned while we're still hydrating the tail (hibernated
+ # default 1800s, provisioned default 600s - the latter is the one that
+ # bit us on the 1000-module run and dropped 335 modules).
+ "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400",
+ "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400",
+ ] + extra_args
+
+ env = os.environ.copy()
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None:
+ """Shut the hub down via 'zen down --pid <pid>'. zen down performs a
+ proper server-initiated shutdown (signals the shutdown event, waits for
+ the server to drain) rather than a blunt kill."""
+ if hub_proc.poll() is not None:
+ return
+ pid = hub_proc.pid
+ print(f"[hub] zen down --pid {pid}")
+ rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"])
+ if rc != 0:
+ print(f"[hub] zen down returned rc={rc}; waiting for exit anyway")
+ try:
+ hub_proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[hub] did not exit after {timeout_s}s, killing")
+ hub_proc.kill()
+ hub_proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub HTTP API
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ try:
+ body = json.loads(resp.read())
+ except Exception:
+ body = {}
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception as e:
+ return 0, {"error": str(e)}
+
+
+def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]:
+ url = f"http://localhost:{port}/hub/status"
+ req = urllib.request.Request(url, headers={"Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ except Exception:
+ return None
+ return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")}
+
+
+def _fan_out_post(
+ pool: ThreadPoolExecutor,
+ port: int,
+ module_ids: list[str],
+ verb: str,
+) -> tuple[list[str], list[tuple[str, int, dict]]]:
+ """POST /hub/modules/<id>/<verb> concurrently. Returns (accepted, failures)."""
+ futures = {
+ mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}")
+ for mid in module_ids
+ }
+ accepted: list[str] = []
+ failures: list[tuple[str, int, dict]] = []
+ for mid, fut in futures.items():
+ status, body = fut.result()
+ if status in (200, 202):
+ accepted.append(mid)
+ else:
+ failures.append((mid, status, body))
+ return accepted, failures
+
+
+# Any of these means the module is done transitioning and will not reach the
+# target state on its own (without a retry we control).
+_FAILED_STATES = {"crashed", "unprovisioned"}
+
+
+def _wait_for_state(
+ port: int,
+ module_ids: list[str],
+ target_state: str,
+ timeout_s: float,
+ label: str,
+) -> tuple[list[str], list[str], dict[str, str]]:
+ """Poll hub status until every module hits target_state, fails, or times out.
+
+ Returns (stuck, failed, last_states). 'stuck' = still mid-transition when
+ we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'.
+ """
+ deadline = time.monotonic() + timeout_s
+ remaining = set(module_ids)
+ failed: list[str] = []
+ last_states: dict[str, str] = {mid: "" for mid in module_ids}
+
+ while remaining and time.monotonic() < deadline:
+ states = _hub_module_states(port)
+ if states is not None:
+ for mid in list(remaining):
+ s = states.get(mid, "")
+ last_states[mid] = s
+ if s == target_state:
+ remaining.discard(mid)
+ elif s in _FAILED_STATES and target_state not in _FAILED_STATES:
+ remaining.discard(mid)
+ failed.append(mid)
+ done = len(module_ids) - len(remaining)
+ print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r")
+ time.sleep(2.0)
+
+ print()
+ return list(remaining), failed, last_states
+
+
+# ---------------------------------------------------------------------------
+# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer)
+# ---------------------------------------------------------------------------
+
+def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]:
+ """Copy src_root/<mid>/* to dst_root/<mid>/* for each module.
+
+ Returns (modules_copied, files_copied, bytes_copied).
+ Replaces only the specific per-module subdirs; never touches siblings.
+ """
+ dst_root.mkdir(parents=True, exist_ok=True)
+ modules_copied = 0
+ files_copied = 0
+ bytes_copied = 0
+
+ for i, mid in enumerate(module_ids, 1):
+ src = src_root / mid
+ dst = dst_root / mid
+ if not src.is_dir():
+ print(f"[snapshot] WARNING: source missing for {mid}: {src}")
+ continue
+ if dst.exists():
+ _rmtree_robust(dst)
+ shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False)
+ modules_copied += 1
+ for root, _dirs, files in os.walk(dst):
+ for f in files:
+ p = Path(root) / f
+ try:
+ bytes_copied += p.stat().st_size
+ except OSError:
+ pass
+ files_copied += 1
+ if i % 25 == 0 or i == len(module_ids):
+ print(f"[snapshot] {i}/{len(module_ids)} modules copied "
+ f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)")
+
+ return modules_copied, files_copied, bytes_copied
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a",
+ help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)")
+ parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot",
+ help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Number of modules to snapshot (default: 300)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Concurrent HTTP workers (default: 50)")
+ parser.add_argument("--poll-timeout", type=float, default=1800.0,
+ help="Max seconds to wait for provision or hibernate to finish (default: 1800)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ args = parser.parse_args()
+
+ hub_data_dir = Path(args.hub_data_dir).resolve()
+ snapshot_dir = Path(args.snapshot_dir).resolve()
+
+ # Safety: snapshot-dir is the preserved output. It must not overlap the
+ # hub data-dir in either direction so nothing the hub writes can clobber
+ # the preserved state.
+ if (snapshot_dir == hub_data_dir
+ or snapshot_dir in hub_data_dir.parents
+ or hub_data_dir in snapshot_dir.parents):
+ sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and hub-data-dir ({hub_data_dir}) must be disjoint")
+
+ s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI)
+ aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE)
+ aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION)
+ if not s3_uri:
+ sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/")
+ if not aws_profile:
+ sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name")
+
+ hub_log = hub_data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ zen_exe = _find_zen(zenserver_exe)
+ zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?")
+ print(f"[setup] build mode: {zenserver_mode}")
+ print(f"[setup] zenserver: {zenserver_exe}")
+ print(f"[setup] zen cli: {zen_exe}")
+ print(f"[setup] S3 URI: {s3_uri}")
+ print(f"[setup] profile: {aws_profile}")
+ print(f"[setup] hub-data-dir: {hub_data_dir}")
+ print(f"[setup] snapshot-dir: {snapshot_dir}")
+
+ session, frozen = _get_session(aws_profile)
+ print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})")
+
+ bucket, prefix = _parse_s3_uri(s3_uri)
+ print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...")
+ module_ids = _list_module_ids(session, bucket, prefix, aws_region, args.module_count)
+ print(f"[s3] selected {len(module_ids)} module folders (UUID-shaped)")
+ if len(module_ids) < args.module_count:
+ print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter")
+
+ if not module_ids:
+ sys.exit("[s3] no module folders found, aborting")
+
+ aws_env = {
+ "AWS_ACCESS_KEY_ID": frozen.access_key,
+ "AWS_SECRET_ACCESS_KEY": frozen.secret_key,
+ }
+ if frozen.token:
+ aws_env["AWS_SESSION_TOKEN"] = frozen.token
+
+ hub_data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = hub_data_dir / "hydration_config.json"
+ config_path.write_text(
+ json.dumps({
+ "type": "s3",
+ "settings": {"uri": s3_uri, "region": aws_region},
+ }),
+ encoding="ascii",
+ )
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ "--hub-enable-dehydration=false",
+ ]
+
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ exit_code = 0
+
+ try:
+ hub_instance_limit = max(args.module_count + 10, 500)
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, hub_data_dir, args.port, hub_log,
+ hub_instance_limit, hub_extra_args, aws_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+
+ t_start = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ # --- Provision ---
+ print(f"[provision] firing {len(module_ids)} provision requests...")
+ t0 = time.monotonic()
+ accepted, failures = _fan_out_post(pool, args.port, module_ids, "provision")
+ print(f"[provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in failures[:10]:
+ print(f"[provision] FAILED {mid}: status={status} body={body}")
+ if not accepted:
+ sys.exit("[provision] nothing accepted, aborting")
+
+ stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision")
+ prov_done = len(accepted) - len(stuck) - len(failed)
+ print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck "
+ f"({time.monotonic()-t0:.1f}s)")
+ if failed:
+ for mid in failed[:10]:
+ print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'")
+ exit_code = 1
+ if stuck:
+ for mid in stuck[:10]:
+ print(f"[provision] STUCK {mid}: last state='{last_states.get(mid, '')}'")
+ exit_code = 1
+ accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)]
+
+ if not accepted:
+ sys.exit("[provision] nothing successfully provisioned, aborting")
+
+ # --- Hibernate ---
+ print(f"[hibernate] firing {len(accepted)} hibernate requests...")
+ t0 = time.monotonic()
+ hib_accepted, hib_failures = _fan_out_post(pool, args.port, accepted, "hibernate")
+ print(f"[hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)")
+ for mid, status, body in hib_failures[:10]:
+ print(f"[hibernate] FAILED {mid}: status={status} body={body}")
+ exit_code = 1
+
+ stuck_hib, failed_hib, last_states_hib = _wait_for_state(args.port, hib_accepted, "hibernated", args.poll_timeout, "hibernate")
+ hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib)
+ print(f"[hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck "
+ f"({time.monotonic()-t0:.1f}s)")
+ if failed_hib or stuck_hib:
+ exit_code = 1
+ for mid in (failed_hib + stuck_hib)[:10]:
+ print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'")
+
+ # --- Copy snapshots while hub is still running. All instances are
+ # hibernated (no writers), watchdog-hibernated-timeout is 86400s
+ # (no auto-deprovision), hub is only touching its own metadata
+ # outside servers/<mid>/. Safe. ---
+ copy_src = hub_data_dir / "servers"
+ to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)]
+ print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}")
+ t0 = time.monotonic()
+ modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy)
+ print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB "
+ f"({time.monotonic()-t0:.1f}s)")
+
+ if modules_copied < len(to_copy):
+ print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied")
+ exit_code = 1
+
+ # --- Graceful hub shutdown via 'zen down' ---
+ _zen_down_hub(zen_exe, hub_proc)
+ hub_proc = None
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ hub_log_handle = None
+
+ print(f"[summary] stage A total elapsed: {time.monotonic()-t_start:.1f}s, exit={exit_code}")
+ print(f"[summary] snapshot available at: {snapshot_dir}")
+
+ finally:
+ if hub_proc is not None and hub_proc.poll() is None:
+ # Fallback: something aborted before the 'zen down' path ran.
+ print("[hub] force-terminating (zen down was not invoked)")
+ hub_proc.terminate()
+ try:
+ hub_proc.wait(timeout=60)
+ except subprocess.TimeoutExpired:
+ hub_proc.kill()
+ hub_proc.wait()
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+
+ return exit_code
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h
index a1c7a3914..7ca2afc69 100644
--- a/src/zencore/include/zencore/string.h
+++ b/src/zencore/include/zencore/string.h
@@ -742,6 +742,7 @@ size_t NiceBytesToBuffer(uint64_t Num, std::span<char> Buffer);
size_t NiceByteRateToBuffer(uint64_t Num, uint64_t ms, std::span<char> Buffer);
size_t NiceLatencyNsToBuffer(uint64_t NanoSeconds, std::span<char> Buffer);
size_t NiceTimeSpanMsToBuffer(uint64_t Milliseconds, std::span<char> Buffer);
+size_t NiceTimeSpanUsToBuffer(uint64_t Microseconds, std::span<char> Buffer);
struct NiceBase
{
@@ -803,6 +804,11 @@ struct NiceTimeSpanMs : public NiceBase
inline NiceTimeSpanMs(uint64_t Milliseconds) { NiceTimeSpanMsToBuffer(Milliseconds, m_Buffer); }
};
+struct NiceTimeSpanUs : public NiceBase
+{
+ inline NiceTimeSpanUs(uint64_t Microseconds) { NiceTimeSpanUsToBuffer(Microseconds, m_Buffer); }
+};
+
//////////////////////////////////////////////////////////////////////////
inline std::string
diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp
index 34519b83b..4072aec56 100644
--- a/src/zencore/string.cpp
+++ b/src/zencore/string.cpp
@@ -482,6 +482,24 @@ NiceTimeSpanMsToBuffer(uint64_t Millis, std::span<char> Buffer)
}
}
+size_t
+NiceTimeSpanUsToBuffer(uint64_t Micros, std::span<char> Buffer)
+{
+ if (Micros < 1000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%" PRIu64 "us", Micros);
+ }
+ else if (Micros < 10000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%.2fms", Micros / 1000.0);
+ }
+ else if (Micros < 1000000)
+ {
+ return snprintf(Buffer.data(), Buffer.size(), "%.1fms", Micros / 1000.0);
+ }
+ return NiceTimeSpanMsToBuffer(Micros / 1000, Buffer);
+}
+
//////////////////////////////////////////////////////////////////////////
template<typename C>
@@ -850,6 +868,13 @@ TEST_CASE("niceNum")
NiceNumGeneral(100000000000000, Buffer, kNicenumTime);
CHECK(StringEquals(Buffer, "100000s"));
+
+ // floor() instead of round(): 999.5us must NOT render as "1000us".
+ NiceNumGeneral(999500, Buffer, kNicenumTime);
+ CHECK(StringEquals(Buffer, "999us"));
+
+ NiceNumGeneral(999999, Buffer, kNicenumTime);
+ CHECK(StringEquals(Buffer, "999us"));
}
SUBCASE("bytes")
@@ -917,6 +942,43 @@ TEST_CASE("niceNum")
NiceTimeSpanMsToBuffer(360000000, Buffer);
CHECK(StringEquals(Buffer, "100h00m"));
}
+
+ SUBCASE("timespan_us")
+ {
+ NiceTimeSpanUsToBuffer(0, Buffer);
+ CHECK(StringEquals(Buffer, "0us"));
+
+ NiceTimeSpanUsToBuffer(1, Buffer);
+ CHECK(StringEquals(Buffer, "1us"));
+
+ NiceTimeSpanUsToBuffer(999, Buffer);
+ CHECK(StringEquals(Buffer, "999us"));
+
+ NiceTimeSpanUsToBuffer(1000, Buffer);
+ CHECK(StringEquals(Buffer, "1.00ms"));
+
+ NiceTimeSpanUsToBuffer(1500, Buffer);
+ CHECK(StringEquals(Buffer, "1.50ms"));
+
+ NiceTimeSpanUsToBuffer(9999, Buffer);
+ CHECK(StringEquals(Buffer, "10.00ms")); // %.2f rounds 9.999 -> 10.00
+
+ NiceTimeSpanUsToBuffer(10000, Buffer);
+ CHECK(StringEquals(Buffer, "10.0ms"));
+
+ NiceTimeSpanUsToBuffer(999500, Buffer);
+ CHECK(StringEquals(Buffer, "999.5ms"));
+
+ NiceTimeSpanUsToBuffer(999999, Buffer);
+ CHECK(StringEquals(Buffer, "1000.0ms")); // boundary just below 1s
+
+ // >=1s delegates to NiceTimeSpanMs.
+ NiceTimeSpanUsToBuffer(1000000, Buffer);
+ CHECK(StringEquals(Buffer, "1.00s"));
+
+ NiceTimeSpanUsToBuffer(60000000, Buffer);
+ CHECK(StringEquals(Buffer, "1m00s"));
+ }
}
TEST_CASE("StringBuilder")
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index c03c1a9a0..4ae8d0457 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
@@ -248,6 +249,7 @@ Hub::~Hub()
void
Hub::Shutdown()
{
+ ZEN_TRACE_CPU("Hub::Shutdown");
ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
bool Expected = false;
@@ -299,6 +301,7 @@ Hub::Shutdown()
Hub::Response
Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
+ ZEN_TRACE_CPU("Hub::Provision");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
bool IsNewInstance = false;
@@ -328,17 +331,22 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
auto NewInstance = std::make_unique<StorageServerInstance>(
m_RunEnvironment,
*m_Hydration,
- StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
- .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
- .TempDir = m_HydrationTempPath / ModuleId,
- .HttpThreadCount = m_Config.InstanceHttpThreadCount,
- .CoreLimit = m_Config.InstanceCoreLimit,
- .ConfigPath = m_Config.InstanceConfigPath,
- .Malloc = m_Config.InstanceMalloc,
- .Trace = m_Config.InstanceTrace,
- .TraceHost = m_Config.InstanceTraceHost,
- .TraceFile = m_Config.InstanceTraceFile,
- .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
+ StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
+ .StateDir = m_RunEnvironment.CreateChildDir(ModuleId),
+ .TempDir = m_HydrationTempPath / ModuleId,
+ .HttpThreadCount = m_Config.InstanceHttpThreadCount,
+ .CoreLimit = m_Config.InstanceCoreLimit,
+ .ConfigPath = m_Config.InstanceConfigPath,
+ .Malloc = m_Config.InstanceMalloc,
+ .Trace = m_Config.InstanceTrace,
+ .TraceHost = m_Config.InstanceTraceHost,
+ .TraceFile = m_Config.InstanceTraceFile,
+ .EnableHydration = m_Config.EnableHydration,
+ .EnableDehydration = m_Config.EnableDehydration,
+ .HydrationPackEnabled = m_Config.HydrationPackEnabled,
+ .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes,
+ .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes,
+ .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool},
ModuleId);
#if ZEN_PLATFORM_WINDOWS
@@ -511,6 +519,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
HubInstanceState OldState,
bool IsNewInstance)
{
+ ZEN_TRACE_CPU("Hub::CompleteProvision");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
std::string BaseUri; // TODO?
@@ -571,6 +580,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
Hub::Response
Hub::Deprovision(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Deprovision");
ZEN_ASSERT(!m_ShutdownFlag.load());
return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) {
ZEN_UNUSED(Instance);
@@ -581,6 +591,7 @@ Hub::Deprovision(const std::string& ModuleId)
Hub::Response
Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate)
{
+ ZEN_TRACE_CPU("Hub::InternalDeprovision");
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
{
@@ -694,6 +705,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI
Hub::Response
Hub::Obliterate(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Obliterate");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -845,6 +857,7 @@ Hub::Obliterate(const std::string& ModuleId)
void
Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex)
{
+ ZEN_TRACE_CPU("Hub::CompleteObliterate");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -871,6 +884,7 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz
void
Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteDeprovision");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -938,6 +952,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
Hub::Response
Hub::Hibernate(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Hibernate");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -1051,6 +1066,7 @@ Hub::Hibernate(const std::string& ModuleId)
void
Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteHibernate");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -1074,6 +1090,7 @@ Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size
Hub::Response
Hub::Wake(const std::string& ModuleId)
{
+ ZEN_TRACE_CPU("Hub::Wake");
ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -1185,6 +1202,7 @@ Hub::Wake(const std::string& ModuleId)
void
Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState)
{
+ ZEN_TRACE_CPU("Hub::CompleteWake");
const std::string ModuleId(Instance.GetModuleId());
const uint16_t Port = Instance.GetBasePort();
@@ -1402,6 +1420,7 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
void
Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
+ ZEN_TRACE_CPU("Hub::AttemptRecoverInstance");
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
{
@@ -1506,6 +1525,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
StorageServerInstance::SharedLockedPtr&& LockedInstance,
size_t ActiveInstanceIndex)
{
+ ZEN_TRACE_CPU("Hub::CheckInstanceStatus");
const std::string ModuleId(LockedInstance.GetModuleId());
HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
@@ -1645,6 +1665,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
void
Hub::UpdateMachineMetrics()
{
+ ZEN_TRACE_CPU("Hub::UpdateMachineMetrics");
try
{
bool DiskSpaceOk = false;
@@ -1696,6 +1717,7 @@ Hub::WatchDog()
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
{
+ ZEN_TRACE_CPU("Hub::WatchDogCycle");
try
{
UpdateMachineMetrics();
@@ -2755,18 +2777,18 @@ TEST_CASE("hub.instance.inactivity.deprovision")
auto PokeInstance = [&](uint16_t Port) {
// Make a real storage request to increment the instance's activity sum.
// The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
- {
- HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
- HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)});
- uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
- std::chrono::steady_clock::time_point::min())
- .count();
- IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
- const HttpClient::Response PutResult =
- PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
- IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
- CHECK(PutResult);
- }
+ // Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast;
+ // RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners.
+ HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
+ .count();
+ IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
+ const HttpClient::Response PutResult =
+ PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
+ REQUIRE_MESSAGE(PutResult, "PokeInstance PUT failed - cannot reset activity timer");
};
PokeInstance(IdleInfo.Port);
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 40d046ce0..1ce9bc876 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -74,6 +74,11 @@ public:
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
CbObject HydrationOptions;
+ bool EnableHydration = true;
+ bool EnableDehydration = true;
+ bool HydrationPackEnabled = true;
+ uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes;
+ uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes;
WatchDogConfiguration WatchDog;
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index c7f25bab6..2730ba059 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -10,15 +10,20 @@
#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/iohash.h>
#include <zencore/logging.h>
#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/system.h>
#include <zencore/thread.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
+#include <zencore/uid.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
#include <zenutil/filesystemutils.h>
+#include <zenutil/wildcard.h>
#include <numeric>
#include <unordered_map>
@@ -34,6 +39,8 @@
namespace zen {
+using namespace std::literals;
+
namespace hydration_impl {
/// UTC time decomposed to calendar fields with sub-second milliseconds.
@@ -76,7 +83,51 @@ namespace hydration_impl {
std::atomic<bool>& PauseFlag,
const std::filesystem::path& Path)
{
- CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
+ CleanDirectoryResult Result = CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0);
+ for (const auto& [FailedPath, Ec] : Result.FailedRemovePaths)
+ {
+ ZEN_WARN("Failed to remove '{}' while cleaning '{}': {}", FailedPath, Path, Ec.message());
+ }
+ }
+
+ // Returns true if RelKey matches any wildcard in Excludes. Excluded paths are
+ // dropped at the dehydrate-side directory scan and never enter the manifest.
+ bool IsExcluded(std::string_view RelKey, std::span<const std::string> Excludes)
+ {
+ for (const std::string& W : Excludes)
+ {
+ if (MatchWildcard(W, RelKey, /*CaseSensitive*/ true))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ std::vector<std::string> ParseStringArray(CbFieldView Field)
+ {
+ std::vector<std::string> Out;
+ for (CbFieldView Entry : Field.AsArrayView())
+ {
+ Out.emplace_back(Entry.AsString());
+ }
+ return Out;
+ }
+
+ // Built-in exclude wildcards applied unless the hub config supplies an explicit
+ // `excludes` array (an empty array opts out of all defaults). Patterns match the
+ // dehydrate-side relative path (forward-slash form). `*` is path-separator-agnostic
+ // per zenutil/wildcard.h.
+ std::vector<std::string> DefaultExcludes()
+ {
+ return {
+ ".sentry-native/*", // sentry-native crash uploader DB; locked while child runs
+ "state_marker", // root-level liveness marker (zenstorageserver.cpp)
+ ".lock", // FILE_FLAG_DELETE_ON_CLOSE lock; locked while child runs
+ "*.bak", // transient backups produced by atomic file replace
+ "gc/reserve.gc", // GC disk reserve (gc subdir per zenstorageserver.cpp)
+ "auth/*", // encrypted auth state under auth/
+ };
}
///////////////////////////////////////////////////////////////////////
@@ -90,45 +141,110 @@ namespace hydration_impl {
std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion
std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait()
- RwLock ThreadIdsLock;
- std::unordered_set<int> ThreadIds;
+ // Per-request timing gathered inside the work lambdas. RequestCount + RequestTotalUs are
+ // summed across all completed requests. RequestMaxUs is the slowest single request
+ // observed (CAS-updated in EndRequest); avg vs max gap surfaces tail latency without
+ // keeping per-request samples.
+ std::atomic<uint64_t> RequestCount{0};
+ std::atomic<uint64_t> RequestTotalUs{0};
+ std::atomic<uint64_t> RequestMaxUs{0};
+ std::atomic<uint32_t> InFlight{0};
+ std::atomic<uint32_t> InFlightPeak{0};
+
+ // Scheduling latency. PhaseClock starts when the phase begins. FirstScheduleUs /
+ // FirstStartUs are the relative times of the earliest ScheduleWork call and the earliest
+ // worker lambda entry. Their difference is how long requests sat in the pool backlog
+ // before a worker picked one up (pool warm-up / backlog).
+ Stopwatch PhaseClock;
+ std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX};
+ std::atomic<uint64_t> FirstStartUs{UINT64_MAX};
+
+ void RecordScheduled()
+ {
+ uint64_t Now = PhaseClock.GetElapsedTimeUs();
+ uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed);
+ while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
+ {
+ }
+ }
- void RecordThread()
+ // Returns a Stopwatch the caller runs across the actual request; call EndRequest with
+ // the elapsed microseconds when the request completes.
+ Stopwatch BeginRequest()
{
- int Tid = zen::GetCurrentThreadId();
- ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); });
+ uint64_t Now = PhaseClock.GetElapsedTimeUs();
+ uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed);
+ while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed))
+ {
+ }
+ uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1;
+ uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed);
+ while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed))
+ {
+ }
+ return Stopwatch{};
+ }
+
+ void EndRequest(uint64_t ElapsedUsValue)
+ {
+ InFlight.fetch_sub(1, std::memory_order_relaxed);
+ RequestCount.fetch_add(1, std::memory_order_relaxed);
+ RequestTotalUs.fetch_add(ElapsedUsValue, std::memory_order_relaxed);
+ uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed);
+ while (ElapsedUsValue > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUsValue, std::memory_order_relaxed))
+ {
+ }
}
};
struct DehydrateStatistics
{
PhaseStats Hash;
- PhaseStats Upload;
- PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs
+ PhaseStats Upload; // Loose CAS PUTs
+ PhaseStats Touch; // Mod-time refresh on pre-existing loose CAS entries; shares Upload's ParallelWork
+ PhaseStats PackUpload; // Pack-blob PUTs; shares Upload's ParallelWork
+ PhaseStats PackTouch; // Mod-time refresh on pre-existing pack blobs; shares Upload's ParallelWork
std::atomic<uint64_t> LoadStateUs{0};
std::atomic<uint64_t> DirScanUs{0};
std::atomic<uint64_t> ListExistingUs{0};
- std::atomic<uint64_t> MetadataSaveUs{0};
+ std::atomic<uint64_t> SaveMetadataUs{0};
std::atomic<uint64_t> CleanUs{0};
std::atomic<uint64_t> TotalFiles{0};
std::atomic<uint64_t> TotalBytes{0};
std::atomic<uint64_t> TotalUs{0};
+
+ // Pack phase stats
+ std::atomic<uint64_t> PackCount{0}; // number of packs built
+ std::atomic<uint64_t> PackedFiles{0}; // Files[] entries folded into packs (includes hash-duplicates)
+ std::atomic<uint64_t> PackBytes{0}; // total bytes across all packs
+ std::atomic<uint64_t> PackBuildUs{0}; // hash + write cost across all packs
};
struct HydrateStatistics
{
- PhaseStats Download;
+ PhaseStats Download; // Loose CAS GETs
+ PhaseStats PackDownload; // Pack-blob GETs; shares Download's ParallelWork
std::atomic<uint64_t> LoadMetadataUs{0};
+ std::atomic<uint64_t> CreateDirsUs{0};
+ std::atomic<uint64_t> CreateDirsCount{0}; // unique parent dirs passed to CreateDirectories
std::atomic<uint64_t> CleanUs{0};
- std::atomic<uint64_t> RenameOrCopyUs{0};
- std::atomic<uint64_t> VerifyScanUs{0};
+ std::atomic<uint64_t> FinalizeUs{0};
+ std::atomic<uint64_t> BuildStateUs{0};
std::atomic<uint64_t> TotalFiles{0};
std::atomic<uint64_t> TotalBytes{0};
std::atomic<uint64_t> TotalUs{0};
+
+ // Pack phase stats
+ std::atomic<uint64_t> PackCount{0}; // packs in manifest
+ std::atomic<uint64_t> PackedFiles{0}; // files unpacked into ServerStateDir (per-destination count)
+ std::atomic<uint64_t> PackUnpackUs{0}; // slice + parallel SafeWriteFile cost
+ // Bytes written to disk during the unpack-and-slice phase (sum of slice sizes
+ // touched by SafeWriteFile). Pairs with PackUnpackUs for disk-write throughput.
+ std::atomic<uint64_t> UnpackWriteBytes{0};
};
// Bits-per-second rate computed at microsecond precision. Zero-safe.
@@ -149,9 +265,13 @@ namespace hydration_impl {
public:
virtual ~StorageBase() = default;
- virtual std::string Describe() const = 0;
- virtual void SaveMetadata(const CbObject& Data) = 0;
- virtual CbObject LoadMetadata() = 0;
+ virtual std::string Describe() const = 0;
+ virtual void SaveMetadata(const CbObject& Data) = 0;
+ virtual CbObject LoadMetadata() = 0;
+ // Backend-specific settings that need to be persisted in state.cbo and reapplied
+ // on hydrate. Today only S3 uses this (MultipartChunkSize - the chunk size used at
+ // dehydrate must be carried forward so hydrate uses the same partitioning). File
+ // backend has no such settings and returns / accepts an empty object.
virtual CbObject GetSettings() = 0;
virtual void ParseSettings(const CbObjectView& Settings) = 0;
virtual std::vector<IoHash> List() = 0;
@@ -179,7 +299,7 @@ namespace hydration_impl {
explicit FileStorage(std::filesystem::path ModulePath);
- virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); }
+ virtual std::string Describe() const override { return fmt::format("file://{}"sv, m_StoragePath.generic_string()); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override { return {}; }
@@ -209,13 +329,12 @@ namespace hydration_impl {
class S3Storage : public StorageBase
{
public:
- static constexpr std::string_view Prefix = "s3://";
- static constexpr std::string_view Type = "s3";
- static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u;
+ static constexpr std::string_view Prefix = "s3://";
+ static constexpr std::string_view Type = "s3";
S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize);
- virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); }
+ virtual std::string Describe() const override { return fmt::format("s3://{}/{}"sv, m_Client.BucketName(), m_KeyPrefix); }
virtual void SaveMetadata(const CbObject& Data) override;
virtual CbObject LoadMetadata() override;
virtual CbObject GetSettings() override;
@@ -256,6 +375,7 @@ namespace hydration_impl {
void FileStorage::SaveMetadata(const CbObject& Data)
{
+ ZEN_TRACE_CPU("FileStorage::SaveMetadata");
BinaryWriter Output;
SaveCompactBinary(Output, Data);
WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize()));
@@ -263,6 +383,7 @@ namespace hydration_impl {
CbObject FileStorage::LoadMetadata()
{
+ ZEN_TRACE_CPU("FileStorage::LoadMetadata");
if (!IsFile(m_StatePathName))
{
return {};
@@ -277,7 +398,7 @@ namespace hydration_impl {
CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error);
if (Error != CbValidateError::None)
{
- throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error)));
+ throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}"sv, m_StatePathName, ToString(Error)));
}
return Result;
}
@@ -309,13 +430,15 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("FileStorage::Put");
if (!AbortFlag.load())
{
- std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ std::filesystem::path DestPath = m_CASPath / fmt::format("{}"sv, Hash);
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath));
+ throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestPath));
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
}
@@ -332,13 +455,16 @@ namespace hydration_impl {
Work.ScheduleWork(WorkerPool,
[this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](
std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("FileStorage::Get");
if (!AbortFlag.load())
{
- std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ std::filesystem::path SourcePath = m_CASPath / fmt::format("{}"sv, Hash);
if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec)
{
- throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath));
+ throw std::system_error(Ec,
+ fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestinationPath));
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
}
@@ -365,6 +491,7 @@ namespace hydration_impl {
void S3Storage::SaveMetadata(const CbObject& Data)
{
+ ZEN_TRACE_CPU("S3Storage::SaveMetadata");
BinaryWriter Output;
SaveCompactBinary(Output, Data);
IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize());
@@ -373,12 +500,13 @@ namespace hydration_impl {
S3Result Result = m_Client.PutObject(Key, std::move(Payload));
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, Result.Error);
}
}
CbObject S3Storage::LoadMetadata()
{
+ ZEN_TRACE_CPU("S3Storage::LoadMetadata");
std::string Key = m_KeyPrefix + "/incremental-state.cbo";
S3GetObjectResult Result = m_Client.GetObject(Key);
if (!Result.IsSuccess())
@@ -387,14 +515,14 @@ namespace hydration_impl {
{
return {};
}
- throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, Result.Error);
}
CbValidateError Error;
CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error);
if (Error != CbValidateError::None)
{
- throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error));
+ throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error));
}
return Meta;
}
@@ -402,13 +530,13 @@ namespace hydration_impl {
CbObject S3Storage::GetSettings()
{
CbObjectWriter Writer;
- Writer << "MultipartChunkSize" << m_MultipartChunkSize;
+ Writer << "MultipartChunkSize"sv << m_MultipartChunkSize;
return Writer.Save();
}
void S3Storage::ParseSettings(const CbObjectView& Settings)
{
- m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize);
}
std::vector<IoHash> S3Storage::List()
@@ -417,7 +545,7 @@ namespace hydration_impl {
S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error);
+ throw zen::runtime_error("Failed to list S3 objects under '{}': {}"sv, CasPrefix, Result.Error);
}
std::vector<IoHash> Hashes;
@@ -448,13 +576,15 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("S3Storage::Put");
if (AbortFlag.load())
{
return;
}
- S3Client& Client = m_Client;
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3Client& Client = m_Client;
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
@@ -466,7 +596,7 @@ namespace hydration_impl {
m_MultipartChunkSize);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error);
}
}
else
@@ -475,7 +605,7 @@ namespace hydration_impl {
S3Result Result = Client.PutObject(Key, File.ReadAll());
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error);
}
}
Stats.Bytes.fetch_add(Size, std::memory_order_relaxed);
@@ -489,7 +619,7 @@ namespace hydration_impl {
const std::filesystem::path& DestinationPath,
PhaseStats& Stats)
{
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4)))
{
@@ -515,15 +645,17 @@ namespace hydration_impl {
uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset);
Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
- if (AbortFlag)
+ ZEN_TRACE_CPU("S3Storage::GetRange");
+ if (AbortFlag.load())
{
return;
}
- S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize);
if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}",
+ throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}"sv,
Key,
Offset,
Offset + ChunkSize - 1,
@@ -541,15 +673,17 @@ namespace hydration_impl {
Work.ScheduleWork(
WorkerPool,
[this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
- if (AbortFlag)
+ ZEN_TRACE_CPU("S3Storage::Get");
+ if (AbortFlag.load())
{
return;
}
- S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir);
if (!Chunk.IsSuccess())
{
- throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error);
+ throw zen::runtime_error("Failed to download '{}' from S3: {}"sv, Key, Chunk.Error);
}
if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
@@ -585,16 +719,18 @@ namespace hydration_impl {
void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats)
{
Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) {
- Stats.RecordThread();
+ ZEN_TRACE_CPU("S3Storage::Touch");
if (AbortFlag.load())
{
return;
}
- std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash);
- S3Result Result = m_Client.Touch(Key);
+ Stopwatch Timer = Stats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); });
+ std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash);
+ S3Result Result = m_Client.Touch(Key);
if (!Result.IsSuccess())
{
- throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error);
+ throw zen::runtime_error("Failed to touch '{}' in S3: {}"sv, Key, Result.Error);
}
});
}
@@ -605,7 +741,7 @@ namespace hydration_impl {
S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix);
if (!ListResult.IsSuccess())
{
- throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error);
+ throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}"sv, ModulePrefix, ListResult.Error);
}
for (const S3ObjectInfo& Obj : ListResult.Objects)
{
@@ -617,7 +753,7 @@ namespace hydration_impl {
S3Result DelResult = m_Client.DeleteObject(Key);
if (!DelResult.IsSuccess())
{
- throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error);
+ throw zen::runtime_error("Failed to delete S3 object '{}': {}"sv, Key, DelResult.Error);
}
});
}
@@ -627,51 +763,141 @@ namespace hydration_impl {
// IncrementalHydrator: the only HydrationStrategyBase implementation.
// Summary emission for hydrate/dehydrate operations.
+ // Queue-wait helper: time between earliest schedule and earliest worker start. UINT64_MAX
+ // sentinels mean the corresponding event never happened (no work scheduled / nothing ran).
+ inline uint64_t QueueWaitUs(uint64_t FirstScheduleUs, uint64_t FirstStartUs)
+ {
+ if (FirstScheduleUs == UINT64_MAX || FirstStartUs == UINT64_MAX || FirstStartUs <= FirstScheduleUs)
+ {
+ return 0;
+ }
+ return FirstStartUs - FirstScheduleUs;
+ }
+
+ inline uint64_t SafeAvg(uint64_t Total, uint64_t Count) { return Count ? Total / Count : 0; }
+
void LogDehydrateSummary(std::string_view Prefix,
const DehydrateStatistics& Stats,
std::string_view ModuleId,
const std::filesystem::path& Source,
std::string_view Target)
{
- const uint64_t HashUs = Stats.Hash.ElapsedUs.load();
- const uint64_t UploadUs = Stats.Upload.ElapsedUs.load();
+ const uint64_t TotalFiles = Stats.TotalFiles.load();
+ const uint64_t HashUs = Stats.Hash.ElapsedUs.load();
+ const uint64_t UploadUs = Stats.Upload.ElapsedUs.load();
+
+ // Hash phase: per-request data (BeginRequest/EndRequest tracks each hash op).
+ // Hash is the first phase to schedule work, so cold-pool warm-up shows up here.
+ const uint64_t HashFiles = Stats.Hash.Files.load();
+ const uint64_t HashBytes = Stats.Hash.Bytes.load();
+ const uint64_t HashReqCount = Stats.Hash.RequestCount.load();
+ const uint64_t HashReqTotalUs = Stats.Hash.RequestTotalUs.load();
+ const uint64_t HashReqMaxUs = Stats.Hash.RequestMaxUs.load();
+ const uint32_t HashPeak = Stats.Hash.InFlightPeak.load();
+ const uint64_t HashQueueUs = QueueWaitUs(Stats.Hash.FirstScheduleUs.load(), Stats.Hash.FirstStartUs.load());
+ // Cache hit rate: every TotalFile not re-hashed was served from the cached state.
+ const uint32_t CacheHitPct = TotalFiles ? gsl::narrow_cast<uint32_t>((TotalFiles - HashFiles) * 100 / TotalFiles) : 0;
+
+ // Upload phase shares one ParallelWork across loose Put (Stats.Upload), pack-blob Put
+ // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch).
+ // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and
+ // reported as a single combined "Requests" line.
+ const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() +
+ Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load();
+ const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() +
+ Stats.Touch.RequestTotalUs.load() + Stats.PackTouch.RequestTotalUs.load();
+ const uint64_t UpReqMaxUs = std::max({Stats.Upload.RequestMaxUs.load(),
+ Stats.PackUpload.RequestMaxUs.load(),
+ Stats.Touch.RequestMaxUs.load(),
+ Stats.PackTouch.RequestMaxUs.load()});
+ const uint32_t UpPeak = std::max({Stats.Upload.InFlightPeak.load(),
+ Stats.PackUpload.InFlightPeak.load(),
+ Stats.Touch.InFlightPeak.load(),
+ Stats.PackTouch.InFlightPeak.load()});
+ // Combined first-schedule / first-start across all four phase counters. UINT64_MAX is
+ // the unset sentinel and naturally loses to any real timestamp under std::min, so empty
+ // phases fall through to the others.
+ const uint64_t UpFirstSchedUs = std::min({Stats.Upload.FirstScheduleUs.load(),
+ Stats.PackUpload.FirstScheduleUs.load(),
+ Stats.Touch.FirstScheduleUs.load(),
+ Stats.PackTouch.FirstScheduleUs.load()});
+ const uint64_t UpFirstStartUs = std::min({Stats.Upload.FirstStartUs.load(),
+ Stats.PackUpload.FirstStartUs.load(),
+ Stats.Touch.FirstStartUs.load(),
+ Stats.PackTouch.FirstStartUs.load()});
+ const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs);
+
+ const uint64_t LooseFiles = Stats.Upload.Files.load();
+ const uint64_t LooseBytes = Stats.Upload.Bytes.load();
+ const uint64_t TouchFiles = Stats.Touch.Files.load();
+ const uint64_t TouchBytes = Stats.Touch.Bytes.load();
+ const uint64_t PackTouchPacks = Stats.PackTouch.Files.load();
+ const uint64_t PackTouchBytes = Stats.PackTouch.Bytes.load();
+
+ const uint64_t PackCount = Stats.PackCount.load();
+ const uint64_t PackedFiles = Stats.PackedFiles.load();
+ const uint64_t PackBytes = Stats.PackBytes.load();
+ const uint64_t PackBuildUs = Stats.PackBuildUs.load();
+ const uint64_t PackUploadFiles = Stats.PackUpload.Files.load();
+ const uint64_t PackUploadBytes = Stats.PackUpload.Bytes.load();
+
ZEN_INFO(
"{} module '{}': {} files ({}) in {}\n"
" Source: {}\n"
" Target: {}\n"
" Load state: {}\n"
" Dir scan: {}\n"
- " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n"
+ " Hash: {} {}/{} files ({}) hashed, {}% cache hit, {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
" List existing: {}\n"
- " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n"
- " Metadata save: {}\n"
+ " Pack: {} {} packs, {} files, {}, {}bits/s\n"
+ " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Save metadata: {}\n"
" Clean: {}",
Prefix,
ModuleId,
- ThousandsNum(Stats.TotalFiles.load()),
+ ThousandsNum(TotalFiles),
NiceBytes(Stats.TotalBytes.load()),
- NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ NiceTimeSpanUs(Stats.TotalUs.load()),
Source.generic_string(),
Target,
- NiceLatencyNs(Stats.LoadStateUs.load() * 1000),
- NiceLatencyNs(Stats.DirScanUs.load() * 1000),
- NiceLatencyNs(HashUs * 1000),
- ThousandsNum(Stats.Hash.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Hash.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)),
- Stats.Hash.ThreadIds.size(),
- NiceLatencyNs(Stats.ListExistingUs.load() * 1000),
- NiceLatencyNs(UploadUs * 1000),
- ThousandsNum(Stats.Upload.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Upload.Bytes.load()),
- ThousandsNum(Stats.Touch.Files.load()),
- NiceBytes(Stats.Touch.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)),
- Stats.Upload.ThreadIds.size(),
- NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000),
- NiceLatencyNs(Stats.CleanUs.load() * 1000));
+ NiceTimeSpanUs(Stats.LoadStateUs.load()),
+ NiceTimeSpanUs(Stats.DirScanUs.load()),
+ NiceTimeSpanUs(HashUs),
+ ThousandsNum(HashFiles),
+ ThousandsNum(TotalFiles),
+ NiceBytes(HashBytes),
+ CacheHitPct,
+ NiceNum(BitsPerSecond(HashBytes, HashUs)),
+ ThousandsNum(HashReqCount),
+ NiceTimeSpanUs(SafeAvg(HashReqTotalUs, HashReqCount)),
+ NiceTimeSpanUs(HashReqMaxUs),
+ HashPeak,
+ NiceTimeSpanUs(HashQueueUs),
+ NiceTimeSpanUs(Stats.ListExistingUs.load()),
+ NiceTimeSpanUs(PackBuildUs),
+ ThousandsNum(PackCount),
+ ThousandsNum(PackedFiles),
+ NiceBytes(PackBytes),
+ NiceNum(BitsPerSecond(PackBytes, PackBuildUs)),
+ NiceTimeSpanUs(UploadUs),
+ ThousandsNum(LooseFiles),
+ NiceBytes(LooseBytes),
+ ThousandsNum(PackUploadFiles),
+ NiceBytes(PackUploadBytes),
+ ThousandsNum(TouchFiles),
+ NiceBytes(TouchBytes),
+ ThousandsNum(PackTouchPacks),
+ NiceBytes(PackTouchBytes),
+ NiceNum(BitsPerSecond(LooseBytes + PackUploadBytes, UploadUs)),
+ ThousandsNum(UpReqCount),
+ NiceTimeSpanUs(SafeAvg(UpReqTotalUs, UpReqCount)),
+ NiceTimeSpanUs(UpReqMaxUs),
+ UpPeak,
+ NiceTimeSpanUs(UpQueueUs),
+ NiceTimeSpanUs(Stats.SaveMetadataUs.load()),
+ NiceTimeSpanUs(Stats.CleanUs.load()));
}
void LogHydrateSummary(std::string_view Prefix,
@@ -681,42 +907,137 @@ namespace hydration_impl {
const std::filesystem::path& Target)
{
const uint64_t DownloadUs = Stats.Download.ElapsedUs.load();
+
+ // Standalone (Stats.Download) and pack (Stats.PackDownload) downloads share one
+ // ParallelWork. Per-request data is collected per PhaseStats by Storage::Get;
+ // reported as a single combined "Requests" line. Multipart GETs may split a pack
+ // into several ranged requests, so DlReqCount can exceed file/blob counts.
+ const uint64_t StandaloneFiles = Stats.Download.Files.load();
+ const uint64_t StandaloneBytes = Stats.Download.Bytes.load();
+ const uint64_t PackDlFiles = Stats.PackDownload.Files.load();
+ const uint64_t PackDlBytes = Stats.PackDownload.Bytes.load();
+ const uint64_t DlReqCount = Stats.Download.RequestCount.load() + Stats.PackDownload.RequestCount.load();
+ const uint64_t DlReqTotalUs = Stats.Download.RequestTotalUs.load() + Stats.PackDownload.RequestTotalUs.load();
+ const uint64_t DlReqMaxUs = std::max(Stats.Download.RequestMaxUs.load(), Stats.PackDownload.RequestMaxUs.load());
+ const uint32_t DlPeak = std::max(Stats.Download.InFlightPeak.load(), Stats.PackDownload.InFlightPeak.load());
+ const uint64_t DlFirstSchedUs = std::min(Stats.Download.FirstScheduleUs.load(), Stats.PackDownload.FirstScheduleUs.load());
+ const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load());
+ const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs);
+
+ const uint64_t PackCount = Stats.PackCount.load();
+ const uint64_t PackedFiles = Stats.PackedFiles.load();
+ const uint64_t PackUnpackUs = Stats.PackUnpackUs.load();
+ const uint64_t UnpackWriteBytes = Stats.UnpackWriteBytes.load();
+
+ const uint64_t CreateDirsUs = Stats.CreateDirsUs.load();
+ const uint64_t CreateDirsCount = Stats.CreateDirsCount.load();
+ const uint64_t CreateDirsRate = CreateDirsUs ? (CreateDirsCount * 1'000'000ull / CreateDirsUs) : 0;
+
+ // Standalone and pack downloads share the Download phase elapsed reported below;
+ // the unpack line is its own clock (slice + parallel SafeWriteFile).
ZEN_INFO(
"{} module '{}': {} files ({}) in {}\n"
" Source: {}\n"
" Target: {}\n"
" Load metadata: {}\n"
- " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n"
+ " Create dirs: {} {} dirs, {} dirs/s\n"
+ " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n"
+ " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n"
+ " Unpack: {} {} packs, {} files ({}), {}bits/s\n"
" Clean: {}\n"
- " Rename/copy: {}\n"
- " Verify scan: {}",
+ " Finalize: {}\n"
+ " Build state: {}",
Prefix,
ModuleId,
ThousandsNum(Stats.TotalFiles.load()),
NiceBytes(Stats.TotalBytes.load()),
- NiceLatencyNs(Stats.TotalUs.load() * 1000),
+ NiceTimeSpanUs(Stats.TotalUs.load()),
Source,
Target.generic_string(),
- NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000),
- NiceLatencyNs(DownloadUs * 1000),
- ThousandsNum(Stats.Download.Files.load()),
- ThousandsNum(Stats.TotalFiles.load()),
- NiceBytes(Stats.Download.Bytes.load()),
- NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)),
- Stats.Download.ThreadIds.size(),
- NiceLatencyNs(Stats.CleanUs.load() * 1000),
- NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000),
- NiceLatencyNs(Stats.VerifyScanUs.load() * 1000));
+ NiceTimeSpanUs(Stats.LoadMetadataUs.load()),
+ NiceTimeSpanUs(CreateDirsUs),
+ ThousandsNum(CreateDirsCount),
+ NiceNum(CreateDirsRate),
+ NiceTimeSpanUs(DownloadUs),
+ ThousandsNum(StandaloneFiles),
+ NiceBytes(StandaloneBytes),
+ ThousandsNum(PackDlFiles),
+ NiceBytes(PackDlBytes),
+ NiceNum(BitsPerSecond(StandaloneBytes + PackDlBytes, DownloadUs)),
+ ThousandsNum(DlReqCount),
+ NiceTimeSpanUs(SafeAvg(DlReqTotalUs, DlReqCount)),
+ NiceTimeSpanUs(DlReqMaxUs),
+ DlPeak,
+ NiceTimeSpanUs(QueueUs),
+ NiceTimeSpanUs(PackUnpackUs),
+ ThousandsNum(PackCount),
+ ThousandsNum(PackedFiles),
+ NiceBytes(UnpackWriteBytes),
+ NiceNum(BitsPerSecond(UnpackWriteBytes, PackUnpackUs)),
+ NiceTimeSpanUs(Stats.CleanUs.load()),
+ NiceTimeSpanUs(Stats.FinalizeUs.load()),
+ NiceTimeSpanUs(Stats.BuildStateUs.load()));
}
///////////////////////////////////////////////////////////////////////
+ // File-manifest entry: one of these per file in a module's state. Lives in
+ // the namespace (rather than nested in IncrementalHydrator) so the helper
+ // functions below can take it by reference.
+
+ struct Entry
+ {
+ std::string RelativePath;
+ uint64_t Size;
+ uint64_t ModTick;
+ IoHash Hash;
+ bool IsPacked = false; // true if content is part of a pack (PackHash valid)
+ // Hash of the pack's concatenated raw bytes (= pack's CAS key). Hydrate downloads
+ // the pack once and slices this entry out at the offset recorded in Pack.Entries[]
+ // for the matching Entry.Hash.
+ IoHash PackHash;
+ };
+
+ ///////////////////////////////////////////////////////////////////////
+ // Pack types: produced by dehydrate's pack phase, consumed by the state
+ // writer; consumed during hydrate to reconstruct slices.
+
+ struct BuiltPackEntry
+ {
+ IoHash Hash;
+ uint64_t Size;
+ };
+
+ struct BuiltPack
+ {
+ IoHash PackHash;
+ uint64_t Size;
+ std::vector<BuiltPackEntry> Entries;
+ };
+
+ struct PackEntryDescriptor
+ {
+ IoHash Hash;
+ uint64_t Size;
+ uint64_t Offset;
+ };
+
+ struct PackDescriptor
+ {
+ uint64_t Size = 0;
+ std::vector<PackEntryDescriptor> Entries;
+ };
+
+ using EntryGroup = std::vector<size_t>;
+ using PackPlan = std::vector<EntryGroup>;
+
+ ///////////////////////////////////////////////////////////////////////
// Holds a per-module StorageBase and threading context; drives the
// hydrate/dehydrate algorithm.
class IncrementalHydrator : public HydrationStrategyBase
{
public:
- IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage);
+ IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage, std::span<const std::string> Excludes);
virtual ~IncrementalHydrator() override;
virtual void Dehydrate(const CbObject& CachedState) override;
@@ -724,16 +1045,9 @@ namespace hydration_impl {
virtual void Obliterate() override;
private:
- struct Entry
- {
- std::filesystem::path RelativePath;
- uint64_t Size;
- uint64_t ModTick;
- IoHash Hash;
- };
-
std::unique_ptr<StorageBase> m_Storage;
HydrationConfig m_Config;
+ std::vector<std::string> m_Excludes;
WorkerThreadPool m_FallbackWorkPool;
std::atomic<bool> m_FallbackAbortFlag{false};
std::atomic<bool> m_FallbackPauseFlag{false};
@@ -743,11 +1057,657 @@ namespace hydration_impl {
};
///////////////////////////////////////////////////////////////////////
+ // Phase helpers used by IncrementalHydrator::Dehydrate and ::Hydrate.
+ // These keep the two big member functions readable as a sequence of
+ // named phases. Helpers take only the data they need and never the
+ // full HydrationConfig or IncrementalHydrator.
+
+ // Removes each path, ignoring errors. Used by both Dehydrate and Hydrate
+ // pack-cleanup guards plus the explicit pre-rename cleanup on Hydrate.
+ void RemoveStagedPackFiles(const std::vector<std::filesystem::path>& Files)
+ {
+ for (const std::filesystem::path& P : Files)
+ {
+ std::error_code Ec;
+ RemoveFile(P, Ec);
+ if (Ec)
+ {
+ ZEN_WARN("Failed to remove staged pack file '{}': {}", P, Ec.message());
+ }
+ }
+ }
+
+ // Collects parent_path() of each input, sorts lex ascending (= ancestor-first),
+ // uniques, then drops any entry that is a strict component-prefix of the next
+ // entry (its descendant's CreateDirectories recursion will create it). Calls
+ // CreateDirectories on the surviving leaves. Use before scheduling parallel
+ // writes so worker threads do not race to create the same parents.
+ size_t CreateParentDirectories(const std::vector<std::filesystem::path>& FilePaths)
+ {
+ if (FilePaths.empty())
+ {
+ return 0;
+ }
+
+ std::vector<std::filesystem::path> Dirs;
+ Dirs.reserve(FilePaths.size());
+ for (const std::filesystem::path& File : FilePaths)
+ {
+ if (File.has_parent_path())
+ {
+ Dirs.push_back(File.parent_path());
+ }
+ }
+ if (Dirs.empty())
+ {
+ return 0;
+ }
+
+ std::sort(Dirs.begin(), Dirs.end());
+ Dirs.erase(std::unique(Dirs.begin(), Dirs.end()), Dirs.end());
+
+ size_t Write = 0;
+ for (size_t Read = 0; Read < Dirs.size(); ++Read)
+ {
+ if (Read + 1 < Dirs.size())
+ {
+ const std::filesystem::path& Cur = Dirs[Read];
+ const std::filesystem::path& Next = Dirs[Read + 1];
+ const auto [ItCur, ItNext] = std::mismatch(Cur.begin(), Cur.end(), Next.begin(), Next.end());
+ if (ItCur == Cur.end() && ItNext != Next.end())
+ {
+ continue; // Cur is component-prefix of Next; descendant will create it
+ }
+ }
+ if (Write != Read)
+ {
+ Dirs[Write] = std::move(Dirs[Read]);
+ }
+ ++Write;
+ }
+ Dirs.resize(Write);
+
+ for (const std::filesystem::path& Dir : Dirs)
+ {
+ CreateDirectories(Dir);
+ }
+ return Dirs.size();
+ }
+
+ // Parses CachedState["Files"sv] into a path-keyed lookup + parallel Entries vector.
+ // Used by Dehydrate to seed its hash cache; Dehydrate ignores PackHash here.
+ void LoadCachedStateEntries(const CbObject& CachedState,
+ std::unordered_map<std::string, size_t>& OutLookup,
+ std::vector<Entry>& OutEntries)
+ {
+ for (CbFieldView FieldView : CachedState["Files"sv].AsArrayView())
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ std::string RelativePath(EntryView["Path"sv].AsString());
+ uint64_t Size = EntryView["Size"sv].AsUInt64();
+ uint64_t ModTick = EntryView["ModTick"sv].AsUInt64();
+ IoHash Hash = EntryView["Hash"sv].AsHash();
+
+ OutLookup.insert_or_assign(RelativePath, OutEntries.size());
+ OutEntries.push_back(Entry{.RelativePath = std::move(RelativePath), .Size = Size, .ModTick = ModTick, .Hash = Hash});
+ }
+ }
+
+ // Computes Out.Hash for a single file. For Oodle-compressed files in a `cas/` subdir
+ // the hash is a meta-hash combining the embedded RawHash with the file size, which
+ // avoids a collision between an uncompressed file and a same-content compressed file.
+ // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential
+ // read, friendlier to the Windows cache manager than mmap).
+ void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out)
+ {
+ if (AbsPath.extension().empty())
+ {
+ std::string_view Rel = Out.RelativePath;
+ std::string_view First = Rel.substr(0, Rel.find('/'));
+ if (First.ends_with("cas"))
+ {
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize);
+ if (Compressed)
+ {
+ IoHashStream Hasher;
+ Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
+ Hasher.Append(&Out.Size, sizeof(Out.Size));
+ Out.Hash = Hasher.GetHash();
+ return;
+ }
+ }
+ }
+
+ BasicFile File(AbsPath, BasicFile::Mode::kRead);
+ IoHashStream Hasher;
+ File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); });
+ Out.Hash = Hasher.GetHash();
+ }
+
+ // Walks DirContent, fills Entries[], schedules hash work for files whose hash
+ // is not in the StateEntries cache. The caller owns the ParallelWork's Wait()
+ // so other operations (e.g. Storage::List) can overlap with hashing. Files whose
+ // relative path matches any pattern in Excludes are dropped here (the hub-wide
+ // default list - see DefaultExcludes() above - covers transient runtime files
+ // like .lock and .sentry-native; the user can override via HydrationOptions).
+ // Returns the number of accepted (non-filtered) entries; OutTotalBytes accumulates
+ // their sizes.
+ size_t ScanAndScheduleHashWork(const DirectoryContent& DirContent,
+ const std::filesystem::path& ServerStateDir,
+ const std::unordered_map<std::string, size_t>& StateEntryLookup,
+ const std::vector<Entry>& StateEntries,
+ std::span<const std::string> Excludes,
+ std::vector<Entry>& Entries,
+ uint64_t& OutTotalBytes,
+ ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ PhaseStats& HashStats)
+ {
+ size_t TotalFiles = 0;
+ OutTotalBytes = 0;
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ std::string RelKey = RelativePath.generic_string();
+ if (IsExcluded(RelKey, Excludes))
+ {
+ continue;
+ }
+ const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
+
+ Entry& CurrentEntry = Entries[TotalFiles];
+ CurrentEntry.RelativePath = std::move(RelKey);
+ CurrentEntry.Size = DirContent.FileSizes[FileIndex];
+ CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
+
+ bool FoundHash = false;
+ if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath); KnownIt != StateEntryLookup.end())
+ {
+ const Entry& StateEntry = StateEntries[KnownIt->second];
+ if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
+ {
+ CurrentEntry.Hash = StateEntry.Hash;
+ FoundHash = true;
+ }
+ }
+
+ if (!FoundHash)
+ {
+ Work.ScheduleWork(Pool, [AbsPath, EntryIndex = TotalFiles, &Entries, &HashStats](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ Stopwatch Timer = HashStats.BeginRequest();
+ auto GuardEnd = MakeGuard([&] { HashStats.EndRequest(Timer.GetElapsedTimeUs()); });
+ Entry& CurrentEntry = Entries[EntryIndex];
+ HashFileContent(AbsPath, CurrentEntry);
+ HashStats.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
+ });
+ HashStats.Files.fetch_add(1, std::memory_order_relaxed);
+ HashStats.RecordScheduled();
+ }
+ TotalFiles++;
+ OutTotalBytes += CurrentEntry.Size;
+ }
+ return TotalFiles;
+ }
+
+ // Plans pack composition deterministically: groups Entries by content hash for
+ // candidates Size < Threshold, sorts groups by ascending IoHash, bin-packs greedily
+ // up to MaxPackBytes, and discards any pack with fewer than two entries. Sets
+ // `IsPacked = true` on every entry that survives into a published pack so the caller
+ // can immediately distinguish loose-CAS uploads from pack-bound uploads.
+ // Returns one PackPlan per pack to build (empty if no packs are produced).
+ std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes)
+ {
+ // 1. Group small-file Entries[] indices by content hash. Every index in a group
+ // shares the same bytes, so any one of them sources the pack content; all of
+ // them get tagged IsPacked once the pack hash is known.
+ std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap;
+ for (size_t Index = 0; Index < Entries.size(); ++Index)
+ {
+ if (Entries[Index].Size >= Threshold)
+ {
+ continue;
+ }
+ UniqueMap[Entries[Index].Hash].push_back(Index);
+ }
+
+ // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule.
+ if (UniqueMap.size() < 2)
+ {
+ return {};
+ }
+
+ auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; };
+ auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; };
+
+ // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move.
+ std::vector<EntryGroup> Ordered;
+ Ordered.reserve(UniqueMap.size());
+ for (auto& [h, g] : UniqueMap)
+ {
+ Ordered.push_back(std::move(g));
+ }
+ std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); });
+
+ // 3. Bin-pack greedily under MaxPackBytes.
+ std::vector<PackPlan> Plans;
+ PackPlan Current;
+ uint64_t CurrentSize = 0;
+ for (EntryGroup& Group : Ordered)
+ {
+ const uint64_t Size = GroupSize(Group);
+ if (Size >= MaxPackBytes)
+ {
+ continue; // fallback to standalone upload
+ }
+ if (CurrentSize + Size > MaxPackBytes && !Current.empty())
+ {
+ if (Current.size() >= 2)
+ {
+ Plans.push_back(std::move(Current));
+ }
+ Current = {};
+ CurrentSize = 0;
+ }
+ Current.push_back(std::move(Group));
+ CurrentSize += Size;
+ }
+ if (Current.size() >= 2)
+ {
+ Plans.push_back(std::move(Current));
+ }
+
+ // Tag entries that survived into a published pack so the loose-upload loop can skip
+ // them. Done after bin-packing so groups discarded by the <2-entry rule are not tagged.
+ for (const PackPlan& Plan : Plans)
+ {
+ for (const EntryGroup& Group : Plan)
+ {
+ for (size_t Idx : Group)
+ {
+ Entries[Idx].IsPacked = true;
+ }
+ }
+ }
+ return Plans;
+ }
+
+ // Reads each source file in Plan, hashes the concatenation, writes raw bytes to
+ // TempPath. Throws on size mismatch (message includes ModuleId for grep). Scratch
+ // is owned by the caller and reused across packs; size must be >= the largest
+ // candidate file (i.e. the pack threshold).
+ BuiltPack BuildPack(const PackPlan& Plan,
+ const std::vector<Entry>& Entries,
+ const std::filesystem::path& ServerStateDir,
+ const std::filesystem::path& TempPath,
+ std::string_view ModuleId,
+ std::vector<uint8_t>& Scratch)
+ {
+ BuiltPack BP;
+ BP.Entries.reserve(Plan.size());
+ IoHashStream Hasher;
+ uint64_t Offset = 0;
+ {
+ BasicFile PackFile(TempPath, BasicFile::Mode::kTruncate);
+ BasicFileWriter Writer(PackFile, /*BufferSize*/ 64 * 1024);
+ for (const EntryGroup& Group : Plan)
+ {
+ // Every Entries[idx] in a group shares the same content hash (= same bytes),
+ // so the first one is a fine source.
+ const Entry& Rep = Entries[Group.front()];
+ std::filesystem::path AbsPath = MakeSafeAbsolutePath(ServerStateDir / Rep.RelativePath);
+ BasicFile Src(AbsPath, BasicFile::Mode::kRead);
+ const uint64_t Size = Src.FileSize();
+ if (Size != Rep.Size || Size > Scratch.size())
+ {
+ throw zen::runtime_error("Pack entry for hash {} (module '{}'): expected {} bytes, file is {} at '{}'"sv,
+ Rep.Hash,
+ ModuleId,
+ Rep.Size,
+ Size,
+ AbsPath);
+ }
+ Src.Read(Scratch.data(), Size, 0);
+ Hasher.Append(Scratch.data(), Size);
+ Writer.Write(Scratch.data(), Size, Offset);
+ Offset += Size;
+ BP.Entries.push_back(BuiltPackEntry{.Hash = Rep.Hash, .Size = Rep.Size});
+ }
+ Writer.Flush();
+ }
+
+ BP.PackHash = Hasher.GetHash();
+ BP.Size = Offset;
+ return BP;
+ }
+
+ // Schedules either a Put (if Hash is not in CAS) or a Touch (if it is). Updates
+ // counters on the matching PhaseStats - Files++ in both cases, Bytes+=Size on the
+ // touch path so touched-bytes accounting tracks size-equivalent work that did not
+ // transfer. Both paths call RecordScheduled so the queue-wait line covers cache-warm
+ // dehydrates (only Touches scheduled). Used for both loose CAS (UploadStats=Stats.Upload,
+ // TouchStats=Stats.Touch) and pack blobs (UploadStats=Stats.PackUpload, TouchStats=
+ // Stats.PackTouch). UploadStats and TouchStats must be distinct PhaseStats so the upload-
+ // throughput metric is not inflated by touched bytes that did not transfer.
+ void ScheduleUploadOrTouch(StorageBase& Storage,
+ ParallelWork& Work,
+ WorkerThreadPool& Pool,
+ const std::unordered_set<IoHash, IoHash::Hasher>& ExistsLookup,
+ const IoHash& Hash,
+ uint64_t Size,
+ const std::filesystem::path& SourcePath,
+ PhaseStats& UploadStats,
+ PhaseStats& TouchStats)
+ {
+ if (ExistsLookup.contains(Hash))
+ {
+ // Refresh the backend's modification time so lifecycle-expiration policies
+ // do not evict CAS entries that are still referenced by this module.
+ Storage.Touch(Work, Pool, Hash, TouchStats);
+ TouchStats.Files.fetch_add(1, std::memory_order_relaxed);
+ TouchStats.Bytes.fetch_add(Size, std::memory_order_relaxed);
+ TouchStats.RecordScheduled();
+ }
+ else
+ {
+ Storage.Put(Work, Pool, Hash, Size, SourcePath, UploadStats);
+ UploadStats.Files.fetch_add(1, std::memory_order_relaxed);
+ UploadStats.RecordScheduled();
+ }
+ }
+
+ // Builds and saves the dehydrate state.cbo: header fields, optional Packs[] array,
+ // and the Files[] array. ModuleId is stored in the manifest.
+ void WriteDehydrateMetadata(StorageBase& Storage,
+ const std::filesystem::path& ServerStateDir,
+ std::string_view ModuleId,
+ uint64_t TotalBytes,
+ uint64_t DehydrateDurationMs,
+ const std::vector<BuiltPack>& BuiltPacks,
+ const std::vector<Entry>& Entries)
+ {
+ UtcTime Now = UtcTime::Now();
+ std::string DehydrateTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z"sv,
+ Now.Tm.tm_year + 1900,
+ Now.Tm.tm_mon + 1,
+ Now.Tm.tm_mday,
+ Now.Tm.tm_hour,
+ Now.Tm.tm_min,
+ Now.Tm.tm_sec,
+ Now.Ms);
+
+ CbObjectWriter Meta;
+ Meta << "SchemaVersion"sv << HydrationSchemaVersion;
+ Meta << "SourceFolder"sv << ServerStateDir.generic_string();
+ Meta << "ModuleId"sv << ModuleId;
+ Meta << "HostName"sv << GetMachineName();
+ Meta << "DehydrateTimeUtc"sv << DehydrateTimeUtc;
+ Meta << "DehydrateDurationMs"sv << DehydrateDurationMs;
+ Meta << "TotalSizeBytes"sv << TotalBytes;
+ Meta << "StorageSettings"sv << Storage.GetSettings();
+
+ if (!BuiltPacks.empty())
+ {
+ Meta.BeginArray("Packs"sv);
+ for (const BuiltPack& BP : BuiltPacks)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Hash"sv << BP.PackHash;
+ Meta << "Size"sv << BP.Size;
+ Meta.BeginArray("Entries"sv);
+ for (const BuiltPackEntry& BPE : BP.Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Hash"sv << BPE.Hash;
+ Meta << "Size"sv << BPE.Size;
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+ }
+
+ Meta.BeginArray("Files"sv);
+ for (const Entry& CurrentEntry : Entries)
+ {
+ Meta.BeginObject();
+ {
+ Meta << "Path"sv << CurrentEntry.RelativePath;
+ Meta << "Size"sv << CurrentEntry.Size;
+ Meta << "ModTick"sv << CurrentEntry.ModTick;
+ Meta << "Hash"sv << CurrentEntry.Hash;
+ if (CurrentEntry.IsPacked)
+ {
+ Meta << "PackHash"sv << CurrentEntry.PackHash;
+ }
+ }
+ Meta.EndObject();
+ }
+ Meta.EndArray();
+
+ Storage.SaveMetadata(Meta.Save());
+ }
+
+ // Parses Meta["Files"sv] into Entries[] + path lookup. Reads PackHash and sets
+ // IsPacked when present. Used by Hydrate.
+ void ParseFilesArray(const CbObject& Meta,
+ std::vector<Entry>& OutEntries,
+ std::unordered_map<std::string, size_t>& OutLookup,
+ uint64_t& OutTotalSize)
+ {
+ OutTotalSize = 0;
+ for (CbFieldView FieldView : Meta["Files"sv])
+ {
+ CbObjectView EntryView = FieldView.AsObjectView();
+ if (EntryView)
+ {
+ Entry NewEntry = {.RelativePath{EntryView["Path"sv].AsString()},
+ .Size = EntryView["Size"sv].AsUInt64(),
+ .ModTick = EntryView["ModTick"sv].AsUInt64(),
+ .Hash = EntryView["Hash"sv].AsHash()};
+ IoHash PackHash = EntryView["PackHash"sv].AsHash();
+ if (PackHash != IoHash::Zero)
+ {
+ NewEntry.IsPacked = true;
+ NewEntry.PackHash = PackHash;
+ }
+ OutTotalSize += NewEntry.Size;
+ OutLookup.insert_or_assign(NewEntry.RelativePath, OutEntries.size());
+ OutEntries.emplace_back(std::move(NewEntry));
+ }
+ }
+ }
+
+ // Parses Meta["Packs"sv] into a hash-keyed descriptor map. Each PackDescriptor's
+ // Entries[] gets a prefix-sum offset for O(1) slice lookup at unpack time.
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> ParsePacksArray(const CbObject& Meta)
+ {
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap;
+ for (CbFieldView FieldView : Meta["Packs"sv])
+ {
+ CbObjectView PackView = FieldView.AsObjectView();
+ if (!PackView)
+ {
+ continue;
+ }
+ IoHash PackHash = PackView["Hash"sv].AsHash();
+ PackDescriptor PD;
+ PD.Size = PackView["Size"sv].AsUInt64();
+ uint64_t Offset = 0;
+ for (CbFieldView EF : PackView["Entries"sv])
+ {
+ CbObjectView EV = EF.AsObjectView();
+ if (!EV)
+ {
+ continue;
+ }
+ PackEntryDescriptor E{.Hash = EV["Hash"sv].AsHash(), .Size = EV["Size"sv].AsUInt64(), .Offset = Offset};
+ Offset += E.Size;
+ PD.Entries.push_back(E);
+ }
+ PackMap.emplace(PackHash, std::move(PD));
+ }
+ return PackMap;
+ }
+
+ // For each downloaded pack: read it into a heap buffer, verify size, and slice
+ // into per-entry IoBuffers (zero-copy views). Throws on size mismatch with the
+ // existing message format.
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> BuildHashToSlice(
+ const std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher>& PackMap,
+ const std::filesystem::path& TempDir,
+ std::string_view ModuleId)
+ {
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice;
+ size_t TotalPackEntries = 0;
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ TotalPackEntries += PD.Entries.size();
+ }
+ HashToSlice.reserve(TotalPackEntries);
+
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ std::filesystem::path PackPath = TempDir / "packs" / fmt::format("{}.bin"sv, PackHash);
+ // Heap-allocated buffer via direct ReadFile avoids mmap materialization
+ // and page-fault latency during the parallel unpack-write that follows.
+ BasicFile PackFile(PackPath, BasicFile::Mode::kRead);
+ IoBuffer PackBuf = PackFile.ReadAll();
+ if (PackBuf.GetSize() != PD.Size)
+ {
+ throw zen::runtime_error("Pack '{}' size mismatch for module '{}' at '{}': expected {}, got {}"sv,
+ PackHash,
+ ModuleId,
+ PackPath,
+ PD.Size,
+ PackBuf.GetSize());
+ }
+
+ for (const auto& E : PD.Entries)
+ {
+ HashToSlice.emplace(E.Hash, IoBuffer(PackBuf, E.Offset, E.Size));
+ }
+ }
+ return HashToSlice;
+ }
+
+ // Migrates contents of SourceDir into ServerStateDir. Same-volume: top-level rename
+ // per child. Different-volume: full CopyTree fallback. Caller is responsible for
+ // final cleanup of the parent temp directory (which may hold sibling staging dirs
+ // like packs/ that must NOT migrate).
+ void MigrateTempToState(const std::filesystem::path& SourceDir,
+ const std::filesystem::path& ServerStateDir,
+ const HydrationConfig::ThreadingOptions& Threading)
+ {
+ // If the two paths share at least one common component they are on the same drive/volume
+ // and atomic renames will succeed. Otherwise fall back to a full copy.
+ auto [ItSrc, ItState] = std::mismatch(SourceDir.begin(), SourceDir.end(), ServerStateDir.begin(), ServerStateDir.end());
+ if (ItSrc != SourceDir.begin())
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*Threading.WorkerPool,
+ SourceDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
+ DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'"sv, AbsPath, Dest));
+ }
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
+ std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'"sv, AbsPath, Dest));
+ }
+ }
+ }
+ else
+ {
+ // Slow path: source and target are on different filesystems, so rename
+ // would fail. Copy the tree instead.
+ ZEN_DEBUG("SourceDir and ServerStateDir are on different filesystems - using CopyTree");
+ CopyTree(SourceDir, ServerStateDir, {.EnableClone = true});
+ }
+ }
+
+ // Walks ServerStateDir and emits a Files[] cache for the next dehydrate's
+ // hash-shortcut (mirrors Load state on dehydrate). Files on disk that aren't in
+ // EntryLookup (manifest) are skipped with a WARN - typically leftovers from an
+ // earlier crashed hydrate.
+ CbObject BuildHydrateState(const std::filesystem::path& ServerStateDir,
+ const std::unordered_map<std::string, size_t>& EntryLookup,
+ const std::vector<Entry>& Entries,
+ std::string_view ModuleId,
+ const HydrationConfig::ThreadingOptions& Threading)
+ {
+ DirectoryContent DirContent;
+ GetDirectoryContent(*Threading.WorkerPool,
+ ServerStateDir,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
+ DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
+ DirContent);
+
+ CbObjectWriter HydrateState;
+ HydrateState.BeginArray("Files"sv);
+ for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
+ {
+ std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
+ std::string RelKey = RelativePath.generic_string();
+
+ if (auto It = EntryLookup.find(RelKey); It != EntryLookup.end())
+ {
+ HydrateState.BeginObject();
+ {
+ HydrateState << "Path"sv << RelKey;
+ HydrateState << "Size"sv << DirContent.FileSizes[FileIndex];
+ HydrateState << "ModTick"sv << DirContent.FileModificationTicks[FileIndex];
+ HydrateState << "Hash"sv << Entries[It->second].Hash;
+ }
+ HydrateState.EndObject();
+ }
+ else
+ {
+ // File on disk after hydrate but not in the manifest. Can happen when TempDir
+ // contained leftovers from a prior crashed hydrate that survived to the rename
+ // phase. Skip it rather than failing - the manifest is the source of truth for
+ // the cached state; the stray file is harmless and gets caught by the next
+ // dehydrate's directory scan.
+ ZEN_WARN("Hydrate: file '{}' present on disk but missing from manifest for module '{}'; skipping", RelKey, ModuleId);
+ }
+ }
+ HydrateState.EndArray();
+
+ return HydrateState.Save();
+ }
+
+ ///////////////////////////////////////////////////////////////////////
// IncrementalHydrator implementations
- IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage)
+ IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config,
+ std::unique_ptr<StorageBase> Storage,
+ std::span<const std::string> Excludes)
: m_Storage(std::move(Storage))
, m_Config(Config)
+ , m_Excludes(Excludes.begin(), Excludes.end())
, m_FallbackWorkPool(0)
{
if (Config.Threading)
@@ -760,6 +1720,7 @@ namespace hydration_impl {
void IncrementalHydrator::Dehydrate(const CbObject& CachedState)
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate");
Stopwatch TotalTimer;
DehydrateStatistics Stats;
const std::string StorageTarget = m_Storage->Describe();
@@ -767,24 +1728,17 @@ namespace hydration_impl {
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
try
{
+ // Load the cache from the previous dehydrate to short-circuit re-hashing of
+ // unchanged files (matched by Path+Size+ModTick).
std::unordered_map<std::string, size_t> StateEntryLookup;
std::vector<Entry> StateEntries;
{
Stopwatch LoadStateTimer;
- for (CbFieldView FieldView : CachedState["Files"].AsArrayView())
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- std::filesystem::path RelativePath(EntryView["Path"].AsString());
- uint64_t Size = EntryView["Size"].AsUInt64();
- uint64_t ModTick = EntryView["ModTick"].AsUInt64();
- IoHash Hash = EntryView["Hash"].AsHash();
-
- StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size());
- StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash});
- }
+ LoadCachedStateEntries(CachedState, StateEntryLookup, StateEntries);
Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs();
}
+ // Scan the server state directory.
DirectoryContent DirContent;
{
Stopwatch DirScanTimer;
@@ -802,101 +1756,29 @@ namespace hydration_impl {
DirContent.Files.size(),
NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0))));
+ // Hash phase: build Entries[] and schedule hash work for files not in the cache.
+ // Storage::List runs in parallel with hashing to populate ExistsLookup before Wait.
std::vector<Entry> Entries;
Entries.resize(DirContent.Files.size());
- uint64_t TotalBytes = 0;
- uint64_t TotalFiles = 0;
-
- std::unordered_set<IoHash> ExistsLookup;
-
+ uint64_t TotalBytes = 0;
+ uint64_t TotalFiles = 0;
+ std::unordered_set<IoHash, IoHash::Hasher> ExistsLookup;
{
+ Stats.Hash.PhaseClock.Reset();
Stopwatch HashTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]);
- if (AbsPath.filename() == "reserve.gc")
- {
- continue;
- }
- const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
- if (*RelativePath.begin() == ".sentry-native")
- {
- continue;
- }
- if (RelativePath == ".lock")
- {
- continue;
- }
-
- Entry& CurrentEntry = Entries[TotalFiles];
- CurrentEntry.RelativePath = RelativePath;
- CurrentEntry.Size = DirContent.FileSizes[FileIndex];
- CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex];
-
- bool FoundHash = false;
- if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end())
- {
- const Entry& StateEntry = StateEntries[KnownIt->second];
- if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick)
- {
- CurrentEntry.Hash = StateEntry.Hash;
- FoundHash = true;
- }
- }
-
- if (!FoundHash)
- {
- Work.ScheduleWork(*m_Threading.WorkerPool,
- [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) {
- Stats.Hash.RecordThread();
- if (AbortFlag)
- {
- return;
- }
-
- Entry& CurrentEntry = Entries[EntryIndex];
-
- bool FoundHash = false;
- if (AbsPath.extension().empty())
- {
- auto It = CurrentEntry.RelativePath.begin();
- if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas"))
- {
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer Compressed = CompressedBuffer::FromCompressed(
- SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)),
- RawHash,
- RawSize);
- if (Compressed)
- {
- // We compose a meta-hash since taking the RawHash might collide with an
- // existing non-compressed file with the same content The collision is
- // unlikely except if the compressed data is zero bytes causing RawHash
- // to be the same as an empty file.
- IoHashStream Hasher;
- Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash));
- Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size));
- CurrentEntry.Hash = Hasher.GetHash();
- FoundHash = true;
- }
- }
- }
-
- if (!FoundHash)
- {
- CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath));
- }
- Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
- });
- Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed);
- }
- TotalFiles++;
- TotalBytes += CurrentEntry.Size;
- }
+ TotalFiles = ScanAndScheduleHashWork(DirContent,
+ ServerStateDir,
+ StateEntryLookup,
+ StateEntries,
+ m_Excludes,
+ Entries,
+ TotalBytes,
+ Work,
+ *m_Threading.WorkerPool,
+ Stats.Hash);
{
Stopwatch ListTimer;
@@ -906,82 +1788,136 @@ namespace hydration_impl {
}
Work.Wait();
-
Entries.resize(TotalFiles);
Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs();
Stats.TotalFiles = TotalFiles;
Stats.TotalBytes = TotalBytes;
}
- uint64_t UploadDurationMs = 0;
+ // Pack planning + unified upload phase. Plan first so we know which entries are
+ // packed, then run loose-CAS uploads and pack builds inside a single ParallelWork.
+ // Loose uploads are scheduled up front so they execute on the worker pool while
+ // the calling thread runs the serial pack-build loop; each completed pack hands
+ // its upload to the same ParallelWork. One Wait covers everything.
+ std::vector<BuiltPack> BuiltPacks;
+ std::vector<std::filesystem::path> StagedPackFiles;
+ auto PackCleanup = MakeGuard([&] {
+ RemoveStagedPackFiles(StagedPackFiles);
+ // Best-effort drop of the now-empty packs/ subdir so TempDir is clean after
+ // dehydrate. Mirrors the explicit cleanup on the hydrate-side success path.
+ std::error_code Ec;
+ DeleteDirectories(MakeSafeAbsolutePath(m_Config.TempDir) / "packs", Ec);
+ });
+
+ // PlanPacks tags Entries[Idx].IsPacked on every index that survives into a pack,
+ // so the loose-upload loop can skip them. PackHash is set later per-pack as each
+ // pack is built.
+ const std::vector<PackPlan> Pending =
+ m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{};
+
+ uint64_t DehydrateDurationMs = 0;
{
+ // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all
+ // four PhaseClocks to the same baseline so the queue-wait line can combine
+ // their FirstScheduleUs / FirstStartUs across the four PhaseStats.
+ Stats.Upload.PhaseClock.Reset();
+ Stats.PackUpload.PhaseClock.Reset();
+ Stats.Touch.PhaseClock.Reset();
+ Stats.PackTouch.PhaseClock.Reset();
Stopwatch UploadTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ // Schedule loose-CAS uploads first so they begin running while the pack-build
+ // loop below executes serially on this thread.
for (const Entry& CurrentEntry : Entries)
{
- if (!ExistsLookup.contains(CurrentEntry.Hash))
- {
- m_Storage->Put(Work,
- *m_Threading.WorkerPool,
- CurrentEntry.Hash,
- CurrentEntry.Size,
- MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
- Stats.Upload);
- Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed);
- }
- else
+ if (CurrentEntry.IsPacked)
{
- // Refresh the backend's modification time so lifecycle-expiration policies
- // do not evict CAS entries that are still referenced by this module.
- m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch);
- Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed);
- Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed);
+ continue; // pack phase covers it
}
+ ScheduleUploadOrTouch(*m_Storage,
+ Work,
+ *m_Threading.WorkerPool,
+ ExistsLookup,
+ CurrentEntry.Hash,
+ CurrentEntry.Size,
+ MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath),
+ Stats.Upload,
+ Stats.Touch);
}
- Work.Wait();
- Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs();
- UploadDurationMs = TotalTimer.GetElapsedTimeMs();
-
- Stopwatch MetadataTimer;
- UtcTime Now = UtcTime::Now();
- std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
- Now.Tm.tm_year + 1900,
- Now.Tm.tm_mon + 1,
- Now.Tm.tm_mday,
- Now.Tm.tm_hour,
- Now.Tm.tm_min,
- Now.Tm.tm_sec,
- Now.Ms);
-
- CbObjectWriter Meta;
- Meta << "SourceFolder" << ServerStateDir.generic_string();
- Meta << "ModuleId" << m_Config.ModuleId;
- Meta << "HostName" << GetMachineName();
- Meta << "UploadTimeUtc" << UploadTimeUtc;
- Meta << "UploadDurationMs" << UploadDurationMs;
- Meta << "TotalSizeBytes" << TotalBytes;
- Meta << "StorageSettings" << m_Storage->GetSettings();
-
- Meta.BeginArray("Files");
- for (const Entry& CurrentEntry : Entries)
+ if (!Pending.empty())
{
- Meta.BeginObject();
+ ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate::Pack");
+ std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ std::filesystem::path PacksDir = TempDir / "packs";
+ CreateDirectories(PacksDir);
+
+ // Reusable scratch for small-file reads. Every pack candidate has Size <
+ // PackThresholdBytes so a single buffer of that size holds any one file.
+ // Build runs serially on the caller's thread - typical modules produce 1-2
+ // packs at ~5 ms each, too small to be worth the parallel-dispatch overhead.
+ std::vector<uint8_t> Scratch(m_Config.PackThresholdBytes);
+
+ for (const PackPlan& Plan : Pending)
{
- Meta << "Path" << CurrentEntry.RelativePath.generic_string();
- Meta << "Size" << CurrentEntry.Size;
- Meta << "ModTick" << CurrentEntry.ModTick;
- Meta << "Hash" << CurrentEntry.Hash;
+ // Pre-register the staging path so PackCleanup removes it even if the
+ // stream-write loop below throws mid-flight.
+ Oid::String_t OidStr;
+ Oid::NewOid().ToString(OidStr);
+ std::filesystem::path PackTempPath = PacksDir / fmt::format("{}.bin"sv, OidStr);
+ StagedPackFiles.push_back(PackTempPath);
+
+ Stopwatch BuildTimer;
+ BuiltPack BP = BuildPack(Plan, Entries, ServerStateDir, PackTempPath, m_Config.ModuleId, Scratch);
+ Stats.PackBuildUs.fetch_add(BuildTimer.GetElapsedTimeUs(), std::memory_order_relaxed);
+
+ // Stamp the pack hash on every matching entry; state.cbo's Files[] reads
+ // PackHash off these entries when emitting per-file PackHash references.
+ uint64_t PackedEntryCount = 0;
+ for (const EntryGroup& Group : Plan)
+ {
+ for (size_t Idx : Group)
+ {
+ Entries[Idx].PackHash = BP.PackHash;
+ }
+ PackedEntryCount += Group.size();
+ }
+
+ Stats.PackCount.fetch_add(1, std::memory_order_relaxed);
+ Stats.PackedFiles.fetch_add(PackedEntryCount, std::memory_order_relaxed);
+ Stats.PackBytes.fetch_add(BP.Size, std::memory_order_relaxed);
+
+ ScheduleUploadOrTouch(*m_Storage,
+ Work,
+ *m_Threading.WorkerPool,
+ ExistsLookup,
+ BP.PackHash,
+ BP.Size,
+ PackTempPath,
+ Stats.PackUpload,
+ Stats.PackTouch);
+
+ BuiltPacks.push_back(std::move(BP));
}
- Meta.EndObject();
}
- Meta.EndArray();
- m_Storage->SaveMetadata(Meta.Save());
- Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs();
+ Work.Wait();
+ // Upload, PackUpload, Touch, and PackTouch share a single ParallelWork. Only
+ // Upload's ElapsedUs is read by the formatter; the others' bytes/requests are
+ // reported against the same Upload phase elapsed.
+ Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs();
+ DehydrateDurationMs = TotalTimer.GetElapsedTimeMs();
}
+ // Persist the new state.cbo with header, Packs[], and Files[].
+ {
+ Stopwatch SaveMetadataTimer;
+ WriteDehydrateMetadata(*m_Storage, ServerStateDir, m_Config.ModuleId, TotalBytes, DehydrateDurationMs, BuiltPacks, Entries);
+ Stats.SaveMetadataUs = SaveMetadataTimer.GetElapsedTimeUs();
+ }
+
+ // Server-state dir contents have been uploaded; wipe them.
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
{
Stopwatch CleanTimer;
@@ -990,7 +1926,7 @@ namespace hydration_impl {
}
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
+ LogDehydrateSummary("Dehydration complete"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
catch (const std::exception& Ex)
{
@@ -999,20 +1935,35 @@ namespace hydration_impl {
Ex.what(),
m_Config.ServerStateDir);
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
+ LogDehydrateSummary("Dehydration failed"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget);
}
}
CbObject IncrementalHydrator::Hydrate()
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Hydrate");
Stopwatch TotalTimer;
HydrateStatistics Stats;
const std::string StorageSource = m_Storage->Describe();
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
+ // Hydrated files land in TempDir/state/, pack staging blobs in TempDir/packs/. Keeping
+ // them in sibling subdirectories means MigrateTempToState only needs to hand the state/
+ // subtree across to ServerStateDir; pack staging never has a chance to leak into the
+ // final server state directory.
+ const std::filesystem::path TempStateDir = TempDir / "state";
try
{
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(TempDir);
+ // A prior hydrate may have crashed after downloading but before the rename phase,
+ // leaving stale files in TempDir that would otherwise get migrated into
+ // ServerStateDir and trip the post-rename manifest check.
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ CreateDirectories(TempStateDir);
+
+ // Load metadata; absent metadata means a fresh module - clean state, return.
CbObject Meta;
{
Stopwatch LoadTimer;
@@ -1028,147 +1979,165 @@ namespace hydration_impl {
return CbObject();
}
+ // Schema-version gate: refuse manifests written by a newer hub. Missing field is
+ // treated as version 0 (legacy / pre-versioning) and decoded best-effort - the
+ // optional fields (Packs[], StorageSettings) absent from v0 manifests fall back
+ // to defaults via ParsePacksArray / ParseSettings.
+ const uint32_t SchemaVersion = Meta["SchemaVersion"sv].AsUInt32(0);
+ if (SchemaVersion > HydrationSchemaVersion)
+ {
+ throw zen::runtime_error("State manifest for module '{}' has schema version {} but this hub supports up to {}"sv,
+ m_Config.ModuleId,
+ SchemaVersion,
+ HydrationSchemaVersion);
+ }
+
+ // Parse manifest: Files[] for per-file metadata, Packs[] (optional) for pack
+ // composition. Missing Packs[] = old-format state; treated as all-standalone.
std::unordered_map<std::string, size_t> EntryLookup;
std::vector<Entry> Entries;
uint64_t TotalSize = 0;
-
- for (CbFieldView FieldView : Meta["Files"])
- {
- CbObjectView EntryView = FieldView.AsObjectView();
- if (EntryView)
- {
- Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()),
- .Size = EntryView["Size"].AsUInt64(),
- .ModTick = EntryView["ModTick"].AsUInt64(),
- .Hash = EntryView["Hash"].AsHash()};
- TotalSize += NewEntry.Size;
- EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size());
- Entries.emplace_back(std::move(NewEntry));
- }
- }
+ ParseFilesArray(Meta, Entries, EntryLookup, TotalSize);
+ std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap = ParsePacksArray(Meta);
Stats.TotalFiles = Entries.size();
Stats.TotalBytes = TotalSize;
+ Stats.PackCount = PackMap.size();
- ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files",
+ ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files, {} packs",
m_Config.ModuleId,
m_Config.ServerStateDir,
Entries.size(),
- NiceBytes(TotalSize));
-
- m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView());
+ NiceBytes(TotalSize),
+ PackMap.size());
+
+ // Re-apply storage settings from state.cbo (e.g. S3 multipart chunk size).
+ m_Storage->ParseSettings(Meta["StorageSettings"sv].AsObjectView());
+
+ // Per-entry destination paths under TempStateDir, indexed parallel to Entries[]. Used
+ // by the standalone download dispatch and (for IsPacked entries) by the unpack
+ // dispatch. Pre-creating parents once for the union covers both phases without a second pass.
+ std::vector<std::filesystem::path> EntryPaths;
+ EntryPaths.reserve(Entries.size());
+ for (const Entry& CurrentEntry : Entries)
+ {
+ EntryPaths.push_back(MakeSafeAbsolutePath(TempStateDir / CurrentEntry.RelativePath));
+ }
+ {
+ Stopwatch CreateDirsTimer;
+ auto RecordElapsed = MakeGuard([&] { Stats.CreateDirsUs = CreateDirsTimer.GetElapsedTimeUs(); });
+ Stats.CreateDirsCount = CreateParentDirectories(EntryPaths);
+ }
+ // Download phase: pack GETs first (so unpack can begin sooner), then standalone files.
+ // Both share the same ParallelWork; per-phase byte / request counts stay separate via
+ // PackDownload vs Download stats while the elapsed time is reported once.
{
+ // Download and PackDownload share one ParallelWork; reset both PhaseClocks
+ // to the same baseline so the queue-wait line can combine their FirstScheduleUs
+ // / FirstStartUs across the two PhaseStats.
+ Stats.Download.PhaseClock.Reset();
+ Stats.PackDownload.PhaseClock.Reset();
Stopwatch DownloadTimer;
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
- for (const Entry& CurrentEntry : Entries)
+ const std::filesystem::path PacksDir = TempDir / "packs";
+ if (!PackMap.empty())
+ {
+ CreateDirectories(PacksDir);
+ }
+
+ for (const auto& [PackHash, PD] : PackMap)
+ {
+ std::filesystem::path PackPath = PacksDir / fmt::format("{}.bin"sv, PackHash);
+ m_Storage->Get(Work, *m_Threading.WorkerPool, PackHash, PD.Size, PackPath, Stats.PackDownload);
+ Stats.PackDownload.Files.fetch_add(1, std::memory_order_relaxed);
+ Stats.PackDownload.RecordScheduled();
+ }
+
+ for (size_t I = 0; I < Entries.size(); ++I)
{
- std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath);
- CreateDirectories(Path.parent_path());
- m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download);
+ if (Entries[I].IsPacked)
+ {
+ continue; // handled in the unpack phase below
+ }
+ m_Storage->Get(Work, *m_Threading.WorkerPool, Entries[I].Hash, Entries[I].Size, EntryPaths[I], Stats.Download);
Stats.Download.Files.fetch_add(1, std::memory_order_relaxed);
+ Stats.Download.RecordScheduled();
}
Work.Wait();
Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs();
}
- // Downloaded successfully - swap into ServerStateDir
- ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
+ // Unpack phase: verify each downloaded pack, build hash->slice map, parallel-write.
+ if (!PackMap.empty())
{
- Stopwatch CleanTimer;
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
- Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
- }
+ ZEN_TRACE_CPU("IncrementalHydrator::Hydrate::Unpack");
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice = BuildHashToSlice(PackMap, TempDir, m_Config.ModuleId);
- {
- Stopwatch RenameTimer;
- // If the two paths share at least one common component they are on the same drive/volume
- // and atomic renames will succeed. Otherwise fall back to a full copy.
- auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end());
- if (ItTmp != TempDir.begin())
{
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- TempDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs,
- DirContent);
-
- for (const std::filesystem::path& AbsPath : DirContent.Directories)
+ Stopwatch UnpackTimer;
+ ParallelWork UnpackWork(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+ for (size_t I = 0; I < Entries.size(); ++I)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest);
- if (Ec)
+ if (!Entries[I].IsPacked)
{
- throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest));
+ continue;
}
- }
- for (const std::filesystem::path& AbsPath : DirContent.Files)
- {
- std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename());
- std::error_code Ec = RenameFileWithRetry(AbsPath, Dest);
- if (Ec)
+ auto It = HashToSlice.find(Entries[I].Hash);
+ if (It == HashToSlice.end())
{
- throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest));
+ throw zen::runtime_error("Packed file '{}' references unknown pack content hash '{}' in module '{}'"sv,
+ Entries[I].RelativePath,
+ Entries[I].Hash,
+ m_Config.ModuleId);
}
+ UnpackWork.ScheduleWork(*m_Threading.WorkerPool,
+ [&Stats, &Path = EntryPaths[I], Slice = &It->second](std::atomic<bool>& AbortFlag) {
+ if (AbortFlag.load())
+ {
+ return;
+ }
+ TemporaryFile::SafeWriteFile(Path, *Slice);
+ Stats.UnpackWriteBytes.fetch_add(Slice->GetSize(), std::memory_order_relaxed);
+ Stats.PackedFiles.fetch_add(1, std::memory_order_relaxed);
+ });
}
-
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ UnpackWork.Wait();
+ Stats.PackUnpackUs = UnpackTimer.GetElapsedTimeUs();
}
- else
- {
- // Slow path: TempDir and ServerStateDir are on different filesystems, so rename
- // would fail. Copy the tree instead and clean up the temp files afterwards.
- ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree");
- CopyTree(TempDir, ServerStateDir, {.EnableClone = true});
- ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
- CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
- }
- Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs();
+ // Release the pack buffers (each IoBuffer slice holds a ref to the pack's underlying
+ // heap buffer) before the rename/verify phase runs - avoids keeping ~sum(pack sizes)
+ // bytes alive across those phases.
+ HashToSlice.clear();
}
- CbObject StateObject;
+ // Downloaded successfully - swap TempStateDir contents into ServerStateDir, then
+ // sweep the rest of TempDir (empty TempStateDir, packs/, anything else).
+ ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
{
- Stopwatch VerifyTimer;
- DirectoryContent DirContent;
- GetDirectoryContent(*m_Threading.WorkerPool,
- ServerStateDir,
- DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive |
- DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick,
- DirContent);
-
- CbObjectWriter HydrateState;
- HydrateState.BeginArray("Files");
- for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++)
- {
- std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]);
-
- if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end())
- {
- HydrateState.BeginObject();
- {
- HydrateState << "Path" << RelativePath.generic_string();
- HydrateState << "Size" << DirContent.FileSizes[FileIndex];
- HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex];
- HydrateState << "Hash" << Entries[It->second].Hash;
- }
- HydrateState.EndObject();
- }
- else
- {
- ZEN_ASSERT(false);
- }
- }
- HydrateState.EndArray();
+ Stopwatch CleanTimer;
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
+ Stats.CleanUs = CleanTimer.GetElapsedTimeUs();
+ }
+ {
+ Stopwatch FinalizeTimer;
+ MigrateTempToState(TempStateDir, ServerStateDir, m_Threading);
+ CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
+ Stats.FinalizeUs = FinalizeTimer.GetElapsedTimeUs();
+ }
- StateObject = HydrateState.Save();
- Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs();
+ // Build the cached state that the next Dehydrate will receive (mirrors Load state on dehydrate).
+ CbObject StateObject;
+ {
+ Stopwatch BuildStateTimer;
+ StateObject = BuildHydrateState(ServerStateDir, EntryLookup, Entries, m_Config.ModuleId, m_Threading);
+ Stats.BuildStateUs = BuildStateTimer.GetElapsedTimeUs();
}
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
+ LogHydrateSummary("Hydration complete"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return StateObject;
}
@@ -1182,25 +2151,42 @@ namespace hydration_impl {
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir);
Stats.TotalUs = TotalTimer.GetElapsedTimeUs();
- LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
+ LogHydrateSummary("Hydration failed"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir);
return {};
}
}
void IncrementalHydrator::Obliterate()
{
+ ZEN_TRACE_CPU("IncrementalHydrator::Obliterate");
const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir);
const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir);
- try
- {
+ auto TryDeleteBackend = [&]() {
ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
m_Storage->Delete(Work, *m_Threading.WorkerPool);
Work.Wait();
+ };
+
+ try
+ {
+ TryDeleteBackend();
}
catch (const std::exception& Ex)
{
- ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what());
+ ZEN_WARN("Obliterate backend delete failed for module '{}' (attempt 1/2): {}. Retrying once.", m_Config.ModuleId, Ex.what());
+ try
+ {
+ TryDeleteBackend();
+ }
+ catch (const std::exception& Ex2)
+ {
+ ZEN_WARN(
+ "Obliterate backend delete failed for module '{}' (attempt 2/2): {}. Proceeding with local cleanup; backend data may "
+ "remain.",
+ m_Config.ModuleId,
+ Ex2.what());
+ }
}
CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir);
@@ -1243,26 +2229,33 @@ private:
uint64_t m_DefaultMultipartChunkSize;
};
+HydrationBase::HydrationBase(const Configuration& Config)
+{
+ using namespace hydration_impl;
+ CbFieldView ExcludesField = Config.Options["excludes"sv];
+ m_Excludes = ExcludesField.HasValue() ? ParseStringArray(ExcludesField) : DefaultExcludes();
+}
+
///////////////////////////////////////////////////////////////////////////
// Implementations
-FileHydration::FileHydration(const Configuration& Config)
+FileHydration::FileHydration(const Configuration& Config) : HydrationBase(Config)
{
if (!Config.TargetSpecification.empty())
{
m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length()));
if (m_StorageRoot.empty())
{
- throw zen::runtime_error("Hydration config 'file' type requires a directory path");
+ throw zen::runtime_error("Hydration config 'file' type requires a directory path"sv);
}
}
else
{
- CbObjectView Settings = Config.Options["settings"].AsObjectView();
- std::string_view Path = Settings["path"].AsString();
+ CbObjectView Settings = Config.Options["settings"sv].AsObjectView();
+ std::string_view Path = Settings["path"sv].AsString();
if (Path.empty())
{
- throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"sv);
}
m_StorageRoot = Utf8ToWide(std::string(Path));
}
@@ -1273,14 +2266,14 @@ std::unique_ptr<HydrationStrategyBase>
FileHydration::CreateHydrator(const HydrationConfig& Config)
{
using namespace hydration_impl;
- return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId));
+ return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId), m_Excludes);
}
-S3Hydration::S3Hydration(const Configuration& Config)
+S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config)
{
using namespace hydration_impl;
- CbObjectView Settings = Config.Options["settings"].AsObjectView();
+ CbObjectView Settings = Config.Options["settings"sv].AsObjectView();
std::string_view Spec;
if (!Config.TargetSpecification.empty())
{
@@ -1289,10 +2282,10 @@ S3Hydration::S3Hydration(const Configuration& Config)
}
else
{
- std::string_view Uri = Settings["uri"].AsString();
+ std::string_view Uri = Settings["uri"sv].AsString();
if (Uri.empty())
{
- throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'");
+ throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"sv);
}
Spec = Uri;
Spec.remove_prefix(S3Storage::Prefix.size());
@@ -1304,10 +2297,10 @@ S3Hydration::S3Hydration(const Configuration& Config)
if (m_Bucket.empty())
{
- throw zen::runtime_error("Incremental S3 hydration config requires a bucket name");
+ throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"sv);
}
- std::string Region = std::string(Settings["region"].AsString());
+ std::string Region = std::string(Settings["region"sv].AsString());
if (Region.empty())
{
Region = GetEnvVariable("AWS_DEFAULT_REGION");
@@ -1322,11 +2315,11 @@ S3Hydration::S3Hydration(const Configuration& Config)
}
m_Region = std::move(Region);
- std::string_view Endpoint = Settings["endpoint"].AsString();
+ std::string_view Endpoint = Settings["endpoint"sv].AsString();
if (!Endpoint.empty())
{
m_Endpoint = std::string(Endpoint);
- m_PathStyle = Settings["path-style"].AsBool();
+ m_PathStyle = Settings["path-style"sv].AsBool();
}
std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID");
@@ -1341,7 +2334,7 @@ S3Hydration::S3Hydration(const Configuration& Config)
m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN");
}
- m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize);
+ m_DefaultMultipartChunkSize = Settings["chunksize"sv].AsUInt64(DefaultMultipartChunkSize);
S3ClientOptions ClientOptions;
ClientOptions.BucketName = m_Bucket;
@@ -1357,6 +2350,11 @@ S3Hydration::S3Hydration(const Configuration& Config)
ClientOptions.Credentials = m_Credentials;
}
ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+ // Retry transient HTTP failures (429 throttle, 503 SlowDown, 5xx, connection errors) at the
+ // HTTP layer. CurlHttpClient::DoWithRetry uses 100*(Attempt+1) ms linear backoff between
+ // attempts. Three retries covers brief S3 rate-limit bursts without holding worker threads
+ // for long under sustained throttle.
+ ClientOptions.HttpSettings.RetryCount = 3;
m_Client = std::make_unique<S3Client>(ClientOptions);
}
@@ -1365,10 +2363,12 @@ std::unique_ptr<HydrationStrategyBase>
S3Hydration::CreateHydrator(const HydrationConfig& Config)
{
using namespace hydration_impl;
- std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId);
+ std::string KeyPrefix =
+ m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId);
return std::make_unique<IncrementalHydrator>(
Config,
- std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize));
+ std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize),
+ m_Excludes);
}
std::unique_ptr<HydrationBase>
@@ -1386,10 +2386,10 @@ InitHydration(const HydrationBase::Configuration& Config)
{
return std::make_unique<S3Hydration>(Config);
}
- throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification);
+ throw zen::runtime_error("Unknown hydration strategy: {}"sv, Config.TargetSpecification);
}
- std::string_view Type = Config.Options["type"].AsString();
+ std::string_view Type = Config.Options["type"sv].AsString();
if (Type == FileStorage::Type)
{
return std::make_unique<FileHydration>(Config);
@@ -1400,9 +2400,9 @@ InitHydration(const HydrationBase::Configuration& Config)
}
if (!Type.empty())
{
- throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ throw zen::runtime_error("Unknown hydration target type '{}'"sv, Type);
}
- throw zen::runtime_error("No hydration target configured");
+ throw zen::runtime_error("No hydration target configured"sv);
}
#if ZEN_WITH_TESTS
@@ -1485,6 +2485,34 @@ namespace {
}
}
+ // Test fixture that centralizes the common scaffolding for file-backed hydration tests:
+ // a scratch temp dir containing the server state, the hydration store, and the hydration
+ // temp dir, plus an initialized FileHydration backend.
+ struct FileHarness
+ {
+ ScopedTemporaryDirectory TempDir;
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ std::unique_ptr<HydrationBase> Hydration;
+
+ FileHarness()
+ {
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationStore);
+ CreateDirectories(HydrationTemp);
+ Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ }
+
+ HydrationConfig MakeConfig(std::string_view ModuleId, HydrationConfig Overrides = {}) const
+ {
+ Overrides.ServerStateDir = ServerStateDir;
+ Overrides.TempDir = HydrationTemp;
+ Overrides.ModuleId = std::string(ModuleId);
+ return Overrides;
+ }
+ };
+
} // namespace
TEST_SUITE_BEGIN("server.hydration");
@@ -1495,140 +2523,386 @@ TEST_SUITE_BEGIN("server.hydration");
TEST_CASE("hydration.file.dehydrate_hydrate")
{
- ScopedTemporaryDirectory TempDir;
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ constexpr auto ModuleId = "testmodule";
+ const auto Config = H.MakeConfig(ModuleId);
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::exists(H.HydrationStore / ModuleId));
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
- const std::string ModuleId = "testmodule";
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
-
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+{
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("testmodule");
- // Dehydrate: copy server state to file store
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- // Verify the module folder exists in the store and ServerStateDir was wiped
- CHECK(std::filesystem::exists(HydrationStore / ModuleId));
- CHECK(std::filesystem::is_empty(ServerStateDir));
+ // Stale file must be wiped by the rehydrate.
+ WriteFile(H.ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
- // Hydrate: restore server state from file store
- Hydration->CreateHydrator(Config)->Hydrate();
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "stale.bin"));
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Verify restored contents match the original
- VerifyTree(ServerStateDir, TestFiles);
+TEST_CASE("hydration.file.excluded_files_not_dehydrated")
+{
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+
+ // Files matched by the built-in DefaultExcludes() set in hydration.cpp. Each must be
+ // skipped during dehydrate and not be recreated by hydrate.
+ CreateDirectories(H.ServerStateDir / "gc");
+ WriteFile(H.ServerStateDir / "gc" / "reserve.gc", CreateSemiRandomBlob(64));
+ CreateDirectories(H.ServerStateDir / ".sentry-native");
+ WriteFile(H.ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
+ WriteFile(H.ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
+ WriteFile(H.ServerStateDir / "state_marker", CreateSemiRandomBlob(16));
+ WriteFile(H.ServerStateDir / ".lock", CreateSemiRandomBlob(8));
+ WriteFile(H.ServerStateDir / "snapshot.bak", CreateSemiRandomBlob(48));
+ CreateDirectories(H.ServerStateDir / "auth");
+ WriteFile(H.ServerStateDir / "auth" / "authstate", CreateSemiRandomBlob(96));
+
+ const auto Config = H.MakeConfig("testmodule_excl");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+
+ CleanDirectory(H.ServerStateDir, true);
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+
+ VerifyTree(H.ServerStateDir, TestFiles);
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "gc" / "reserve.gc"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".sentry-native"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "state_marker"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".lock"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "snapshot.bak"));
+ CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "auth" / "authstate"));
}
-TEST_CASE("hydration.file.hydrate_overwrites_existing_state")
+TEST_CASE("hydration.options.excludes_override")
{
+ // Explicit `excludes` replaces the built-in default list outright; `.lock` is not
+ // in the override list, so it must appear in the manifest. Use the Options-only
+ // path (type + settings.path) so the same Options object also carries the override
+ // `excludes` array.
ScopedTemporaryDirectory TempDir;
-
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ std::filesystem::path ServerStateDir = TempDir.Path() / "state";
+ std::filesystem::path HydrationStore = TempDir.Path() / "store";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "tmp";
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
+ WriteFile(ServerStateDir / "regular.bin", CreateSemiRandomBlob(64));
+ WriteFile(ServerStateDir / ".lock", CreateSemiRandomBlob(8));
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
-
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
-
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"};
+ CbObjectWriter Options;
+ Options << "type"sv
+ << "file"sv;
+ Options.BeginObject("settings"sv);
+ {
+ Options << "path"sv << HydrationStore.generic_string();
+ }
+ Options.EndObject();
+ Options.BeginArray("excludes"sv);
+ {
+ Options << "*.tmp"sv;
+ }
+ Options.EndArray();
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ HydrationBase::Configuration HydrCfg{.Options = Options.Save()};
+ std::unique_ptr<HydrationBase> Hydration = InitHydration(HydrCfg);
+ HydrationConfig PerModuleCfg{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "excl_off"};
+ Hydration->CreateHydrator(PerModuleCfg)->Dehydrate(CbObject());
- // Put a stale file in ServerStateDir to simulate leftover state
- WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
+ const std::filesystem::path StateFile = HydrationStore / "excl_off" / "current-state.cbo";
+ REQUIRE(std::filesystem::exists(StateFile));
- // Hydrate - must wipe stale file and restore original
- Hydration->CreateHydrator(Config)->Hydrate();
+ FileContents Contents = ReadFile(StateFile);
+ REQUIRE(Contents);
+ IoBuffer Payload = Contents.Flatten();
+ CbValidateError Err;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err);
+ REQUIRE_EQ(Err, CbValidateError::None);
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin"));
- VerifyTree(ServerStateDir, TestFiles);
+ bool HasLock = false;
+ for (CbFieldView F : Meta["Files"sv])
+ {
+ if (F.AsObjectView()["Path"sv].AsString() == ".lock")
+ {
+ HasLock = true;
+ break;
+ }
+ }
+ CHECK(HasLock);
}
-TEST_CASE("hydration.file.excluded_files_not_dehydrated")
+// ---------------------------------------------------------------------------
+// FileHydrator obliterate test
+// ---------------------------------------------------------------------------
+
+TEST_CASE("hydration.file.obliterate")
{
- ScopedTemporaryDirectory TempDir;
+ FileHarness H;
+ constexpr std::string_view ModuleId = "obliterate_test"sv;
+ CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig(ModuleId);
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::exists(H.HydrationStore / ModuleId));
- auto TestFiles = CreateSmallTestTree(ServerStateDir);
+ // Put files back in ServerStateDir + TempDir to verify cleanup.
+ CreateSmallTestTree(H.ServerStateDir);
+ WriteFile(H.HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
- // Add files that the dehydrator should skip
- WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64));
- CreateDirectories(ServerStateDir / ".sentry-native");
- WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32));
- WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128));
+ H.Hydration->CreateHydrator(Config)->Obliterate();
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ CHECK_FALSE(std::filesystem::exists(H.HydrationStore / ModuleId));
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ CHECK(std::filesystem::is_empty(H.HydrationTemp));
+}
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"};
+// ---------------------------------------------------------------------------
+// Pack tests - exercise small-file packing, unpacking, and fallback paths.
+// ---------------------------------------------------------------------------
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+TEST_CASE("hydration.file.pack_roundtrip")
+{
+ // CreateSmallTestTree produces 6 files all < 2 KB -> single pack with every file in it.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_roundtrip");
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Hydrate into a clean directory
- CleanDirectory(ServerStateDir, true);
- Hydration->CreateHydrator(Config)->Hydrate();
+TEST_CASE("hydration.file.pack_disabled_fallback")
+{
+ // PackEnabled=false -> every file is a standalone CAS entry regardless of size.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_disabled", HydrationConfig{.PackEnabled = false});
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Normal files must be restored
- VerifyTree(ServerStateDir, TestFiles);
- // Excluded files must NOT be restored
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc"));
- CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native"));
+TEST_CASE("hydration.file.pack_one_unique_fallback")
+{
+ // Only 1 unique small-file candidate -> no pack (min 2 entries); falls back to standalone.
+ FileHarness H;
+
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ IoBuffer Small = CreateSemiRandomBlob(128);
+ WriteFile(H.ServerStateDir / "tiny.bin", Small);
+ TestFiles.emplace_back("tiny.bin", std::move(Small));
+
+ IoBuffer Big = CreateSemiRandomBlob(8192);
+ WriteFile(H.ServerStateDir / "big.bin", Big);
+ TestFiles.emplace_back("big.bin", std::move(Big));
+
+ const auto Config = H.MakeConfig("pack_one_unique");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
}
-// ---------------------------------------------------------------------------
-// FileHydrator obliterate test
-// ---------------------------------------------------------------------------
+TEST_CASE("hydration.file.pack_duplicate_hashes")
+{
+ // 10 files share one hash + 1 distinct file -> pack has 2 unique entries; hydrate writes
+ // all 11 destinations correctly.
+ FileHarness H;
+
+ IoBuffer Shared = CreateSemiRandomBlob(256);
+ IoBuffer Other = CreateSemiRandomBlob(256);
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ for (int I = 0; I < 10; ++I)
+ {
+ std::filesystem::path Rel = fmt::format("dup_{:02d}.bin"sv, I);
+ WriteFile(H.ServerStateDir / Rel, Shared);
+ TestFiles.emplace_back(Rel, Shared);
+ }
+ WriteFile(H.ServerStateDir / "other.bin", Other);
+ TestFiles.emplace_back("other.bin", Other);
+
+ const auto Config = H.MakeConfig("pack_duplicates");
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
-TEST_CASE("hydration.file.obliterate")
+TEST_CASE("hydration.file.pack_large_dataset")
{
- ScopedTemporaryDirectory TempDir;
+ // Mix of many small files + a few large ones, with a modest MaxPackBytes
+ // to force bin-packing into multiple packs. Verifies ordering, splitting, and the
+ // interaction between packed and standalone uploads.
+ FileHarness H;
+
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles;
+ constexpr int kSmallCount = 100;
+ constexpr int kLargeCount = 3;
+
+ // Varied small-file sizes (256-2048 B) avoid artificial uniformity in the bin-pack.
+ FastRandom Rand{.Seed = 0xcafebabe};
+ for (int I = 0; I < kSmallCount; ++I)
+ {
+ uint64_t Size = 256 + (Rand.Next() % 1793); // [256, 2048]
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, Size);
+ auto Rel = std::filesystem::path(fmt::format("small/group{}/file_{:04d}.bin"sv, I / 25, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ TestFiles.emplace_back(std::move(Rel), std::move(Blob));
+ }
+ for (int I = 0; I < kLargeCount; ++I)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, 32 * 1024 + I * 4096);
+ auto Rel = std::filesystem::path(fmt::format("large/file_{:02d}.bulk"sv, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ TestFiles.emplace_back(std::move(Rel), std::move(Blob));
+ }
+
+ // Cap each pack at ~32 KB -> 100 small files (~115 KB raw) split across ~4 packs.
+ const auto Config = H.MakeConfig("pack_large", HydrationConfig{.MaxPackBytes = 32 * 1024});
+
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
- std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store";
- std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
- CreateDirectories(ServerStateDir);
- CreateDirectories(HydrationStore);
- CreateDirectories(HydrationTemp);
+TEST_CASE("hydration.file.pack_hash_determinism")
+{
+ // Two independent dehydrate runs over the same content must produce byte-identical state
+ // files (and therefore identical pack hashes). This is what keeps ExistsLookup dedup
+ // working across redeploys.
+ FileHarness H;
+
+ FastRandom Rand{.Seed = 0x12345678};
+ std::vector<std::pair<std::filesystem::path, IoBuffer>> Files;
+ for (int I = 0; I < 40; ++I)
+ {
+ IoBuffer Blob = CreateSemiRandomBlob(Rand, 256 + (I % 7) * 200);
+ auto Rel = std::filesystem::path(fmt::format("tree/leaf_{:02d}.dat"sv, I));
+ CreateDirectories((H.ServerStateDir / Rel).parent_path());
+ WriteFile(H.ServerStateDir / Rel, Blob);
+ Files.emplace_back(std::move(Rel), std::move(Blob));
+ }
+
+ const auto Config = H.MakeConfig("pack_determinism");
+ const std::filesystem::path StateFile = H.HydrationStore / "pack_determinism" / "current-state.cbo";
+
+ // Extract the ordered pack-hash list from state.cbo. Timestamp / duration fields vary
+ // across runs so byte-identity is not achievable; the pack identities are.
+ auto ReadPackHashes = [&]() -> std::vector<IoHash> {
+ FileContents Contents = ReadFile(StateFile);
+ REQUIRE(Contents);
+ IoBuffer Payload = Contents.Flatten();
+ CbValidateError Err;
+ CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err);
+ REQUIRE_EQ(Err, CbValidateError::None);
+ std::vector<IoHash> Hashes;
+ for (CbFieldView F : Meta["Packs"sv])
+ {
+ Hashes.push_back(F.AsObjectView()["Hash"sv].AsHash());
+ }
+ return Hashes;
+ };
- const std::string ModuleId = "obliterate_test";
- CreateSmallTestTree(ServerStateDir);
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ std::vector<IoHash> First = ReadPackHashes();
+ REQUIRE_FALSE(First.empty());
- auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()});
+ // Rehydrate so the tree is back on disk, then dehydrate again with a fresh hydrator.
+ H.Hydration->CreateHydrator(Config)->Hydrate();
+ VerifyTree(H.ServerStateDir, Files);
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+ auto HydrationB = InitHydration({.TargetSpecification = "file://" + H.HydrationStore.string()});
+ HydrationB->CreateHydrator(Config)->Dehydrate(CbObject());
+ std::vector<IoHash> Second = ReadPackHashes();
- // Dehydrate so the backend store has data
- Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
- CHECK(std::filesystem::exists(HydrationStore / ModuleId));
+ REQUIRE_EQ(First.size(), Second.size());
+ for (size_t I = 0; I < First.size(); ++I)
+ {
+ CHECK_EQ(First[I], Second[I]);
+ }
+}
- // Put some files back in ServerStateDir and TempDir to verify cleanup
- CreateSmallTestTree(ServerStateDir);
- WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64));
+TEST_CASE("hydration.file.pack_backward_compat_read")
+{
+ // Hand-craft a state.cbo without any Packs[] / PackHash fields (old format). Hydrate must
+ // treat every file as standalone and roundtrip successfully.
+ FileHarness H;
+ const auto TestFiles = CreateSmallTestTree(H.ServerStateDir);
+ const auto Config = H.MakeConfig("pack_oldformat", HydrationConfig{.PackEnabled = false});
+
+ // Dehydrate with PackEnabled=false -> state.cbo has no Packs[] and no PackHash fields.
+ H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject());
+ CHECK(std::filesystem::is_empty(H.ServerStateDir));
+
+ // Hydrate with PackEnabled=true -> the hydrator must still handle the old-format state.
+ const auto NewConfig = H.MakeConfig("pack_oldformat");
+ H.Hydration->CreateHydrator(NewConfig)->Hydrate();
+ VerifyTree(H.ServerStateDir, TestFiles);
+}
- // Obliterate
- Hydration->CreateHydrator(Config)->Obliterate();
+// ---------------------------------------------------------------------------
+// CreateParentDirectories helper test
+// ---------------------------------------------------------------------------
- // Backend store directory deleted
- CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId));
- // ServerStateDir cleaned
- CHECK(std::filesystem::is_empty(ServerStateDir));
- // TempDir cleaned
- CHECK(std::filesystem::is_empty(HydrationTemp));
+TEST_CASE("hydration.createparentdirectories")
+{
+ ScopedTemporaryDirectory TempDir;
+ const std::filesystem::path Root = TempDir.Path();
+
+ // Edge: empty input.
+ CHECK_EQ(hydration_impl::CreateParentDirectories({}), 0u);
+
+ // Edge: bare filename has no parent_path() -> contributes nothing.
+ std::vector<std::filesystem::path> Bare{"bare.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Bare), 0u);
+
+ // Edge: single input. Triggers the Dirs.size() == 1 path that bypasses the prune loop.
+ const std::filesystem::path SingleRoot = Root / "single";
+ std::vector<std::filesystem::path> Single{SingleRoot / "only" / "a.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Single), 1u);
+ CHECK(std::filesystem::is_directory(SingleRoot / "only"));
+
+ // Edge: pre-existing dirs must not raise; count still reflects leaf set.
+ const std::filesystem::path PreRoot = Root / "preexisting";
+ CreateDirectories(PreRoot / "pre" / "made");
+ std::vector<std::filesystem::path> Pre{PreRoot / "pre" / "made" / "f.bin"};
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Pre), 1u);
+ CHECK(std::filesystem::is_directory(PreRoot / "pre" / "made"));
+
+ // Generic: ancestor-chain pruning, parent dedup across files in same dir, disjoint
+ // siblings (cannot prune each other), nested-vs-flat coexistence. Expected leaves:
+ // deep/nest/leaf, deep/sibling, flat, lone.
+ const std::filesystem::path MixRoot = Root / "mixed";
+ std::vector<std::filesystem::path> Mix{MixRoot / "deep" / "nest" / "leaf" / "x.bin",
+ MixRoot / "deep" / "nest" / "leaf" / "y.bin", // shares parent with x
+ MixRoot / "deep" / "nest" / "g.bin", // ancestor of leaf -> pruned
+ MixRoot / "deep" / "h.bin", // ancestor of nest -> pruned
+ MixRoot / "deep" / "sibling" / "i.bin", // sibling of nest -> kept
+ MixRoot / "flat" / "j.bin", // top-level sibling -> kept
+ MixRoot / "lone" / "k.bin"}; // disjoint -> kept
+ CHECK_EQ(hydration_impl::CreateParentDirectories(Mix), 4u);
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest" / "leaf"));
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "sibling"));
+ CHECK(std::filesystem::is_directory(MixRoot / "flat"));
+ CHECK(std::filesystem::is_directory(MixRoot / "lone"));
+ // Pruned ancestors still exist via CreateDirectories recursion.
+ CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest"));
+ CHECK(std::filesystem::is_directory(MixRoot / "deep"));
}
// ---------------------------------------------------------------------------
@@ -1658,7 +2932,7 @@ TEST_CASE("hydration.file.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- std::string ModuleId = fmt::format("file_concurrent_{}", I);
+ std::string ModuleId = fmt::format("file_concurrent_{}"sv, I);
std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
CreateDirectories(StateDir);
@@ -1817,7 +3091,7 @@ TEST_CASE("hydration.s3.concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- std::string ModuleId = fmt::format("s3_concurrent_{}", I);
+ std::string ModuleId = fmt::format("s3_concurrent_{}"sv, I);
std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state";
std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp";
CreateDirectories(StateDir);
@@ -1890,7 +3164,7 @@ TEST_CASE("hydration.s3.obliterate")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_obliterate";
+ constexpr std::string_view ModuleId = "s3test_obliterate"sv;
HydrationBase::Configuration BaseConfig;
{
@@ -1904,7 +3178,7 @@ TEST_CASE("hydration.s3.obliterate")
}
auto Hydration = InitHydration(BaseConfig);
- HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId};
+ HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)};
// Dehydrate to populate backend
CreateSmallTestTree(ServerStateDir);
@@ -1918,7 +3192,7 @@ TEST_CASE("hydration.s3.obliterate")
Opts.Credentials.AccessKeyId = Minio.RootUser();
Opts.Credentials.SecretAccessKey = Minio.RootPassword();
S3Client Client(Opts);
- return Client.ListObjects(ModuleId + "/");
+ return Client.ListObjects(fmt::format("{}/"sv, ModuleId));
};
// Verify objects exist in S3
@@ -2034,7 +3308,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_performance";
+ constexpr std::string_view ModuleId = "s3test_performance"sv;
CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true});
// auto TestFiles = CreateTestTree(ServerStateDir);
@@ -2054,7 +3328,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip())
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Dehydrate: upload server state to MinIO
@@ -2091,7 +3365,7 @@ TEST_CASE("hydration.file.incremental")
CreateDirectories(HydrationStore);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "testmodule";
+ constexpr std::string_view ModuleId = "testmodule"sv;
// auto TestFiles = CreateTestTree(ServerStateDir);
TestThreading Threading(4);
@@ -2100,7 +3374,7 @@ TEST_CASE("hydration.file.incremental")
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Hydrate with no prior state
@@ -2170,7 +3444,7 @@ TEST_CASE("hydration.s3.incremental")
CreateDirectories(ServerStateDir);
CreateDirectories(HydrationTemp);
- const std::string ModuleId = "s3test_incremental";
+ constexpr std::string_view ModuleId = "s3test_incremental"sv;
TestThreading Threading(8);
@@ -2188,7 +3462,7 @@ TEST_CASE("hydration.s3.incremental")
HydrationConfig Config{.ServerStateDir = ServerStateDir,
.TempDir = HydrationTemp,
- .ModuleId = ModuleId,
+ .ModuleId = std::string(ModuleId),
.Threading = Threading.Options};
// Hydrate with no prior state
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index 0455dda91..d9a3dda5b 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -2,6 +2,8 @@
#pragma once
+#include "hydrationdefaults.h"
+
#include <zencore/compactbinary.h>
#include <atomic>
@@ -9,6 +11,7 @@
#include <memory>
#include <optional>
#include <string>
+#include <vector>
namespace zen {
@@ -32,6 +35,20 @@ struct HydrationConfig
// External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread.
std::optional<ThreadingOptions> Threading;
+
+ // When true, small files are concatenated into pack blobs during Dehydrate and sliced
+ // back out during Hydrate. Pack contents are stored raw (no compression); the CAS key
+ // of a pack is the hash of its concatenated raw bytes. Dramatically reduces request
+ // count for modules dominated by tiny metadata files.
+ bool PackEnabled = true;
+
+ // Files strictly smaller than this are candidates for packing.
+ uint64_t PackThresholdBytes = DefaultPackThresholdBytes;
+
+ // Upper bound on the concatenation size of a single pack. Candidates are bin-packed
+ // greedily; a pack that would exceed this is closed and a new one is started. A single
+ // unique candidate larger than this falls back to standalone upload.
+ uint64_t MaxPackBytes = DefaultMaxPackBytes;
};
/**
@@ -46,14 +63,23 @@ struct HydrationStrategyBase
virtual ~HydrationStrategyBase() = default;
// Upload server state to the configured target. ServerStateDir is wiped on success.
- // On failure, ServerStateDir is left intact.
+ // On failure, ServerStateDir is left intact and the failure is logged at WARN; no
+ // exception propagates to the caller. Callers that need to distinguish success from
+ // failure must inspect the log stream or observe downstream effects (e.g. presence of
+ // the metadata file on the backend) - success is not signalled through the API.
virtual void Dehydrate(const CbObject& CachedState) = 0;
- // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate.
- // On failure, ServerStateDir is wiped and an empty CbObject is returned.
+ // Download state from the configured target into ServerStateDir. Returns cached state
+ // for the next Dehydrate. On failure, ServerStateDir is wiped, the failure is logged,
+ // and an empty CbObject is returned (a no-op cache). Callers can check the result for
+ // emptiness as a failure indicator.
virtual CbObject Hydrate() = 0;
- // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir.
+ // Delete all stored data for this module from the configured backend, then clean
+ // ServerStateDir and TempDir. Backend-delete failures are retried once; if the retry
+ // also fails, local cleanup proceeds regardless and the failure is logged at WARN.
+ // Because backend deletion is best-effort, a return from Obliterate does not guarantee
+ // backend data is gone.
virtual void Obliterate() = 0;
};
@@ -73,14 +99,23 @@ public:
{
// Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path")
std::string TargetSpecification;
- // Full config object (mutually exclusive with TargetSpecification)
+ // Full config object (mutually exclusive with TargetSpecification). Backend-specific
+ // settings (e.g. S3 "chunksize") live inside Options["settings"]. The common
+ // `excludes` entry is parsed once by HydrationBase and shared across modules.
CbObject Options;
};
+ // Parses common Options entries (`excludes`) into m_Excludes, applying built-in
+ // defaults when the field is absent. Field present-but-empty `[]` is honored as an
+ // explicit override (no defaults applied).
+ explicit HydrationBase(const Configuration& Config);
virtual ~HydrationBase() = default;
// Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate.
virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0;
+
+protected:
+ std::vector<std::string> m_Excludes;
};
// Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration).
diff --git a/src/zenserver/hub/hydrationdefaults.h b/src/zenserver/hub/hydrationdefaults.h
new file mode 100644
index 000000000..8d9fb6d41
--- /dev/null
+++ b/src/zenserver/hub/hydrationdefaults.h
@@ -0,0 +1,26 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <cstdint>
+
+namespace zen {
+
+// state.cbo schema version. Bump on any incompatible change to the manifest layout.
+// Hydrate refuses to read a manifest with a higher version than this. Manifests written
+// by older binaries omit the field; treated as version 0 and decoded best-effort
+// (missing optional fields like Packs[] / StorageSettings fall back to defaults).
+inline constexpr uint32_t HydrationSchemaVersion = 1;
+
+// Multipart chunk size used by the S3 backend when hydrating/dehydrating large files.
+// Dehydrate writes the chosen size into state.cbo so hydrate replays with the same
+// partitioning. State records without the field fall back to this default.
+inline constexpr uint64_t DefaultMultipartChunkSize = 64u * 1024u * 1024u;
+
+// Files strictly smaller than this are candidates for packing.
+inline constexpr uint64_t DefaultPackThresholdBytes = 256u * 1024u;
+
+// Upper bound on the concatenation size of a single pack. Packs are stored raw (no compression).
+inline constexpr uint64_t DefaultMaxPackBytes = 4u * 1024u * 1024u;
+
+} // namespace zen
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 9d477fb10..8d36e6a46 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -10,6 +10,7 @@
#include <zencore/logging.h>
#include <zencore/string.h>
#include <zencore/timer.h>
+#include <zencore/trace.h>
namespace zen {
@@ -31,6 +32,7 @@ StorageServerInstance::~StorageServerInstance()
void
StorageServerInstance::SpawnServerProcess()
{
+ ZEN_TRACE_CPU("StorageServerInstance::SpawnServerProcess");
Stopwatch SpawnTimer;
ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId);
@@ -103,6 +105,7 @@ StorageServerInstance::ShutdownServerProcess()
{
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::ShutdownServerProcess");
Stopwatch ShutdownTimer;
// m_ServerInstance.Shutdown() never throws.
m_ServerInstance.Shutdown();
@@ -131,6 +134,7 @@ StorageServerInstance::ProvisionLocked()
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::ProvisionLocked");
ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir);
try
{
@@ -150,6 +154,7 @@ StorageServerInstance::ProvisionLocked()
void
StorageServerInstance::DeprovisionLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::DeprovisionLocked");
ShutdownServerProcess();
// Crashed or Hibernated: process already dead; skip Shutdown.
@@ -169,6 +174,7 @@ StorageServerInstance::DeprovisionLocked()
void
StorageServerInstance::ObliterateLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::ObliterateLocked");
ShutdownServerProcess();
std::atomic<bool> AbortFlag{false};
@@ -181,6 +187,7 @@ StorageServerInstance::ObliterateLocked()
void
StorageServerInstance::HibernateLocked()
{
+ ZEN_TRACE_CPU("StorageServerInstance::HibernateLocked");
// Signal server to shut down, but keep data around for later wake
ShutdownServerProcess();
}
@@ -195,6 +202,8 @@ StorageServerInstance::WakeLocked()
return;
}
+ ZEN_TRACE_CPU("StorageServerInstance::WakeLocked");
+
try
{
SpawnServerProcess();
@@ -212,6 +221,12 @@ StorageServerInstance::WakeLocked()
void
StorageServerInstance::Hydrate()
{
+ if (!m_Config.EnableHydration)
+ {
+ ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", m_ModuleId);
+ return;
+ }
+ ZEN_TRACE_CPU("StorageServerInstance::Hydrate");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
@@ -222,6 +237,12 @@ StorageServerInstance::Hydrate()
void
StorageServerInstance::Dehydrate()
{
+ if (!m_Config.EnableDehydration)
+ {
+ ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", m_ModuleId);
+ return;
+ }
+ ZEN_TRACE_CPU("StorageServerInstance::Dehydrate");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag);
@@ -238,6 +259,9 @@ StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::at
Config.Threading.emplace(
HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag});
}
+ Config.PackEnabled = m_Config.HydrationPackEnabled;
+ Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes;
+ Config.MaxPackBytes = m_Config.HydrationMaxPackBytes;
return Config;
}
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 21ac1ada3..a2f376a23 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -36,6 +36,11 @@ public:
std::string Trace;
std::string TraceHost;
std::string TraceFile;
+ bool EnableHydration = true;
+ bool EnableDehydration = true;
+ bool HydrationPackEnabled = true;
+ uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes;
+ uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes;
WorkerThreadPool* OptionalWorkerPool = nullptr;
};
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index ebc2cf2f1..27c1c9fc4 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -235,6 +235,44 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
+ "hub-enable-hydration",
+ "Load instance state from the hydration target on provision (default true)",
+ cxxopts::value<bool>(m_ServerOptions.HubEnableHydration)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-enable-dehydration",
+ "Save instance state to the hydration target on deprovision (default true)",
+ cxxopts::value<bool>(m_ServerOptions.HubEnableDehydration)->default_value("true"),
+ "");
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-enable-pack",
+ "Concatenate small files into raw CAS pack blobs during dehydrate (default true; see --hub-hydration-pack-threshold-bytes)",
+ cxxopts::value<bool>(m_ServerOptions.HubHydrationPackEnabled)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
+ "hub-hydration-pack-threshold-bytes",
+ fmt::format("Files strictly smaller than this are pack candidates (default {})", DefaultPackThresholdBytes),
+ cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationPackThresholdBytes)
+ ->default_value(fmt::format("{}", DefaultPackThresholdBytes)),
+ "<bytes>");
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-max-pack-bytes",
+ fmt::format("Upper bound on a pack's concatenation size (default {})", DefaultMaxPackBytes),
+ cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationMaxPackBytes)->default_value(fmt::format("{}", DefaultMaxPackBytes)),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
"hub-watchdog-cycle-interval-ms",
"Interval between watchdog cycles in milliseconds",
cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"),
@@ -367,6 +405,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv);
Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv);
Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv);
+ Options.AddOption("hub.enablehydration"sv, m_ServerOptions.HubEnableHydration, "hub-enable-hydration"sv);
+ Options.AddOption("hub.enabledehydration"sv, m_ServerOptions.HubEnableDehydration, "hub-enable-dehydration"sv);
+ Options.AddOption("hub.hydration.enablepack"sv, m_ServerOptions.HubHydrationPackEnabled, "hub-hydration-enable-pack"sv);
+ Options.AddOption("hub.hydration.packthresholdbytes"sv,
+ m_ServerOptions.HubHydrationPackThresholdBytes,
+ "hub-hydration-pack-threshold-bytes"sv);
+ Options.AddOption("hub.hydration.maxpackbytes"sv, m_ServerOptions.HubHydrationMaxPackBytes, "hub-hydration-max-pack-bytes"sv);
Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv);
Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv,
@@ -453,7 +498,7 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
HubInstanceState PreviousState,
HubInstanceState NewState)
{
- ZEN_UNUSED(PreviousState);
+ ZEN_INFO("Module '{}' changed state from {} to {}", ModuleId, zen::ToString(PreviousState), zen::ToString(NewState));
if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating)
{
@@ -661,6 +706,11 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
.InstanceTraceFile = ServerConfig.HubInstanceTraceFile,
.InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
.HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .EnableHydration = ServerConfig.HubEnableHydration,
+ .EnableDehydration = ServerConfig.HubEnableDehydration,
+ .HydrationPackEnabled = ServerConfig.HubHydrationPackEnabled,
+ .HydrationPackThresholdBytes = ServerConfig.HubHydrationPackThresholdBytes,
+ .HydrationMaxPackBytes = ServerConfig.HubHydrationMaxPackBytes,
.WatchDog =
{
.CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 5e465bb14..6416792a6 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -3,6 +3,7 @@
#pragma once
#include "hubinstancestate.h"
+#include "hydrationdefaults.h"
#include "resourcemetrics.h"
#include "zenserver.h"
@@ -47,7 +48,12 @@ struct ZenHubServerConfig : public ZenServerConfig
uint16_t HubBasePortNumber = 21000;
int HubInstanceLimit = 1000;
bool HubUseJobObject = true;
- std::string HubInstanceHttpClass = "asio";
+ bool HubEnableHydration = true; // Load instance state from hydration target on provision
+ bool HubEnableDehydration = true; // Save instance state to hydration target on deprovision
+ bool HubHydrationPackEnabled = true; // Concatenate small files into raw CAS pack blobs during dehydrate
+ uint64_t HubHydrationPackThresholdBytes = DefaultPackThresholdBytes; // Files strictly smaller than this are pack candidates
+ uint64_t HubHydrationMaxPackBytes = DefaultMaxPackBytes; // Upper bound on a pack's concatenation size
+ std::string HubInstanceHttpClass = "asio";
std::string HubInstanceMalloc;
std::string HubInstanceTrace;
std::string HubInstanceTraceHost;
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index f8bed92da..ab80cfcc7 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -148,11 +148,31 @@ namespace {
return true;
}
+ /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429).
+ /// Code is checked on both the HTTP status and the XML error code so we catch proxies that
+ /// return 200 with a SlowDown body.
+ bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode)
+ {
+ const int Status = static_cast<int>(Response.StatusCode);
+ if (Status == 503 || Status == 429)
+ {
+ return true;
+ }
+ if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" ||
+ ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests")
+ {
+ return true;
+ }
+ return false;
+ }
+
/// Build a human-readable error message for a failed S3 response. When the response body
/// contains an S3 `<Error>` element, the Code and Message fields are included in the string
/// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.)
/// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport
/// message when no XML body is available (HEAD responses, transport errors).
+ /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so
+ /// callers can grep for it without parsing combined error text.
std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response)
{
if (!Response.Error.has_value() && Response.ResponsePayload)
@@ -164,9 +184,21 @@ namespace {
{
ExtendableStringBuilder<256> Decoded;
DecodeXmlEntities(Message, Decoded);
+ if (IsS3Throttled(Response, Code))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'",
+ Prefix,
+ static_cast<int>(Response.StatusCode),
+ Code,
+ Decoded.ToView());
+ }
return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView());
}
}
+ if (IsS3Throttled(Response, {}))
+ {
+ ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode));
+ }
return Response.ErrorMessage(Prefix);
}