diff options
| author | Dan Engelbrecht <[email protected]> | 2026-04-27 11:14:09 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-27 11:14:09 +0200 |
| commit | 753ab4e89b9a5952e50bc77d404198520b362a3a (patch) | |
| tree | 39dbaad8389677981281b8c1585ac846251539f0 | |
| parent | fix crash when scavenging sequences or copying local chunks (#1013) (diff) | |
| download | archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.tar.xz archived-zen-753ab4e89b9a5952e50bc77d404198520b362a3a.zip | |
hydration with pack (#1016)
- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files
- `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true)
- `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB)
- `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB)
- Feature: Hub hydration and dehydration can be disabled per direction
- `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true)
- `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true)
- Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults.
- Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately
- Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes
- Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer
- Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB)
- Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup
24 files changed, 5378 insertions, 529 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index cb515b2e2..3288f8bba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,17 @@ ## +- Feature: Hub hydration packs small files into raw CAS pack blobs to reduce request count for modules dominated by tiny metadata files + - `--hub-hydration-enable-pack` (Lua: `hub.hydration.enablepack`, default true) + - `--hub-hydration-pack-threshold-bytes` (Lua: `hub.hydration.packthresholdbytes`, default 256 KiB) + - `--hub-hydration-max-pack-bytes` (Lua: `hub.hydration.maxpackbytes`, default 4 MiB) +- Feature: Hub hydration and dehydration can be disabled per direction + - `--hub-enable-hydration` (Lua: `hub.enablehydration`, default true) + - `--hub-enable-dehydration` (Lua: `hub.enabledehydration`, default true) +- Feature: Hub hydration accepts a configurable file exclude list via `HydrationOptions` `excludes` (array of wildcards). Built-in defaults skip transient runtime files (`.lock`, `.sentry-native/*`, `state_marker`, `*.bak`, `gc/reserve.gc`, `auth/*`) so they no longer participate in dehydrate scans. Override semantics: a present field replaces the default outright; explicit `[]` opts out of all defaults. +- Improvement: Hub hydration completion logs now report per-request average and max latency, peak in-flight workers, queue wait, and hash-cache hit percentage; loose and pack-blob transfers are reported separately +- Improvement: Hub hydration pre-creates unique parent directories before scheduling parallel writes +- Improvement: S3 hydration retries transient HTTP failures (timeouts, 429 throttling, 5xx server errors, connection errors) up to 3 times via the HTTP client retry layer +- Improvement: S3 hydration multipart chunk size is persisted in `state.cbo` per module so hydrate replays the partitioning used at dehydrate; default raised to 64 MiB (was 32 MiB) +- Improvement: Hub hydration `Obliterate` retries backend delete once before falling back to local cleanup - Bugfix: `zen builds download` no longer crashes intermittently when scavenging sequences or copying local chunks ## 5.8.7 diff --git a/VERSION.txt b/VERSION.txt index 6f4fe76b0..c7735a587 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -5.8.8
\ No newline at end of file +5.8.9-pre2
\ No newline at end of file diff --git a/docs/hub.md b/docs/hub.md index 27cf98512..63d90c502 100644 --- a/docs/hub.md +++ b/docs/hub.md @@ -243,11 +243,18 @@ suitable for single-host deployments where instances share locally cached data. `targetspec` and `targetconfig` are mutually exclusive. -| CLI flag | Lua key | Default | Description | -|-----------------------------------|------------------------------|---------------------------|-------------| -| `--hub-hydration-target-spec` | `hub.hydration.targetspec` | _(local path, see above)_ | Shorthand URI for the hydration source. Must use the `file://` prefix for file targets: `file:///absolute/path`. | -| `--hub-hydration-target-config` | `hub.hydration.targetconfig` | _(none)_ | Path to a JSON file specifying the hydration source. Supports `file` and `s3` backends. | -| `--hub-hydration-threads` | `hub.hydration.threads` | `max(cpu/4, 2)` | Thread count for the hydration/dehydration worker pool. Controls parallel file hashing and backend I/O during hydrate/dehydrate. Set to `0` for synchronous operation. | +| CLI flag | Lua key | Default | Description | +|---------------------------------------------|---------------------------------------------|---------------------------|-------------| +| `--hub-hydration-target-spec` | `hub.hydration.targetspec` | _(local path, see above)_ | Shorthand URI for the hydration source. Must use the `file://` prefix for file targets: `file:///absolute/path`. | +| `--hub-hydration-target-config` | `hub.hydration.targetconfig` | _(none)_ | Path to a JSON file specifying the hydration source. Supports `file` and `s3` backends. | +| `--hub-hydration-threads` | `hub.hydration.threads` | `max(cpu/4, 2)` | Thread count for the hydration/dehydration worker pool. Controls parallel file hashing and backend I/O during hydrate/dehydrate. Set to `0` for synchronous operation. | +| `--hub-enable-hydration` | `hub.enablehydration` | `true` | Load instance state from the hydration target on provision. Disable to start every provision from an empty instance directory. | +| `--hub-enable-dehydration` | `hub.enabledehydration` | `true` | Save instance state to the hydration target on deprovision. Disable to run the hydrate-only path (useful for perf testing against a fixed backend snapshot). | +| `--hub-hydration-enable-pack` | `hub.hydration.enablepack` | `true` | Concatenate small files into CAS pack blobs during dehydrate. See [Pack](#pack). | +| `--hub-hydration-pack-threshold-bytes` | `hub.hydration.packthresholdbytes` | `262144` (256 KiB) | Files strictly smaller than this are pack candidates. Larger files are stored as standalone CAS entries. | +| `--hub-hydration-max-pack-bytes` | `hub.hydration.maxpackbytes` | `4194304` (4 MiB) | Upper bound on a single pack's concatenation size. Candidates are bin-packed greedily; packs that would exceed this cap are closed and a new pack is started. A unique candidate larger than the cap falls back to standalone upload. | + +Multipart chunk size is S3-specific and set via the target config (see [Multipart chunking](#multipart-chunking)). **File backend** (`hub.hydration.targetconfig` JSON): @@ -269,24 +276,115 @@ suitable for single-host deployments where instances share locally cached data. "uri": "s3://bucket-name/optional/prefix", "region": "us-east-1", "endpoint": "https://custom-endpoint", - "path-style": false + "path-style": false, + "chunksize": 67108864 } } ``` +Both backends accept the optional top-level `excludes` key to override the built-in +defaults; see [Excludes](#excludes) for the schema and the default list. + S3 settings: -| Field | Required | Description | -|---|---|---| -| `uri` | Yes | S3 URI. Include a prefix path to isolate hub data within a shared bucket. | -| `region` | No | AWS region. Defaults to `us-east-1`; also reads `AWS_DEFAULT_REGION` / `AWS_REGION`. | -| `endpoint` | No | Custom endpoint URL for S3-compatible services (MinIO, Ceph, etc.). | -| `path-style` | No | Use path-style S3 URLs instead of virtual-hosted. Required by some S3-compatible services. Default `false`. | +| Field | Required | Default | Description | +|---|---|---|---| +| `uri` | Yes | - | S3 URI. Include a prefix path to isolate hub data within a shared bucket. | +| `region` | No | `us-east-1` | AWS region. Also reads `AWS_DEFAULT_REGION` / `AWS_REGION`. | +| `endpoint` | No | - | Custom endpoint URL for S3-compatible services (MinIO, Ceph, etc.). | +| `path-style` | No | `false` | Use path-style S3 URLs instead of virtual-hosted. Required by some S3-compatible services. | +| `chunksize` | No | `67108864` (64 MiB) | Multipart chunk size for GET/PUT on files at or above 1.25x this value. See [Multipart chunking](#multipart-chunking). | S3 credentials are read from environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`). If `AWS_ACCESS_KEY_ID` is not set, the hub falls back to IMDS instance credentials. +#### Pack + +When pack is enabled (default), dehydrate concatenates every unique file smaller than +`packthresholdbytes` into one or more CAS pack blobs, rather than uploading each file as +its own CAS entry. Pack contents are stored raw (no compression); each pack's CAS key is +the hash of its concatenated bytes. Files at or above the threshold continue to upload as +standalone CAS entries. + +Pack composition is deterministic: candidates are ordered by content hash and bin-packed +greedily up to `maxpackbytes`. Identical module state produces identical pack hashes +across runs, so `ExistsLookup` deduplicates packs between redeploys just like regular +CAS entries. A pack that would contain fewer than two entries is discarded and its +candidates revert to standalone upload. + +The state manifest records per-pack entries and each file's owning pack, so hydrate +resolves packed files by downloading the pack once and slicing it into the target files. + +The primary win is request count: for modules dominated by tiny metadata files, pack can +collapse dozens of GETs into a single request, which dominates wall time on high-RTT +backends (real S3) more than it matters on localhost MinIO. + +#### Excludes + +The optional top-level `excludes` key on the target config is an array of wildcard +patterns matched against each file's relative path (forward-slash form). Matching +files are dropped at the dehydrate-side directory scan and never enter the manifest. +`*` is path-separator-agnostic, so `auth/*` matches `auth/authstate` and any deeper +path under `auth/`. + +`excludes` uses **override semantics, not additive**: if the field is present in the +target config its contents fully replace the default list. An explicit empty array +(`"excludes": []`) is honoured as "apply no excludes". Field absent from the config +means the built-in default applies. + +Built-in default: + +| Pattern | Why | +|---|---| +| `.sentry-native/*` | Sentry-native crash uploader DB; locked while child runs | +| `state_marker` | Root-level liveness marker re-created by the child | +| `.lock` | Instance lock file (`FILE_FLAG_DELETE_ON_CLOSE`); locked while child runs | +| `*.bak` | Transient backups produced by atomic file replace | +| `gc/reserve.gc` | GC disk reserve under the per-store `gc/` subdirectory | +| `auth/*` | Encrypted auth state (`auth/authstate`) | + +A target config whose `excludes` array reproduces the built-in default (equivalent to +omitting the key from the config): + +```json +{ + "type": "file", + "settings": { + "path": "/data/hydration_storage" + }, + "excludes": [ + ".sentry-native/*", + "state_marker", + ".lock", + "*.bak", + "gc/reserve.gc", + "auth/*" + ] +} +``` + +#### Multipart chunking + +Large files above 1.25x the chunk size are transferred in chunks using S3 range requests +(GET) or multipart uploads (PUT). Smaller files use a single request. + +Chunk size is an S3-specific setting. It has no CLI or Lua surface - it applies only to +S3 backends and is set per-target: +- Target config JSON key: `chunksize` (under `settings`). +- State.cbo persists it per-module as `MultipartChunkSize` (under `StorageSettings`). + This locks each module's chunking to what was used at its last dehydrate. + +The chunk size a module uses is **persisted into state.cbo during dehydrate** so the +next hydrate uses the same partitioning. Changing the target-config `chunksize` affects +only modules with no prior state.cbo; existing modules continue to use the value +recorded at their last dehydrate. State.cbo files without the field fall back to +`DefaultMultipartChunkSize` (64 MiB). + +The default of 64 MiB is tuned for intra-region Nitro EC2 instances (hub and S3 bucket in +the same AWS region). For smaller instance types where per-connection bandwidth is under +~40 MB/s, 32 MiB may give better thread-pool utilisation on multi-hundred-MiB files. + --- ### Upstream Notifications diff --git a/scripts/test_scripts/hub/PERF_SEED_README.md b/scripts/test_scripts/hub/PERF_SEED_README.md new file mode 100644 index 000000000..fb471d4bb --- /dev/null +++ b/scripts/test_scripts/hub/PERF_SEED_README.md @@ -0,0 +1,148 @@ +# Perf-seed workflow + +Three-stage pipeline for running repeatable hub-hydration perf tests against a +local MinIO backend seeded with real module data pulled from production S3. + +## Layout + +All scripts default to a single perf-seed root - currently `E:/Dev/zen-perf-seed/` +in the script defaults, but every path is overridable via CLI flag (see the +per-stage options below). Pick a root with enough free space (snapshots and +preserved CAS dirs can be large) and either pass the matching `--*-dir` flag on +each invocation or change the script defaults to your chosen root. + +Layout under the chosen root (`<perf-seed>/`): + +``` +<perf-seed>/ + hub-a/ Stage A hub data dir (transient) + servers/<moduleid>/ + s3-snapshot/ Preserved production server-state trees (read-only after Stage A) + <moduleid>/ + hubs/ Stage B per-bucket hub data dirs (transient) + hub-b-zen-seed-packed/ + hub-b-zen-seed-unpacked/ + minio-data/ Stage B MinIO data dir (transient, carries every seeded bucket) + minio-seeded-baseline/ Preserved baseline MinIO CAS (read-only after Stage B + preserve) + README.txt + minio-seeded-packed/ Preserved packed MinIO CAS (filled by the pack worktree) + README.txt + hub-perf/ Stage C hub data dir (wiped each run) + minio-run/ Stage C MinIO data dir (wiped + re-copied each run) + perf-runs/ Per-run archive: hub.log, logs/, hub.utrace, summary.json + 20260423-141530_zen-seed-packed/ + 20260423-143112_zen-seed-unpacked/ +``` + +## Prerequisites + +- Debug or release build of zenserver + minio: `xmake -y` +- `pip install boto3` +- AWS CLI v2 with an SSO profile configured (for Stage A only) +- Environment variables (or pass equivalents via CLI flags): + - `ZEN_PERF_S3_URI` - source S3 bucket, e.g. `s3://your-bucket/optional-prefix/` + - `ZEN_PERF_AWS_PROFILE` - AWS SSO profile name with read access to that bucket + - `ZEN_PERF_AWS_REGION` - optional, defaults to `us-east-1` + +## Stage A - snapshot real S3 data + +One-time (or when you want a fresh baseline from production). + +``` +export ZEN_PERF_S3_URI=s3://your-bucket/ +export ZEN_PERF_AWS_PROFILE=your-sso-profile +python scripts/test_scripts/hub/seed_s3_snapshot.py +``` + +Provisions N modules from `$ZEN_PERF_S3_URI`, hibernates them, then copies +`hub-a/servers/<mid>/` to `s3-snapshot/<mid>/`. Triggers `aws sso login` +automatically if the SSO token is missing or expired. + +Module selection ranks all UUID-shaped folders by their +`incremental-state.cbo` `LastModified` (newest first, a proxy for +most-recently-accessed) and takes the top `--module-count`. + +Options: +- `--module-count N` (default 1000) +- `--snapshot-dir PATH` (default `<perf-seed>/s3-snapshot`) +- `--hub-data-dir PATH` (default `<perf-seed>/hub-a`) + +## Stage B - seed MinIO from the snapshot + +One-time per pack-mode (or when `s3-snapshot` changes). + +`seed_minio.py` seeds a **single** bucket per invocation. The pack flag is +hardcoded inside the script (`--hub-hydration-enable-pack=true` near the +top of `_start_hub`). To produce both packed and unpacked baselines for +comparison, invoke the script twice from two separate worktrees - one with +the flag flipped to `false` - and preserve the resulting MinIO data dir +each time. + +``` +# In the pack worktree (flag = true), seeds zen-seed-packed +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-packed +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-packed + +# In the no-pack worktree (flag = false), seeds zen-seed-unpacked +python scripts/test_scripts/hub/seed_minio.py --wipe --bucket zen-seed-unpacked +python scripts/test_scripts/hub/preserve_minio_state.py --dest <perf-seed>/minio-seeded-unpacked +``` + +The script provisions every module found under `s3-snapshot/`, hibernates +them, overlays the snapshot on top of the hub's servers dir, then +deprovisions all modules - which runs the dehydrate path and uploads the +content into the bucket. + +`preserve_minio_state.py` copies the resulting `minio-data/` to a +variant-specific preservation dir and writes a README with provenance. + +Options of interest: +- `--bucket NAME` - bucket name (default `zen-seed-packed`). +- `--wipe` removes the per-bucket hub data dir and the shared minio-data + dir before starting. +- `--module-count N` caps the set (0 = every module in snapshot-dir). + +## Stage C - run a perf iteration + +Repeat as often as you want; each run starts from the preserved baseline. + +``` +# Pack-on bucket +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-packed --trace + +# Pack-off bucket (for comparison) +python scripts/test_scripts/hub/run_minio_perf.py --bucket zen-seed-unpacked --trace +``` + +Steps: +1. Copies `--minio-seeded` (default `minio-seeded-baseline/`) over `minio-run/` so MinIO starts from a known state. +2. Wipes `hub-perf/` (unless `--no-wipe-hub`). +3. Starts MinIO and hub. +4. Provisions all modules, waits for `provisioned`, deprovisions, waits gone. +5. Stops everything cleanly. + +Default mode is `--hub-enable-dehydration=false` so MinIO isn't modified; every +iteration exercises the hydrate-only path against the same baseline CAS. The +`--bucket` flag selects which seeded bucket (and therefore which pack mode) +to exercise. + +Pass `--enable-dehydration` to run a full provision -> deprovision cycle that +includes re-upload (dehydrate) at deprovision time. Use this to measure the +dehydrate phase end-to-end against the seeded baseline. Note the seeded +baseline diverges after a `--enable-dehydration` run - re-copy `--minio-seeded` +or re-run `preserve_minio_state.py` if you want to compare to the pristine state. + +After each run the hub log, structured zenserver logs, any utrace file, and a +`summary.json` with the run's timings are copied into +`perf-runs/<timestamp>_<bucket>/` so Stage C runs can be compared +post-hoc. Override the destination with `--archive-dir PATH`. + +## Resetting between runs + +- **Keep**: `s3-snapshot/`, `minio-seeded-baseline/`, `minio-seeded-packed/`. These are expensive to rebuild. +- **Discard freely**: `hub-a/`, `hubs/`, `hub-perf/`, `minio-data/`, `minio-run/`. + +To force a fresh MinIO seed for one variant: delete the matching +`minio-seeded-<variant>/` and re-run Stage B + preserve (with the matching +`--dest`) in that worktree. To force a fresh S3 snapshot: delete +`s3-snapshot/` and re-run Stage A. diff --git a/scripts/test_scripts/hub/analyze_perf_runs.py b/scripts/test_scripts/hub/analyze_perf_runs.py new file mode 100644 index 000000000..521d9cc1e --- /dev/null +++ b/scripts/test_scripts/hub/analyze_perf_runs.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +"""Parse two hub.log archives (packed vs unpacked) and produce a comparison.""" + +from __future__ import annotations + +import argparse +import re +import sys +from pathlib import Path +from statistics import mean, median + + +_SIZE_UNITS = {"B": 1, "K": 1024, "M": 1024**2, "G": 1024**3, "T": 1024**4} +_TIME_UNITS = {"ns": 1e-9, "us": 1e-6, "ms": 1e-3, "s": 1.0} + + +def parse_size(s: str) -> float: + # Accept forms: "1.53K", "22.6K", "180M", "1.83G", "25" (bare bytes), "0B". + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)B?", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "B" + return float(val) * _SIZE_UNITS.get(unit, 1) + + +def parse_time_s(s: str) -> float: + # Accept forms: "1s", "46ms", "712us", "0ns" + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(ns|us|ms|s)", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) + return float(val) * _TIME_UNITS[unit] + + +def parse_rate_bits(s: str) -> float: + # "30.7Mbits/s" -> bits/sec + m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*([KMGT]?)bits/s", s.strip()) + if not m: + return 0.0 + val, unit = m.group(1), m.group(2) or "" + mult = {"": 1, "K": 1e3, "M": 1e6, "G": 1e9, "T": 1e12}[unit] + return float(val) * mult + + +_RE_COMPLETE = re.compile(r"Hydration complete module '([0-9a-f-]+)': (\d+) files \(([^)]+)\) in (\S+)$") +_RE_PACK = re.compile(r"Pack unpack:\s+(\d+) packs,\s+(\d+) files,\s+(\S+),\s+download (\S+),\s+unpack (\S+)") +_RE_DLPHASE = re.compile(r"Download phase:\s+(\S+)\s+(\d+)/(\d+)\s+\(([^)]+)\) downloaded,\s+(\S+),\s+(\d+) threads") +_RE_REQS = re.compile(r"Requests:\s+(\d+) reqs,\s+avg (\S+)/req,\s+avg (\S+)/req,\s+peak in-flight (\d+),\s+saturation (\d+)%") +_RE_SCHED = re.compile(r"Scheduling:\s+1st schedule \+(\S+),\s+1st start \+(\S+),\s+queue wait (\S+)") +_RE_LOADMETA = re.compile(r"Load metadata:\s+(\S+)") +_RE_RENAME = re.compile(r"Rename/copy:\s+(\S+)") + + +def parse_log(path: Path) -> list[dict]: + modules: list[dict] = [] + text = path.read_text(encoding="utf-8", errors="replace").splitlines() + i = 0 + while i < len(text): + line = text[i] + m = _RE_COMPLETE.search(line) + if not m: + i += 1 + continue + mod = { + "mid": m.group(1), + "files": int(m.group(2)), + "total_bytes": parse_size(m.group(3)), + "wall_s": parse_time_s(m.group(4)), + } + # Read the block that follows (up to ~12 lines). + block = text[i : min(i + 20, len(text))] + joined = "\n".join(block) + pm = _RE_PACK.search(joined) + if pm: + mod["packs"] = int(pm.group(1)) + mod["packed_files"] = int(pm.group(2)) + # Uncompressed format: single size, no integrity rehash. + mod["pack_bytes"] = parse_size(pm.group(3)) + mod["pack_download_s"] = parse_time_s(pm.group(4)) + mod["pack_unpack_s"] = parse_time_s(pm.group(5)) + else: + mod["packs"] = 0 + mod["packed_files"] = 0 + mod["pack_bytes"] = 0.0 + mod["pack_download_s"] = 0.0 + mod["pack_unpack_s"] = 0.0 + + dm = _RE_DLPHASE.search(joined) + if dm: + mod["dl_phase_s"] = parse_time_s(dm.group(1)) + mod["dl_files"] = int(dm.group(2)) + mod["dl_total_files"] = int(dm.group(3)) + mod["dl_bytes"] = parse_size(dm.group(4)) + mod["dl_throughput_bps"] = parse_rate_bits(dm.group(5)) + mod["dl_threads"] = int(dm.group(6)) + rm = _RE_REQS.search(joined) + if rm: + mod["reqs"] = int(rm.group(1)) + mod["req_avg_s"] = parse_time_s(rm.group(2)) + mod["req_avg_throughput_bps"] = parse_rate_bits(rm.group(3)) + mod["req_peak_inflight"] = int(rm.group(4)) + mod["req_saturation_pct"] = int(rm.group(5)) + sm = _RE_SCHED.search(joined) + if sm: + mod["sched_1st_schedule_s"] = parse_time_s(sm.group(1)) + mod["sched_1st_start_s"] = parse_time_s(sm.group(2)) + mod["queue_wait_s"] = parse_time_s(sm.group(3)) + lm = _RE_LOADMETA.search(joined) + if lm: + mod["load_metadata_s"] = parse_time_s(lm.group(1)) + rcm = _RE_RENAME.search(joined) + if rcm: + mod["rename_copy_s"] = parse_time_s(rcm.group(1)) + + modules.append(mod) + i += 1 + + return modules + + +def fmt_bytes(n: float) -> str: + for u in ("B", "KB", "MB", "GB", "TB"): + if n < 1024 or u == "TB": + return f"{n:.1f} {u}" + n /= 1024 + return f"{n:.1f} TB" + + +def fmt_s(s: float) -> str: + if s < 1e-3: + return f"{s*1e6:.0f} us" + if s < 1.0: + return f"{s*1e3:.1f} ms" + return f"{s:.2f} s" + + +def agg(modules: list[dict]) -> dict: + a: dict = {} + a["count"] = len(modules) + a["total_files"] = sum(m["files"] for m in modules) + a["total_bytes"] = sum(m["total_bytes"] for m in modules) + a["total_reqs"] = sum(m.get("reqs", 0) for m in modules) + a["total_packs"] = sum(m.get("packs", 0) for m in modules) + a["total_packed_files"] = sum(m.get("packed_files", 0) for m in modules) + a["total_pack_bytes"] = sum(m.get("pack_bytes", 0.0) for m in modules) + a["total_dl_bytes"] = sum(m.get("dl_bytes", 0.0) for m in modules) + a["total_dl_phase_s"] = sum(m.get("dl_phase_s", 0.0) for m in modules) + a["total_pack_download_s"] = sum(m.get("pack_download_s", 0.0) for m in modules) + a["total_pack_unpack_s"] = sum(m.get("pack_unpack_s", 0.0) for m in modules) + a["total_rename_s"] = sum(m.get("rename_copy_s", 0.0) for m in modules) + a["total_load_meta_s"] = sum(m.get("load_metadata_s", 0.0) for m in modules) + a["total_wall_s"] = sum(m["wall_s"] for m in modules) + + # Per-module medians and percentiles of request-level metrics + req_avgs = [m["req_avg_s"] for m in modules if m.get("reqs", 0) > 0] + a["req_avg_latency_median_s"] = median(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_mean_s"] = mean(req_avgs) if req_avgs else 0.0 + a["req_avg_latency_p95_s"] = ( + sorted(req_avgs)[int(0.95 * len(req_avgs))] if req_avgs else 0.0 + ) + + queue_waits = [m["queue_wait_s"] for m in modules if "queue_wait_s" in m] + a["queue_wait_median_s"] = median(queue_waits) if queue_waits else 0.0 + a["queue_wait_mean_s"] = mean(queue_waits) if queue_waits else 0.0 + a["queue_wait_p95_s"] = ( + sorted(queue_waits)[int(0.95 * len(queue_waits))] if queue_waits else 0.0 + ) + a["queue_wait_max_s"] = max(queue_waits) if queue_waits else 0.0 + + first_starts = [m["sched_1st_start_s"] for m in modules if "sched_1st_start_s" in m] + a["first_start_median_s"] = median(first_starts) if first_starts else 0.0 + a["first_start_p95_s"] = ( + sorted(first_starts)[int(0.95 * len(first_starts))] if first_starts else 0.0 + ) + + wall_times = [m["wall_s"] for m in modules] + a["wall_median_s"] = median(wall_times) if wall_times else 0.0 + a["wall_mean_s"] = mean(wall_times) if wall_times else 0.0 + a["wall_p95_s"] = sorted(wall_times)[int(0.95 * len(wall_times))] if wall_times else 0.0 + a["wall_max_s"] = max(wall_times) if wall_times else 0.0 + + saturations = [m["req_saturation_pct"] for m in modules if "req_saturation_pct" in m] + a["saturation_median_pct"] = median(saturations) if saturations else 0 + a["saturation_mean_pct"] = mean(saturations) if saturations else 0 + + peak_inflights = [m["req_peak_inflight"] for m in modules if "req_peak_inflight" in m] + a["peak_inflight_median"] = median(peak_inflights) if peak_inflights else 0 + a["peak_inflight_max"] = max(peak_inflights) if peak_inflights else 0 + + return a + + +def print_row(label: str, packed_val: str, unpacked_val: str, delta: str = ""): + print(f" {label:<42} {packed_val:>14} {unpacked_val:>14} {delta}") + + +def delta_pct(packed: float, unpacked: float) -> str: + if unpacked == 0: + return "" + d = packed - unpacked + pct = (d / unpacked) * 100.0 + sign = "+" if d >= 0 else "" + return f"{sign}{pct:.1f}%" + + +def main() -> int: + p = argparse.ArgumentParser() + p.add_argument("--packed-log", required=True) + p.add_argument("--unpacked-log", required=True) + args = p.parse_args() + + pm = parse_log(Path(args.packed_log)) + um = parse_log(Path(args.unpacked_log)) + + pa = agg(pm) + ua = agg(um) + + print(f"\nParsed packed: {pa['count']} modules from {args.packed_log}") + print(f"Parsed unpacked: {ua['count']} modules from {args.unpacked_log}\n") + + print(f" {'metric':<42} {'packed':>14} {'unpacked':>14} delta(p vs u)") + print(f" {'-'*42} {'-'*14} {'-'*14} {'-'*12}") + + print("\n-- workload --") + print_row("modules", f"{pa['count']}", f"{ua['count']}") + print_row("total files (entries)", f"{pa['total_files']}", f"{ua['total_files']}") + print_row("total raw data", fmt_bytes(pa["total_bytes"]), fmt_bytes(ua["total_bytes"])) + + print("\n-- S3 requests --") + print_row("total S3 GETs", + f"{pa['total_reqs']}", f"{ua['total_reqs']}", + delta_pct(pa["total_reqs"], ua["total_reqs"])) + print_row("GETs per module (mean)", + f"{pa['total_reqs']/pa['count']:.1f}", + f"{ua['total_reqs']/ua['count']:.1f}", + delta_pct(pa['total_reqs']/pa['count'], ua['total_reqs']/ua['count'])) + print_row("requests saved by packing", + f"{ua['total_reqs'] - pa['total_reqs']}", "-", + f"{((ua['total_reqs'] - pa['total_reqs']) / ua['total_reqs']) * 100:.1f}%") + + print("\n-- payload --") + print_row("total bytes downloaded", + fmt_bytes(pa["total_dl_bytes"]), + fmt_bytes(ua["total_dl_bytes"]), + delta_pct(pa["total_dl_bytes"], ua["total_dl_bytes"])) + print_row(" pack bytes (aggregate)", + fmt_bytes(pa["total_pack_bytes"]), "-") + diff = pa["total_dl_bytes"] - ua["total_dl_bytes"] + direction = "saved by packing" if diff < 0 else "added by packing" + print_row(f"bytes {direction}", fmt_bytes(abs(diff)), "-") + + print("\n-- per-request latency (per-module avg) --") + print_row("median req avg latency", + fmt_s(pa["req_avg_latency_median_s"]), + fmt_s(ua["req_avg_latency_median_s"]), + delta_pct(pa["req_avg_latency_median_s"], ua["req_avg_latency_median_s"])) + print_row("mean req avg latency", + fmt_s(pa["req_avg_latency_mean_s"]), + fmt_s(ua["req_avg_latency_mean_s"]), + delta_pct(pa["req_avg_latency_mean_s"], ua["req_avg_latency_mean_s"])) + print_row("p95 req avg latency", + fmt_s(pa["req_avg_latency_p95_s"]), + fmt_s(ua["req_avg_latency_p95_s"]), + delta_pct(pa["req_avg_latency_p95_s"], ua["req_avg_latency_p95_s"])) + + print("\n-- queue / scheduling --") + print_row("median queue wait (per module)", + fmt_s(pa["queue_wait_median_s"]), + fmt_s(ua["queue_wait_median_s"]), + delta_pct(pa["queue_wait_median_s"], ua["queue_wait_median_s"])) + print_row("mean queue wait", + fmt_s(pa["queue_wait_mean_s"]), + fmt_s(ua["queue_wait_mean_s"]), + delta_pct(pa["queue_wait_mean_s"], ua["queue_wait_mean_s"])) + print_row("p95 queue wait", + fmt_s(pa["queue_wait_p95_s"]), + fmt_s(ua["queue_wait_p95_s"]), + delta_pct(pa["queue_wait_p95_s"], ua["queue_wait_p95_s"])) + print_row("max queue wait", + fmt_s(pa["queue_wait_max_s"]), + fmt_s(ua["queue_wait_max_s"]), + delta_pct(pa["queue_wait_max_s"], ua["queue_wait_max_s"])) + print_row("median 1st-start delay", + fmt_s(pa["first_start_median_s"]), + fmt_s(ua["first_start_median_s"]), + delta_pct(pa["first_start_median_s"], ua["first_start_median_s"])) + + print("\n-- per-module wall time (hydration block) --") + print_row("median", + fmt_s(pa["wall_median_s"]), + fmt_s(ua["wall_median_s"]), + delta_pct(pa["wall_median_s"], ua["wall_median_s"])) + print_row("mean", + fmt_s(pa["wall_mean_s"]), + fmt_s(ua["wall_mean_s"]), + delta_pct(pa["wall_mean_s"], ua["wall_mean_s"])) + print_row("p95", + fmt_s(pa["wall_p95_s"]), + fmt_s(ua["wall_p95_s"]), + delta_pct(pa["wall_p95_s"], ua["wall_p95_s"])) + print_row("max", + fmt_s(pa["wall_max_s"]), + fmt_s(ua["wall_max_s"]), + delta_pct(pa["wall_max_s"], ua["wall_max_s"])) + + print("\n-- thread-pool saturation / concurrency --") + print_row("saturation median %", + f"{pa['saturation_median_pct']}", + f"{ua['saturation_median_pct']}") + print_row("saturation mean %", + f"{pa['saturation_mean_pct']:.1f}", + f"{ua['saturation_mean_pct']:.1f}") + print_row("peak in-flight median", + f"{pa['peak_inflight_median']}", + f"{ua['peak_inflight_median']}") + print_row("peak in-flight max", + f"{pa['peak_inflight_max']}", + f"{ua['peak_inflight_max']}") + + print("\n-- pack-specific costs (packed run only) --") + print(f" pack GETs added: {pa['total_packs']} ({pa['total_packs']/pa['count']:.2f}/module)") + print(f" small files folded: {pa['total_packed_files']} ({pa['total_packed_files']/pa['count']:.1f}/module)") + print(f" pack download time total: {pa['total_pack_download_s']:.1f} s") + print(f" pack unpack total: {pa['total_pack_unpack_s']:.2f} s") + + print("\n-- per-phase totals (sum across modules, wall-clock contribution) --") + print_row("load metadata total", + fmt_s(pa["total_load_meta_s"]), + fmt_s(ua["total_load_meta_s"]), + delta_pct(pa["total_load_meta_s"], ua["total_load_meta_s"])) + print_row("download phase total", + fmt_s(pa["total_dl_phase_s"]), + fmt_s(ua["total_dl_phase_s"]), + delta_pct(pa["total_dl_phase_s"], ua["total_dl_phase_s"])) + print_row("rename/copy total", + fmt_s(pa["total_rename_s"]), + fmt_s(ua["total_rename_s"]), + delta_pct(pa["total_rename_s"], ua["total_rename_s"])) + print_row("hydration wall sum", + fmt_s(pa["total_wall_s"]), + fmt_s(ua["total_wall_s"]), + delta_pct(pa["total_wall_s"], ua["total_wall_s"])) + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/hub_load_test_s3.py b/scripts/test_scripts/hub/hub_load_test_s3.py new file mode 100644 index 000000000..23014409c --- /dev/null +++ b/scripts/test_scripts/hub/hub_load_test_s3.py @@ -0,0 +1,537 @@ +#!/usr/bin/env python3 +"""Hub provision/deprovision sweep against a real S3 bucket. + +Lists the top-level folders in S3_URI (each folder name is a moduleId), +picks 200 of them, then: + 1. fires all provision requests concurrently, + 2. polls until every module reaches 'provisioned', + 3. waits 5 seconds, + 4. fires all deprovision requests concurrently, + 5. polls until every module is deprovisioned, + 6. waits 5 seconds, + 7. shuts down the hub. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Credentials come from 'aws sso login --profile <AWS_PROFILE>'. If the SSO +session is missing or expired, the script triggers the login automatically. + +Requirements: + pip install boto3 + aws CLI v2 +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + + +# --------------------------------------------------------------------------- +# Executable discovery +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + candidates = [ + repo_root / "build" / "windows" / "x64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "linux" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + repo_root / "build" / "macosx" / "x86_64" / "debug" / f"zenserver{_EXE_SUFFIX}", + ] + for c in candidates: + if c.exists(): + return c + + matches = list(repo_root.glob(f"build/**/debug/zenserver{_EXE_SUFFIX}")) + if matches: + return max(matches, key=lambda p: p.stat().st_mtime) + + sys.exit( + "zenserver debug executable not found in build/. " + "Run: xmake config -y -m debug -a x64 && xmake -y\n" + "Or pass --zenserver-dir <dir>." + ) + + +# --------------------------------------------------------------------------- +# AWS SSO + S3 listing +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit( + "[aws] boto3 is required.\n" + "Install it with: pip install boto3" + ) + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI (must start with s3://): {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str) -> list[str]: + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + paginator = s3.get_paginator("list_objects_v2") + module_ids: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + # Strip outer prefix + trailing slash to get the folder name + folder = full[len(prefix_norm):].rstrip("/") + if folder: + module_ids.append(folder) + return module_ids + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-http-threads=8", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[{name}] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + out: dict[str, str] = {} + for m in data.get("modules", []) or []: + mid = m.get("moduleId") + if mid: + out[mid] = m.get("state", "") + return out + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """Fire POST /hub/modules/<id>/<verb> for every module concurrently. + + Returns (accepted_ids, failures) where accepted_ids got 200/202 and + failures is a list of (mid, status, body) for everything else. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + # Already gone - treat as success for deprovision fan-out. + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _wait_for_provisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is 'provisioned' or gone. + + Returns (stuck_ids, last_states). stuck_ids are the ones that did not reach + 'provisioned' within timeout_s; last_states maps all module_ids to their + last-seen state (or empty string if the module was absent from the report). + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + now_done: list[str] = [] + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == "provisioned": + now_done.append(mid) + elif s in ("", "unprovisioned", "crashed"): + # Not useful end-states for "waiting to become provisioned", + # but we don't block forever on them either - let timeout decide. + pass + for mid in now_done: + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[provision] {done}/{len(module_ids)} provisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], dict[str, str]]: + """Poll until every module in module_ids is gone from hub status.""" + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} deprovisioned...", end="\r") + time.sleep(2.0) + + return list(remaining), last_states + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest-s3", + help="Hub --data-dir (default: E:/Dev/hub-loadtest-s3)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=200, + help="Number of modules to sweep (default: 200)") + parser.add_argument("--settle-seconds", type=float, default=5.0, + help="Seconds to wait between provision-complete and deprovision, " + "and between deprovision-complete and shutdown (default: 5.0)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers for provision/deprovision fan-out (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=600.0, + help="Max seconds to wait for provision or deprovision to finish (default: 600)") + parser.add_argument("--trace", + nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <data-dir>/hub.utrace. " + "Optionally pass channel spec (default: 'default')") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + data_dir = Path(args.data_dir) + hub_log = data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] region: {aws_region}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region) + print(f"[s3] found {len(module_ids)} module folders") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + selected = module_ids[:args.module_count] + if len(selected) < args.module_count: + print(f"[s3] only {len(selected)} folders available (requested {args.module_count})") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + data_dir.mkdir(parents=True, exist_ok=True) + config_path = data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + # Read-only S3 profile: skip dehydration to avoid AccessDenied on write-back. + "--hub-enable-dehydration=false", + ] + if args.trace: + trace_path = data_dir / "hub.utrace" + hub_extra_args += [ + f"--trace={args.trace}", + f"--tracefile={trace_path}", + ] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + + try: + hub_instance_limit = max(args.module_count, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision phase --- + print(f"[provision] firing {len(selected)} provision requests (workers={args.workers})...") + t_pv0 = time.monotonic() + accepted, pv_failures = _fan_out_post(pool, args.port, selected, "provision") + print(f"[provision] accepted={len(accepted)}, rejected/error={len(pv_failures)} " + f"(fan-out {time.monotonic() - t_pv0:.1f}s)") + for mid, status, body in pv_failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if len(pv_failures) > 10: + print(f"[provision] ... and {len(pv_failures) - 10} more failures") + + if hub_proc.poll() is not None: + print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})") + return + + if accepted: + stuck, last_states = _wait_for_provisioned(args.port, accepted, args.poll_timeout) + provisioned_count = len(accepted) - len(stuck) + print() + print(f"[provision] all provision complete: {provisioned_count}/{len(accepted)} reached 'provisioned' " + f"({time.monotonic() - t_pv0:.1f}s total)") + if stuck: + print(f"[provision] WARNING: {len(stuck)} module(s) did not reach 'provisioned' within {args.poll_timeout}s") + for mid in stuck[:10]: + print(f"[provision] stuck {mid}: last state='{last_states.get(mid, '')}'") + else: + print("[provision] nothing provisioned") + return + + print(f"[settle] waiting {args.settle_seconds:.0f}s before deprovision...") + time.sleep(args.settle_seconds) + + # --- Deprovision phase --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t_dp0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.port, accepted, "deprovision") + print(f"[deprovision] accepted={len(dp_accepted)}, rejected/error={len(dp_failures)} " + f"(fan-out {time.monotonic() - t_dp0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[deprovision] FAILED {mid}: status={status} body={body}") + if len(dp_failures) > 10: + print(f"[deprovision] ... and {len(dp_failures) - 10} more failures") + + stuck_dp, last_states_dp = _wait_for_deprovisioned(args.port, dp_accepted, args.poll_timeout) + deprovisioned_count = len(dp_accepted) - len(stuck_dp) + print() + print(f"[deprovision] all deprovision complete: {deprovisioned_count}/{len(dp_accepted)} gone " + f"({time.monotonic() - t_dp0:.1f}s total)") + if stuck_dp: + print(f"[deprovision] WARNING: {len(stuck_dp)} module(s) still present after {args.poll_timeout}s") + for mid in stuck_dp[:10]: + print(f"[deprovision] stuck {mid}: last state='{last_states_dp.get(mid, '')}'") + + print(f"[settle] waiting {args.settle_seconds:.0f}s before shutdown...") + time.sleep(args.settle_seconds) + + print(f"[summary] total elapsed: {time.monotonic() - t_start:.1f}s") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _stop_process(hub_proc, "hub", timeout_s=120.0) + if hub_log_handle is not None: + hub_log_handle.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/hub/perf_configs/hub.lua b/scripts/test_scripts/hub/perf_configs/hub.lua new file mode 100644 index 000000000..f3cf3e697 --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/hub.lua @@ -0,0 +1,48 @@ +-- Perf-test hub config: mirrors the production hub config in the repo root +-- (hub.lua / instance.lua). The launch script overrides hub.instance.config +-- and the effective concurrency (--corelimit=128) via CLI to simulate an +-- r5n.32xlarge box so the default auto-picks match what prod sees there. + +hub = { + instance = { + baseportnumber = 21000, -- default + limits = { + count = 1100, -- headroom for 1000-module perf runs + memorylimitpercent = 90, -- default: 0 (disabled) + disklimitpercent = 90, -- default: 0 (disabled) + }, + corelimit = 4, -- default: 0 (auto) + provisionthreads = 8, -- default: auto + -- NOTE: hub.instance.config (path to instance lua) is overridden via + -- --hub-instance-config on the CLI. If left here, it would be resolved + -- relative to the hub's CWD at spawn time (NOT this file's dir). + }, + + hydration = { + -- Match production's per-module download pool size. Without this, the + -- default auto-picks hardware_concurrency/4 which on --corelimit=128 + -- would be 32. Prod logs consistently show "16 threads" in Download phase. + threads = 16, + }, + + watchdog = { + cycleintervalms = 5000, -- default: 3000. slower cycle, 1000 instances to scan + cycleprocessingbudgetms = 1000, -- default: 500. more budget per cycle for larger instance count + instancecheckthrottlems = 10, -- default: 5. slight throttle to reduce hub CPU + provisionedinactivitytimeoutseconds = 600, -- default + hibernatedinactivitytimeoutseconds = 1800, -- default + inactivitycheckmarginseconds = 60, -- default + }, +} + +network = { + httpserverthreads = 8, -- default: auto. hub itself needs few threads +} + +server = { + dedicated = true, -- default: false. signals build-farm use, affects thread scaling heuristics +} + +gc = { + enabled = false, -- default: true. hub has no storage, no need for GC +} diff --git a/scripts/test_scripts/hub/perf_configs/instance.lua b/scripts/test_scripts/hub/perf_configs/instance.lua new file mode 100644 index 000000000..1251997db --- /dev/null +++ b/scripts/test_scripts/hub/perf_configs/instance.lua @@ -0,0 +1,15 @@ +network = { + httpserverthreads = 4, -- default: auto +} + +gc = { + enabled = false, -- default: true. hub triggers GC at deprovision, no need for periodic GC +} + +cache = { + bucket = { + memlayer = { + sizethreshold = 0, -- default: 1024. 0 disables memlayer entirely + }, + }, +} diff --git a/scripts/test_scripts/hub/preserve_minio_state.py b/scripts/test_scripts/hub/preserve_minio_state.py new file mode 100644 index 000000000..365e4a542 --- /dev/null +++ b/scripts/test_scripts/hub/preserve_minio_state.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +"""Preserve a freshly-seeded MinIO data directory. + +Run this after seed_minio.py has finished. By default it MOVES --source (the +MinIO --data-dir used during seeding) to --dest (the preservation path) and +writes a README with provenance so future perf runs start from a known +baseline. Move avoids a ~0.5 TB copy on a full 1000-module seed; Stage B +wipes --minio-data-dir on its next invocation anyway. + +Pass --copy to keep --source in place (slower; needs 2x disk). + +Typical invocation: + python preserve_minio_state.py + +Defaults map to the paths recommended by PERF_SEED_README.md. +""" + +from __future__ import annotations + +import argparse +import datetime +import os +import shutil +import stat +import sys +from pathlib import Path + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + def _onerror(func, p, exc_info): + try: + os.chmod(p, stat.S_IWRITE) + func(p) + except Exception: + pass + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +def _size_of(path: Path) -> tuple[int, int]: + files = 0 + total = 0 + for root, _dirs, names in os.walk(path): + for n in names: + p = Path(root) / n + try: + total += p.stat().st_size + except OSError: + pass + files += 1 + return files, total + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--source", default="E:/Dev/zen-perf-seed/minio-data", + help="Source MinIO data dir (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--dest", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preservation path (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--s3-uri", default=os.environ.get("ZEN_PERF_S3_URI", ""), + help="Source S3 URI recorded in the README (defaults to $ZEN_PERF_S3_URI)") + parser.add_argument("--bucket", default="zen-seed", + help="MinIO bucket name recorded in the README") + parser.add_argument("--module-count", type=int, default=300, + help="Module count recorded in the README") + parser.add_argument("--copy", action="store_true", + help="Copy --source to --dest instead of moving it. Default is move " + "(fast, in-place rename when on the same volume). Use --copy if you " + "want to keep --source intact for another preserve run.") + args = parser.parse_args() + + source = Path(args.source).resolve() + dest = Path(args.dest).resolve() + + if not source.is_dir(): + sys.exit(f"[preserve] source not found: {source}") + + # Dest is wiped and rewritten. Refuse any path that would clobber source. + if dest == source or dest in source.parents or source in dest.parents: + sys.exit(f"[preserve] source ({source}) and dest ({dest}) must be disjoint") + + files, total = _size_of(source) + mode = "copy" if args.copy else "move" + print(f"[preserve] source: {source} -> {files:,} files, {total/1024/1024:.1f} MB") + print(f"[preserve] dest: {dest}") + print(f"[preserve] mode: {mode}") + + if dest.exists(): + print(f"[preserve] removing existing dest {dest}") + _rmtree_robust(dest) + + dest.parent.mkdir(parents=True, exist_ok=True) + if args.copy: + shutil.copytree(source, dest, symlinks=False) + else: + shutil.move(str(source), str(dest)) + + readme = dest / "README.txt" + readme.write_text( + "\n".join([ + "zen-perf-seed preserved MinIO state", + "", + f"Created: {datetime.datetime.now(datetime.timezone.utc).isoformat()}", + f"Source s3 URI: {args.s3_uri}", + f"Bucket: {args.bucket}", + f"Modules: {args.module_count}", + f"Files: {files:,}", + f"Bytes: {total:,}", + "", + "To run a perf iteration: copy this directory onto a fresh MinIO data", + "dir (see scripts/test_scripts/hub/run_minio_perf.py) and point a hub at it.", + "", + ]), + encoding="ascii", + ) + print(f"[preserve] wrote {readme}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/run_minio_perf.py b/scripts/test_scripts/hub/run_minio_perf.py new file mode 100644 index 000000000..b59cff41a --- /dev/null +++ b/scripts/test_scripts/hub/run_minio_perf.py @@ -0,0 +1,614 @@ +#!/usr/bin/env python3 +"""Stage C of the perf-seed workflow: run a perf iteration against the +preserved MinIO state. + +Flow per invocation: + 1. Reset --minio-run by copying --minio-seeded over it (so every iteration + starts from the same canonical state). + 2. Start MinIO against --minio-run. + 3. Start a hub against MinIO. By default --hub-enable-dehydration=false so + the perf run is read-only and the seeded baseline survives intact. + Pass --enable-dehydration to run a full provision -> deprovision cycle + that re-uploads at deprovision time (use this to measure the dehydrate + phase as well as hydrate; the seeded baseline diverges after such a run). + 4. Provision every module found under --snapshot-dir (same set that was + used to seed MinIO), wait for all to reach 'provisioned'. + 5. Deprovision them all, wait for them to be gone. + 6. Stop hub + MinIO cleanly. + +Optional --trace writes a utrace file to --hub-data-dir/hub.utrace. + +This script reuses the lightweight harness helpers from the other stages but +keeps the run fully offline once the seed is preserved. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p}") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}") + return p + + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out after {timeout_s}s") + + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + # Use the production-like Lua config (limits, thread counts, watchdog timers) + # so perf numbers reflect what prod actually runs. Hub.hydration.threads is + # left unset so the default auto-pick (hardware_concurrency/4) fires; combined + # with --corelimit=128 below, that yields 32 hydration threads on the actual + # prod r5n.32xlarge class, and clamps to the real core count on smaller boxes. + config_dir = Path(__file__).resolve().parent / "perf_configs" + hub_config = config_dir / "hub.lua" + inst_config = config_dir / "instance.lua" + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + f"--config={hub_config}", + f"--hub-instance-config={inst_config}", + # Cap effective hardware concurrency to 128 (r5n.32xlarge). On a dev box + # with fewer physical cores, this clamps to the actual count (see + # LimitHardwareConcurrency in thread.cpp - it's a Min, never inflates). + # When running on the actual prod instance class, this makes the default + # auto-picks (provision/hydration thread counts) behave as prod does. + "--corelimit=128", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen(cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, **popen_kwargs) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' for graceful hub shutdown.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_states(port: int) -> Optional[dict[str, str]]: + # Without Accept: application/json the hub serves compact binary (CBOR) + # which json.loads can't parse. 60s timeout - /hub/status can take a few + # seconds to serialize with 300+ modules in flight. + req = urllib.request.Request( + f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}, + ) + try: + with urllib.request.urlopen(req, timeout=60) as resp: + data = json.loads(resp.read()) + except Exception as e: + # Visibility: prior silent None caused "0/300 provisioned..." forever. + print(f"[status] WARN /hub/status failed: {e}") + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + futures = {mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") for mid in module_ids} + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +def _state_histogram(states: dict[str, str], remaining: set[str]) -> str: + counts: dict[str, int] = {} + for mid in remaining: + s = states.get(mid, "<absent>") + counts[s] = counts.get(s, 0) + 1 + return ", ".join(f"{k}={v}" for k, v in sorted(counts.items(), key=lambda kv: -kv[1])) + + +def _wait_for_provisioned(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + if states.get(mid) == "provisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all provisioned)" + print(f"[provision] {done}/{len(ids)} provisioned | remaining: {hist}") + last_verbose = now + else: + print(f"[provision] {done}/{len(ids)} provisioned...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _wait_for_gone(port: int, ids: list[str], timeout_s: float) -> list[str]: + deadline = time.monotonic() + timeout_s + remaining = set(ids) + last_verbose = 0.0 + while remaining and time.monotonic() < deadline: + states = _hub_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + done = len(ids) - len(remaining) + now = time.monotonic() + if states is not None and (now - last_verbose >= 10.0 or not remaining): + hist = _state_histogram(states, remaining) if remaining else "(all gone)" + print(f"[deprovision] {done}/{len(ids)} gone | remaining: {hist}") + last_verbose = now + else: + print(f"[deprovision] {done}/{len(ids)} gone...", end="\r") + time.sleep(2.0) + print() + return list(remaining) + + +def _robust_copytree(src: Path, dst: Path) -> None: + """Windows-friendly directory copy with progress. + + Uses robocopy /MIR to mirror src -> dst and /COPY:DAT /DCOPY:DAT to copy + file data + attributes + timestamps explicitly (not just the defaults). + Safe because dst is always the working dir (minio-run) - never the + preserved baseline. + """ + if sys.platform == "win32": + cmd = [ + "robocopy", str(src), str(dst), + "/MIR", + "/COPY:DAT", "/DCOPY:DAT", + "/NJH", "/NJS", "/NC", "/NDL", "/NFL", "/NP", + "/R:2", "/W:1", + ] + print(f"[reset] robocopy {src} -> {dst}") + rc = subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + if rc >= 8: + sys.exit(f"[reset] robocopy failed rc={rc}") + else: + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False) + + +def _archive_run( + archive_dir: Path, + hub_data_dir: Path, + bucket: str, + summary: dict, +) -> Optional[Path]: + """Copy hub.log, zenserver.log(s), hub.utrace and a summary.json into a + timestamped subdir under archive_dir so successive runs can be compared. + + Returns the archive destination path or None if there was nothing to copy. + """ + try: + ts = time.strftime("%Y%m%d-%H%M%S", time.localtime()) + dest = archive_dir / f"{ts}_{bucket}" + dest.mkdir(parents=True, exist_ok=True) + + copied: list[str] = [] + # Hub stdout/stderr (captured by _start_hub via subprocess stdout=) + hub_log = hub_data_dir / "hub.log" + if hub_log.is_file(): + shutil.copy2(hub_log, dest / "hub.log") + copied.append("hub.log") + # Structured zenserver logs (rotated variants included) + logs_src = hub_data_dir / "logs" + if logs_src.is_dir(): + logs_dst = dest / "logs" + logs_dst.mkdir(exist_ok=True) + for p in logs_src.iterdir(): + if p.is_file(): + shutil.copy2(p, logs_dst / p.name) + copied.append(f"logs/{p.name}") + # Optional trace + utrace = hub_data_dir / "hub.utrace" + if utrace.is_file(): + shutil.copy2(utrace, dest / "hub.utrace") + copied.append("hub.utrace") + + (dest / "summary.json").write_text(json.dumps(summary, indent=2), encoding="ascii") + + print(f"[archive] {dest} ({len(copied)} files)") + return dest + except Exception as e: + print(f"[archive] WARNING: failed to archive run: {e}") + return None + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--minio-seeded", default="E:/Dev/zen-perf-seed/minio-seeded-packed", + help="Preserved MinIO baseline (default: E:/Dev/zen-perf-seed/minio-seeded-packed). " + "Sibling to E:/Dev/zen-perf-seed/minio-seeded-baseline.") + parser.add_argument("--minio-run", default="E:/Dev/zen-perf-seed/minio-run", + help="Working MinIO data dir, wiped and re-copied each run (default: E:/Dev/zen-perf-seed/minio-run)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of module IDs to run against (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-perf", + help="Hub --data-dir for the perf run (wiped each run if --wipe). Default: E:/Dev/zen-perf-seed/hub-perf") + parser.add_argument("--archive-dir", default="E:/Dev/zen-perf-seed/perf-runs", + help="Where to archive hub.log + zenserver.log + hub.utrace after each run " + "(default: E:/Dev/zen-perf-seed/perf-runs)") + parser.add_argument("--bucket", default="zen-seed-packed", + help="MinIO bucket name to exercise (default: zen-seed-packed). " + "Pack worktree seeds only the packed bucket.") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules used (0 = all from snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1200.0) + parser.add_argument("--settle-seconds", type=float, default=5.0) + parser.add_argument("--trace", nargs="?", const="default", default=None, metavar="CHANNELS", + help="Enable UE trace on the hub, writing to <hub-data-dir>/hub.utrace (default channels: 'default')") + parser.add_argument("--enable-dehydration", action="store_true", + help="Run with --hub-enable-dehydration=true so the deprovision phase " + "actually re-uploads to MinIO. Default false preserves the seeded " + "baseline; pass this to measure dehydrate cost as well as hydrate.") + parser.add_argument("--no-wipe-hub", action="store_true", + help="Don't wipe the hub data dir before starting (useful to inspect leftover state)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + minio_seeded = Path(args.minio_seeded).resolve() + minio_run = Path(args.minio_run).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_dir = Path(args.hub_data_dir).resolve() + archive_dir = Path(args.archive_dir).resolve() + + if not minio_seeded.is_dir(): + sys.exit(f"[setup] preserved MinIO baseline not found: {minio_seeded}") + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot dir not found: {snapshot_dir}") + + # Preserved inputs are read-only from this script's perspective. Refuse to + # run if any mutable path could accidentally clobber them. + for label, mutable in (("minio-run", minio_run), ("hub-data-dir", hub_data_dir)): + for ro_label, ro in (("minio-seeded", minio_seeded), ("snapshot-dir", snapshot_dir)): + if mutable == ro or mutable in ro.parents or ro in mutable.parents: + sys.exit(f"[setup] refusing to run: {label}={mutable} overlaps {ro_label}={ro}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no modules in {snapshot_dir}") + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] minio-seeded: {minio_seeded}") + print(f"[setup] minio-run: {minio_run}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + + # Reset MinIO run dir from the baseline. + _robust_copytree(minio_seeded, minio_run) + + # Wipe the hub data dir so every run starts from scratch unless the user opts out. + if not args.no_wipe_hub and hub_data_dir.exists(): + print(f"[reset] wiping {hub_data_dir}") + _rmtree_robust(hub_data_dir) + hub_data_dir.mkdir(parents=True, exist_ok=True) + + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{args.bucket}", + "endpoint": f"http://localhost:{args.minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + f"--hub-enable-dehydration={'true' if args.enable_dehydration else 'false'}", + ] + if args.enable_dehydration: + print("[mode] --enable-dehydration=true: deprovision will re-upload to MinIO; baseline will diverge") + if args.trace: + trace_path = hub_data_dir / "hub.utrace" + hub_extra_args += [f"--trace={args.trace}", f"--tracefile={trace_path}"] + print(f"[trace] enabled channels='{args.trace}', file={trace_path}") + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + minio_proc: Optional[subprocess.Popen] = None + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + timings: dict = { + "bucket": args.bucket, + "module_count": len(module_ids), + "build_mode": zenserver_mode, + "provision_fanout_s": None, + "provision_total_s": None, + "deprovision_fanout_s": None, + "deprovision_total_s": None, + "total_s": None, + } + + try: + minio_proc = _start_minio(minio_exe, minio_run, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + + hub_log = hub_data_dir / "hub.log" + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.hub_port, hub_log, + hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, args.hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.hub_port, module_ids, "provision") + fanout_s = time.monotonic() - t0 + timings["provision_fanout_s"] = round(fanout_s, 2) + print(f"[provision] accepted={len(accepted)} failures={len(failures)} (fan-out {fanout_s:.1f}s)") + for mid, status, body in failures[:5]: + print(f"[provision] FAIL {mid}: status={status} body={body}") + if failures: + exit_code = 1 + stuck = _wait_for_provisioned(args.hub_port, accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["provision_total_s"] = round(total_s, 2) + print(f"[provision] all provisioned in {total_s:.1f}s ({len(accepted)-len(stuck)}/{len(accepted)})") + if stuck: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + # --- Deprovision --- + print(f"[deprovision] firing {len(accepted)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, args.hub_port, accepted, "deprovision") + fanout_s = time.monotonic() - t0 + timings["deprovision_fanout_s"] = round(fanout_s, 2) + print(f"[deprovision] accepted={len(dp_accepted)} failures={len(dp_failures)} (fan-out {fanout_s:.1f}s)") + if dp_failures: + exit_code = 1 + stuck_dp = _wait_for_gone(args.hub_port, dp_accepted, args.poll_timeout) + total_s = time.monotonic() - t0 + timings["deprovision_total_s"] = round(total_s, 2) + print(f"[deprovision] all gone in {total_s:.1f}s ({len(dp_accepted)-len(stuck_dp)}/{len(dp_accepted)})") + if stuck_dp: + exit_code = 1 + + print(f"[settle] waiting {args.settle_seconds:.0f}s") + time.sleep(args.settle_seconds) + + elapsed = time.monotonic() - t_start + timings["total_s"] = round(elapsed, 2) + print(f"[summary] total elapsed: {elapsed:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + if minio_proc is not None and minio_proc.poll() is None: + _stop_minio_graceful(minio_proc) + # Archive AFTER the hub has exited so in-flight log writes are flushed. + timings["exit_code"] = exit_code + _archive_run(archive_dir, hub_data_dir, args.bucket, timings) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_minio.py b/scripts/test_scripts/hub/seed_minio.py new file mode 100644 index 000000000..e0e45c4cb --- /dev/null +++ b/scripts/test_scripts/hub/seed_minio.py @@ -0,0 +1,720 @@ +#!/usr/bin/env python3 +"""Stage B of the perf-seed workflow. + +Replays the snapshot produced by seed_s3_snapshot.py into a single MinIO +bucket so a later perf run can exercise that bucket. Pack mode is hardcoded +in this script (see --hub-hydration-enable-pack flag in _start_hub) and +governs how the hub uploads into MinIO. To compare packed vs unpacked, +invoke this script twice from two separate worktrees - one with the pack +flag flipped to false - producing two preserved MinIO data dirs, then run +run_minio_perf.py against each. + +Flow: + 1. Start a local MinIO server against --minio-data-dir, create the bucket. + 2. Start a hub against MinIO (--bucket as target), dehydration ON, + hibernated-watchdog timeout set huge so hibernated modules are not + auto-deprovisioned before we deprovision them ourselves. + 3. Provision every module discovered under --snapshot-dir (fresh hydrate + against an empty bucket spawns the child on an empty dir). + 4. Hibernate every module (child exits, state dir unlocked). + 5. Overlay the snapshot into <hub-data-dir>/servers/<mid>/. + 6. Deprovision every module. Deprovision from Hibernated runs Dehydrate(), + which scans the overlaid ServerStateDir and uploads content into the + bucket. + 7. Shut the hub down via 'zen down --pid'. + 8. Stop MinIO. + +Preservation of the resulting MinIO data directory is done by a separate +step (see preserve_minio_state.py / PERF_SEED_README.md). +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import signal +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" +_MINIO_USER = "minioadmin" +_MINIO_PASS = "minioadmin" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + + +# --------------------------------------------------------------------------- +# Executable discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +def _find_minio(zenserver_path: Path) -> Path: + p = zenserver_path.parent / f"minio{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"minio executable not found at {p}. Build the zen project first.") + return p + + +# --------------------------------------------------------------------------- +# MinIO lifecycle +# --------------------------------------------------------------------------- + +def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen: + data_dir.mkdir(parents=True, exist_ok=True) + env = os.environ.copy() + env["MINIO_ROOT_USER"] = _MINIO_USER + env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + proc = subprocess.Popen( + [str(minio_exe), "server", str(data_dir), + "--address", f":{port}", + "--console-address", f":{console_port}", + "--quiet"], + env=env, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + **popen_kwargs, + ) + print(f"[minio] started (pid {proc.pid}) port={port} console={console_port} data={data_dir}") + return proc + + +def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None: + deadline = time.monotonic() + timeout_s + url = f"http://localhost:{port}/minio/health/live" + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(url, timeout=1): + print("[minio] ready") + return + except Exception: + time.sleep(0.1) + sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s") + + +def _create_minio_bucket(port: int, bucket: str) -> None: + try: + import boto3 # type: ignore[import-not-found] + import botocore.config # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[minio] boto3 is required. pip install boto3") + s3 = boto3.client( + "s3", + endpoint_url=f"http://localhost:{port}", + aws_access_key_id=_MINIO_USER, + aws_secret_access_key=_MINIO_PASS, + region_name="us-east-1", + config=botocore.config.Config(signature_version="s3v4"), + ) + try: + s3.create_bucket(Bucket=bucket) + print(f"[minio] created bucket '{bucket}'") + except botocore.exceptions.ClientError as e: + if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"): + print(f"[minio] bucket '{bucket}' already exists") + else: + raise + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # Prevent the watchdog from auto-deprovisioning modules while we're + # still hydrating the tail / in the overlay phase. BOTH timers have to + # be extended - the provisioned one (default 600s) is what bites on + # large-N runs where early provisions go idle waiting for the rest. + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + # Explicit - default is true, but make it obvious that Stage B needs + # it since the final deprovision drives the MinIO upload. + "--hub-enable-dehydration=true", + # Pack worktree: turn pack on so dehydrate emits packed CAS. + "--hub-hydration-enable-pack=true", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode})") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """'zen down --pid' so the hub runs its normal shutdown path.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +def _stop_minio_graceful(proc: subprocess.Popen, timeout_s: float = 30.0) -> None: + """MinIO has no external shutdown tool; Ctrl-Break lets it flush.""" + if proc.poll() is not None: + return + try: + if sys.platform == "win32": + proc.send_signal(signal.CTRL_BREAK_EVENT) + else: + proc.terminate() + except Exception: + proc.terminate() + try: + proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[minio] did not exit after {timeout_s}s, killing") + proc.kill() + proc.wait() + + +# --------------------------------------------------------------------------- +# Hub API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST concurrently. For 'deprovision' a 404 means the module is already + gone - count it as accepted so the state-poll recognises completion. + """ + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + elif verb == "deprovision" and status == 404: + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +def _wait_for_deprovisioned( + port: int, + module_ids: list[str], + timeout_s: float, +) -> tuple[list[str], list[str], dict[str, str]]: + """Wait until each module is gone from /hub/status or in 'unprovisioned'. + 'crashed' counts as a hard failure - dehydrate never completed for it. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if mid not in states or s == "unprovisioned": + remaining.discard(mid) + elif s == "crashed": + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[deprovision] {done}/{len(module_ids)} gone ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot overlay +# --------------------------------------------------------------------------- + +def _overlay_snapshot(snapshot_root: Path, hub_servers_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Replace hub_servers_root/<mid>/* with snapshot_root/<mid>/*. + + snapshot_root is treated as read-only; only hub_servers_root is written to. + """ + files_copied = 0 + bytes_copied = 0 + modules_overlaid = 0 + + for i, mid in enumerate(module_ids, 1): + src = snapshot_root / mid + dst = hub_servers_root / mid + if not src.is_dir(): + print(f"[overlay] WARNING: snapshot missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_overlaid += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[overlay] {i}/{len(module_ids)} modules overlaid " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + return modules_overlaid, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def _seed_one_bucket( + *, + bucket: str, + hub_data_dir: Path, + snapshot_dir: Path, + module_ids: list[str], + minio_port: int, + hub_port: int, + poll_timeout: float, + workers: int, + zenserver_exe: Path, + zen_exe: Path, +) -> tuple[int, dict[str, float]]: + """Run the provision/hibernate/overlay/deprovision cycle against a single + MinIO bucket. Returns (exit_code, timings) where timings maps phase name + to elapsed seconds (provision_s, hibernate_s, overlay_s, deprovision_s, + total_s). + """ + print(f"\n================ seeding {bucket} into hub-dir {hub_data_dir} ================") + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": { + "uri": f"s3://{bucket}", + "endpoint": f"http://localhost:{minio_port}", + "path-style": True, + "region": "us-east-1", + }, + }), + encoding="ascii", + ) + hub_extra_args = [f"--hub-hydration-target-config={config_path}"] + hub_extra_env = { + "AWS_ACCESS_KEY_ID": _MINIO_USER, + "AWS_SECRET_ACCESS_KEY": _MINIO_PASS, + } + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + hib_accepted: list[str] = [] + stuck_hib: list[str] = [] + failed_hib: list[str] = [] + timings: dict[str, float] = { + "provision_s": 0.0, + "hibernate_s": 0.0, + "overlay_s": 0.0, + "deprovision_s": 0.0, + "total_s": 0.0, + } + + try: + hub_log = hub_data_dir / "hub.log" + hub_instance_limit = max(len(module_ids) + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, hub_port, hub_log, + hub_instance_limit, hub_extra_args, hub_extra_env, + ) + _wait_for_hub(hub_proc, hub_port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=workers) as pool: + # --- Provision --- + print(f"[{bucket}][provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, hub_port, module_ids, "provision") + print(f"[{bucket}][provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[{bucket}][provision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + if not accepted: + return 1, timings + + stuck, failed, last_states = _wait_for_state(hub_port, accepted, "provisioned", poll_timeout, f"{bucket}/provision") + timings["provision_s"] = time.monotonic() - t0 + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[{bucket}][provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({timings['provision_s']:.1f}s)") + if failed or stuck: + exit_code = 1 + for mid in (failed + stuck)[:10]: + print(f"[{bucket}][provision] not-provisioned {mid}: last state='{last_states.get(mid, '')}'") + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + if not accepted: + return 1, timings + + # --- Hibernate --- + print(f"[{bucket}][hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, hub_port, accepted, "hibernate") + print(f"[{bucket}][hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[{bucket}][hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(hub_port, hib_accepted, "hibernated", poll_timeout, f"{bucket}/hibernate") + timings["hibernate_s"] = time.monotonic() - t0 + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[{bucket}][hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({timings['hibernate_s']:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[{bucket}][hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Overlay snapshot onto hub's servers dir --- + to_overlay = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + hub_servers = hub_data_dir / "servers" + print(f"[{bucket}][overlay] copying {len(to_overlay)} snapshot trees from {snapshot_dir} -> {hub_servers}") + t0 = time.monotonic() + modules_overlaid, files_copied, bytes_copied = _overlay_snapshot(snapshot_dir, hub_servers, to_overlay) + timings["overlay_s"] = time.monotonic() - t0 + print(f"[{bucket}][overlay] overlaid {modules_overlaid} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({timings['overlay_s']:.1f}s)") + if modules_overlaid < len(to_overlay): + exit_code = 1 + + # --- Deprovision (triggers dehydrate -> MinIO upload) --- + with ThreadPoolExecutor(max_workers=workers) as pool: + print(f"[{bucket}][deprovision] firing {len(to_overlay)} deprovision requests...") + t0 = time.monotonic() + dp_accepted, dp_failures = _fan_out_post(pool, hub_port, to_overlay, "deprovision") + print(f"[{bucket}][deprovision] accepted={len(dp_accepted)}, failed={len(dp_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in dp_failures[:10]: + print(f"[{bucket}][deprovision] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_dp, failed_dp, last_states_dp = _wait_for_deprovisioned(hub_port, dp_accepted, poll_timeout) + timings["deprovision_s"] = time.monotonic() - t0 + dp_done = len(dp_accepted) - len(stuck_dp) - len(failed_dp) + print(f"[{bucket}][deprovision] complete: {dp_done}/{len(dp_accepted)} gone, {len(failed_dp)} crashed, {len(stuck_dp)} stuck " + f"({timings['deprovision_s']:.1f}s)") + if failed_dp or stuck_dp: + exit_code = 1 + for mid in (failed_dp + stuck_dp)[:10]: + print(f"[{bucket}][deprovision] not-gone {mid}: last state='{last_states_dp.get(mid, '')}'") + + timings["total_s"] = time.monotonic() - t_start + print(f"[{bucket}] bucket elapsed: {timings['total_s']:.1f}s, exit={exit_code}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + print(f"[{bucket}][hub] stopping...") + _zen_down_hub(zen_exe, hub_proc) + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code, timings + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Source of per-module server-state trees (READ-ONLY) (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--hub-data-root", default="E:/Dev/zen-perf-seed/hubs", + help="Each bucket gets its own hub data dir under this root: <root>/hub-b-<bucket>/ " + "(default: E:/Dev/zen-perf-seed/hubs)") + parser.add_argument("--minio-data-dir", default="E:/Dev/zen-perf-seed/minio-data", + help="MinIO data dir shared by every bucket (default: E:/Dev/zen-perf-seed/minio-data)") + parser.add_argument("--minio-port", type=int, default=9000) + parser.add_argument("--minio-console-port", type=int, default=9001) + parser.add_argument("--hub-port", type=int, default=8558) + parser.add_argument("--bucket", default="zen-seed-packed", + help="Bucket to seed (default: zen-seed-packed). Pack worktree - " + "hub is launched with --hub-hydration-enable-pack=true.") + parser.add_argument("--module-count", type=int, default=0, + help="Cap on modules processed (0 = all modules in snapshot-dir)") + parser.add_argument("--workers", type=int, default=50) + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for each state transition (default: 1800)") + parser.add_argument("--wipe", action="store_true", + help="Wipe each per-bucket hub data dir and the shared MinIO data dir before starting " + "(never touches --snapshot-dir)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver + minio executables (auto-detected)") + args = parser.parse_args() + + bucket_name: str = args.bucket + + snapshot_dir = Path(args.snapshot_dir).resolve() + hub_data_root = Path(args.hub_data_root).resolve() + minio_data_dir = Path(args.minio_data_dir).resolve() + + hub_data_dir = (hub_data_root / f"hub-b-{bucket_name}").resolve() + + # Safety: snapshot-dir must not overlap any mutable path. + for label, d in [ + ("minio-data-dir", minio_data_dir), + ("hub-data-root", hub_data_root), + (f"hub-b-{bucket_name}", hub_data_dir), + ]: + if snapshot_dir == d or snapshot_dir in d.parents or d == snapshot_dir or d in snapshot_dir.parents: + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and {label} ({d}) must be disjoint") + + if not snapshot_dir.is_dir(): + sys.exit(f"[setup] snapshot-dir not found: {snapshot_dir}") + + module_ids = sorted([p.name for p in snapshot_dir.iterdir() if p.is_dir()]) + if args.module_count > 0: + module_ids = module_ids[:args.module_count] + if not module_ids: + sys.exit(f"[setup] no module directories found in {snapshot_dir}") + + if args.wipe: + for d in [hub_data_dir, minio_data_dir]: + if d.exists(): + if snapshot_dir.is_relative_to(d): + sys.exit(f"[setup] refusing to wipe {d}: snapshot-dir is under it") + print(f"[setup] wiping {d}") + _rmtree_robust(d) + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + minio_exe = _find_minio(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] minio: {minio_exe}") + print(f"[setup] snapshot-dir: {snapshot_dir} (read-only)") + print(f"[setup] hub-data-root: {hub_data_root}") + print(f"[setup] minio-data-dir: {minio_data_dir}") + print(f"[setup] modules: {len(module_ids)}") + print(f"[setup] bucket: {bucket_name} (hub-dir {hub_data_dir})") + + minio_proc: Optional[subprocess.Popen] = None + exit_code = 0 + + try: + minio_proc = _start_minio(minio_exe, minio_data_dir, args.minio_port, args.minio_console_port) + _wait_for_minio(args.minio_port) + _create_minio_bucket(args.minio_port, bucket_name) + + t_total = time.monotonic() + rc, timings = _seed_one_bucket( + bucket=bucket_name, + hub_data_dir=hub_data_dir, + snapshot_dir=snapshot_dir, + module_ids=module_ids, + minio_port=args.minio_port, + hub_port=args.hub_port, + poll_timeout=args.poll_timeout, + workers=args.workers, + zenserver_exe=zenserver_exe, + zen_exe=zen_exe, + ) + exit_code = rc + + print(f"\n[summary] stage B total elapsed: {time.monotonic()-t_total:.1f}s, exit={exit_code}") + print(f"[summary] MinIO data dir is now seeded: {minio_data_dir}") + + phases = ["provision_s", "hibernate_s", "overlay_s", "deprovision_s", "total_s"] + labels = ["provision", "hibernate", "overlay", "deprovision (upload)", "total"] + print(f"\n[summary] timings (s):") + for key, lbl in zip(phases, labels): + print(f" {lbl:<22s} {timings.get(key, 0.0):>8.1f}") + + print(f"[summary] next: preserve {minio_data_dir} to E:/Dev/zen-perf-seed/minio-seeded/") + + finally: + if minio_proc is not None and minio_proc.poll() is None: + print("[minio] stopping...") + _stop_minio_graceful(minio_proc) + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/scripts/test_scripts/hub/seed_s3_snapshot.py b/scripts/test_scripts/hub/seed_s3_snapshot.py new file mode 100644 index 000000000..f0bc7b607 --- /dev/null +++ b/scripts/test_scripts/hub/seed_s3_snapshot.py @@ -0,0 +1,635 @@ +#!/usr/bin/env python3 +"""Stage A of the perf-seed workflow. + +Fetches real module data from a configured S3 bucket via the hub hydration +pipeline and preserves the decompressed server-state trees to disk so a +later stage can replay them into a local MinIO backend. + +Flow: + 1. Start a hub against real S3 (read-only SSO credentials), dehydration off, + hibernated-watchdog timeout set huge so modules are not auto-deprovisioned + mid-flow. + 2. Provision N modules (first N moduleIds from the bucket listing, filtered + to UUID-shaped folders). + 3. Wait until every module reaches 'provisioned'. + 4. Hibernate every module (child processes exit, state dir intact and no + remaining writer while we copy). + 5. Wait until every module reaches 'hibernated'. + 6. Copy <hub-data-dir>/servers/<moduleid>/ -> <snapshot-dir>/<moduleid>/ + with the hub still running - hibernate took the writers offline and the + watchdog is muted. + 7. Shut the hub down via 'zen down --pid <hub pid>'. + +Required environment variables (or pass via CLI flags): + ZEN_PERF_S3_URI e.g. s3://your-bucket/optional-prefix/ + ZEN_PERF_AWS_PROFILE AWS SSO profile name configured with read access + ZEN_PERF_AWS_REGION defaults to us-east-1 + +Requirements: + pip install boto3 + aws CLI v2 with the SSO profile configured +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional + +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + + +def _rmtree_robust(path) -> None: + """shutil.rmtree with a Windows-friendly retry for read-only files.""" + import os as _os + import stat as _stat + def _onerror(func, p, exc_info): + try: + _os.chmod(p, _stat.S_IWRITE) + func(p) + except Exception: + pass + # onexc was introduced in py3.12; fall back to onerror on older versions. + if sys.version_info >= (3, 12): + shutil.rmtree(path, onexc=lambda func, p, exc: _onerror(func, p, (type(exc), exc, exc.__traceback__))) + else: + shutil.rmtree(path, onerror=_onerror) + + +_DEFAULT_S3_URI = os.environ.get("ZEN_PERF_S3_URI", "") +_DEFAULT_AWS_PROFILE = os.environ.get("ZEN_PERF_AWS_PROFILE", "") +_DEFAULT_AWS_REGION = os.environ.get("ZEN_PERF_AWS_REGION", "us-east-1") + +# Matches UUID-shaped module IDs (UUID-ish with mixed 4-char groups). Filters +# out stray top-level keys (readmes, test scratches) that aren't real modules. +_MODULEID_RE = re.compile(r"^[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}$") + + +# --------------------------------------------------------------------------- +# zenserver binary discovery - prefer release, fall back to debug. +# --------------------------------------------------------------------------- + +def _find_zenserver(override: Optional[str]) -> Path: + if override: + p = Path(override) / f"zenserver{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zenserver not found at {p}") + return p + + script_dir = Path(__file__).resolve().parent + repo_root = script_dir.parent.parent + for mode in ("release", "debug"): + for plat in (("windows", "x64"), ("linux", "x86_64"), ("macosx", "x86_64")): + p = repo_root / "build" / plat[0] / plat[1] / mode / f"zenserver{_EXE_SUFFIX}" + if p.exists(): + return p + sys.exit("zenserver executable not found under build/. Build it or pass --zenserver-dir.") + + +def _find_zen(zenserver_exe: Path) -> Path: + p = zenserver_exe.parent / f"zen{_EXE_SUFFIX}" + if not p.exists(): + sys.exit(f"zen CLI not found at {p} (used for graceful hub shutdown)") + return p + + +# --------------------------------------------------------------------------- +# AWS SSO credential resolution +# --------------------------------------------------------------------------- + +def _require_boto3(): + try: + import boto3 # type: ignore[import-not-found] + import botocore.exceptions # type: ignore[import-not-found] + except ImportError: + sys.exit("[aws] boto3 is required. Install it with: pip install boto3") + return boto3, botocore.exceptions + + +def _sso_login(profile: str) -> None: + print(f"[aws] running 'aws sso login --profile {profile}'...") + rc = subprocess.call(["aws", "sso", "login", "--profile", profile]) + if rc != 0: + sys.exit(f"[aws] 'aws sso login' failed with rc={rc}") + + +def _get_session(profile: str): + boto3, exc_mod = _require_boto3() + + def _load_frozen(): + session = boto3.Session(profile_name=profile) + creds = session.get_credentials() + if creds is None: + return session, None + return session, creds.get_frozen_credentials() + + try: + session, frozen = _load_frozen() + if frozen is not None and frozen.access_key and frozen.secret_key: + return session, frozen + except exc_mod.ProfileNotFound: + sys.exit(f"[aws] profile '{profile}' not found in ~/.aws/config") + except Exception as e: + print(f"[aws] initial credential load failed: {e}") + + _sso_login(profile) + + session, frozen = _load_frozen() + if frozen is None or not frozen.access_key: + sys.exit("[aws] could not resolve credentials after sso login") + return session, frozen + + +def _parse_s3_uri(uri: str) -> tuple[str, str]: + if not uri.startswith("s3://"): + sys.exit(f"[aws] invalid S3 URI: {uri}") + rest = uri[len("s3://"):] + if "/" in rest: + bucket, prefix = rest.split("/", 1) + else: + bucket, prefix = rest, "" + return bucket, prefix + + +def _list_module_ids(session, bucket: str, prefix: str, region: str, limit: int) -> list[str]: + """List UUID-shaped module folders under the bucket and return the `limit` + most recently active ones, ranked by the LastModified of each module's + `incremental-state.cbo` (newest first - these were last dehydrated most + recently, which is the closest proxy for "most recently accessed"). + + Falls back to listing order for any folder whose state file can't be + HEADed (missing / 403 / transient error). + """ + s3 = session.client("s3", region_name=region) + prefix_norm = prefix if (not prefix or prefix.endswith("/")) else prefix + "/" + + # 1. Enumerate every UUID-shaped folder. + paginator = s3.get_paginator("list_objects_v2") + candidates: list[str] = [] + for page in paginator.paginate(Bucket=bucket, Prefix=prefix_norm, Delimiter="/"): + for cp in page.get("CommonPrefixes", []) or []: + folder = cp.get("Prefix", "")[len(prefix_norm):].rstrip("/") + if folder and _MODULEID_RE.match(folder): + candidates.append(folder) + print(f"[s3] {len(candidates)} module folders match UUID shape; ranking by state.cbo LastModified...") + + # 2. HEAD each module's state file in parallel. Missing/failed HEADs land + # at the tail via a sentinel epoch 0 timestamp. + from datetime import datetime, timezone + epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) + + def _state_mtime(mid: str) -> datetime: + key = f"{prefix_norm}{mid}/incremental-state.cbo" + try: + resp = s3.head_object(Bucket=bucket, Key=key) + return resp.get("LastModified", epoch) + except Exception: + return epoch + + with ThreadPoolExecutor(max_workers=50) as pool: + times = list(pool.map(_state_mtime, candidates)) + + # 3. Sort descending (newest first). Folders without a state file sink. + ranked = sorted(zip(candidates, times), key=lambda x: x[1], reverse=True) + missing = sum(1 for _, t in ranked if t == epoch) + if missing: + print(f"[s3] {missing}/{len(ranked)} modules have no incremental-state.cbo (sorted to tail)") + return [mid for mid, _ in ranked[:limit]] + + +# --------------------------------------------------------------------------- +# Hub lifecycle +# --------------------------------------------------------------------------- + +def _start_hub( + zenserver_exe: Path, + data_dir: Path, + port: int, + log_file: Path, + instance_limit: int, + extra_args: list[str], + extra_env: dict[str, str], +) -> tuple[subprocess.Popen, object]: + data_dir.mkdir(parents=True, exist_ok=True) + cmd = [ + str(zenserver_exe), + "hub", + "--enable-execution-history=false", + f"--data-dir={data_dir}", + f"--port={port}", + "--hub-instance-corelimit=4", + "--hub-provision-disk-limit-percent=99", + "--hub-provision-memory-limit-percent=80", + f"--hub-instance-limit={instance_limit}", + # Seeding is not a perf-measurement path - we want it as fast as the + # host can manage. Let the hub go wide on both provisioning and + # hydration thread pools rather than matching prod limits. + "--hub-instance-provision-threads=64", + "--hub-hydration-threads=64", + # With 1000 modules the seeding flow runs for 20+ minutes. Extend BOTH + # watchdog inactivity timers so early-provisioned modules do not get + # auto-deprovisioned while we're still hydrating the tail (hibernated + # default 1800s, provisioned default 600s - the latter is the one that + # bit us on the 1000-module run and dropped 335 modules). + "--hub-watchdog-provisioned-inactivity-timeout-seconds=86400", + "--hub-watchdog-hibernated-inactivity-timeout-seconds=86400", + ] + extra_args + + env = os.environ.copy() + env.update(extra_env) + + popen_kwargs: dict = {} + if sys.platform == "win32": + popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP + log_handle = log_file.open("wb") + try: + proc = subprocess.Popen( + cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT, + **popen_kwargs, + ) + except Exception: + log_handle.close() + raise + print(f"[hub] started (pid {proc.pid}), log: {log_file}") + return proc, log_handle + + +def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None: + deadline = time.monotonic() + timeout_s + req = urllib.request.Request(f"http://localhost:{port}/hub/status", + headers={"Accept": "application/json"}) + while time.monotonic() < deadline: + if proc.poll() is not None: + sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - " + f"is another zenserver already running on port {port}?") + try: + with urllib.request.urlopen(req, timeout=2): + print("[hub] ready") + return + except Exception: + time.sleep(0.2) + sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s") + + +def _zen_down_hub(zen_exe: Path, hub_proc: subprocess.Popen, timeout_s: float = 300.0) -> None: + """Shut the hub down via 'zen down --pid <pid>'. zen down performs a + proper server-initiated shutdown (signals the shutdown event, waits for + the server to drain) rather than a blunt kill.""" + if hub_proc.poll() is not None: + return + pid = hub_proc.pid + print(f"[hub] zen down --pid {pid}") + rc = subprocess.call([str(zen_exe), "down", "--pid", str(pid), "--force"]) + if rc != 0: + print(f"[hub] zen down returned rc={rc}; waiting for exit anyway") + try: + hub_proc.wait(timeout=timeout_s) + except subprocess.TimeoutExpired: + print(f"[hub] did not exit after {timeout_s}s, killing") + hub_proc.kill() + hub_proc.wait() + + +# --------------------------------------------------------------------------- +# Hub HTTP API +# --------------------------------------------------------------------------- + +def _hub_post(port: int, path: str, timeout_s: float = 60.0) -> tuple[int, dict]: + url = f"http://localhost:{port}{path}" + req = urllib.request.Request(url, data=b"{}", method="POST", + headers={"Content-Type": "application/json", + "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + try: + body = json.loads(resp.read()) + except Exception: + body = {} + return resp.status, body + except urllib.error.HTTPError as e: + try: + body = json.loads(e.read()) + except Exception: + body = {} + return e.code, body + except Exception as e: + return 0, {"error": str(e)} + + +def _hub_module_states(port: int, timeout_s: float = 10.0) -> Optional[dict[str, str]]: + url = f"http://localhost:{port}/hub/status" + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + data = json.loads(resp.read()) + except Exception: + return None + return {m["moduleId"]: m.get("state", "") for m in (data.get("modules") or []) if m.get("moduleId")} + + +def _fan_out_post( + pool: ThreadPoolExecutor, + port: int, + module_ids: list[str], + verb: str, +) -> tuple[list[str], list[tuple[str, int, dict]]]: + """POST /hub/modules/<id>/<verb> concurrently. Returns (accepted, failures).""" + futures = { + mid: pool.submit(_hub_post, port, f"/hub/modules/{mid}/{verb}") + for mid in module_ids + } + accepted: list[str] = [] + failures: list[tuple[str, int, dict]] = [] + for mid, fut in futures.items(): + status, body = fut.result() + if status in (200, 202): + accepted.append(mid) + else: + failures.append((mid, status, body)) + return accepted, failures + + +# Any of these means the module is done transitioning and will not reach the +# target state on its own (without a retry we control). +_FAILED_STATES = {"crashed", "unprovisioned"} + + +def _wait_for_state( + port: int, + module_ids: list[str], + target_state: str, + timeout_s: float, + label: str, +) -> tuple[list[str], list[str], dict[str, str]]: + """Poll hub status until every module hits target_state, fails, or times out. + + Returns (stuck, failed, last_states). 'stuck' = still mid-transition when + we timed out. 'failed' = hit an _FAILED_STATES value such as 'crashed'. + """ + deadline = time.monotonic() + timeout_s + remaining = set(module_ids) + failed: list[str] = [] + last_states: dict[str, str] = {mid: "" for mid in module_ids} + + while remaining and time.monotonic() < deadline: + states = _hub_module_states(port) + if states is not None: + for mid in list(remaining): + s = states.get(mid, "") + last_states[mid] = s + if s == target_state: + remaining.discard(mid) + elif s in _FAILED_STATES and target_state not in _FAILED_STATES: + remaining.discard(mid) + failed.append(mid) + done = len(module_ids) - len(remaining) + print(f"[{label}] {done}/{len(module_ids)} '{target_state}' ({len(failed)} failed)...", end="\r") + time.sleep(2.0) + + print() + return list(remaining), failed, last_states + + +# --------------------------------------------------------------------------- +# Snapshot copy (run AFTER hub shutdown so there's no concurrent writer) +# --------------------------------------------------------------------------- + +def _copy_snapshot(src_root: Path, dst_root: Path, module_ids: list[str]) -> tuple[int, int, int]: + """Copy src_root/<mid>/* to dst_root/<mid>/* for each module. + + Returns (modules_copied, files_copied, bytes_copied). + Replaces only the specific per-module subdirs; never touches siblings. + """ + dst_root.mkdir(parents=True, exist_ok=True) + modules_copied = 0 + files_copied = 0 + bytes_copied = 0 + + for i, mid in enumerate(module_ids, 1): + src = src_root / mid + dst = dst_root / mid + if not src.is_dir(): + print(f"[snapshot] WARNING: source missing for {mid}: {src}") + continue + if dst.exists(): + _rmtree_robust(dst) + shutil.copytree(src, dst, symlinks=False, dirs_exist_ok=False) + modules_copied += 1 + for root, _dirs, files in os.walk(dst): + for f in files: + p = Path(root) / f + try: + bytes_copied += p.stat().st_size + except OSError: + pass + files_copied += 1 + if i % 25 == 0 or i == len(module_ids): + print(f"[snapshot] {i}/{len(module_ids)} modules copied " + f"({files_copied:,} files, {bytes_copied/1024/1024:.1f} MB)") + + return modules_copied, files_copied, bytes_copied + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--hub-data-dir", default="E:/Dev/zen-perf-seed/hub-a", + help="Hub --data-dir (default: E:/Dev/zen-perf-seed/hub-a)") + parser.add_argument("--snapshot-dir", default="E:/Dev/zen-perf-seed/s3-snapshot", + help="Destination for per-module server-state trees (default: E:/Dev/zen-perf-seed/s3-snapshot)") + parser.add_argument("--port", type=int, default=8558, + help="Hub HTTP port (default: 8558)") + parser.add_argument("--module-count", type=int, default=1000, + help="Number of modules to snapshot (default: 300)") + parser.add_argument("--workers", type=int, default=50, + help="Concurrent HTTP workers (default: 50)") + parser.add_argument("--poll-timeout", type=float, default=1800.0, + help="Max seconds to wait for provision or hibernate to finish (default: 1800)") + parser.add_argument("--zenserver-dir", + help="Directory containing zenserver executable (auto-detected by default)") + args = parser.parse_args() + + hub_data_dir = Path(args.hub_data_dir).resolve() + snapshot_dir = Path(args.snapshot_dir).resolve() + + # Safety: snapshot-dir is the preserved output. It must not overlap the + # hub data-dir in either direction so nothing the hub writes can clobber + # the preserved state. + if (snapshot_dir == hub_data_dir + or snapshot_dir in hub_data_dir.parents + or hub_data_dir in snapshot_dir.parents): + sys.exit(f"[setup] snapshot-dir ({snapshot_dir}) and hub-data-dir ({hub_data_dir}) must be disjoint") + + s3_uri = os.environ.get("S3_URI", _DEFAULT_S3_URI) + aws_profile = os.environ.get("AWS_PROFILE", _DEFAULT_AWS_PROFILE) + aws_region = os.environ.get("AWS_REGION", _DEFAULT_AWS_REGION) + if not s3_uri: + sys.exit("[setup] S3 URI not set. Set ZEN_PERF_S3_URI (or S3_URI) to a bucket like s3://your-bucket/") + if not aws_profile: + sys.exit("[setup] AWS profile not set. Set ZEN_PERF_AWS_PROFILE (or AWS_PROFILE) to your SSO profile name") + + hub_log = hub_data_dir / "hub.log" + + zenserver_exe = _find_zenserver(args.zenserver_dir) + zen_exe = _find_zen(zenserver_exe) + zenserver_mode = "release" if "release" in zenserver_exe.parts else ("debug" if "debug" in zenserver_exe.parts else "?") + print(f"[setup] build mode: {zenserver_mode}") + print(f"[setup] zenserver: {zenserver_exe}") + print(f"[setup] zen cli: {zen_exe}") + print(f"[setup] S3 URI: {s3_uri}") + print(f"[setup] profile: {aws_profile}") + print(f"[setup] hub-data-dir: {hub_data_dir}") + print(f"[setup] snapshot-dir: {snapshot_dir}") + + session, frozen = _get_session(aws_profile) + print(f"[aws] credentials resolved (key prefix {frozen.access_key[:6]}..., session-token={'yes' if frozen.token else 'no'})") + + bucket, prefix = _parse_s3_uri(s3_uri) + print(f"[s3] listing folders under bucket='{bucket}' prefix='{prefix}'...") + module_ids = _list_module_ids(session, bucket, prefix, aws_region, args.module_count) + print(f"[s3] selected {len(module_ids)} module folders (UUID-shaped)") + if len(module_ids) < args.module_count: + print(f"[s3] WARNING: asked for {args.module_count} modules, only {len(module_ids)} matched the UUID filter") + + if not module_ids: + sys.exit("[s3] no module folders found, aborting") + + aws_env = { + "AWS_ACCESS_KEY_ID": frozen.access_key, + "AWS_SECRET_ACCESS_KEY": frozen.secret_key, + } + if frozen.token: + aws_env["AWS_SESSION_TOKEN"] = frozen.token + + hub_data_dir.mkdir(parents=True, exist_ok=True) + config_path = hub_data_dir / "hydration_config.json" + config_path.write_text( + json.dumps({ + "type": "s3", + "settings": {"uri": s3_uri, "region": aws_region}, + }), + encoding="ascii", + ) + hub_extra_args = [ + f"--hub-hydration-target-config={config_path}", + "--hub-enable-dehydration=false", + ] + + hub_proc: Optional[subprocess.Popen] = None + hub_log_handle = None + exit_code = 0 + + try: + hub_instance_limit = max(args.module_count + 10, 500) + hub_proc, hub_log_handle = _start_hub( + zenserver_exe, hub_data_dir, args.port, hub_log, + hub_instance_limit, hub_extra_args, aws_env, + ) + _wait_for_hub(hub_proc, args.port) + + t_start = time.monotonic() + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + # --- Provision --- + print(f"[provision] firing {len(module_ids)} provision requests...") + t0 = time.monotonic() + accepted, failures = _fan_out_post(pool, args.port, module_ids, "provision") + print(f"[provision] accepted={len(accepted)}, failed={len(failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in failures[:10]: + print(f"[provision] FAILED {mid}: status={status} body={body}") + if not accepted: + sys.exit("[provision] nothing accepted, aborting") + + stuck, failed, last_states = _wait_for_state(args.port, accepted, "provisioned", args.poll_timeout, "provision") + prov_done = len(accepted) - len(stuck) - len(failed) + print(f"[provision] complete: {prov_done}/{len(accepted)} provisioned, {len(failed)} failed, {len(stuck)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed: + for mid in failed[:10]: + print(f"[provision] FAILED {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + if stuck: + for mid in stuck[:10]: + print(f"[provision] STUCK {mid}: last state='{last_states.get(mid, '')}'") + exit_code = 1 + accepted = [m for m in accepted if m not in set(stuck) and m not in set(failed)] + + if not accepted: + sys.exit("[provision] nothing successfully provisioned, aborting") + + # --- Hibernate --- + print(f"[hibernate] firing {len(accepted)} hibernate requests...") + t0 = time.monotonic() + hib_accepted, hib_failures = _fan_out_post(pool, args.port, accepted, "hibernate") + print(f"[hibernate] accepted={len(hib_accepted)}, failed={len(hib_failures)} (fan-out {time.monotonic()-t0:.1f}s)") + for mid, status, body in hib_failures[:10]: + print(f"[hibernate] FAILED {mid}: status={status} body={body}") + exit_code = 1 + + stuck_hib, failed_hib, last_states_hib = _wait_for_state(args.port, hib_accepted, "hibernated", args.poll_timeout, "hibernate") + hib_done = len(hib_accepted) - len(stuck_hib) - len(failed_hib) + print(f"[hibernate] complete: {hib_done}/{len(hib_accepted)} hibernated, {len(failed_hib)} failed, {len(stuck_hib)} stuck " + f"({time.monotonic()-t0:.1f}s)") + if failed_hib or stuck_hib: + exit_code = 1 + for mid in (failed_hib + stuck_hib)[:10]: + print(f"[hibernate] not-hibernated {mid}: last state='{last_states_hib.get(mid, '')}'") + + # --- Copy snapshots while hub is still running. All instances are + # hibernated (no writers), watchdog-hibernated-timeout is 86400s + # (no auto-deprovision), hub is only touching its own metadata + # outside servers/<mid>/. Safe. --- + copy_src = hub_data_dir / "servers" + to_copy = [m for m in hib_accepted if m not in set(stuck_hib) and m not in set(failed_hib)] + print(f"[snapshot] copying {len(to_copy)} module trees from {copy_src} -> {snapshot_dir}") + t0 = time.monotonic() + modules_copied, files_copied, bytes_copied = _copy_snapshot(copy_src, snapshot_dir, to_copy) + print(f"[snapshot] copied {modules_copied} modules, {files_copied:,} files, {bytes_copied/1024/1024:.1f} MB " + f"({time.monotonic()-t0:.1f}s)") + + if modules_copied < len(to_copy): + print(f"[snapshot] WARNING: only {modules_copied}/{len(to_copy)} trees copied") + exit_code = 1 + + # --- Graceful hub shutdown via 'zen down' --- + _zen_down_hub(zen_exe, hub_proc) + hub_proc = None + if hub_log_handle is not None: + hub_log_handle.close() + hub_log_handle = None + + print(f"[summary] stage A total elapsed: {time.monotonic()-t_start:.1f}s, exit={exit_code}") + print(f"[summary] snapshot available at: {snapshot_dir}") + + finally: + if hub_proc is not None and hub_proc.poll() is None: + # Fallback: something aborted before the 'zen down' path ran. + print("[hub] force-terminating (zen down was not invoked)") + hub_proc.terminate() + try: + hub_proc.wait(timeout=60) + except subprocess.TimeoutExpired: + hub_proc.kill() + hub_proc.wait() + if hub_log_handle is not None: + hub_log_handle.close() + + return exit_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/zencore/include/zencore/string.h b/src/zencore/include/zencore/string.h index a1c7a3914..7ca2afc69 100644 --- a/src/zencore/include/zencore/string.h +++ b/src/zencore/include/zencore/string.h @@ -742,6 +742,7 @@ size_t NiceBytesToBuffer(uint64_t Num, std::span<char> Buffer); size_t NiceByteRateToBuffer(uint64_t Num, uint64_t ms, std::span<char> Buffer); size_t NiceLatencyNsToBuffer(uint64_t NanoSeconds, std::span<char> Buffer); size_t NiceTimeSpanMsToBuffer(uint64_t Milliseconds, std::span<char> Buffer); +size_t NiceTimeSpanUsToBuffer(uint64_t Microseconds, std::span<char> Buffer); struct NiceBase { @@ -803,6 +804,11 @@ struct NiceTimeSpanMs : public NiceBase inline NiceTimeSpanMs(uint64_t Milliseconds) { NiceTimeSpanMsToBuffer(Milliseconds, m_Buffer); } }; +struct NiceTimeSpanUs : public NiceBase +{ + inline NiceTimeSpanUs(uint64_t Microseconds) { NiceTimeSpanUsToBuffer(Microseconds, m_Buffer); } +}; + ////////////////////////////////////////////////////////////////////////// inline std::string diff --git a/src/zencore/string.cpp b/src/zencore/string.cpp index 34519b83b..4072aec56 100644 --- a/src/zencore/string.cpp +++ b/src/zencore/string.cpp @@ -482,6 +482,24 @@ NiceTimeSpanMsToBuffer(uint64_t Millis, std::span<char> Buffer) } } +size_t +NiceTimeSpanUsToBuffer(uint64_t Micros, std::span<char> Buffer) +{ + if (Micros < 1000) + { + return snprintf(Buffer.data(), Buffer.size(), "%" PRIu64 "us", Micros); + } + else if (Micros < 10000) + { + return snprintf(Buffer.data(), Buffer.size(), "%.2fms", Micros / 1000.0); + } + else if (Micros < 1000000) + { + return snprintf(Buffer.data(), Buffer.size(), "%.1fms", Micros / 1000.0); + } + return NiceTimeSpanMsToBuffer(Micros / 1000, Buffer); +} + ////////////////////////////////////////////////////////////////////////// template<typename C> @@ -850,6 +868,13 @@ TEST_CASE("niceNum") NiceNumGeneral(100000000000000, Buffer, kNicenumTime); CHECK(StringEquals(Buffer, "100000s")); + + // floor() instead of round(): 999.5us must NOT render as "1000us". + NiceNumGeneral(999500, Buffer, kNicenumTime); + CHECK(StringEquals(Buffer, "999us")); + + NiceNumGeneral(999999, Buffer, kNicenumTime); + CHECK(StringEquals(Buffer, "999us")); } SUBCASE("bytes") @@ -917,6 +942,43 @@ TEST_CASE("niceNum") NiceTimeSpanMsToBuffer(360000000, Buffer); CHECK(StringEquals(Buffer, "100h00m")); } + + SUBCASE("timespan_us") + { + NiceTimeSpanUsToBuffer(0, Buffer); + CHECK(StringEquals(Buffer, "0us")); + + NiceTimeSpanUsToBuffer(1, Buffer); + CHECK(StringEquals(Buffer, "1us")); + + NiceTimeSpanUsToBuffer(999, Buffer); + CHECK(StringEquals(Buffer, "999us")); + + NiceTimeSpanUsToBuffer(1000, Buffer); + CHECK(StringEquals(Buffer, "1.00ms")); + + NiceTimeSpanUsToBuffer(1500, Buffer); + CHECK(StringEquals(Buffer, "1.50ms")); + + NiceTimeSpanUsToBuffer(9999, Buffer); + CHECK(StringEquals(Buffer, "10.00ms")); // %.2f rounds 9.999 -> 10.00 + + NiceTimeSpanUsToBuffer(10000, Buffer); + CHECK(StringEquals(Buffer, "10.0ms")); + + NiceTimeSpanUsToBuffer(999500, Buffer); + CHECK(StringEquals(Buffer, "999.5ms")); + + NiceTimeSpanUsToBuffer(999999, Buffer); + CHECK(StringEquals(Buffer, "1000.0ms")); // boundary just below 1s + + // >=1s delegates to NiceTimeSpanMs. + NiceTimeSpanUsToBuffer(1000000, Buffer); + CHECK(StringEquals(Buffer, "1.00s")); + + NiceTimeSpanUsToBuffer(60000000, Buffer); + CHECK(StringEquals(Buffer, "1m00s")); + } } TEST_CASE("StringBuilder") diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index c03c1a9a0..4ae8d0457 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/timer.h> +#include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> @@ -248,6 +249,7 @@ Hub::~Hub() void Hub::Shutdown() { + ZEN_TRACE_CPU("Hub::Shutdown"); ZEN_INFO("Hub service shutting down, deprovisioning any current instances"); bool Expected = false; @@ -299,6 +301,7 @@ Hub::Shutdown() Hub::Response Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { + ZEN_TRACE_CPU("Hub::Provision"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; @@ -328,17 +331,22 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, *m_Hydration, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), - .TempDir = m_HydrationTempPath / ModuleId, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath, - .Malloc = m_Config.InstanceMalloc, - .Trace = m_Config.InstanceTrace, - .TraceHost = m_Config.InstanceTraceHost, - .TraceFile = m_Config.InstanceTraceFile, - .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .EnableHydration = m_Config.EnableHydration, + .EnableDehydration = m_Config.EnableDehydration, + .HydrationPackEnabled = m_Config.HydrationPackEnabled, + .HydrationPackThresholdBytes = m_Config.HydrationPackThresholdBytes, + .HydrationMaxPackBytes = m_Config.HydrationMaxPackBytes, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -511,6 +519,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, HubInstanceState OldState, bool IsNewInstance) { + ZEN_TRACE_CPU("Hub::CompleteProvision"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); std::string BaseUri; // TODO? @@ -571,6 +580,7 @@ Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, Hub::Response Hub::Deprovision(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Deprovision"); ZEN_ASSERT(!m_ShutdownFlag.load()); return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) { ZEN_UNUSED(Instance); @@ -581,6 +591,7 @@ Hub::Deprovision(const std::string& ModuleId) Hub::Response Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate) { + ZEN_TRACE_CPU("Hub::InternalDeprovision"); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { @@ -694,6 +705,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI Hub::Response Hub::Obliterate(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Obliterate"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -845,6 +857,7 @@ Hub::Obliterate(const std::string& ModuleId) void Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { + ZEN_TRACE_CPU("Hub::CompleteObliterate"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -871,6 +884,7 @@ Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, siz void Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteDeprovision"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -938,6 +952,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si Hub::Response Hub::Hibernate(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Hibernate"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -1051,6 +1066,7 @@ Hub::Hibernate(const std::string& ModuleId) void Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteHibernate"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -1074,6 +1090,7 @@ Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance, size Hub::Response Hub::Wake(const std::string& ModuleId) { + ZEN_TRACE_CPU("Hub::Wake"); ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; @@ -1185,6 +1202,7 @@ Hub::Wake(const std::string& ModuleId) void Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) { + ZEN_TRACE_CPU("Hub::CompleteWake"); const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); @@ -1402,6 +1420,7 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS void Hub::AttemptRecoverInstance(std::string_view ModuleId) { + ZEN_TRACE_CPU("Hub::AttemptRecoverInstance"); StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; { @@ -1506,6 +1525,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex) { + ZEN_TRACE_CPU("Hub::CheckInstanceStatus"); const std::string ModuleId(LockedInstance.GetModuleId()); HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); @@ -1645,6 +1665,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, void Hub::UpdateMachineMetrics() { + ZEN_TRACE_CPU("Hub::UpdateMachineMetrics"); try { bool DiskSpaceOk = false; @@ -1696,6 +1717,7 @@ Hub::WatchDog() size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 while (!m_ShutdownFlag.load() && !m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) { + ZEN_TRACE_CPU("Hub::WatchDogCycle"); try { UpdateMachineMetrics(); @@ -2755,18 +2777,18 @@ TEST_CASE("hub.instance.inactivity.deprovision") auto PokeInstance = [&](uint16_t Port) { // Make a real storage request to increment the instance's activity sum. // The watchdog detects the changed sum on the next cycle and resets LastActivityTime. - { - HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), - HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)}); - uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - - std::chrono::steady_clock::time_point::min()) - .count(); - IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); - const HttpClient::Response PutResult = - PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), - IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); - CHECK(PutResult); - } + // Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast; + // RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners. + HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), + HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3}); + uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - + std::chrono::steady_clock::time_point::min()) + .count(); + IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); + const HttpClient::Response PutResult = + PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), + IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); + REQUIRE_MESSAGE(PutResult, "PokeInstance PUT failed - cannot reset activity timer"); }; PokeInstance(IdleInfo.Port); diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 40d046ce0..1ce9bc876 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -74,6 +74,11 @@ public: std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; CbObject HydrationOptions; + bool EnableHydration = true; + bool EnableDehydration = true; + bool HydrationPackEnabled = true; + uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes; + uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes; WatchDogConfiguration WatchDog; diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp index c7f25bab6..2730ba059 100644 --- a/src/zenserver/hub/hydration.cpp +++ b/src/zenserver/hub/hydration.cpp @@ -10,15 +10,20 @@ #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/system.h> #include <zencore/thread.h> #include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/uid.h> #include <zenutil/cloud/imdscredentials.h> #include <zenutil/cloud/s3client.h> #include <zenutil/filesystemutils.h> +#include <zenutil/wildcard.h> #include <numeric> #include <unordered_map> @@ -34,6 +39,8 @@ namespace zen { +using namespace std::literals; + namespace hydration_impl { /// UTC time decomposed to calendar fields with sub-second milliseconds. @@ -76,7 +83,51 @@ namespace hydration_impl { std::atomic<bool>& PauseFlag, const std::filesystem::path& Path) { - CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); + CleanDirectoryResult Result = CleanDirectory(WorkerPool, AbortFlag, PauseFlag, Path, std::vector<std::string>{}, {}, 0); + for (const auto& [FailedPath, Ec] : Result.FailedRemovePaths) + { + ZEN_WARN("Failed to remove '{}' while cleaning '{}': {}", FailedPath, Path, Ec.message()); + } + } + + // Returns true if RelKey matches any wildcard in Excludes. Excluded paths are + // dropped at the dehydrate-side directory scan and never enter the manifest. + bool IsExcluded(std::string_view RelKey, std::span<const std::string> Excludes) + { + for (const std::string& W : Excludes) + { + if (MatchWildcard(W, RelKey, /*CaseSensitive*/ true)) + { + return true; + } + } + return false; + } + + std::vector<std::string> ParseStringArray(CbFieldView Field) + { + std::vector<std::string> Out; + for (CbFieldView Entry : Field.AsArrayView()) + { + Out.emplace_back(Entry.AsString()); + } + return Out; + } + + // Built-in exclude wildcards applied unless the hub config supplies an explicit + // `excludes` array (an empty array opts out of all defaults). Patterns match the + // dehydrate-side relative path (forward-slash form). `*` is path-separator-agnostic + // per zenutil/wildcard.h. + std::vector<std::string> DefaultExcludes() + { + return { + ".sentry-native/*", // sentry-native crash uploader DB; locked while child runs + "state_marker", // root-level liveness marker (zenstorageserver.cpp) + ".lock", // FILE_FLAG_DELETE_ON_CLOSE lock; locked while child runs + "*.bak", // transient backups produced by atomic file replace + "gc/reserve.gc", // GC disk reserve (gc subdir per zenstorageserver.cpp) + "auth/*", // encrypted auth state under auth/ + }; } /////////////////////////////////////////////////////////////////////// @@ -90,45 +141,110 @@ namespace hydration_impl { std::atomic<uint64_t> Bytes{0}; // lambda-side: bytes transferred on successful completion std::atomic<uint64_t> ElapsedUs{0}; // wall time around Work.Wait() - RwLock ThreadIdsLock; - std::unordered_set<int> ThreadIds; + // Per-request timing gathered inside the work lambdas. RequestCount + RequestTotalUs are + // summed across all completed requests. RequestMaxUs is the slowest single request + // observed (CAS-updated in EndRequest); avg vs max gap surfaces tail latency without + // keeping per-request samples. + std::atomic<uint64_t> RequestCount{0}; + std::atomic<uint64_t> RequestTotalUs{0}; + std::atomic<uint64_t> RequestMaxUs{0}; + std::atomic<uint32_t> InFlight{0}; + std::atomic<uint32_t> InFlightPeak{0}; + + // Scheduling latency. PhaseClock starts when the phase begins. FirstScheduleUs / + // FirstStartUs are the relative times of the earliest ScheduleWork call and the earliest + // worker lambda entry. Their difference is how long requests sat in the pool backlog + // before a worker picked one up (pool warm-up / backlog). + Stopwatch PhaseClock; + std::atomic<uint64_t> FirstScheduleUs{UINT64_MAX}; + std::atomic<uint64_t> FirstStartUs{UINT64_MAX}; + + void RecordScheduled() + { + uint64_t Now = PhaseClock.GetElapsedTimeUs(); + uint64_t Existing = FirstScheduleUs.load(std::memory_order_relaxed); + while (Now < Existing && !FirstScheduleUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) + { + } + } - void RecordThread() + // Returns a Stopwatch the caller runs across the actual request; call EndRequest with + // the elapsed microseconds when the request completes. + Stopwatch BeginRequest() { - int Tid = zen::GetCurrentThreadId(); - ThreadIdsLock.WithExclusiveLock([&] { ThreadIds.insert(Tid); }); + uint64_t Now = PhaseClock.GetElapsedTimeUs(); + uint64_t Existing = FirstStartUs.load(std::memory_order_relaxed); + while (Now < Existing && !FirstStartUs.compare_exchange_weak(Existing, Now, std::memory_order_relaxed)) + { + } + uint32_t Current = InFlight.fetch_add(1, std::memory_order_relaxed) + 1; + uint32_t Peak = InFlightPeak.load(std::memory_order_relaxed); + while (Current > Peak && !InFlightPeak.compare_exchange_weak(Peak, Current, std::memory_order_relaxed)) + { + } + return Stopwatch{}; + } + + void EndRequest(uint64_t ElapsedUsValue) + { + InFlight.fetch_sub(1, std::memory_order_relaxed); + RequestCount.fetch_add(1, std::memory_order_relaxed); + RequestTotalUs.fetch_add(ElapsedUsValue, std::memory_order_relaxed); + uint64_t Existing = RequestMaxUs.load(std::memory_order_relaxed); + while (ElapsedUsValue > Existing && !RequestMaxUs.compare_exchange_weak(Existing, ElapsedUsValue, std::memory_order_relaxed)) + { + } } }; struct DehydrateStatistics { PhaseStats Hash; - PhaseStats Upload; - PhaseStats Touch; // Touch shares Upload's ParallelWork / ElapsedUs + PhaseStats Upload; // Loose CAS PUTs + PhaseStats Touch; // Mod-time refresh on pre-existing loose CAS entries; shares Upload's ParallelWork + PhaseStats PackUpload; // Pack-blob PUTs; shares Upload's ParallelWork + PhaseStats PackTouch; // Mod-time refresh on pre-existing pack blobs; shares Upload's ParallelWork std::atomic<uint64_t> LoadStateUs{0}; std::atomic<uint64_t> DirScanUs{0}; std::atomic<uint64_t> ListExistingUs{0}; - std::atomic<uint64_t> MetadataSaveUs{0}; + std::atomic<uint64_t> SaveMetadataUs{0}; std::atomic<uint64_t> CleanUs{0}; std::atomic<uint64_t> TotalFiles{0}; std::atomic<uint64_t> TotalBytes{0}; std::atomic<uint64_t> TotalUs{0}; + + // Pack phase stats + std::atomic<uint64_t> PackCount{0}; // number of packs built + std::atomic<uint64_t> PackedFiles{0}; // Files[] entries folded into packs (includes hash-duplicates) + std::atomic<uint64_t> PackBytes{0}; // total bytes across all packs + std::atomic<uint64_t> PackBuildUs{0}; // hash + write cost across all packs }; struct HydrateStatistics { - PhaseStats Download; + PhaseStats Download; // Loose CAS GETs + PhaseStats PackDownload; // Pack-blob GETs; shares Download's ParallelWork std::atomic<uint64_t> LoadMetadataUs{0}; + std::atomic<uint64_t> CreateDirsUs{0}; + std::atomic<uint64_t> CreateDirsCount{0}; // unique parent dirs passed to CreateDirectories std::atomic<uint64_t> CleanUs{0}; - std::atomic<uint64_t> RenameOrCopyUs{0}; - std::atomic<uint64_t> VerifyScanUs{0}; + std::atomic<uint64_t> FinalizeUs{0}; + std::atomic<uint64_t> BuildStateUs{0}; std::atomic<uint64_t> TotalFiles{0}; std::atomic<uint64_t> TotalBytes{0}; std::atomic<uint64_t> TotalUs{0}; + + // Pack phase stats + std::atomic<uint64_t> PackCount{0}; // packs in manifest + std::atomic<uint64_t> PackedFiles{0}; // files unpacked into ServerStateDir (per-destination count) + std::atomic<uint64_t> PackUnpackUs{0}; // slice + parallel SafeWriteFile cost + // Bytes written to disk during the unpack-and-slice phase (sum of slice sizes + // touched by SafeWriteFile). Pairs with PackUnpackUs for disk-write throughput. + std::atomic<uint64_t> UnpackWriteBytes{0}; }; // Bits-per-second rate computed at microsecond precision. Zero-safe. @@ -149,9 +265,13 @@ namespace hydration_impl { public: virtual ~StorageBase() = default; - virtual std::string Describe() const = 0; - virtual void SaveMetadata(const CbObject& Data) = 0; - virtual CbObject LoadMetadata() = 0; + virtual std::string Describe() const = 0; + virtual void SaveMetadata(const CbObject& Data) = 0; + virtual CbObject LoadMetadata() = 0; + // Backend-specific settings that need to be persisted in state.cbo and reapplied + // on hydrate. Today only S3 uses this (MultipartChunkSize - the chunk size used at + // dehydrate must be carried forward so hydrate uses the same partitioning). File + // backend has no such settings and returns / accepts an empty object. virtual CbObject GetSettings() = 0; virtual void ParseSettings(const CbObjectView& Settings) = 0; virtual std::vector<IoHash> List() = 0; @@ -179,7 +299,7 @@ namespace hydration_impl { explicit FileStorage(std::filesystem::path ModulePath); - virtual std::string Describe() const override { return fmt::format("file://{}", m_StoragePath.generic_string()); } + virtual std::string Describe() const override { return fmt::format("file://{}"sv, m_StoragePath.generic_string()); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override { return {}; } @@ -209,13 +329,12 @@ namespace hydration_impl { class S3Storage : public StorageBase { public: - static constexpr std::string_view Prefix = "s3://"; - static constexpr std::string_view Type = "s3"; - static constexpr uint64_t DefaultMultipartChunkSize = 32u * 1024u * 1024u; + static constexpr std::string_view Prefix = "s3://"; + static constexpr std::string_view Type = "s3"; S3Storage(S3Client& Client, std::string KeyPrefix, std::filesystem::path TempDir, uint64_t MultipartChunkSize); - virtual std::string Describe() const override { return fmt::format("s3://{}/{}", m_Client.BucketName(), m_KeyPrefix); } + virtual std::string Describe() const override { return fmt::format("s3://{}/{}"sv, m_Client.BucketName(), m_KeyPrefix); } virtual void SaveMetadata(const CbObject& Data) override; virtual CbObject LoadMetadata() override; virtual CbObject GetSettings() override; @@ -256,6 +375,7 @@ namespace hydration_impl { void FileStorage::SaveMetadata(const CbObject& Data) { + ZEN_TRACE_CPU("FileStorage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); WriteFile(m_StatePathName, IoBuffer(IoBuffer::Wrap, Output.GetData(), Output.GetSize())); @@ -263,6 +383,7 @@ namespace hydration_impl { CbObject FileStorage::LoadMetadata() { + ZEN_TRACE_CPU("FileStorage::LoadMetadata"); if (!IsFile(m_StatePathName)) { return {}; @@ -277,7 +398,7 @@ namespace hydration_impl { CbObject Result = ValidateAndReadCompactBinaryObject(std::move(Payload), Error); if (Error != CbValidateError::None) { - throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}", m_StatePathName, ToString(Error))); + throw std::runtime_error(fmt::format("Failed to read {} state file. Reason: {}"sv, m_StatePathName, ToString(Error))); } return Result; } @@ -309,13 +430,15 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("FileStorage::Put"); if (!AbortFlag.load()) { - std::filesystem::path DestPath = m_CASPath / fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + std::filesystem::path DestPath = m_CASPath / fmt::format("{}"sv, Hash); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); if (std::error_code Ec = CopyFile(SourcePath, DestPath, CopyFileOptions{.EnableClone = true}); Ec) { - throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestPath)); + throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } @@ -332,13 +455,16 @@ namespace hydration_impl { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats]( std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("FileStorage::Get"); if (!AbortFlag.load()) { - std::filesystem::path SourcePath = m_CASPath / fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + std::filesystem::path SourcePath = m_CASPath / fmt::format("{}"sv, Hash); if (std::error_code Ec = CopyFile(SourcePath, DestinationPath, CopyFileOptions{.EnableClone = true}); Ec) { - throw std::system_error(Ec, fmt::format("Failed to copy '{}' to '{}'", SourcePath, DestinationPath)); + throw std::system_error(Ec, + fmt::format("Failed to copy '{}' to '{}'"sv, SourcePath, DestinationPath)); } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); } @@ -365,6 +491,7 @@ namespace hydration_impl { void S3Storage::SaveMetadata(const CbObject& Data) { + ZEN_TRACE_CPU("S3Storage::SaveMetadata"); BinaryWriter Output; SaveCompactBinary(Output, Data); IoBuffer Payload(IoBuffer::Clone, Output.GetData(), Output.GetSize()); @@ -373,12 +500,13 @@ namespace hydration_impl { S3Result Result = m_Client.PutObject(Key, std::move(Payload)); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to save incremental metadata to '{}': {}", Key, Result.Error); + throw zen::runtime_error("Failed to save incremental metadata to '{}': {}"sv, Key, Result.Error); } } CbObject S3Storage::LoadMetadata() { + ZEN_TRACE_CPU("S3Storage::LoadMetadata"); std::string Key = m_KeyPrefix + "/incremental-state.cbo"; S3GetObjectResult Result = m_Client.GetObject(Key); if (!Result.IsSuccess()) @@ -387,14 +515,14 @@ namespace hydration_impl { { return {}; } - throw zen::runtime_error("Failed to load incremental metadata from '{}': {}", Key, Result.Error); + throw zen::runtime_error("Failed to load incremental metadata from '{}': {}"sv, Key, Result.Error); } CbValidateError Error; CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Result.Content), Error); if (Error != CbValidateError::None) { - throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}", Key, ToString(Error)); + throw zen::runtime_error("Failed to parse incremental metadata from '{}': {}"sv, Key, ToString(Error)); } return Meta; } @@ -402,13 +530,13 @@ namespace hydration_impl { CbObject S3Storage::GetSettings() { CbObjectWriter Writer; - Writer << "MultipartChunkSize" << m_MultipartChunkSize; + Writer << "MultipartChunkSize"sv << m_MultipartChunkSize; return Writer.Save(); } void S3Storage::ParseSettings(const CbObjectView& Settings) { - m_MultipartChunkSize = Settings["MultipartChunkSize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + m_MultipartChunkSize = Settings["MultipartChunkSize"sv].AsUInt64(DefaultMultipartChunkSize); } std::vector<IoHash> S3Storage::List() @@ -417,7 +545,7 @@ namespace hydration_impl { S3ListObjectsResult Result = m_Client.ListObjects(CasPrefix); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to list S3 objects under '{}': {}", CasPrefix, Result.Error); + throw zen::runtime_error("Failed to list S3 objects under '{}': {}"sv, CasPrefix, Result.Error); } std::vector<IoHash> Hashes; @@ -448,13 +576,15 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Hash = IoHash(Hash), Size, SourcePath = std::filesystem::path(SourcePath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("S3Storage::Put"); if (AbortFlag.load()) { return; } - S3Client& Client = m_Client; - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3Client& Client = m_Client; + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { @@ -466,7 +596,7 @@ namespace hydration_impl { m_MultipartChunkSize); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } else @@ -475,7 +605,7 @@ namespace hydration_impl { S3Result Result = Client.PutObject(Key, File.ReadAll()); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to upload '{}' to S3: {}"sv, Key, Result.Error); } } Stats.Bytes.fetch_add(Size, std::memory_order_relaxed); @@ -489,7 +619,7 @@ namespace hydration_impl { const std::filesystem::path& DestinationPath, PhaseStats& Stats) { - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); if (Size >= (m_MultipartChunkSize + (m_MultipartChunkSize / 4))) { @@ -515,15 +645,17 @@ namespace hydration_impl { uint64_t ChunkSize = std::min<uint64_t>(m_MultipartChunkSize, Size - Offset); Work.ScheduleWork(WorkerPool, [this, Key = Key, Offset, ChunkSize, Data, &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); - if (AbortFlag) + ZEN_TRACE_CPU("S3Storage::GetRange"); + if (AbortFlag.load()) { return; } - S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3GetObjectResult Chunk = m_Client.GetObjectRange(Key, Offset, ChunkSize); if (!Chunk.IsSuccess()) { - throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}", + throw zen::runtime_error("Failed to download '{}' bytes [{}-{}] from S3: {}"sv, Key, Offset, Offset + ChunkSize - 1, @@ -541,15 +673,17 @@ namespace hydration_impl { Work.ScheduleWork( WorkerPool, [this, Key = Key, Size, DestinationPath = std::filesystem::path(DestinationPath), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); - if (AbortFlag) + ZEN_TRACE_CPU("S3Storage::Get"); + if (AbortFlag.load()) { return; } - S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + S3GetObjectResult Chunk = m_Client.GetObject(Key, m_TempDir); if (!Chunk.IsSuccess()) { - throw zen::runtime_error("Failed to download '{}' from S3: {}", Key, Chunk.Error); + throw zen::runtime_error("Failed to download '{}' from S3: {}"sv, Key, Chunk.Error); } if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef)) @@ -585,16 +719,18 @@ namespace hydration_impl { void S3Storage::Touch(ParallelWork& Work, WorkerThreadPool& WorkerPool, const IoHash& Hash, PhaseStats& Stats) { Work.ScheduleWork(WorkerPool, [this, Hash = IoHash(Hash), &Stats](std::atomic<bool>& AbortFlag) { - Stats.RecordThread(); + ZEN_TRACE_CPU("S3Storage::Touch"); if (AbortFlag.load()) { return; } - std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}", Hash); - S3Result Result = m_Client.Touch(Key); + Stopwatch Timer = Stats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { Stats.EndRequest(Timer.GetElapsedTimeUs()); }); + std::string Key = m_KeyPrefix + "/cas/" + fmt::format("{}"sv, Hash); + S3Result Result = m_Client.Touch(Key); if (!Result.IsSuccess()) { - throw zen::runtime_error("Failed to touch '{}' in S3: {}", Key, Result.Error); + throw zen::runtime_error("Failed to touch '{}' in S3: {}"sv, Key, Result.Error); } }); } @@ -605,7 +741,7 @@ namespace hydration_impl { S3ListObjectsResult ListResult = m_Client.ListObjects(ModulePrefix); if (!ListResult.IsSuccess()) { - throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}", ModulePrefix, ListResult.Error); + throw zen::runtime_error("Failed to list S3 objects for deletion under '{}': {}"sv, ModulePrefix, ListResult.Error); } for (const S3ObjectInfo& Obj : ListResult.Objects) { @@ -617,7 +753,7 @@ namespace hydration_impl { S3Result DelResult = m_Client.DeleteObject(Key); if (!DelResult.IsSuccess()) { - throw zen::runtime_error("Failed to delete S3 object '{}': {}", Key, DelResult.Error); + throw zen::runtime_error("Failed to delete S3 object '{}': {}"sv, Key, DelResult.Error); } }); } @@ -627,51 +763,141 @@ namespace hydration_impl { // IncrementalHydrator: the only HydrationStrategyBase implementation. // Summary emission for hydrate/dehydrate operations. + // Queue-wait helper: time between earliest schedule and earliest worker start. UINT64_MAX + // sentinels mean the corresponding event never happened (no work scheduled / nothing ran). + inline uint64_t QueueWaitUs(uint64_t FirstScheduleUs, uint64_t FirstStartUs) + { + if (FirstScheduleUs == UINT64_MAX || FirstStartUs == UINT64_MAX || FirstStartUs <= FirstScheduleUs) + { + return 0; + } + return FirstStartUs - FirstScheduleUs; + } + + inline uint64_t SafeAvg(uint64_t Total, uint64_t Count) { return Count ? Total / Count : 0; } + void LogDehydrateSummary(std::string_view Prefix, const DehydrateStatistics& Stats, std::string_view ModuleId, const std::filesystem::path& Source, std::string_view Target) { - const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); - const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); + const uint64_t TotalFiles = Stats.TotalFiles.load(); + const uint64_t HashUs = Stats.Hash.ElapsedUs.load(); + const uint64_t UploadUs = Stats.Upload.ElapsedUs.load(); + + // Hash phase: per-request data (BeginRequest/EndRequest tracks each hash op). + // Hash is the first phase to schedule work, so cold-pool warm-up shows up here. + const uint64_t HashFiles = Stats.Hash.Files.load(); + const uint64_t HashBytes = Stats.Hash.Bytes.load(); + const uint64_t HashReqCount = Stats.Hash.RequestCount.load(); + const uint64_t HashReqTotalUs = Stats.Hash.RequestTotalUs.load(); + const uint64_t HashReqMaxUs = Stats.Hash.RequestMaxUs.load(); + const uint32_t HashPeak = Stats.Hash.InFlightPeak.load(); + const uint64_t HashQueueUs = QueueWaitUs(Stats.Hash.FirstScheduleUs.load(), Stats.Hash.FirstStartUs.load()); + // Cache hit rate: every TotalFile not re-hashed was served from the cached state. + const uint32_t CacheHitPct = TotalFiles ? gsl::narrow_cast<uint32_t>((TotalFiles - HashFiles) * 100 / TotalFiles) : 0; + + // Upload phase shares one ParallelWork across loose Put (Stats.Upload), pack-blob Put + // (Stats.PackUpload), loose Touch (Stats.Touch), and pack-blob Touch (Stats.PackTouch). + // Per-request data is collected per PhaseStats by Storage::Put / Storage::Touch and + // reported as a single combined "Requests" line. + const uint64_t UpReqCount = Stats.Upload.RequestCount.load() + Stats.PackUpload.RequestCount.load() + + Stats.Touch.RequestCount.load() + Stats.PackTouch.RequestCount.load(); + const uint64_t UpReqTotalUs = Stats.Upload.RequestTotalUs.load() + Stats.PackUpload.RequestTotalUs.load() + + Stats.Touch.RequestTotalUs.load() + Stats.PackTouch.RequestTotalUs.load(); + const uint64_t UpReqMaxUs = std::max({Stats.Upload.RequestMaxUs.load(), + Stats.PackUpload.RequestMaxUs.load(), + Stats.Touch.RequestMaxUs.load(), + Stats.PackTouch.RequestMaxUs.load()}); + const uint32_t UpPeak = std::max({Stats.Upload.InFlightPeak.load(), + Stats.PackUpload.InFlightPeak.load(), + Stats.Touch.InFlightPeak.load(), + Stats.PackTouch.InFlightPeak.load()}); + // Combined first-schedule / first-start across all four phase counters. UINT64_MAX is + // the unset sentinel and naturally loses to any real timestamp under std::min, so empty + // phases fall through to the others. + const uint64_t UpFirstSchedUs = std::min({Stats.Upload.FirstScheduleUs.load(), + Stats.PackUpload.FirstScheduleUs.load(), + Stats.Touch.FirstScheduleUs.load(), + Stats.PackTouch.FirstScheduleUs.load()}); + const uint64_t UpFirstStartUs = std::min({Stats.Upload.FirstStartUs.load(), + Stats.PackUpload.FirstStartUs.load(), + Stats.Touch.FirstStartUs.load(), + Stats.PackTouch.FirstStartUs.load()}); + const uint64_t UpQueueUs = QueueWaitUs(UpFirstSchedUs, UpFirstStartUs); + + const uint64_t LooseFiles = Stats.Upload.Files.load(); + const uint64_t LooseBytes = Stats.Upload.Bytes.load(); + const uint64_t TouchFiles = Stats.Touch.Files.load(); + const uint64_t TouchBytes = Stats.Touch.Bytes.load(); + const uint64_t PackTouchPacks = Stats.PackTouch.Files.load(); + const uint64_t PackTouchBytes = Stats.PackTouch.Bytes.load(); + + const uint64_t PackCount = Stats.PackCount.load(); + const uint64_t PackedFiles = Stats.PackedFiles.load(); + const uint64_t PackBytes = Stats.PackBytes.load(); + const uint64_t PackBuildUs = Stats.PackBuildUs.load(); + const uint64_t PackUploadFiles = Stats.PackUpload.Files.load(); + const uint64_t PackUploadBytes = Stats.PackUpload.Bytes.load(); + ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load state: {}\n" " Dir scan: {}\n" - " Hash phase: {} {}/{} ({}) hashed, {}bits/s, {} threads\n" + " Hash: {} {}/{} files ({}) hashed, {}% cache hit, {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" " List existing: {}\n" - " Upload phase: {} {}/{} ({}) uploaded, {} ({}) touched, {}bits/s, {} threads\n" - " Metadata save: {}\n" + " Pack: {} {} packs, {} files, {}, {}bits/s\n" + " Upload: {} loose {} files ({}), packed {} blobs ({}), touched {} loose ({}) + {} packs ({}), {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Save metadata: {}\n" " Clean: {}", Prefix, ModuleId, - ThousandsNum(Stats.TotalFiles.load()), + ThousandsNum(TotalFiles), NiceBytes(Stats.TotalBytes.load()), - NiceLatencyNs(Stats.TotalUs.load() * 1000), + NiceTimeSpanUs(Stats.TotalUs.load()), Source.generic_string(), Target, - NiceLatencyNs(Stats.LoadStateUs.load() * 1000), - NiceLatencyNs(Stats.DirScanUs.load() * 1000), - NiceLatencyNs(HashUs * 1000), - ThousandsNum(Stats.Hash.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Hash.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Hash.Bytes.load(), HashUs)), - Stats.Hash.ThreadIds.size(), - NiceLatencyNs(Stats.ListExistingUs.load() * 1000), - NiceLatencyNs(UploadUs * 1000), - ThousandsNum(Stats.Upload.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Upload.Bytes.load()), - ThousandsNum(Stats.Touch.Files.load()), - NiceBytes(Stats.Touch.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Upload.Bytes.load(), UploadUs)), - Stats.Upload.ThreadIds.size(), - NiceLatencyNs(Stats.MetadataSaveUs.load() * 1000), - NiceLatencyNs(Stats.CleanUs.load() * 1000)); + NiceTimeSpanUs(Stats.LoadStateUs.load()), + NiceTimeSpanUs(Stats.DirScanUs.load()), + NiceTimeSpanUs(HashUs), + ThousandsNum(HashFiles), + ThousandsNum(TotalFiles), + NiceBytes(HashBytes), + CacheHitPct, + NiceNum(BitsPerSecond(HashBytes, HashUs)), + ThousandsNum(HashReqCount), + NiceTimeSpanUs(SafeAvg(HashReqTotalUs, HashReqCount)), + NiceTimeSpanUs(HashReqMaxUs), + HashPeak, + NiceTimeSpanUs(HashQueueUs), + NiceTimeSpanUs(Stats.ListExistingUs.load()), + NiceTimeSpanUs(PackBuildUs), + ThousandsNum(PackCount), + ThousandsNum(PackedFiles), + NiceBytes(PackBytes), + NiceNum(BitsPerSecond(PackBytes, PackBuildUs)), + NiceTimeSpanUs(UploadUs), + ThousandsNum(LooseFiles), + NiceBytes(LooseBytes), + ThousandsNum(PackUploadFiles), + NiceBytes(PackUploadBytes), + ThousandsNum(TouchFiles), + NiceBytes(TouchBytes), + ThousandsNum(PackTouchPacks), + NiceBytes(PackTouchBytes), + NiceNum(BitsPerSecond(LooseBytes + PackUploadBytes, UploadUs)), + ThousandsNum(UpReqCount), + NiceTimeSpanUs(SafeAvg(UpReqTotalUs, UpReqCount)), + NiceTimeSpanUs(UpReqMaxUs), + UpPeak, + NiceTimeSpanUs(UpQueueUs), + NiceTimeSpanUs(Stats.SaveMetadataUs.load()), + NiceTimeSpanUs(Stats.CleanUs.load())); } void LogHydrateSummary(std::string_view Prefix, @@ -681,42 +907,137 @@ namespace hydration_impl { const std::filesystem::path& Target) { const uint64_t DownloadUs = Stats.Download.ElapsedUs.load(); + + // Standalone (Stats.Download) and pack (Stats.PackDownload) downloads share one + // ParallelWork. Per-request data is collected per PhaseStats by Storage::Get; + // reported as a single combined "Requests" line. Multipart GETs may split a pack + // into several ranged requests, so DlReqCount can exceed file/blob counts. + const uint64_t StandaloneFiles = Stats.Download.Files.load(); + const uint64_t StandaloneBytes = Stats.Download.Bytes.load(); + const uint64_t PackDlFiles = Stats.PackDownload.Files.load(); + const uint64_t PackDlBytes = Stats.PackDownload.Bytes.load(); + const uint64_t DlReqCount = Stats.Download.RequestCount.load() + Stats.PackDownload.RequestCount.load(); + const uint64_t DlReqTotalUs = Stats.Download.RequestTotalUs.load() + Stats.PackDownload.RequestTotalUs.load(); + const uint64_t DlReqMaxUs = std::max(Stats.Download.RequestMaxUs.load(), Stats.PackDownload.RequestMaxUs.load()); + const uint32_t DlPeak = std::max(Stats.Download.InFlightPeak.load(), Stats.PackDownload.InFlightPeak.load()); + const uint64_t DlFirstSchedUs = std::min(Stats.Download.FirstScheduleUs.load(), Stats.PackDownload.FirstScheduleUs.load()); + const uint64_t DlFirstStartUs = std::min(Stats.Download.FirstStartUs.load(), Stats.PackDownload.FirstStartUs.load()); + const uint64_t QueueUs = QueueWaitUs(DlFirstSchedUs, DlFirstStartUs); + + const uint64_t PackCount = Stats.PackCount.load(); + const uint64_t PackedFiles = Stats.PackedFiles.load(); + const uint64_t PackUnpackUs = Stats.PackUnpackUs.load(); + const uint64_t UnpackWriteBytes = Stats.UnpackWriteBytes.load(); + + const uint64_t CreateDirsUs = Stats.CreateDirsUs.load(); + const uint64_t CreateDirsCount = Stats.CreateDirsCount.load(); + const uint64_t CreateDirsRate = CreateDirsUs ? (CreateDirsCount * 1'000'000ull / CreateDirsUs) : 0; + + // Standalone and pack downloads share the Download phase elapsed reported below; + // the unpack line is its own clock (slice + parallel SafeWriteFile). ZEN_INFO( "{} module '{}': {} files ({}) in {}\n" " Source: {}\n" " Target: {}\n" " Load metadata: {}\n" - " Download phase: {} {}/{} ({}) downloaded, {}bits/s, {} threads\n" + " Create dirs: {} {} dirs, {} dirs/s\n" + " Download: {} loose {} files ({}), packed {} blobs ({}), {}bits/s\n" + " Requests: {} reqs, avg {}/req, max {}/req, peak in-flight {}, queue wait {}\n" + " Unpack: {} {} packs, {} files ({}), {}bits/s\n" " Clean: {}\n" - " Rename/copy: {}\n" - " Verify scan: {}", + " Finalize: {}\n" + " Build state: {}", Prefix, ModuleId, ThousandsNum(Stats.TotalFiles.load()), NiceBytes(Stats.TotalBytes.load()), - NiceLatencyNs(Stats.TotalUs.load() * 1000), + NiceTimeSpanUs(Stats.TotalUs.load()), Source, Target.generic_string(), - NiceLatencyNs(Stats.LoadMetadataUs.load() * 1000), - NiceLatencyNs(DownloadUs * 1000), - ThousandsNum(Stats.Download.Files.load()), - ThousandsNum(Stats.TotalFiles.load()), - NiceBytes(Stats.Download.Bytes.load()), - NiceNum(BitsPerSecond(Stats.Download.Bytes.load(), DownloadUs)), - Stats.Download.ThreadIds.size(), - NiceLatencyNs(Stats.CleanUs.load() * 1000), - NiceLatencyNs(Stats.RenameOrCopyUs.load() * 1000), - NiceLatencyNs(Stats.VerifyScanUs.load() * 1000)); + NiceTimeSpanUs(Stats.LoadMetadataUs.load()), + NiceTimeSpanUs(CreateDirsUs), + ThousandsNum(CreateDirsCount), + NiceNum(CreateDirsRate), + NiceTimeSpanUs(DownloadUs), + ThousandsNum(StandaloneFiles), + NiceBytes(StandaloneBytes), + ThousandsNum(PackDlFiles), + NiceBytes(PackDlBytes), + NiceNum(BitsPerSecond(StandaloneBytes + PackDlBytes, DownloadUs)), + ThousandsNum(DlReqCount), + NiceTimeSpanUs(SafeAvg(DlReqTotalUs, DlReqCount)), + NiceTimeSpanUs(DlReqMaxUs), + DlPeak, + NiceTimeSpanUs(QueueUs), + NiceTimeSpanUs(PackUnpackUs), + ThousandsNum(PackCount), + ThousandsNum(PackedFiles), + NiceBytes(UnpackWriteBytes), + NiceNum(BitsPerSecond(UnpackWriteBytes, PackUnpackUs)), + NiceTimeSpanUs(Stats.CleanUs.load()), + NiceTimeSpanUs(Stats.FinalizeUs.load()), + NiceTimeSpanUs(Stats.BuildStateUs.load())); } /////////////////////////////////////////////////////////////////////// + // File-manifest entry: one of these per file in a module's state. Lives in + // the namespace (rather than nested in IncrementalHydrator) so the helper + // functions below can take it by reference. + + struct Entry + { + std::string RelativePath; + uint64_t Size; + uint64_t ModTick; + IoHash Hash; + bool IsPacked = false; // true if content is part of a pack (PackHash valid) + // Hash of the pack's concatenated raw bytes (= pack's CAS key). Hydrate downloads + // the pack once and slices this entry out at the offset recorded in Pack.Entries[] + // for the matching Entry.Hash. + IoHash PackHash; + }; + + /////////////////////////////////////////////////////////////////////// + // Pack types: produced by dehydrate's pack phase, consumed by the state + // writer; consumed during hydrate to reconstruct slices. + + struct BuiltPackEntry + { + IoHash Hash; + uint64_t Size; + }; + + struct BuiltPack + { + IoHash PackHash; + uint64_t Size; + std::vector<BuiltPackEntry> Entries; + }; + + struct PackEntryDescriptor + { + IoHash Hash; + uint64_t Size; + uint64_t Offset; + }; + + struct PackDescriptor + { + uint64_t Size = 0; + std::vector<PackEntryDescriptor> Entries; + }; + + using EntryGroup = std::vector<size_t>; + using PackPlan = std::vector<EntryGroup>; + + /////////////////////////////////////////////////////////////////////// // Holds a per-module StorageBase and threading context; drives the // hydrate/dehydrate algorithm. class IncrementalHydrator : public HydrationStrategyBase { public: - IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage); + IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage, std::span<const std::string> Excludes); virtual ~IncrementalHydrator() override; virtual void Dehydrate(const CbObject& CachedState) override; @@ -724,16 +1045,9 @@ namespace hydration_impl { virtual void Obliterate() override; private: - struct Entry - { - std::filesystem::path RelativePath; - uint64_t Size; - uint64_t ModTick; - IoHash Hash; - }; - std::unique_ptr<StorageBase> m_Storage; HydrationConfig m_Config; + std::vector<std::string> m_Excludes; WorkerThreadPool m_FallbackWorkPool; std::atomic<bool> m_FallbackAbortFlag{false}; std::atomic<bool> m_FallbackPauseFlag{false}; @@ -743,11 +1057,657 @@ namespace hydration_impl { }; /////////////////////////////////////////////////////////////////////// + // Phase helpers used by IncrementalHydrator::Dehydrate and ::Hydrate. + // These keep the two big member functions readable as a sequence of + // named phases. Helpers take only the data they need and never the + // full HydrationConfig or IncrementalHydrator. + + // Removes each path, ignoring errors. Used by both Dehydrate and Hydrate + // pack-cleanup guards plus the explicit pre-rename cleanup on Hydrate. + void RemoveStagedPackFiles(const std::vector<std::filesystem::path>& Files) + { + for (const std::filesystem::path& P : Files) + { + std::error_code Ec; + RemoveFile(P, Ec); + if (Ec) + { + ZEN_WARN("Failed to remove staged pack file '{}': {}", P, Ec.message()); + } + } + } + + // Collects parent_path() of each input, sorts lex ascending (= ancestor-first), + // uniques, then drops any entry that is a strict component-prefix of the next + // entry (its descendant's CreateDirectories recursion will create it). Calls + // CreateDirectories on the surviving leaves. Use before scheduling parallel + // writes so worker threads do not race to create the same parents. + size_t CreateParentDirectories(const std::vector<std::filesystem::path>& FilePaths) + { + if (FilePaths.empty()) + { + return 0; + } + + std::vector<std::filesystem::path> Dirs; + Dirs.reserve(FilePaths.size()); + for (const std::filesystem::path& File : FilePaths) + { + if (File.has_parent_path()) + { + Dirs.push_back(File.parent_path()); + } + } + if (Dirs.empty()) + { + return 0; + } + + std::sort(Dirs.begin(), Dirs.end()); + Dirs.erase(std::unique(Dirs.begin(), Dirs.end()), Dirs.end()); + + size_t Write = 0; + for (size_t Read = 0; Read < Dirs.size(); ++Read) + { + if (Read + 1 < Dirs.size()) + { + const std::filesystem::path& Cur = Dirs[Read]; + const std::filesystem::path& Next = Dirs[Read + 1]; + const auto [ItCur, ItNext] = std::mismatch(Cur.begin(), Cur.end(), Next.begin(), Next.end()); + if (ItCur == Cur.end() && ItNext != Next.end()) + { + continue; // Cur is component-prefix of Next; descendant will create it + } + } + if (Write != Read) + { + Dirs[Write] = std::move(Dirs[Read]); + } + ++Write; + } + Dirs.resize(Write); + + for (const std::filesystem::path& Dir : Dirs) + { + CreateDirectories(Dir); + } + return Dirs.size(); + } + + // Parses CachedState["Files"sv] into a path-keyed lookup + parallel Entries vector. + // Used by Dehydrate to seed its hash cache; Dehydrate ignores PackHash here. + void LoadCachedStateEntries(const CbObject& CachedState, + std::unordered_map<std::string, size_t>& OutLookup, + std::vector<Entry>& OutEntries) + { + for (CbFieldView FieldView : CachedState["Files"sv].AsArrayView()) + { + CbObjectView EntryView = FieldView.AsObjectView(); + std::string RelativePath(EntryView["Path"sv].AsString()); + uint64_t Size = EntryView["Size"sv].AsUInt64(); + uint64_t ModTick = EntryView["ModTick"sv].AsUInt64(); + IoHash Hash = EntryView["Hash"sv].AsHash(); + + OutLookup.insert_or_assign(RelativePath, OutEntries.size()); + OutEntries.push_back(Entry{.RelativePath = std::move(RelativePath), .Size = Size, .ModTick = ModTick, .Hash = Hash}); + } + } + + // Computes Out.Hash for a single file. For Oodle-compressed files in a `cas/` subdir + // the hash is a meta-hash combining the embedded RawHash with the file size, which + // avoids a collision between an uncompressed file and a same-content compressed file. + // All other files use a streaming raw hash via BasicFile + IoHashStream (sequential + // read, friendlier to the Windows cache manager than mmap). + void HashFileContent(const std::filesystem::path& AbsPath, Entry& Out) + { + if (AbsPath.extension().empty()) + { + std::string_view Rel = Out.RelativePath; + std::string_view First = Rel.substr(0, Rel.find('/')); + if (First.ends_with("cas")) + { + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), RawHash, RawSize); + if (Compressed) + { + IoHashStream Hasher; + Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); + Hasher.Append(&Out.Size, sizeof(Out.Size)); + Out.Hash = Hasher.GetHash(); + return; + } + } + } + + BasicFile File(AbsPath, BasicFile::Mode::kRead); + IoHashStream Hasher; + File.StreamFile([&Hasher](const void* Data, uint64_t Size) { Hasher.Append(Data, Size); }); + Out.Hash = Hasher.GetHash(); + } + + // Walks DirContent, fills Entries[], schedules hash work for files whose hash + // is not in the StateEntries cache. The caller owns the ParallelWork's Wait() + // so other operations (e.g. Storage::List) can overlap with hashing. Files whose + // relative path matches any pattern in Excludes are dropped here (the hub-wide + // default list - see DefaultExcludes() above - covers transient runtime files + // like .lock and .sentry-native; the user can override via HydrationOptions). + // Returns the number of accepted (non-filtered) entries; OutTotalBytes accumulates + // their sizes. + size_t ScanAndScheduleHashWork(const DirectoryContent& DirContent, + const std::filesystem::path& ServerStateDir, + const std::unordered_map<std::string, size_t>& StateEntryLookup, + const std::vector<Entry>& StateEntries, + std::span<const std::string> Excludes, + std::vector<Entry>& Entries, + uint64_t& OutTotalBytes, + ParallelWork& Work, + WorkerThreadPool& Pool, + PhaseStats& HashStats) + { + size_t TotalFiles = 0; + OutTotalBytes = 0; + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + std::string RelKey = RelativePath.generic_string(); + if (IsExcluded(RelKey, Excludes)) + { + continue; + } + const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); + + Entry& CurrentEntry = Entries[TotalFiles]; + CurrentEntry.RelativePath = std::move(RelKey); + CurrentEntry.Size = DirContent.FileSizes[FileIndex]; + CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; + + bool FoundHash = false; + if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath); KnownIt != StateEntryLookup.end()) + { + const Entry& StateEntry = StateEntries[KnownIt->second]; + if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) + { + CurrentEntry.Hash = StateEntry.Hash; + FoundHash = true; + } + } + + if (!FoundHash) + { + Work.ScheduleWork(Pool, [AbsPath, EntryIndex = TotalFiles, &Entries, &HashStats](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + Stopwatch Timer = HashStats.BeginRequest(); + auto GuardEnd = MakeGuard([&] { HashStats.EndRequest(Timer.GetElapsedTimeUs()); }); + Entry& CurrentEntry = Entries[EntryIndex]; + HashFileContent(AbsPath, CurrentEntry); + HashStats.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + }); + HashStats.Files.fetch_add(1, std::memory_order_relaxed); + HashStats.RecordScheduled(); + } + TotalFiles++; + OutTotalBytes += CurrentEntry.Size; + } + return TotalFiles; + } + + // Plans pack composition deterministically: groups Entries by content hash for + // candidates Size < Threshold, sorts groups by ascending IoHash, bin-packs greedily + // up to MaxPackBytes, and discards any pack with fewer than two entries. Sets + // `IsPacked = true` on every entry that survives into a published pack so the caller + // can immediately distinguish loose-CAS uploads from pack-bound uploads. + // Returns one PackPlan per pack to build (empty if no packs are produced). + std::vector<PackPlan> PlanPacks(std::vector<Entry>& Entries, uint64_t Threshold, uint64_t MaxPackBytes) + { + // 1. Group small-file Entries[] indices by content hash. Every index in a group + // shares the same bytes, so any one of them sources the pack content; all of + // them get tagged IsPacked once the pack hash is known. + std::unordered_map<IoHash, EntryGroup, IoHash::Hasher> UniqueMap; + for (size_t Index = 0; Index < Entries.size(); ++Index) + { + if (Entries[Index].Size >= Threshold) + { + continue; + } + UniqueMap[Entries[Index].Hash].push_back(Index); + } + + // Need at least 2 unique groups for any pack to survive the "discard 1-entry packs" rule. + if (UniqueMap.size() < 2) + { + return {}; + } + + auto GroupHash = [&](const EntryGroup& G) -> const IoHash& { return Entries[G.front()].Hash; }; + auto GroupSize = [&](const EntryGroup& G) -> uint64_t { return Entries[G.front()].Size; }; + + // 2. Deterministic order: ascending IoHash. Drain the map so the index vectors move. + std::vector<EntryGroup> Ordered; + Ordered.reserve(UniqueMap.size()); + for (auto& [h, g] : UniqueMap) + { + Ordered.push_back(std::move(g)); + } + std::sort(Ordered.begin(), Ordered.end(), [&](const EntryGroup& A, const EntryGroup& B) { return GroupHash(A) < GroupHash(B); }); + + // 3. Bin-pack greedily under MaxPackBytes. + std::vector<PackPlan> Plans; + PackPlan Current; + uint64_t CurrentSize = 0; + for (EntryGroup& Group : Ordered) + { + const uint64_t Size = GroupSize(Group); + if (Size >= MaxPackBytes) + { + continue; // fallback to standalone upload + } + if (CurrentSize + Size > MaxPackBytes && !Current.empty()) + { + if (Current.size() >= 2) + { + Plans.push_back(std::move(Current)); + } + Current = {}; + CurrentSize = 0; + } + Current.push_back(std::move(Group)); + CurrentSize += Size; + } + if (Current.size() >= 2) + { + Plans.push_back(std::move(Current)); + } + + // Tag entries that survived into a published pack so the loose-upload loop can skip + // them. Done after bin-packing so groups discarded by the <2-entry rule are not tagged. + for (const PackPlan& Plan : Plans) + { + for (const EntryGroup& Group : Plan) + { + for (size_t Idx : Group) + { + Entries[Idx].IsPacked = true; + } + } + } + return Plans; + } + + // Reads each source file in Plan, hashes the concatenation, writes raw bytes to + // TempPath. Throws on size mismatch (message includes ModuleId for grep). Scratch + // is owned by the caller and reused across packs; size must be >= the largest + // candidate file (i.e. the pack threshold). + BuiltPack BuildPack(const PackPlan& Plan, + const std::vector<Entry>& Entries, + const std::filesystem::path& ServerStateDir, + const std::filesystem::path& TempPath, + std::string_view ModuleId, + std::vector<uint8_t>& Scratch) + { + BuiltPack BP; + BP.Entries.reserve(Plan.size()); + IoHashStream Hasher; + uint64_t Offset = 0; + { + BasicFile PackFile(TempPath, BasicFile::Mode::kTruncate); + BasicFileWriter Writer(PackFile, /*BufferSize*/ 64 * 1024); + for (const EntryGroup& Group : Plan) + { + // Every Entries[idx] in a group shares the same content hash (= same bytes), + // so the first one is a fine source. + const Entry& Rep = Entries[Group.front()]; + std::filesystem::path AbsPath = MakeSafeAbsolutePath(ServerStateDir / Rep.RelativePath); + BasicFile Src(AbsPath, BasicFile::Mode::kRead); + const uint64_t Size = Src.FileSize(); + if (Size != Rep.Size || Size > Scratch.size()) + { + throw zen::runtime_error("Pack entry for hash {} (module '{}'): expected {} bytes, file is {} at '{}'"sv, + Rep.Hash, + ModuleId, + Rep.Size, + Size, + AbsPath); + } + Src.Read(Scratch.data(), Size, 0); + Hasher.Append(Scratch.data(), Size); + Writer.Write(Scratch.data(), Size, Offset); + Offset += Size; + BP.Entries.push_back(BuiltPackEntry{.Hash = Rep.Hash, .Size = Rep.Size}); + } + Writer.Flush(); + } + + BP.PackHash = Hasher.GetHash(); + BP.Size = Offset; + return BP; + } + + // Schedules either a Put (if Hash is not in CAS) or a Touch (if it is). Updates + // counters on the matching PhaseStats - Files++ in both cases, Bytes+=Size on the + // touch path so touched-bytes accounting tracks size-equivalent work that did not + // transfer. Both paths call RecordScheduled so the queue-wait line covers cache-warm + // dehydrates (only Touches scheduled). Used for both loose CAS (UploadStats=Stats.Upload, + // TouchStats=Stats.Touch) and pack blobs (UploadStats=Stats.PackUpload, TouchStats= + // Stats.PackTouch). UploadStats and TouchStats must be distinct PhaseStats so the upload- + // throughput metric is not inflated by touched bytes that did not transfer. + void ScheduleUploadOrTouch(StorageBase& Storage, + ParallelWork& Work, + WorkerThreadPool& Pool, + const std::unordered_set<IoHash, IoHash::Hasher>& ExistsLookup, + const IoHash& Hash, + uint64_t Size, + const std::filesystem::path& SourcePath, + PhaseStats& UploadStats, + PhaseStats& TouchStats) + { + if (ExistsLookup.contains(Hash)) + { + // Refresh the backend's modification time so lifecycle-expiration policies + // do not evict CAS entries that are still referenced by this module. + Storage.Touch(Work, Pool, Hash, TouchStats); + TouchStats.Files.fetch_add(1, std::memory_order_relaxed); + TouchStats.Bytes.fetch_add(Size, std::memory_order_relaxed); + TouchStats.RecordScheduled(); + } + else + { + Storage.Put(Work, Pool, Hash, Size, SourcePath, UploadStats); + UploadStats.Files.fetch_add(1, std::memory_order_relaxed); + UploadStats.RecordScheduled(); + } + } + + // Builds and saves the dehydrate state.cbo: header fields, optional Packs[] array, + // and the Files[] array. ModuleId is stored in the manifest. + void WriteDehydrateMetadata(StorageBase& Storage, + const std::filesystem::path& ServerStateDir, + std::string_view ModuleId, + uint64_t TotalBytes, + uint64_t DehydrateDurationMs, + const std::vector<BuiltPack>& BuiltPacks, + const std::vector<Entry>& Entries) + { + UtcTime Now = UtcTime::Now(); + std::string DehydrateTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z"sv, + Now.Tm.tm_year + 1900, + Now.Tm.tm_mon + 1, + Now.Tm.tm_mday, + Now.Tm.tm_hour, + Now.Tm.tm_min, + Now.Tm.tm_sec, + Now.Ms); + + CbObjectWriter Meta; + Meta << "SchemaVersion"sv << HydrationSchemaVersion; + Meta << "SourceFolder"sv << ServerStateDir.generic_string(); + Meta << "ModuleId"sv << ModuleId; + Meta << "HostName"sv << GetMachineName(); + Meta << "DehydrateTimeUtc"sv << DehydrateTimeUtc; + Meta << "DehydrateDurationMs"sv << DehydrateDurationMs; + Meta << "TotalSizeBytes"sv << TotalBytes; + Meta << "StorageSettings"sv << Storage.GetSettings(); + + if (!BuiltPacks.empty()) + { + Meta.BeginArray("Packs"sv); + for (const BuiltPack& BP : BuiltPacks) + { + Meta.BeginObject(); + { + Meta << "Hash"sv << BP.PackHash; + Meta << "Size"sv << BP.Size; + Meta.BeginArray("Entries"sv); + for (const BuiltPackEntry& BPE : BP.Entries) + { + Meta.BeginObject(); + { + Meta << "Hash"sv << BPE.Hash; + Meta << "Size"sv << BPE.Size; + } + Meta.EndObject(); + } + Meta.EndArray(); + } + Meta.EndObject(); + } + Meta.EndArray(); + } + + Meta.BeginArray("Files"sv); + for (const Entry& CurrentEntry : Entries) + { + Meta.BeginObject(); + { + Meta << "Path"sv << CurrentEntry.RelativePath; + Meta << "Size"sv << CurrentEntry.Size; + Meta << "ModTick"sv << CurrentEntry.ModTick; + Meta << "Hash"sv << CurrentEntry.Hash; + if (CurrentEntry.IsPacked) + { + Meta << "PackHash"sv << CurrentEntry.PackHash; + } + } + Meta.EndObject(); + } + Meta.EndArray(); + + Storage.SaveMetadata(Meta.Save()); + } + + // Parses Meta["Files"sv] into Entries[] + path lookup. Reads PackHash and sets + // IsPacked when present. Used by Hydrate. + void ParseFilesArray(const CbObject& Meta, + std::vector<Entry>& OutEntries, + std::unordered_map<std::string, size_t>& OutLookup, + uint64_t& OutTotalSize) + { + OutTotalSize = 0; + for (CbFieldView FieldView : Meta["Files"sv]) + { + CbObjectView EntryView = FieldView.AsObjectView(); + if (EntryView) + { + Entry NewEntry = {.RelativePath{EntryView["Path"sv].AsString()}, + .Size = EntryView["Size"sv].AsUInt64(), + .ModTick = EntryView["ModTick"sv].AsUInt64(), + .Hash = EntryView["Hash"sv].AsHash()}; + IoHash PackHash = EntryView["PackHash"sv].AsHash(); + if (PackHash != IoHash::Zero) + { + NewEntry.IsPacked = true; + NewEntry.PackHash = PackHash; + } + OutTotalSize += NewEntry.Size; + OutLookup.insert_or_assign(NewEntry.RelativePath, OutEntries.size()); + OutEntries.emplace_back(std::move(NewEntry)); + } + } + } + + // Parses Meta["Packs"sv] into a hash-keyed descriptor map. Each PackDescriptor's + // Entries[] gets a prefix-sum offset for O(1) slice lookup at unpack time. + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> ParsePacksArray(const CbObject& Meta) + { + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap; + for (CbFieldView FieldView : Meta["Packs"sv]) + { + CbObjectView PackView = FieldView.AsObjectView(); + if (!PackView) + { + continue; + } + IoHash PackHash = PackView["Hash"sv].AsHash(); + PackDescriptor PD; + PD.Size = PackView["Size"sv].AsUInt64(); + uint64_t Offset = 0; + for (CbFieldView EF : PackView["Entries"sv]) + { + CbObjectView EV = EF.AsObjectView(); + if (!EV) + { + continue; + } + PackEntryDescriptor E{.Hash = EV["Hash"sv].AsHash(), .Size = EV["Size"sv].AsUInt64(), .Offset = Offset}; + Offset += E.Size; + PD.Entries.push_back(E); + } + PackMap.emplace(PackHash, std::move(PD)); + } + return PackMap; + } + + // For each downloaded pack: read it into a heap buffer, verify size, and slice + // into per-entry IoBuffers (zero-copy views). Throws on size mismatch with the + // existing message format. + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> BuildHashToSlice( + const std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher>& PackMap, + const std::filesystem::path& TempDir, + std::string_view ModuleId) + { + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice; + size_t TotalPackEntries = 0; + for (const auto& [PackHash, PD] : PackMap) + { + TotalPackEntries += PD.Entries.size(); + } + HashToSlice.reserve(TotalPackEntries); + + for (const auto& [PackHash, PD] : PackMap) + { + std::filesystem::path PackPath = TempDir / "packs" / fmt::format("{}.bin"sv, PackHash); + // Heap-allocated buffer via direct ReadFile avoids mmap materialization + // and page-fault latency during the parallel unpack-write that follows. + BasicFile PackFile(PackPath, BasicFile::Mode::kRead); + IoBuffer PackBuf = PackFile.ReadAll(); + if (PackBuf.GetSize() != PD.Size) + { + throw zen::runtime_error("Pack '{}' size mismatch for module '{}' at '{}': expected {}, got {}"sv, + PackHash, + ModuleId, + PackPath, + PD.Size, + PackBuf.GetSize()); + } + + for (const auto& E : PD.Entries) + { + HashToSlice.emplace(E.Hash, IoBuffer(PackBuf, E.Offset, E.Size)); + } + } + return HashToSlice; + } + + // Migrates contents of SourceDir into ServerStateDir. Same-volume: top-level rename + // per child. Different-volume: full CopyTree fallback. Caller is responsible for + // final cleanup of the parent temp directory (which may hold sibling staging dirs + // like packs/ that must NOT migrate). + void MigrateTempToState(const std::filesystem::path& SourceDir, + const std::filesystem::path& ServerStateDir, + const HydrationConfig::ThreadingOptions& Threading) + { + // If the two paths share at least one common component they are on the same drive/volume + // and atomic renames will succeed. Otherwise fall back to a full copy. + auto [ItSrc, ItState] = std::mismatch(SourceDir.begin(), SourceDir.end(), ServerStateDir.begin(), ServerStateDir.end()); + if (ItSrc != SourceDir.begin()) + { + DirectoryContent DirContent; + GetDirectoryContent(*Threading.WorkerPool, + SourceDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, + DirContent); + + for (const std::filesystem::path& AbsPath : DirContent.Directories) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'"sv, AbsPath, Dest)); + } + } + for (const std::filesystem::path& AbsPath : DirContent.Files) + { + std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); + std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'"sv, AbsPath, Dest)); + } + } + } + else + { + // Slow path: source and target are on different filesystems, so rename + // would fail. Copy the tree instead. + ZEN_DEBUG("SourceDir and ServerStateDir are on different filesystems - using CopyTree"); + CopyTree(SourceDir, ServerStateDir, {.EnableClone = true}); + } + } + + // Walks ServerStateDir and emits a Files[] cache for the next dehydrate's + // hash-shortcut (mirrors Load state on dehydrate). Files on disk that aren't in + // EntryLookup (manifest) are skipped with a WARN - typically leftovers from an + // earlier crashed hydrate. + CbObject BuildHydrateState(const std::filesystem::path& ServerStateDir, + const std::unordered_map<std::string, size_t>& EntryLookup, + const std::vector<Entry>& Entries, + std::string_view ModuleId, + const HydrationConfig::ThreadingOptions& Threading) + { + DirectoryContent DirContent; + GetDirectoryContent(*Threading.WorkerPool, + ServerStateDir, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | + DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, + DirContent); + + CbObjectWriter HydrateState; + HydrateState.BeginArray("Files"sv); + for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) + { + std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); + std::string RelKey = RelativePath.generic_string(); + + if (auto It = EntryLookup.find(RelKey); It != EntryLookup.end()) + { + HydrateState.BeginObject(); + { + HydrateState << "Path"sv << RelKey; + HydrateState << "Size"sv << DirContent.FileSizes[FileIndex]; + HydrateState << "ModTick"sv << DirContent.FileModificationTicks[FileIndex]; + HydrateState << "Hash"sv << Entries[It->second].Hash; + } + HydrateState.EndObject(); + } + else + { + // File on disk after hydrate but not in the manifest. Can happen when TempDir + // contained leftovers from a prior crashed hydrate that survived to the rename + // phase. Skip it rather than failing - the manifest is the source of truth for + // the cached state; the stray file is harmless and gets caught by the next + // dehydrate's directory scan. + ZEN_WARN("Hydrate: file '{}' present on disk but missing from manifest for module '{}'; skipping", RelKey, ModuleId); + } + } + HydrateState.EndArray(); + + return HydrateState.Save(); + } + + /////////////////////////////////////////////////////////////////////// // IncrementalHydrator implementations - IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, std::unique_ptr<StorageBase> Storage) + IncrementalHydrator::IncrementalHydrator(const HydrationConfig& Config, + std::unique_ptr<StorageBase> Storage, + std::span<const std::string> Excludes) : m_Storage(std::move(Storage)) , m_Config(Config) + , m_Excludes(Excludes.begin(), Excludes.end()) , m_FallbackWorkPool(0) { if (Config.Threading) @@ -760,6 +1720,7 @@ namespace hydration_impl { void IncrementalHydrator::Dehydrate(const CbObject& CachedState) { + ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate"); Stopwatch TotalTimer; DehydrateStatistics Stats; const std::string StorageTarget = m_Storage->Describe(); @@ -767,24 +1728,17 @@ namespace hydration_impl { const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); try { + // Load the cache from the previous dehydrate to short-circuit re-hashing of + // unchanged files (matched by Path+Size+ModTick). std::unordered_map<std::string, size_t> StateEntryLookup; std::vector<Entry> StateEntries; { Stopwatch LoadStateTimer; - for (CbFieldView FieldView : CachedState["Files"].AsArrayView()) - { - CbObjectView EntryView = FieldView.AsObjectView(); - std::filesystem::path RelativePath(EntryView["Path"].AsString()); - uint64_t Size = EntryView["Size"].AsUInt64(); - uint64_t ModTick = EntryView["ModTick"].AsUInt64(); - IoHash Hash = EntryView["Hash"].AsHash(); - - StateEntryLookup.insert_or_assign(RelativePath.generic_string(), StateEntries.size()); - StateEntries.push_back(Entry{.RelativePath = RelativePath, .Size = Size, .ModTick = ModTick, .Hash = Hash}); - } + LoadCachedStateEntries(CachedState, StateEntryLookup, StateEntries); Stats.LoadStateUs = LoadStateTimer.GetElapsedTimeUs(); } + // Scan the server state directory. DirectoryContent DirContent; { Stopwatch DirScanTimer; @@ -802,101 +1756,29 @@ namespace hydration_impl { DirContent.Files.size(), NiceBytes(std::accumulate(DirContent.FileSizes.begin(), DirContent.FileSizes.end(), uint64_t(0)))); + // Hash phase: build Entries[] and schedule hash work for files not in the cache. + // Storage::List runs in parallel with hashing to populate ExistsLookup before Wait. std::vector<Entry> Entries; Entries.resize(DirContent.Files.size()); - uint64_t TotalBytes = 0; - uint64_t TotalFiles = 0; - - std::unordered_set<IoHash> ExistsLookup; - + uint64_t TotalBytes = 0; + uint64_t TotalFiles = 0; + std::unordered_set<IoHash, IoHash::Hasher> ExistsLookup; { + Stats.Hash.PhaseClock.Reset(); Stopwatch HashTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) - { - const std::filesystem::path AbsPath = MakeSafeAbsolutePath(DirContent.Files[FileIndex]); - if (AbsPath.filename() == "reserve.gc") - { - continue; - } - const std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - if (*RelativePath.begin() == ".sentry-native") - { - continue; - } - if (RelativePath == ".lock") - { - continue; - } - - Entry& CurrentEntry = Entries[TotalFiles]; - CurrentEntry.RelativePath = RelativePath; - CurrentEntry.Size = DirContent.FileSizes[FileIndex]; - CurrentEntry.ModTick = DirContent.FileModificationTicks[FileIndex]; - - bool FoundHash = false; - if (auto KnownIt = StateEntryLookup.find(CurrentEntry.RelativePath.generic_string()); KnownIt != StateEntryLookup.end()) - { - const Entry& StateEntry = StateEntries[KnownIt->second]; - if (StateEntry.Size == CurrentEntry.Size && StateEntry.ModTick == CurrentEntry.ModTick) - { - CurrentEntry.Hash = StateEntry.Hash; - FoundHash = true; - } - } - - if (!FoundHash) - { - Work.ScheduleWork(*m_Threading.WorkerPool, - [AbsPath, EntryIndex = TotalFiles, &Entries, &Stats](std::atomic<bool>& AbortFlag) { - Stats.Hash.RecordThread(); - if (AbortFlag) - { - return; - } - - Entry& CurrentEntry = Entries[EntryIndex]; - - bool FoundHash = false; - if (AbsPath.extension().empty()) - { - auto It = CurrentEntry.RelativePath.begin(); - if (It != CurrentEntry.RelativePath.end() && It->filename().string().ends_with("cas")) - { - IoHash RawHash; - uint64_t RawSize; - CompressedBuffer Compressed = CompressedBuffer::FromCompressed( - SharedBuffer(IoBufferBuilder::MakeFromFile(AbsPath)), - RawHash, - RawSize); - if (Compressed) - { - // We compose a meta-hash since taking the RawHash might collide with an - // existing non-compressed file with the same content The collision is - // unlikely except if the compressed data is zero bytes causing RawHash - // to be the same as an empty file. - IoHashStream Hasher; - Hasher.Append(RawHash.Hash, sizeof(RawHash.Hash)); - Hasher.Append(&CurrentEntry.Size, sizeof(CurrentEntry.Size)); - CurrentEntry.Hash = Hasher.GetHash(); - FoundHash = true; - } - } - } - - if (!FoundHash) - { - CurrentEntry.Hash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(AbsPath)); - } - Stats.Hash.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); - }); - Stats.Hash.Files.fetch_add(1, std::memory_order_relaxed); - } - TotalFiles++; - TotalBytes += CurrentEntry.Size; - } + TotalFiles = ScanAndScheduleHashWork(DirContent, + ServerStateDir, + StateEntryLookup, + StateEntries, + m_Excludes, + Entries, + TotalBytes, + Work, + *m_Threading.WorkerPool, + Stats.Hash); { Stopwatch ListTimer; @@ -906,82 +1788,136 @@ namespace hydration_impl { } Work.Wait(); - Entries.resize(TotalFiles); Stats.Hash.ElapsedUs = HashTimer.GetElapsedTimeUs(); Stats.TotalFiles = TotalFiles; Stats.TotalBytes = TotalBytes; } - uint64_t UploadDurationMs = 0; + // Pack planning + unified upload phase. Plan first so we know which entries are + // packed, then run loose-CAS uploads and pack builds inside a single ParallelWork. + // Loose uploads are scheduled up front so they execute on the worker pool while + // the calling thread runs the serial pack-build loop; each completed pack hands + // its upload to the same ParallelWork. One Wait covers everything. + std::vector<BuiltPack> BuiltPacks; + std::vector<std::filesystem::path> StagedPackFiles; + auto PackCleanup = MakeGuard([&] { + RemoveStagedPackFiles(StagedPackFiles); + // Best-effort drop of the now-empty packs/ subdir so TempDir is clean after + // dehydrate. Mirrors the explicit cleanup on the hydrate-side success path. + std::error_code Ec; + DeleteDirectories(MakeSafeAbsolutePath(m_Config.TempDir) / "packs", Ec); + }); + + // PlanPacks tags Entries[Idx].IsPacked on every index that survives into a pack, + // so the loose-upload loop can skip them. PackHash is set later per-pack as each + // pack is built. + const std::vector<PackPlan> Pending = + m_Config.PackEnabled ? PlanPacks(Entries, m_Config.PackThresholdBytes, m_Config.MaxPackBytes) : std::vector<PackPlan>{}; + + uint64_t DehydrateDurationMs = 0; { + // Upload, PackUpload, Touch, and PackTouch share one ParallelWork; reset all + // four PhaseClocks to the same baseline so the queue-wait line can combine + // their FirstScheduleUs / FirstStartUs across the four PhaseStats. + Stats.Upload.PhaseClock.Reset(); + Stats.PackUpload.PhaseClock.Reset(); + Stats.Touch.PhaseClock.Reset(); + Stats.PackTouch.PhaseClock.Reset(); Stopwatch UploadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + // Schedule loose-CAS uploads first so they begin running while the pack-build + // loop below executes serially on this thread. for (const Entry& CurrentEntry : Entries) { - if (!ExistsLookup.contains(CurrentEntry.Hash)) - { - m_Storage->Put(Work, - *m_Threading.WorkerPool, - CurrentEntry.Hash, - CurrentEntry.Size, - MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), - Stats.Upload); - Stats.Upload.Files.fetch_add(1, std::memory_order_relaxed); - } - else + if (CurrentEntry.IsPacked) { - // Refresh the backend's modification time so lifecycle-expiration policies - // do not evict CAS entries that are still referenced by this module. - m_Storage->Touch(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, Stats.Touch); - Stats.Touch.Files.fetch_add(1, std::memory_order_relaxed); - Stats.Touch.Bytes.fetch_add(CurrentEntry.Size, std::memory_order_relaxed); + continue; // pack phase covers it } + ScheduleUploadOrTouch(*m_Storage, + Work, + *m_Threading.WorkerPool, + ExistsLookup, + CurrentEntry.Hash, + CurrentEntry.Size, + MakeSafeAbsolutePath(ServerStateDir / CurrentEntry.RelativePath), + Stats.Upload, + Stats.Touch); } - Work.Wait(); - Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); - UploadDurationMs = TotalTimer.GetElapsedTimeMs(); - - Stopwatch MetadataTimer; - UtcTime Now = UtcTime::Now(); - std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z", - Now.Tm.tm_year + 1900, - Now.Tm.tm_mon + 1, - Now.Tm.tm_mday, - Now.Tm.tm_hour, - Now.Tm.tm_min, - Now.Tm.tm_sec, - Now.Ms); - - CbObjectWriter Meta; - Meta << "SourceFolder" << ServerStateDir.generic_string(); - Meta << "ModuleId" << m_Config.ModuleId; - Meta << "HostName" << GetMachineName(); - Meta << "UploadTimeUtc" << UploadTimeUtc; - Meta << "UploadDurationMs" << UploadDurationMs; - Meta << "TotalSizeBytes" << TotalBytes; - Meta << "StorageSettings" << m_Storage->GetSettings(); - - Meta.BeginArray("Files"); - for (const Entry& CurrentEntry : Entries) + if (!Pending.empty()) { - Meta.BeginObject(); + ZEN_TRACE_CPU("IncrementalHydrator::Dehydrate::Pack"); + std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + std::filesystem::path PacksDir = TempDir / "packs"; + CreateDirectories(PacksDir); + + // Reusable scratch for small-file reads. Every pack candidate has Size < + // PackThresholdBytes so a single buffer of that size holds any one file. + // Build runs serially on the caller's thread - typical modules produce 1-2 + // packs at ~5 ms each, too small to be worth the parallel-dispatch overhead. + std::vector<uint8_t> Scratch(m_Config.PackThresholdBytes); + + for (const PackPlan& Plan : Pending) { - Meta << "Path" << CurrentEntry.RelativePath.generic_string(); - Meta << "Size" << CurrentEntry.Size; - Meta << "ModTick" << CurrentEntry.ModTick; - Meta << "Hash" << CurrentEntry.Hash; + // Pre-register the staging path so PackCleanup removes it even if the + // stream-write loop below throws mid-flight. + Oid::String_t OidStr; + Oid::NewOid().ToString(OidStr); + std::filesystem::path PackTempPath = PacksDir / fmt::format("{}.bin"sv, OidStr); + StagedPackFiles.push_back(PackTempPath); + + Stopwatch BuildTimer; + BuiltPack BP = BuildPack(Plan, Entries, ServerStateDir, PackTempPath, m_Config.ModuleId, Scratch); + Stats.PackBuildUs.fetch_add(BuildTimer.GetElapsedTimeUs(), std::memory_order_relaxed); + + // Stamp the pack hash on every matching entry; state.cbo's Files[] reads + // PackHash off these entries when emitting per-file PackHash references. + uint64_t PackedEntryCount = 0; + for (const EntryGroup& Group : Plan) + { + for (size_t Idx : Group) + { + Entries[Idx].PackHash = BP.PackHash; + } + PackedEntryCount += Group.size(); + } + + Stats.PackCount.fetch_add(1, std::memory_order_relaxed); + Stats.PackedFiles.fetch_add(PackedEntryCount, std::memory_order_relaxed); + Stats.PackBytes.fetch_add(BP.Size, std::memory_order_relaxed); + + ScheduleUploadOrTouch(*m_Storage, + Work, + *m_Threading.WorkerPool, + ExistsLookup, + BP.PackHash, + BP.Size, + PackTempPath, + Stats.PackUpload, + Stats.PackTouch); + + BuiltPacks.push_back(std::move(BP)); } - Meta.EndObject(); } - Meta.EndArray(); - m_Storage->SaveMetadata(Meta.Save()); - Stats.MetadataSaveUs = MetadataTimer.GetElapsedTimeUs(); + Work.Wait(); + // Upload, PackUpload, Touch, and PackTouch share a single ParallelWork. Only + // Upload's ElapsedUs is read by the formatter; the others' bytes/requests are + // reported against the same Upload phase elapsed. + Stats.Upload.ElapsedUs = UploadTimer.GetElapsedTimeUs(); + DehydrateDurationMs = TotalTimer.GetElapsedTimeMs(); } + // Persist the new state.cbo with header, Packs[], and Files[]. + { + Stopwatch SaveMetadataTimer; + WriteDehydrateMetadata(*m_Storage, ServerStateDir, m_Config.ModuleId, TotalBytes, DehydrateDurationMs, BuiltPacks, Entries); + Stats.SaveMetadataUs = SaveMetadataTimer.GetElapsedTimeUs(); + } + + // Server-state dir contents have been uploaded; wipe them. ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { Stopwatch CleanTimer; @@ -990,7 +1926,7 @@ namespace hydration_impl { } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogDehydrateSummary("Dehydration complete", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); + LogDehydrateSummary("Dehydration complete"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } catch (const std::exception& Ex) { @@ -999,20 +1935,35 @@ namespace hydration_impl { Ex.what(), m_Config.ServerStateDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogDehydrateSummary("Dehydration failed", Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); + LogDehydrateSummary("Dehydration failed"sv, Stats, m_Config.ModuleId, ServerStateDir, StorageTarget); } } CbObject IncrementalHydrator::Hydrate() { + ZEN_TRACE_CPU("IncrementalHydrator::Hydrate"); Stopwatch TotalTimer; HydrateStatistics Stats; const std::string StorageSource = m_Storage->Describe(); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); + // Hydrated files land in TempDir/state/, pack staging blobs in TempDir/packs/. Keeping + // them in sibling subdirectories means MigrateTempToState only needs to hand the state/ + // subtree across to ServerStateDir; pack staging never has a chance to leak into the + // final server state directory. + const std::filesystem::path TempStateDir = TempDir / "state"; try { + CreateDirectories(ServerStateDir); + CreateDirectories(TempDir); + // A prior hydrate may have crashed after downloading but before the rename phase, + // leaving stale files in TempDir that would otherwise get migrated into + // ServerStateDir and trip the post-rename manifest check. + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + CreateDirectories(TempStateDir); + + // Load metadata; absent metadata means a fresh module - clean state, return. CbObject Meta; { Stopwatch LoadTimer; @@ -1028,147 +1979,165 @@ namespace hydration_impl { return CbObject(); } + // Schema-version gate: refuse manifests written by a newer hub. Missing field is + // treated as version 0 (legacy / pre-versioning) and decoded best-effort - the + // optional fields (Packs[], StorageSettings) absent from v0 manifests fall back + // to defaults via ParsePacksArray / ParseSettings. + const uint32_t SchemaVersion = Meta["SchemaVersion"sv].AsUInt32(0); + if (SchemaVersion > HydrationSchemaVersion) + { + throw zen::runtime_error("State manifest for module '{}' has schema version {} but this hub supports up to {}"sv, + m_Config.ModuleId, + SchemaVersion, + HydrationSchemaVersion); + } + + // Parse manifest: Files[] for per-file metadata, Packs[] (optional) for pack + // composition. Missing Packs[] = old-format state; treated as all-standalone. std::unordered_map<std::string, size_t> EntryLookup; std::vector<Entry> Entries; uint64_t TotalSize = 0; - - for (CbFieldView FieldView : Meta["Files"]) - { - CbObjectView EntryView = FieldView.AsObjectView(); - if (EntryView) - { - Entry NewEntry = {.RelativePath = std::filesystem::path(EntryView["Path"].AsString()), - .Size = EntryView["Size"].AsUInt64(), - .ModTick = EntryView["ModTick"].AsUInt64(), - .Hash = EntryView["Hash"].AsHash()}; - TotalSize += NewEntry.Size; - EntryLookup.insert_or_assign(NewEntry.RelativePath.generic_string(), Entries.size()); - Entries.emplace_back(std::move(NewEntry)); - } - } + ParseFilesArray(Meta, Entries, EntryLookup, TotalSize); + std::unordered_map<IoHash, PackDescriptor, IoHash::Hasher> PackMap = ParsePacksArray(Meta); Stats.TotalFiles = Entries.size(); Stats.TotalBytes = TotalSize; + Stats.PackCount = PackMap.size(); - ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files", + ZEN_INFO("Hydrating module '{}' to folder '{}'. {} ({}) files, {} packs", m_Config.ModuleId, m_Config.ServerStateDir, Entries.size(), - NiceBytes(TotalSize)); - - m_Storage->ParseSettings(Meta["StorageSettings"].AsObjectView()); + NiceBytes(TotalSize), + PackMap.size()); + + // Re-apply storage settings from state.cbo (e.g. S3 multipart chunk size). + m_Storage->ParseSettings(Meta["StorageSettings"sv].AsObjectView()); + + // Per-entry destination paths under TempStateDir, indexed parallel to Entries[]. Used + // by the standalone download dispatch and (for IsPacked entries) by the unpack + // dispatch. Pre-creating parents once for the union covers both phases without a second pass. + std::vector<std::filesystem::path> EntryPaths; + EntryPaths.reserve(Entries.size()); + for (const Entry& CurrentEntry : Entries) + { + EntryPaths.push_back(MakeSafeAbsolutePath(TempStateDir / CurrentEntry.RelativePath)); + } + { + Stopwatch CreateDirsTimer; + auto RecordElapsed = MakeGuard([&] { Stats.CreateDirsUs = CreateDirsTimer.GetElapsedTimeUs(); }); + Stats.CreateDirsCount = CreateParentDirectories(EntryPaths); + } + // Download phase: pack GETs first (so unpack can begin sooner), then standalone files. + // Both share the same ParallelWork; per-phase byte / request counts stay separate via + // PackDownload vs Download stats while the elapsed time is reported once. { + // Download and PackDownload share one ParallelWork; reset both PhaseClocks + // to the same baseline so the queue-wait line can combine their FirstScheduleUs + // / FirstStartUs across the two PhaseStats. + Stats.Download.PhaseClock.Reset(); + Stats.PackDownload.PhaseClock.Reset(); Stopwatch DownloadTimer; ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); - for (const Entry& CurrentEntry : Entries) + const std::filesystem::path PacksDir = TempDir / "packs"; + if (!PackMap.empty()) + { + CreateDirectories(PacksDir); + } + + for (const auto& [PackHash, PD] : PackMap) + { + std::filesystem::path PackPath = PacksDir / fmt::format("{}.bin"sv, PackHash); + m_Storage->Get(Work, *m_Threading.WorkerPool, PackHash, PD.Size, PackPath, Stats.PackDownload); + Stats.PackDownload.Files.fetch_add(1, std::memory_order_relaxed); + Stats.PackDownload.RecordScheduled(); + } + + for (size_t I = 0; I < Entries.size(); ++I) { - std::filesystem::path Path = MakeSafeAbsolutePath(TempDir / CurrentEntry.RelativePath); - CreateDirectories(Path.parent_path()); - m_Storage->Get(Work, *m_Threading.WorkerPool, CurrentEntry.Hash, CurrentEntry.Size, Path, Stats.Download); + if (Entries[I].IsPacked) + { + continue; // handled in the unpack phase below + } + m_Storage->Get(Work, *m_Threading.WorkerPool, Entries[I].Hash, Entries[I].Size, EntryPaths[I], Stats.Download); Stats.Download.Files.fetch_add(1, std::memory_order_relaxed); + Stats.Download.RecordScheduled(); } Work.Wait(); Stats.Download.ElapsedUs = DownloadTimer.GetElapsedTimeUs(); } - // Downloaded successfully - swap into ServerStateDir - ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); + // Unpack phase: verify each downloaded pack, build hash->slice map, parallel-write. + if (!PackMap.empty()) { - Stopwatch CleanTimer; - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); - Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); - } + ZEN_TRACE_CPU("IncrementalHydrator::Hydrate::Unpack"); + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> HashToSlice = BuildHashToSlice(PackMap, TempDir, m_Config.ModuleId); - { - Stopwatch RenameTimer; - // If the two paths share at least one common component they are on the same drive/volume - // and atomic renames will succeed. Otherwise fall back to a full copy. - auto [ItTmp, ItState] = std::mismatch(TempDir.begin(), TempDir.end(), ServerStateDir.begin(), ServerStateDir.end()); - if (ItTmp != TempDir.begin()) { - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - TempDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, - DirContent); - - for (const std::filesystem::path& AbsPath : DirContent.Directories) + Stopwatch UnpackTimer; + ParallelWork UnpackWork(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + for (size_t I = 0; I < Entries.size(); ++I) { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameDirectoryWithRetry(AbsPath, Dest); - if (Ec) + if (!Entries[I].IsPacked) { - throw std::system_error(Ec, fmt::format("Failed to rename directory from '{}' to '{}'", AbsPath, Dest)); + continue; } - } - for (const std::filesystem::path& AbsPath : DirContent.Files) - { - std::filesystem::path Dest = MakeSafeAbsolutePath(ServerStateDir / AbsPath.filename()); - std::error_code Ec = RenameFileWithRetry(AbsPath, Dest); - if (Ec) + auto It = HashToSlice.find(Entries[I].Hash); + if (It == HashToSlice.end()) { - throw std::system_error(Ec, fmt::format("Failed to rename file from '{}' to '{}'", AbsPath, Dest)); + throw zen::runtime_error("Packed file '{}' references unknown pack content hash '{}' in module '{}'"sv, + Entries[I].RelativePath, + Entries[I].Hash, + m_Config.ModuleId); } + UnpackWork.ScheduleWork(*m_Threading.WorkerPool, + [&Stats, &Path = EntryPaths[I], Slice = &It->second](std::atomic<bool>& AbortFlag) { + if (AbortFlag.load()) + { + return; + } + TemporaryFile::SafeWriteFile(Path, *Slice); + Stats.UnpackWriteBytes.fetch_add(Slice->GetSize(), std::memory_order_relaxed); + Stats.PackedFiles.fetch_add(1, std::memory_order_relaxed); + }); } - - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + UnpackWork.Wait(); + Stats.PackUnpackUs = UnpackTimer.GetElapsedTimeUs(); } - else - { - // Slow path: TempDir and ServerStateDir are on different filesystems, so rename - // would fail. Copy the tree instead and clean up the temp files afterwards. - ZEN_DEBUG("TempDir and ServerStateDir are on different filesystems - using CopyTree"); - CopyTree(TempDir, ServerStateDir, {.EnableClone = true}); - ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); - CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); - } - Stats.RenameOrCopyUs = RenameTimer.GetElapsedTimeUs(); + // Release the pack buffers (each IoBuffer slice holds a ref to the pack's underlying + // heap buffer) before the rename/verify phase runs - avoids keeping ~sum(pack sizes) + // bytes alive across those phases. + HashToSlice.clear(); } - CbObject StateObject; + // Downloaded successfully - swap TempStateDir contents into ServerStateDir, then + // sweep the rest of TempDir (empty TempStateDir, packs/, anything else). + ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir); { - Stopwatch VerifyTimer; - DirectoryContent DirContent; - GetDirectoryContent(*m_Threading.WorkerPool, - ServerStateDir, - DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | - DirectoryContentFlags::IncludeFileSizes | DirectoryContentFlags::IncludeModificationTick, - DirContent); - - CbObjectWriter HydrateState; - HydrateState.BeginArray("Files"); - for (size_t FileIndex = 0; FileIndex < DirContent.Files.size(); FileIndex++) - { - std::filesystem::path RelativePath = FastRelativePath(ServerStateDir, DirContent.Files[FileIndex]); - - if (auto It = EntryLookup.find(RelativePath.generic_string()); It != EntryLookup.end()) - { - HydrateState.BeginObject(); - { - HydrateState << "Path" << RelativePath.generic_string(); - HydrateState << "Size" << DirContent.FileSizes[FileIndex]; - HydrateState << "ModTick" << DirContent.FileModificationTicks[FileIndex]; - HydrateState << "Hash" << Entries[It->second].Hash; - } - HydrateState.EndObject(); - } - else - { - ZEN_ASSERT(false); - } - } - HydrateState.EndArray(); + Stopwatch CleanTimer; + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); + Stats.CleanUs = CleanTimer.GetElapsedTimeUs(); + } + { + Stopwatch FinalizeTimer; + MigrateTempToState(TempStateDir, ServerStateDir, m_Threading); + CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); + Stats.FinalizeUs = FinalizeTimer.GetElapsedTimeUs(); + } - StateObject = HydrateState.Save(); - Stats.VerifyScanUs = VerifyTimer.GetElapsedTimeUs(); + // Build the cached state that the next Dehydrate will receive (mirrors Load state on dehydrate). + CbObject StateObject; + { + Stopwatch BuildStateTimer; + StateObject = BuildHydrateState(ServerStateDir, EntryLookup, Entries, m_Config.ModuleId, m_Threading); + Stats.BuildStateUs = BuildStateTimer.GetElapsedTimeUs(); } Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogHydrateSummary("Hydration complete", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + LogHydrateSummary("Hydration complete"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return StateObject; } @@ -1182,25 +2151,42 @@ namespace hydration_impl { ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir); CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, TempDir); Stats.TotalUs = TotalTimer.GetElapsedTimeUs(); - LogHydrateSummary("Hydration failed", Stats, m_Config.ModuleId, StorageSource, ServerStateDir); + LogHydrateSummary("Hydration failed"sv, Stats, m_Config.ModuleId, StorageSource, ServerStateDir); return {}; } } void IncrementalHydrator::Obliterate() { + ZEN_TRACE_CPU("IncrementalHydrator::Obliterate"); const std::filesystem::path ServerStateDir = MakeSafeAbsolutePath(m_Config.ServerStateDir); const std::filesystem::path TempDir = MakeSafeAbsolutePath(m_Config.TempDir); - try - { + auto TryDeleteBackend = [&]() { ParallelWork Work(*m_Threading.AbortFlag, *m_Threading.PauseFlag, WorkerThreadPool::EMode::EnableBacklog); m_Storage->Delete(Work, *m_Threading.WorkerPool); Work.Wait(); + }; + + try + { + TryDeleteBackend(); } catch (const std::exception& Ex) { - ZEN_WARN("Failed to delete backend storage for module '{}': {}. Proceeding with local cleanup.", m_Config.ModuleId, Ex.what()); + ZEN_WARN("Obliterate backend delete failed for module '{}' (attempt 1/2): {}. Retrying once.", m_Config.ModuleId, Ex.what()); + try + { + TryDeleteBackend(); + } + catch (const std::exception& Ex2) + { + ZEN_WARN( + "Obliterate backend delete failed for module '{}' (attempt 2/2): {}. Proceeding with local cleanup; backend data may " + "remain.", + m_Config.ModuleId, + Ex2.what()); + } } CleanDirectory(*m_Threading.WorkerPool, *m_Threading.AbortFlag, *m_Threading.PauseFlag, ServerStateDir); @@ -1243,26 +2229,33 @@ private: uint64_t m_DefaultMultipartChunkSize; }; +HydrationBase::HydrationBase(const Configuration& Config) +{ + using namespace hydration_impl; + CbFieldView ExcludesField = Config.Options["excludes"sv]; + m_Excludes = ExcludesField.HasValue() ? ParseStringArray(ExcludesField) : DefaultExcludes(); +} + /////////////////////////////////////////////////////////////////////////// // Implementations -FileHydration::FileHydration(const Configuration& Config) +FileHydration::FileHydration(const Configuration& Config) : HydrationBase(Config) { if (!Config.TargetSpecification.empty()) { m_StorageRoot = Utf8ToWide(Config.TargetSpecification.substr(hydration_impl::FileStorage::Prefix.length())); if (m_StorageRoot.empty()) { - throw zen::runtime_error("Hydration config 'file' type requires a directory path"); + throw zen::runtime_error("Hydration config 'file' type requires a directory path"sv); } } else { - CbObjectView Settings = Config.Options["settings"].AsObjectView(); - std::string_view Path = Settings["path"].AsString(); + CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); + std::string_view Path = Settings["path"sv].AsString(); if (Path.empty()) { - throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"); + throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'"sv); } m_StorageRoot = Utf8ToWide(std::string(Path)); } @@ -1273,14 +2266,14 @@ std::unique_ptr<HydrationStrategyBase> FileHydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; - return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId)); + return std::make_unique<IncrementalHydrator>(Config, std::make_unique<FileStorage>(m_StorageRoot / Config.ModuleId), m_Excludes); } -S3Hydration::S3Hydration(const Configuration& Config) +S3Hydration::S3Hydration(const Configuration& Config) : HydrationBase(Config) { using namespace hydration_impl; - CbObjectView Settings = Config.Options["settings"].AsObjectView(); + CbObjectView Settings = Config.Options["settings"sv].AsObjectView(); std::string_view Spec; if (!Config.TargetSpecification.empty()) { @@ -1289,10 +2282,10 @@ S3Hydration::S3Hydration(const Configuration& Config) } else { - std::string_view Uri = Settings["uri"].AsString(); + std::string_view Uri = Settings["uri"sv].AsString(); if (Uri.empty()) { - throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"); + throw zen::runtime_error("Incremental S3 hydration config requires 'settings.uri'"sv); } Spec = Uri; Spec.remove_prefix(S3Storage::Prefix.size()); @@ -1304,10 +2297,10 @@ S3Hydration::S3Hydration(const Configuration& Config) if (m_Bucket.empty()) { - throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"); + throw zen::runtime_error("Incremental S3 hydration config requires a bucket name"sv); } - std::string Region = std::string(Settings["region"].AsString()); + std::string Region = std::string(Settings["region"sv].AsString()); if (Region.empty()) { Region = GetEnvVariable("AWS_DEFAULT_REGION"); @@ -1322,11 +2315,11 @@ S3Hydration::S3Hydration(const Configuration& Config) } m_Region = std::move(Region); - std::string_view Endpoint = Settings["endpoint"].AsString(); + std::string_view Endpoint = Settings["endpoint"sv].AsString(); if (!Endpoint.empty()) { m_Endpoint = std::string(Endpoint); - m_PathStyle = Settings["path-style"].AsBool(); + m_PathStyle = Settings["path-style"sv].AsBool(); } std::string AccessKeyId = GetEnvVariable("AWS_ACCESS_KEY_ID"); @@ -1341,7 +2334,7 @@ S3Hydration::S3Hydration(const Configuration& Config) m_Credentials.SessionToken = GetEnvVariable("AWS_SESSION_TOKEN"); } - m_DefaultMultipartChunkSize = Settings["chunksize"].AsUInt64(S3Storage::DefaultMultipartChunkSize); + m_DefaultMultipartChunkSize = Settings["chunksize"sv].AsUInt64(DefaultMultipartChunkSize); S3ClientOptions ClientOptions; ClientOptions.BucketName = m_Bucket; @@ -1357,6 +2350,11 @@ S3Hydration::S3Hydration(const Configuration& Config) ClientOptions.Credentials = m_Credentials; } ClientOptions.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u; + // Retry transient HTTP failures (429 throttle, 503 SlowDown, 5xx, connection errors) at the + // HTTP layer. CurlHttpClient::DoWithRetry uses 100*(Attempt+1) ms linear backoff between + // attempts. Three retries covers brief S3 rate-limit bursts without holding worker threads + // for long under sustained throttle. + ClientOptions.HttpSettings.RetryCount = 3; m_Client = std::make_unique<S3Client>(ClientOptions); } @@ -1365,10 +2363,12 @@ std::unique_ptr<HydrationStrategyBase> S3Hydration::CreateHydrator(const HydrationConfig& Config) { using namespace hydration_impl; - std::string KeyPrefix = m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}", m_KeyPrefixRoot, Config.ModuleId); + std::string KeyPrefix = + m_KeyPrefixRoot.empty() ? std::string(Config.ModuleId) : fmt::format("{}/{}"sv, m_KeyPrefixRoot, Config.ModuleId); return std::make_unique<IncrementalHydrator>( Config, - std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize)); + std::make_unique<S3Storage>(*m_Client, std::move(KeyPrefix), Config.TempDir, m_DefaultMultipartChunkSize), + m_Excludes); } std::unique_ptr<HydrationBase> @@ -1386,10 +2386,10 @@ InitHydration(const HydrationBase::Configuration& Config) { return std::make_unique<S3Hydration>(Config); } - throw zen::runtime_error("Unknown hydration strategy: {}", Config.TargetSpecification); + throw zen::runtime_error("Unknown hydration strategy: {}"sv, Config.TargetSpecification); } - std::string_view Type = Config.Options["type"].AsString(); + std::string_view Type = Config.Options["type"sv].AsString(); if (Type == FileStorage::Type) { return std::make_unique<FileHydration>(Config); @@ -1400,9 +2400,9 @@ InitHydration(const HydrationBase::Configuration& Config) } if (!Type.empty()) { - throw zen::runtime_error("Unknown hydration target type '{}'", Type); + throw zen::runtime_error("Unknown hydration target type '{}'"sv, Type); } - throw zen::runtime_error("No hydration target configured"); + throw zen::runtime_error("No hydration target configured"sv); } #if ZEN_WITH_TESTS @@ -1485,6 +2485,34 @@ namespace { } } + // Test fixture that centralizes the common scaffolding for file-backed hydration tests: + // a scratch temp dir containing the server state, the hydration store, and the hydration + // temp dir, plus an initialized FileHydration backend. + struct FileHarness + { + ScopedTemporaryDirectory TempDir; + std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; + std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + std::unique_ptr<HydrationBase> Hydration; + + FileHarness() + { + CreateDirectories(ServerStateDir); + CreateDirectories(HydrationStore); + CreateDirectories(HydrationTemp); + Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + } + + HydrationConfig MakeConfig(std::string_view ModuleId, HydrationConfig Overrides = {}) const + { + Overrides.ServerStateDir = ServerStateDir; + Overrides.TempDir = HydrationTemp; + Overrides.ModuleId = std::string(ModuleId); + return Overrides; + } + }; + } // namespace TEST_SUITE_BEGIN("server.hydration"); @@ -1495,140 +2523,386 @@ TEST_SUITE_BEGIN("server.hydration"); TEST_CASE("hydration.file.dehydrate_hydrate") { - ScopedTemporaryDirectory TempDir; + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + constexpr auto ModuleId = "testmodule"; + const auto Config = H.MakeConfig(ModuleId); - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); - const std::string ModuleId = "testmodule"; - auto TestFiles = CreateSmallTestTree(ServerStateDir); - - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; +TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +{ + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("testmodule"); - // Dehydrate: copy server state to file store - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - // Verify the module folder exists in the store and ServerStateDir was wiped - CHECK(std::filesystem::exists(HydrationStore / ModuleId)); - CHECK(std::filesystem::is_empty(ServerStateDir)); + // Stale file must be wiped by the rehydrate. + WriteFile(H.ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + H.Hydration->CreateHydrator(Config)->Hydrate(); - // Hydrate: restore server state from file store - Hydration->CreateHydrator(Config)->Hydrate(); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "stale.bin")); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Verify restored contents match the original - VerifyTree(ServerStateDir, TestFiles); +TEST_CASE("hydration.file.excluded_files_not_dehydrated") +{ + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + + // Files matched by the built-in DefaultExcludes() set in hydration.cpp. Each must be + // skipped during dehydrate and not be recreated by hydrate. + CreateDirectories(H.ServerStateDir / "gc"); + WriteFile(H.ServerStateDir / "gc" / "reserve.gc", CreateSemiRandomBlob(64)); + CreateDirectories(H.ServerStateDir / ".sentry-native"); + WriteFile(H.ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); + WriteFile(H.ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); + WriteFile(H.ServerStateDir / "state_marker", CreateSemiRandomBlob(16)); + WriteFile(H.ServerStateDir / ".lock", CreateSemiRandomBlob(8)); + WriteFile(H.ServerStateDir / "snapshot.bak", CreateSemiRandomBlob(48)); + CreateDirectories(H.ServerStateDir / "auth"); + WriteFile(H.ServerStateDir / "auth" / "authstate", CreateSemiRandomBlob(96)); + + const auto Config = H.MakeConfig("testmodule_excl"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + + CleanDirectory(H.ServerStateDir, true); + H.Hydration->CreateHydrator(Config)->Hydrate(); + + VerifyTree(H.ServerStateDir, TestFiles); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "gc" / "reserve.gc")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".sentry-native")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "state_marker")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / ".lock")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "snapshot.bak")); + CHECK_FALSE(std::filesystem::exists(H.ServerStateDir / "auth" / "authstate")); } -TEST_CASE("hydration.file.hydrate_overwrites_existing_state") +TEST_CASE("hydration.options.excludes_override") { + // Explicit `excludes` replaces the built-in default list outright; `.lock` is not + // in the override list, so it must appear in the manifest. Use the Options-only + // path (type + settings.path) so the same Options object also carries the override + // `excludes` array. ScopedTemporaryDirectory TempDir; - - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; + std::filesystem::path ServerStateDir = TempDir.Path() / "state"; + std::filesystem::path HydrationStore = TempDir.Path() / "store"; + std::filesystem::path HydrationTemp = TempDir.Path() / "tmp"; CreateDirectories(ServerStateDir); CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); + WriteFile(ServerStateDir / "regular.bin", CreateSemiRandomBlob(64)); + WriteFile(ServerStateDir / ".lock", CreateSemiRandomBlob(8)); - auto TestFiles = CreateSmallTestTree(ServerStateDir); - - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); - - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule"}; + CbObjectWriter Options; + Options << "type"sv + << "file"sv; + Options.BeginObject("settings"sv); + { + Options << "path"sv << HydrationStore.generic_string(); + } + Options.EndObject(); + Options.BeginArray("excludes"sv); + { + Options << "*.tmp"sv; + } + Options.EndArray(); - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + HydrationBase::Configuration HydrCfg{.Options = Options.Save()}; + std::unique_ptr<HydrationBase> Hydration = InitHydration(HydrCfg); + HydrationConfig PerModuleCfg{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "excl_off"}; + Hydration->CreateHydrator(PerModuleCfg)->Dehydrate(CbObject()); - // Put a stale file in ServerStateDir to simulate leftover state - WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256)); + const std::filesystem::path StateFile = HydrationStore / "excl_off" / "current-state.cbo"; + REQUIRE(std::filesystem::exists(StateFile)); - // Hydrate - must wipe stale file and restore original - Hydration->CreateHydrator(Config)->Hydrate(); + FileContents Contents = ReadFile(StateFile); + REQUIRE(Contents); + IoBuffer Payload = Contents.Flatten(); + CbValidateError Err; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); + REQUIRE_EQ(Err, CbValidateError::None); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "stale.bin")); - VerifyTree(ServerStateDir, TestFiles); + bool HasLock = false; + for (CbFieldView F : Meta["Files"sv]) + { + if (F.AsObjectView()["Path"sv].AsString() == ".lock") + { + HasLock = true; + break; + } + } + CHECK(HasLock); } -TEST_CASE("hydration.file.excluded_files_not_dehydrated") +// --------------------------------------------------------------------------- +// FileHydrator obliterate test +// --------------------------------------------------------------------------- + +TEST_CASE("hydration.file.obliterate") { - ScopedTemporaryDirectory TempDir; + FileHarness H; + constexpr std::string_view ModuleId = "obliterate_test"sv; + CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig(ModuleId); - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::exists(H.HydrationStore / ModuleId)); - auto TestFiles = CreateSmallTestTree(ServerStateDir); + // Put files back in ServerStateDir + TempDir to verify cleanup. + CreateSmallTestTree(H.ServerStateDir); + WriteFile(H.HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); - // Add files that the dehydrator should skip - WriteFile(ServerStateDir / "reserve.gc", CreateSemiRandomBlob(64)); - CreateDirectories(ServerStateDir / ".sentry-native"); - WriteFile(ServerStateDir / ".sentry-native" / "db.lock", CreateSemiRandomBlob(32)); - WriteFile(ServerStateDir / ".sentry-native" / "breadcrumb.json", CreateSemiRandomBlob(128)); + H.Hydration->CreateHydrator(Config)->Obliterate(); - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + CHECK_FALSE(std::filesystem::exists(H.HydrationStore / ModuleId)); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + CHECK(std::filesystem::is_empty(H.HydrationTemp)); +} - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = "testmodule_excl"}; +// --------------------------------------------------------------------------- +// Pack tests - exercise small-file packing, unpacking, and fallback paths. +// --------------------------------------------------------------------------- - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); +TEST_CASE("hydration.file.pack_roundtrip") +{ + // CreateSmallTestTree produces 6 files all < 2 KB -> single pack with every file in it. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_roundtrip"); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Hydrate into a clean directory - CleanDirectory(ServerStateDir, true); - Hydration->CreateHydrator(Config)->Hydrate(); +TEST_CASE("hydration.file.pack_disabled_fallback") +{ + // PackEnabled=false -> every file is a standalone CAS entry regardless of size. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_disabled", HydrationConfig{.PackEnabled = false}); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Normal files must be restored - VerifyTree(ServerStateDir, TestFiles); - // Excluded files must NOT be restored - CHECK_FALSE(std::filesystem::exists(ServerStateDir / "reserve.gc")); - CHECK_FALSE(std::filesystem::exists(ServerStateDir / ".sentry-native")); +TEST_CASE("hydration.file.pack_one_unique_fallback") +{ + // Only 1 unique small-file candidate -> no pack (min 2 entries); falls back to standalone. + FileHarness H; + + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + IoBuffer Small = CreateSemiRandomBlob(128); + WriteFile(H.ServerStateDir / "tiny.bin", Small); + TestFiles.emplace_back("tiny.bin", std::move(Small)); + + IoBuffer Big = CreateSemiRandomBlob(8192); + WriteFile(H.ServerStateDir / "big.bin", Big); + TestFiles.emplace_back("big.bin", std::move(Big)); + + const auto Config = H.MakeConfig("pack_one_unique"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); } -// --------------------------------------------------------------------------- -// FileHydrator obliterate test -// --------------------------------------------------------------------------- +TEST_CASE("hydration.file.pack_duplicate_hashes") +{ + // 10 files share one hash + 1 distinct file -> pack has 2 unique entries; hydrate writes + // all 11 destinations correctly. + FileHarness H; + + IoBuffer Shared = CreateSemiRandomBlob(256); + IoBuffer Other = CreateSemiRandomBlob(256); + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + for (int I = 0; I < 10; ++I) + { + std::filesystem::path Rel = fmt::format("dup_{:02d}.bin"sv, I); + WriteFile(H.ServerStateDir / Rel, Shared); + TestFiles.emplace_back(Rel, Shared); + } + WriteFile(H.ServerStateDir / "other.bin", Other); + TestFiles.emplace_back("other.bin", Other); + + const auto Config = H.MakeConfig("pack_duplicates"); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} -TEST_CASE("hydration.file.obliterate") +TEST_CASE("hydration.file.pack_large_dataset") { - ScopedTemporaryDirectory TempDir; + // Mix of many small files + a few large ones, with a modest MaxPackBytes + // to force bin-packing into multiple packs. Verifies ordering, splitting, and the + // interaction between packed and standalone uploads. + FileHarness H; + + std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles; + constexpr int kSmallCount = 100; + constexpr int kLargeCount = 3; + + // Varied small-file sizes (256-2048 B) avoid artificial uniformity in the bin-pack. + FastRandom Rand{.Seed = 0xcafebabe}; + for (int I = 0; I < kSmallCount; ++I) + { + uint64_t Size = 256 + (Rand.Next() % 1793); // [256, 2048] + IoBuffer Blob = CreateSemiRandomBlob(Rand, Size); + auto Rel = std::filesystem::path(fmt::format("small/group{}/file_{:04d}.bin"sv, I / 25, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + TestFiles.emplace_back(std::move(Rel), std::move(Blob)); + } + for (int I = 0; I < kLargeCount; ++I) + { + IoBuffer Blob = CreateSemiRandomBlob(Rand, 32 * 1024 + I * 4096); + auto Rel = std::filesystem::path(fmt::format("large/file_{:02d}.bulk"sv, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + TestFiles.emplace_back(std::move(Rel), std::move(Blob)); + } + + // Cap each pack at ~32 KB -> 100 small files (~115 KB raw) split across ~4 packs. + const auto Config = H.MakeConfig("pack_large", HydrationConfig{.MaxPackBytes = 32 * 1024}); + + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - std::filesystem::path ServerStateDir = TempDir.Path() / "server_state"; - std::filesystem::path HydrationStore = TempDir.Path() / "hydration_store"; - std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp"; - CreateDirectories(ServerStateDir); - CreateDirectories(HydrationStore); - CreateDirectories(HydrationTemp); +TEST_CASE("hydration.file.pack_hash_determinism") +{ + // Two independent dehydrate runs over the same content must produce byte-identical state + // files (and therefore identical pack hashes). This is what keeps ExistsLookup dedup + // working across redeploys. + FileHarness H; + + FastRandom Rand{.Seed = 0x12345678}; + std::vector<std::pair<std::filesystem::path, IoBuffer>> Files; + for (int I = 0; I < 40; ++I) + { + IoBuffer Blob = CreateSemiRandomBlob(Rand, 256 + (I % 7) * 200); + auto Rel = std::filesystem::path(fmt::format("tree/leaf_{:02d}.dat"sv, I)); + CreateDirectories((H.ServerStateDir / Rel).parent_path()); + WriteFile(H.ServerStateDir / Rel, Blob); + Files.emplace_back(std::move(Rel), std::move(Blob)); + } + + const auto Config = H.MakeConfig("pack_determinism"); + const std::filesystem::path StateFile = H.HydrationStore / "pack_determinism" / "current-state.cbo"; + + // Extract the ordered pack-hash list from state.cbo. Timestamp / duration fields vary + // across runs so byte-identity is not achievable; the pack identities are. + auto ReadPackHashes = [&]() -> std::vector<IoHash> { + FileContents Contents = ReadFile(StateFile); + REQUIRE(Contents); + IoBuffer Payload = Contents.Flatten(); + CbValidateError Err; + CbObject Meta = ValidateAndReadCompactBinaryObject(std::move(Payload), Err); + REQUIRE_EQ(Err, CbValidateError::None); + std::vector<IoHash> Hashes; + for (CbFieldView F : Meta["Packs"sv]) + { + Hashes.push_back(F.AsObjectView()["Hash"sv].AsHash()); + } + return Hashes; + }; - const std::string ModuleId = "obliterate_test"; - CreateSmallTestTree(ServerStateDir); + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + std::vector<IoHash> First = ReadPackHashes(); + REQUIRE_FALSE(First.empty()); - auto Hydration = InitHydration({.TargetSpecification = "file://" + HydrationStore.string()}); + // Rehydrate so the tree is back on disk, then dehydrate again with a fresh hydrator. + H.Hydration->CreateHydrator(Config)->Hydrate(); + VerifyTree(H.ServerStateDir, Files); - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + auto HydrationB = InitHydration({.TargetSpecification = "file://" + H.HydrationStore.string()}); + HydrationB->CreateHydrator(Config)->Dehydrate(CbObject()); + std::vector<IoHash> Second = ReadPackHashes(); - // Dehydrate so the backend store has data - Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); - CHECK(std::filesystem::exists(HydrationStore / ModuleId)); + REQUIRE_EQ(First.size(), Second.size()); + for (size_t I = 0; I < First.size(); ++I) + { + CHECK_EQ(First[I], Second[I]); + } +} - // Put some files back in ServerStateDir and TempDir to verify cleanup - CreateSmallTestTree(ServerStateDir); - WriteFile(HydrationTemp / "leftover.tmp", CreateSemiRandomBlob(64)); +TEST_CASE("hydration.file.pack_backward_compat_read") +{ + // Hand-craft a state.cbo without any Packs[] / PackHash fields (old format). Hydrate must + // treat every file as standalone and roundtrip successfully. + FileHarness H; + const auto TestFiles = CreateSmallTestTree(H.ServerStateDir); + const auto Config = H.MakeConfig("pack_oldformat", HydrationConfig{.PackEnabled = false}); + + // Dehydrate with PackEnabled=false -> state.cbo has no Packs[] and no PackHash fields. + H.Hydration->CreateHydrator(Config)->Dehydrate(CbObject()); + CHECK(std::filesystem::is_empty(H.ServerStateDir)); + + // Hydrate with PackEnabled=true -> the hydrator must still handle the old-format state. + const auto NewConfig = H.MakeConfig("pack_oldformat"); + H.Hydration->CreateHydrator(NewConfig)->Hydrate(); + VerifyTree(H.ServerStateDir, TestFiles); +} - // Obliterate - Hydration->CreateHydrator(Config)->Obliterate(); +// --------------------------------------------------------------------------- +// CreateParentDirectories helper test +// --------------------------------------------------------------------------- - // Backend store directory deleted - CHECK_FALSE(std::filesystem::exists(HydrationStore / ModuleId)); - // ServerStateDir cleaned - CHECK(std::filesystem::is_empty(ServerStateDir)); - // TempDir cleaned - CHECK(std::filesystem::is_empty(HydrationTemp)); +TEST_CASE("hydration.createparentdirectories") +{ + ScopedTemporaryDirectory TempDir; + const std::filesystem::path Root = TempDir.Path(); + + // Edge: empty input. + CHECK_EQ(hydration_impl::CreateParentDirectories({}), 0u); + + // Edge: bare filename has no parent_path() -> contributes nothing. + std::vector<std::filesystem::path> Bare{"bare.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Bare), 0u); + + // Edge: single input. Triggers the Dirs.size() == 1 path that bypasses the prune loop. + const std::filesystem::path SingleRoot = Root / "single"; + std::vector<std::filesystem::path> Single{SingleRoot / "only" / "a.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Single), 1u); + CHECK(std::filesystem::is_directory(SingleRoot / "only")); + + // Edge: pre-existing dirs must not raise; count still reflects leaf set. + const std::filesystem::path PreRoot = Root / "preexisting"; + CreateDirectories(PreRoot / "pre" / "made"); + std::vector<std::filesystem::path> Pre{PreRoot / "pre" / "made" / "f.bin"}; + CHECK_EQ(hydration_impl::CreateParentDirectories(Pre), 1u); + CHECK(std::filesystem::is_directory(PreRoot / "pre" / "made")); + + // Generic: ancestor-chain pruning, parent dedup across files in same dir, disjoint + // siblings (cannot prune each other), nested-vs-flat coexistence. Expected leaves: + // deep/nest/leaf, deep/sibling, flat, lone. + const std::filesystem::path MixRoot = Root / "mixed"; + std::vector<std::filesystem::path> Mix{MixRoot / "deep" / "nest" / "leaf" / "x.bin", + MixRoot / "deep" / "nest" / "leaf" / "y.bin", // shares parent with x + MixRoot / "deep" / "nest" / "g.bin", // ancestor of leaf -> pruned + MixRoot / "deep" / "h.bin", // ancestor of nest -> pruned + MixRoot / "deep" / "sibling" / "i.bin", // sibling of nest -> kept + MixRoot / "flat" / "j.bin", // top-level sibling -> kept + MixRoot / "lone" / "k.bin"}; // disjoint -> kept + CHECK_EQ(hydration_impl::CreateParentDirectories(Mix), 4u); + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest" / "leaf")); + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "sibling")); + CHECK(std::filesystem::is_directory(MixRoot / "flat")); + CHECK(std::filesystem::is_directory(MixRoot / "lone")); + // Pruned ancestors still exist via CreateDirectories recursion. + CHECK(std::filesystem::is_directory(MixRoot / "deep" / "nest")); + CHECK(std::filesystem::is_directory(MixRoot / "deep")); } // --------------------------------------------------------------------------- @@ -1658,7 +2932,7 @@ TEST_CASE("hydration.file.concurrent") for (int I = 0; I < kModuleCount; ++I) { - std::string ModuleId = fmt::format("file_concurrent_{}", I); + std::string ModuleId = fmt::format("file_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); @@ -1817,7 +3091,7 @@ TEST_CASE("hydration.s3.concurrent") for (int I = 0; I < kModuleCount; ++I) { - std::string ModuleId = fmt::format("s3_concurrent_{}", I); + std::string ModuleId = fmt::format("s3_concurrent_{}"sv, I); std::filesystem::path StateDir = TempDir.Path() / ModuleId / "state"; std::filesystem::path TempPath = TempDir.Path() / ModuleId / "temp"; CreateDirectories(StateDir); @@ -1890,7 +3164,7 @@ TEST_CASE("hydration.s3.obliterate") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_obliterate"; + constexpr std::string_view ModuleId = "s3test_obliterate"sv; HydrationBase::Configuration BaseConfig; { @@ -1904,7 +3178,7 @@ TEST_CASE("hydration.s3.obliterate") } auto Hydration = InitHydration(BaseConfig); - HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = ModuleId}; + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, .ModuleId = std::string(ModuleId)}; // Dehydrate to populate backend CreateSmallTestTree(ServerStateDir); @@ -1918,7 +3192,7 @@ TEST_CASE("hydration.s3.obliterate") Opts.Credentials.AccessKeyId = Minio.RootUser(); Opts.Credentials.SecretAccessKey = Minio.RootPassword(); S3Client Client(Opts); - return Client.ListObjects(ModuleId + "/"); + return Client.ListObjects(fmt::format("{}/"sv, ModuleId)); }; // Verify objects exist in S3 @@ -2034,7 +3308,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_performance"; + constexpr std::string_view ModuleId = "s3test_performance"sv; CopyTree("E:\\Dev\\hub\\brainrot\\20260402-225355-508", ServerStateDir, {.EnableClone = true}); // auto TestFiles = CreateTestTree(ServerStateDir); @@ -2054,7 +3328,7 @@ TEST_CASE("hydration.s3.dehydrate_hydrate.performance" * doctest::skip()) HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Dehydrate: upload server state to MinIO @@ -2091,7 +3365,7 @@ TEST_CASE("hydration.file.incremental") CreateDirectories(HydrationStore); CreateDirectories(HydrationTemp); - const std::string ModuleId = "testmodule"; + constexpr std::string_view ModuleId = "testmodule"sv; // auto TestFiles = CreateTestTree(ServerStateDir); TestThreading Threading(4); @@ -2100,7 +3374,7 @@ TEST_CASE("hydration.file.incremental") HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state @@ -2170,7 +3444,7 @@ TEST_CASE("hydration.s3.incremental") CreateDirectories(ServerStateDir); CreateDirectories(HydrationTemp); - const std::string ModuleId = "s3test_incremental"; + constexpr std::string_view ModuleId = "s3test_incremental"sv; TestThreading Threading(8); @@ -2188,7 +3462,7 @@ TEST_CASE("hydration.s3.incremental") HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = HydrationTemp, - .ModuleId = ModuleId, + .ModuleId = std::string(ModuleId), .Threading = Threading.Options}; // Hydrate with no prior state diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h index 0455dda91..d9a3dda5b 100644 --- a/src/zenserver/hub/hydration.h +++ b/src/zenserver/hub/hydration.h @@ -2,6 +2,8 @@ #pragma once +#include "hydrationdefaults.h" + #include <zencore/compactbinary.h> #include <atomic> @@ -9,6 +11,7 @@ #include <memory> #include <optional> #include <string> +#include <vector> namespace zen { @@ -32,6 +35,20 @@ struct HydrationConfig // External threading for parallel I/O and hashing. If not set, work runs inline on the caller's thread. std::optional<ThreadingOptions> Threading; + + // When true, small files are concatenated into pack blobs during Dehydrate and sliced + // back out during Hydrate. Pack contents are stored raw (no compression); the CAS key + // of a pack is the hash of its concatenated raw bytes. Dramatically reduces request + // count for modules dominated by tiny metadata files. + bool PackEnabled = true; + + // Files strictly smaller than this are candidates for packing. + uint64_t PackThresholdBytes = DefaultPackThresholdBytes; + + // Upper bound on the concatenation size of a single pack. Candidates are bin-packed + // greedily; a pack that would exceed this is closed and a new one is started. A single + // unique candidate larger than this falls back to standalone upload. + uint64_t MaxPackBytes = DefaultMaxPackBytes; }; /** @@ -46,14 +63,23 @@ struct HydrationStrategyBase virtual ~HydrationStrategyBase() = default; // Upload server state to the configured target. ServerStateDir is wiped on success. - // On failure, ServerStateDir is left intact. + // On failure, ServerStateDir is left intact and the failure is logged at WARN; no + // exception propagates to the caller. Callers that need to distinguish success from + // failure must inspect the log stream or observe downstream effects (e.g. presence of + // the metadata file on the backend) - success is not signalled through the API. virtual void Dehydrate(const CbObject& CachedState) = 0; - // Download state from the configured target into ServerStateDir. Returns cached state for the next Dehydrate. - // On failure, ServerStateDir is wiped and an empty CbObject is returned. + // Download state from the configured target into ServerStateDir. Returns cached state + // for the next Dehydrate. On failure, ServerStateDir is wiped, the failure is logged, + // and an empty CbObject is returned (a no-op cache). Callers can check the result for + // emptiness as a failure indicator. virtual CbObject Hydrate() = 0; - // Delete all stored data for this module from the configured backend, then clean ServerStateDir and TempDir. + // Delete all stored data for this module from the configured backend, then clean + // ServerStateDir and TempDir. Backend-delete failures are retried once; if the retry + // also fails, local cleanup proceeds regardless and the failure is logged at WARN. + // Because backend deletion is best-effort, a return from Obliterate does not guarantee + // backend data is gone. virtual void Obliterate() = 0; }; @@ -73,14 +99,23 @@ public: { // Back-end specific target specification (e.g. "s3://bucket/prefix", "file:///path") std::string TargetSpecification; - // Full config object (mutually exclusive with TargetSpecification) + // Full config object (mutually exclusive with TargetSpecification). Backend-specific + // settings (e.g. S3 "chunksize") live inside Options["settings"]. The common + // `excludes` entry is parsed once by HydrationBase and shared across modules. CbObject Options; }; + // Parses common Options entries (`excludes`) into m_Excludes, applying built-in + // defaults when the field is absent. Field present-but-empty `[]` is honored as an + // explicit override (no defaults applied). + explicit HydrationBase(const Configuration& Config); virtual ~HydrationBase() = default; // Create a configured per-module hydrator, ready to call Hydrate/Dehydrate/Obliterate. virtual std::unique_ptr<HydrationStrategyBase> CreateHydrator(const HydrationConfig& Config) = 0; + +protected: + std::vector<std::string> m_Excludes; }; // Factory: parses Config and returns the concrete backend (FileHydration or S3Hydration). diff --git a/src/zenserver/hub/hydrationdefaults.h b/src/zenserver/hub/hydrationdefaults.h new file mode 100644 index 000000000..8d9fb6d41 --- /dev/null +++ b/src/zenserver/hub/hydrationdefaults.h @@ -0,0 +1,26 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> + +namespace zen { + +// state.cbo schema version. Bump on any incompatible change to the manifest layout. +// Hydrate refuses to read a manifest with a higher version than this. Manifests written +// by older binaries omit the field; treated as version 0 and decoded best-effort +// (missing optional fields like Packs[] / StorageSettings fall back to defaults). +inline constexpr uint32_t HydrationSchemaVersion = 1; + +// Multipart chunk size used by the S3 backend when hydrating/dehydrating large files. +// Dehydrate writes the chosen size into state.cbo so hydrate replays with the same +// partitioning. State records without the field fall back to this default. +inline constexpr uint64_t DefaultMultipartChunkSize = 64u * 1024u * 1024u; + +// Files strictly smaller than this are candidates for packing. +inline constexpr uint64_t DefaultPackThresholdBytes = 256u * 1024u; + +// Upper bound on the concatenation size of a single pack. Packs are stored raw (no compression). +inline constexpr uint64_t DefaultMaxPackBytes = 4u * 1024u * 1024u; + +} // namespace zen diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index 9d477fb10..8d36e6a46 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -10,6 +10,7 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> +#include <zencore/trace.h> namespace zen { @@ -31,6 +32,7 @@ StorageServerInstance::~StorageServerInstance() void StorageServerInstance::SpawnServerProcess() { + ZEN_TRACE_CPU("StorageServerInstance::SpawnServerProcess"); Stopwatch SpawnTimer; ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); @@ -103,6 +105,7 @@ StorageServerInstance::ShutdownServerProcess() { return; } + ZEN_TRACE_CPU("StorageServerInstance::ShutdownServerProcess"); Stopwatch ShutdownTimer; // m_ServerInstance.Shutdown() never throws. m_ServerInstance.Shutdown(); @@ -131,6 +134,7 @@ StorageServerInstance::ProvisionLocked() return; } + ZEN_TRACE_CPU("StorageServerInstance::ProvisionLocked"); ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_Config.StateDir); try { @@ -150,6 +154,7 @@ StorageServerInstance::ProvisionLocked() void StorageServerInstance::DeprovisionLocked() { + ZEN_TRACE_CPU("StorageServerInstance::DeprovisionLocked"); ShutdownServerProcess(); // Crashed or Hibernated: process already dead; skip Shutdown. @@ -169,6 +174,7 @@ StorageServerInstance::DeprovisionLocked() void StorageServerInstance::ObliterateLocked() { + ZEN_TRACE_CPU("StorageServerInstance::ObliterateLocked"); ShutdownServerProcess(); std::atomic<bool> AbortFlag{false}; @@ -181,6 +187,7 @@ StorageServerInstance::ObliterateLocked() void StorageServerInstance::HibernateLocked() { + ZEN_TRACE_CPU("StorageServerInstance::HibernateLocked"); // Signal server to shut down, but keep data around for later wake ShutdownServerProcess(); } @@ -195,6 +202,8 @@ StorageServerInstance::WakeLocked() return; } + ZEN_TRACE_CPU("StorageServerInstance::WakeLocked"); + try { SpawnServerProcess(); @@ -212,6 +221,12 @@ StorageServerInstance::WakeLocked() void StorageServerInstance::Hydrate() { + if (!m_Config.EnableHydration) + { + ZEN_INFO("Hydration disabled; skipping hydrate for module '{}'", m_ModuleId); + return; + } + ZEN_TRACE_CPU("StorageServerInstance::Hydrate"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); @@ -222,6 +237,12 @@ StorageServerInstance::Hydrate() void StorageServerInstance::Dehydrate() { + if (!m_Config.EnableDehydration) + { + ZEN_INFO("Dehydration disabled; skipping dehydrate for module '{}'", m_ModuleId); + return; + } + ZEN_TRACE_CPU("StorageServerInstance::Dehydrate"); std::atomic<bool> AbortFlag{false}; std::atomic<bool> PauseFlag{false}; HydrationConfig Config = MakeHydrationConfig(AbortFlag, PauseFlag); @@ -238,6 +259,9 @@ StorageServerInstance::MakeHydrationConfig(std::atomic<bool>& AbortFlag, std::at Config.Threading.emplace( HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalWorkerPool, .AbortFlag = &AbortFlag, .PauseFlag = &PauseFlag}); } + Config.PackEnabled = m_Config.HydrationPackEnabled; + Config.PackThresholdBytes = m_Config.HydrationPackThresholdBytes; + Config.MaxPackBytes = m_Config.HydrationMaxPackBytes; return Config; } diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index 21ac1ada3..a2f376a23 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -36,6 +36,11 @@ public: std::string Trace; std::string TraceHost; std::string TraceFile; + bool EnableHydration = true; + bool EnableDehydration = true; + bool HydrationPackEnabled = true; + uint64_t HydrationPackThresholdBytes = DefaultPackThresholdBytes; + uint64_t HydrationMaxPackBytes = DefaultMaxPackBytes; WorkerThreadPool* OptionalWorkerPool = nullptr; }; diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index ebc2cf2f1..27c1c9fc4 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -235,6 +235,44 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "hub-enable-hydration", + "Load instance state from the hydration target on provision (default true)", + cxxopts::value<bool>(m_ServerOptions.HubEnableHydration)->default_value("true"), + ""); + + Options.add_option("hub", + "", + "hub-enable-dehydration", + "Save instance state to the hydration target on deprovision (default true)", + cxxopts::value<bool>(m_ServerOptions.HubEnableDehydration)->default_value("true"), + ""); + + Options.add_option( + "hub", + "", + "hub-hydration-enable-pack", + "Concatenate small files into raw CAS pack blobs during dehydrate (default true; see --hub-hydration-pack-threshold-bytes)", + cxxopts::value<bool>(m_ServerOptions.HubHydrationPackEnabled)->default_value("true"), + ""); + + Options.add_option("hub", + "", + "hub-hydration-pack-threshold-bytes", + fmt::format("Files strictly smaller than this are pack candidates (default {})", DefaultPackThresholdBytes), + cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationPackThresholdBytes) + ->default_value(fmt::format("{}", DefaultPackThresholdBytes)), + "<bytes>"); + + Options.add_option( + "hub", + "", + "hub-hydration-max-pack-bytes", + fmt::format("Upper bound on a pack's concatenation size (default {})", DefaultMaxPackBytes), + cxxopts::value<uint64_t>(m_ServerOptions.HubHydrationMaxPackBytes)->default_value(fmt::format("{}", DefaultMaxPackBytes)), + "<bytes>"); + + Options.add_option("hub", + "", "hub-watchdog-cycle-interval-ms", "Interval between watchdog cycles in milliseconds", cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"), @@ -367,6 +405,13 @@ ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); + Options.AddOption("hub.enablehydration"sv, m_ServerOptions.HubEnableHydration, "hub-enable-hydration"sv); + Options.AddOption("hub.enabledehydration"sv, m_ServerOptions.HubEnableDehydration, "hub-enable-dehydration"sv); + Options.AddOption("hub.hydration.enablepack"sv, m_ServerOptions.HubHydrationPackEnabled, "hub-hydration-enable-pack"sv); + Options.AddOption("hub.hydration.packthresholdbytes"sv, + m_ServerOptions.HubHydrationPackThresholdBytes, + "hub-hydration-pack-threshold-bytes"sv); + Options.AddOption("hub.hydration.maxpackbytes"sv, m_ServerOptions.HubHydrationMaxPackBytes, "hub-hydration-max-pack-bytes"sv); Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, @@ -453,7 +498,7 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, HubInstanceState PreviousState, HubInstanceState NewState) { - ZEN_UNUSED(PreviousState); + ZEN_INFO("Module '{}' changed state from {} to {}", ModuleId, zen::ToString(PreviousState), zen::ToString(NewState)); if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating) { @@ -661,6 +706,11 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) .InstanceTraceFile = ServerConfig.HubInstanceTraceFile, .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, + .EnableHydration = ServerConfig.HubEnableHydration, + .EnableDehydration = ServerConfig.HubEnableDehydration, + .HydrationPackEnabled = ServerConfig.HubHydrationPackEnabled, + .HydrationPackThresholdBytes = ServerConfig.HubHydrationPackThresholdBytes, + .HydrationMaxPackBytes = ServerConfig.HubHydrationMaxPackBytes, .WatchDog = { .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index 5e465bb14..6416792a6 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -3,6 +3,7 @@ #pragma once #include "hubinstancestate.h" +#include "hydrationdefaults.h" #include "resourcemetrics.h" #include "zenserver.h" @@ -47,7 +48,12 @@ struct ZenHubServerConfig : public ZenServerConfig uint16_t HubBasePortNumber = 21000; int HubInstanceLimit = 1000; bool HubUseJobObject = true; - std::string HubInstanceHttpClass = "asio"; + bool HubEnableHydration = true; // Load instance state from hydration target on provision + bool HubEnableDehydration = true; // Save instance state to hydration target on deprovision + bool HubHydrationPackEnabled = true; // Concatenate small files into raw CAS pack blobs during dehydrate + uint64_t HubHydrationPackThresholdBytes = DefaultPackThresholdBytes; // Files strictly smaller than this are pack candidates + uint64_t HubHydrationMaxPackBytes = DefaultMaxPackBytes; // Upper bound on a pack's concatenation size + std::string HubInstanceHttpClass = "asio"; std::string HubInstanceMalloc; std::string HubInstanceTrace; std::string HubInstanceTraceHost; diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp index f8bed92da..ab80cfcc7 100644 --- a/src/zenutil/cloud/s3client.cpp +++ b/src/zenutil/cloud/s3client.cpp @@ -148,11 +148,31 @@ namespace { return true; } + /// True if the response indicates S3 throttling (503 SlowDown / ServiceUnavailable / 429). + /// Code is checked on both the HTTP status and the XML error code so we catch proxies that + /// return 200 with a SlowDown body. + bool IsS3Throttled(const HttpClient::Response& Response, std::string_view ErrorCode) + { + const int Status = static_cast<int>(Response.StatusCode); + if (Status == 503 || Status == 429) + { + return true; + } + if (ErrorCode == "SlowDown" || ErrorCode == "ServiceUnavailable" || ErrorCode == "ThrottlingException" || + ErrorCode == "RequestLimitExceeded" || ErrorCode == "TooManyRequests") + { + return true; + } + return false; + } + /// Build a human-readable error message for a failed S3 response. When the response body /// contains an S3 `<Error>` element, the Code and Message fields are included in the string /// so transient 4xx/5xx failures (SignatureDoesNotMatch, AuthorizationHeaderMalformed, etc.) /// show up in logs instead of being swallowed. Falls back to the generic HTTP/transport /// message when no XML body is available (HEAD responses, transport errors). + /// Also emits a distinct `S3 THROTTLED` warning when the response indicates throttling so + /// callers can grep for it without parsing combined error text. std::string S3ErrorMessage(std::string_view Prefix, const HttpClient::Response& Response) { if (!Response.Error.has_value() && Response.ResponsePayload) @@ -164,9 +184,21 @@ namespace { { ExtendableStringBuilder<256> Decoded; DecodeXmlEntities(Message, Decoded); + if (IsS3Throttled(Response, Code)) + { + ZEN_WARN("S3 THROTTLED [{}] status={} code='{}' message='{}'", + Prefix, + static_cast<int>(Response.StatusCode), + Code, + Decoded.ToView()); + } return fmt::format("{}: HTTP status ({}) {} - {}", Prefix, static_cast<int>(Response.StatusCode), Code, Decoded.ToView()); } } + if (IsS3Throttled(Response, {})) + { + ZEN_WARN("S3 THROTTLED [{}] status={} (no XML body)", Prefix, static_cast<int>(Response.StatusCode)); + } return Response.ErrorMessage(Prefix); } |