aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-30 13:58:14 +0200
committerGitHub Enterprise <[email protected]>2026-03-30 13:58:14 +0200
commit6d75696d11aab547bb34ea22ec10fcdc594e5a44 (patch)
tree047db726b2c4cfc05fca433561fe09f635ae88a8
parenthub resource limits (#900) (diff)
downloadzen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.tar.xz
zen-6d75696d11aab547bb34ea22ec10fcdc594e5a44.zip
hub s3 hydrate improvements (#902)
- Feature: Added `--hub-hydration-target-config` option to specify the hydration target via a JSON config file (mutually exclusive with `--hub-hydration-target-spec`); supports `file` and `s3` types with structured settings ```json { "type": "file", "settings": { "path": "/path/to/hydration/storage" } } ``` ```json { "type": "s3", "settings": { "uri": "s3://bucket[/prefix]", "region": "us-east-1", "endpoint": "http://localhost:9000", "path-style": true } } ``` - Improvement: Hub hydration dehydration skips the `.sentry-native` directory - Bugfix: Fixed `MakeSafeAbsolutePathInPlace` when a UNC prefix is present but path uses mixed delimiters
-rw-r--r--CHANGELOG.md22
-rw-r--r--scripts/test_scripts/hub_load_test.py969
-rw-r--r--scripts/test_scripts/hub_provision_perf_test.py501
-rw-r--r--src/zencore/filesystem.cpp4
-rw-r--r--src/zens3-testbed/main.cpp2
-rw-r--r--src/zenserver/hub/hub.cpp9
-rw-r--r--src/zenserver/hub/hub.h3
-rw-r--r--src/zenserver/hub/hydration.cpp399
-rw-r--r--src/zenserver/hub/hydration.h10
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp6
-rw-r--r--src/zenserver/hub/storageserverinstance.h2
-rw-r--r--src/zenserver/hub/zenhubserver.cpp43
-rw-r--r--src/zenserver/hub/zenhubserver.h1
-rw-r--r--src/zenutil/cloud/s3client.cpp28
-rw-r--r--src/zenutil/include/zenutil/cloud/s3client.h8
15 files changed, 1890 insertions, 117 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index caeb88845..e182e6d85 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,6 +1,26 @@
##
- Feature: Hub dashboard now shows a Resources tile with disk and memory usage against configured limits
- Feature: Hub module listing now shows state-change timestamps and duration for each instance
+- Feature: Added `--hub-hydration-target-config` option to specify the hydration target via a JSON config file (mutually exclusive with `--hub-hydration-target-spec`); supports `file` and `s3` types with structured settings
+ ```json
+ {
+ "type": "file",
+ "settings": {
+ "path": "/path/to/hydration/storage"
+ }
+ }
+ ```
+ ```json
+ {
+ "type": "s3",
+ "settings": {
+ "uri": "s3://bucket[/prefix]",
+ "region": "us-east-1",
+ "endpoint": "http://localhost:9000",
+ "path-style": true
+ }
+ }
+ ```
- Improvement: Hub provisioning rejects new instances when disk or memory usage exceeds configurable thresholds; limits are disabled by default (0 = no limit)
- `--hub-provision-disk-limit-bytes` - Reject provisioning when used disk exceeds this many bytes
- `--hub-provision-disk-limit-percent` - Reject provisioning when used disk exceeds this percentage of total disk
@@ -8,8 +28,10 @@
- `--hub-provision-memory-limit-percent` - Reject provisioning when used memory exceeds this percentage of total RAM
- Improvement: Hub process metrics are now tracked atomically per active instance slot, eliminating per-query process handle lookups
- Improvement: Hub, Build Store, and Workspaces service stats sections in the dashboard are now collapsible
+- Improvement: Hub hydration dehydration skips the `.sentry-native` directory
- Bugfix: Fix an issue when a file goes from being internally stored to externally referenced and it could get into an inconsistent state
- Bugfix: Hub watchdog loop did not check `m_ShutdownFlag`, causing it to spin indefinitely on shutdown
+- Bugfix: Fixed `MakeSafeAbsolutePathInPlace` when a UNC prefix is present but path uses mixed delimiters
## 5.8.0
- Feature: Hub watchdog automatically deprovisions inactive provisioned and hibernated instances
diff --git a/scripts/test_scripts/hub_load_test.py b/scripts/test_scripts/hub_load_test.py
new file mode 100644
index 000000000..7bff1eb37
--- /dev/null
+++ b/scripts/test_scripts/hub_load_test.py
@@ -0,0 +1,969 @@
+#!/usr/bin/env python3
+"""Hub sustained load test.
+
+Keeps ~N modules concurrently provisioned from a pool of 1000 predefined
+module names, writing and reading data to each instance, then either letting
+the hub watchdog deprovision idle ones or explicitly deprovisioning them.
+Runs indefinitely until Ctrl-C.
+
+Optional --s3: starts a local MinIO server and configures the hub to use it
+as the de/hydration backend.
+
+Requirements:
+ pip install boto3 (only needed with --s3)
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import queue
+import random
+import subprocess
+import sys
+import threading
+import time
+import urllib.error
+import urllib.request
+import webbrowser
+from concurrent.futures import Future, ThreadPoolExecutor, wait as futures_wait
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+_NAMESPACE = "loadtest"
+_BUCKET = "bucket"
+
+# Key sizes to use for activity writes (bytes) - biased toward smaller.
+# Blobs are pre-generated at startup; one per size, shared across all requests.
+_KEY_SIZES = [512, 512, 2048, 2048, 8192, 32768]
+_BLOBS: list[bytes] = [] # populated by _init_blobs()
+
+
+def _init_blobs() -> None:
+ seen: set[int] = set()
+ for size in _KEY_SIZES:
+ if size not in seen:
+ seen.add(size)
+ _BLOBS.append(os.urandom(size))
+ else:
+ # reuse the already-generated blob for this size
+ _BLOBS.append(next(b for b in _BLOBS if len(b) == size))
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = list(repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}"))
+ if matches:
+ return max(matches, key=lambda p: p.stat().st_mtime)
+
+ sys.exit(
+ "zenserver executable not found in build/. "
+ "Run: xmake config -y -m release -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(
+ f"minio executable not found at {p}. "
+ "Build with: xmake config -y -m release -a x64 && xmake -y"
+ )
+ return p
+
+
+# ---------------------------------------------------------------------------
+# MinIO
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ minio_data = data_dir / "minio"
+ minio_data.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(minio_data),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ **popen_kwargs,
+ )
+ print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3
+ import botocore.config
+ import botocore.exceptions
+ except ImportError:
+ sys.exit(
+ "[minio] boto3 is required for --s3.\n"
+ "Install it with: pip install boto3"
+ )
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ idle_timeout: int,
+ extra_args: list[str],
+ extra_env: Optional[dict[str, str]],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ "--hub-instance-limit=500",
+ f"--hub-watchdog-provisioned-inactivity-timeout-seconds={idle_timeout}",
+ "--hub-watchdog-inactivity-check-margin-seconds=5",
+ "--hub-watchdog-cycle-interval-ms=2000",
+ "--hub-watchdog-cycle-processing-budget-ms=3000",
+ "--hub-watchdog-activity-check-connect-timeout-ms=20",
+ "--hub-watchdog-activity-check-request-timeout-ms=50",
+ ] + extra_args
+
+ env = os.environ.copy()
+ if extra_env:
+ env.update(extra_env)
+
+ popen_kwargs: dict = {}
+ if sys.platform == "win32":
+ popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT,
+ **popen_kwargs,
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Module state
+# ---------------------------------------------------------------------------
+
+@dataclass
+class ModuleState:
+ state: str = "idle" # idle|provisioning|active|deprovisioning|watchdog-pending
+ base_uri: Optional[str] = None
+ written_keys: list[str] = field(default_factory=list)
+ activity_rounds_left: int = 0
+ explicit_deprovision: bool = False
+ watchdog_pending_since: float = 0.0
+ generation: int = 0 # incremented on each provision; queue entries carry this to discard stale entries
+
+
+# ---------------------------------------------------------------------------
+# Counters
+# ---------------------------------------------------------------------------
+
+@dataclass
+class Counters:
+ provisions: int = 0
+ deprovisions_explicit: int = 0
+ deprovisions_watchdog: int = 0
+ provision_rejected: int = 0
+ activity_rounds: int = 0
+ writes: int = 0
+ reads: int = 0
+ errors: int = 0
+ last_reject_time: float = 0.0
+ _lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)
+
+ def inc(self, name: str, n: int = 1) -> None:
+ with self._lock:
+ setattr(self, name, getattr(self, name) + n)
+
+ def record_reject(self) -> None:
+ with self._lock:
+ self.provision_rejected += 1
+ self.last_reject_time = time.monotonic()
+
+ def snapshot(self) -> dict:
+ with self._lock:
+ return {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
+
+
+# ---------------------------------------------------------------------------
+# Hub API helpers (urllib, no external deps)
+# ---------------------------------------------------------------------------
+
+def _hub_post(port: int, path: str, timeout_s: float = 30.0) -> tuple[int, dict]:
+ url = f"http://localhost:{port}{path}"
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ body = json.loads(resp.read())
+ return resp.status, body
+ except urllib.error.HTTPError as e:
+ try:
+ body = json.loads(e.read())
+ except Exception:
+ body = {}
+ return e.code, body
+ except Exception:
+ return 0, {}
+
+
+def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
+ try:
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ return data.get("modules", [])
+ except Exception:
+ return None
+
+
+def _instance_put(base_uri: str, key: str, data: bytes, timeout_s: float = 10.0) -> int:
+ url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
+ req = urllib.request.Request(url, data=data, method="PUT",
+ headers={"Content-Type": "application/octet-stream",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ return resp.status
+ except urllib.error.HTTPError as e:
+ return e.code
+ except Exception:
+ return 0
+
+
+def _instance_get(base_uri: str, key: str, timeout_s: float = 10.0) -> int:
+ url = f"{base_uri}/z$/{_NAMESPACE}/{_BUCKET}/{key}"
+ req = urllib.request.Request(url, headers={"Accept": "application/octet-stream"})
+ try:
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ resp.read()
+ return resp.status
+ except urllib.error.HTTPError as e:
+ return e.code
+ except Exception:
+ return 0
+
+
+# ---------------------------------------------------------------------------
+# Worker tasks (run in thread pool - no sleeping)
+# ---------------------------------------------------------------------------
+
+def _task_provision(
+ module_id: str,
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ explicit_deprovision_rate: float,
+ stop_event: threading.Event,
+) -> None:
+ status, body = _hub_post(port, f"/hub/modules/{module_id}/provision")
+
+ generation = 0
+ schedule_burst = False
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "provisioning":
+ # Shutdown changed state while provision was in-flight; leave it alone
+ return
+ if status in (200, 202):
+ instance_port = body.get("port")
+ base_uri = f"http://localhost:{instance_port}" if instance_port else None
+ if status == 200:
+ # Instance is immediately ready
+ mod.generation += 1
+ generation = mod.generation
+ mod.state = "active"
+ mod.base_uri = base_uri
+ mod.written_keys = []
+ mod.activity_rounds_left = random.randint(5, 20)
+ mod.explicit_deprovision = random.random() < explicit_deprovision_rate
+ schedule_burst = not stop_event.is_set()
+ else:
+ # 202: async provision - instance is still starting up.
+ # Stay in "provisioning"; the hub status poll activates it when ready.
+ # _shutdown_deprovision_all also covers "provisioning", so shutdown is safe.
+ mod.state = "provisioning"
+ mod.base_uri = base_uri
+ elif status == 409:
+ mod.state = "idle"
+ counters.record_reject()
+ return
+ else:
+ mod.state = "idle"
+ counters.inc("errors")
+ return
+
+ counters.inc("provisions")
+ if schedule_burst:
+ activity_queue.put((time.monotonic() + random.uniform(0.1, 0.3), generation, module_id))
+
+
+def _task_deprovision(
+ module_id: str,
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ retries: int = 5,
+) -> None:
+ succeeded = False
+ for attempt in range(retries + 1):
+ status, _ = _hub_post(port, f"/hub/modules/{module_id}/deprovision")
+ if status in (200, 202, 404):
+ succeeded = True
+ break
+ if status == 409 and attempt < retries:
+ time.sleep(0.2)
+ continue
+ counters.inc("errors")
+ break
+
+ with state_lock:
+ state_map[module_id].state = "idle"
+ state_map[module_id].base_uri = None
+ state_map[module_id].written_keys = []
+
+ if succeeded:
+ counters.inc("deprovisions_explicit")
+
+
+def _task_activity_burst(
+ module_id: str,
+ generation: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ pool: ThreadPoolExecutor,
+ port: int,
+) -> None:
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "active" or mod.generation != generation:
+ return
+ base_uri = mod.base_uri
+ existing_keys = list(mod.written_keys)
+
+ if not base_uri:
+ with state_lock:
+ state_map[module_id].state = "idle"
+ return
+
+ # Write 3-8 new keys per burst
+ num_writes = random.randint(3, 8)
+ new_keys: list[str] = []
+ write_errors = 0
+ for _ in range(num_writes):
+ key = f"{random.getrandbits(160):040x}"
+ data = random.choice(_BLOBS)
+ status = _instance_put(base_uri, key, data)
+ if status in (200, 201, 204):
+ new_keys.append(key)
+ counters.inc("writes")
+ else:
+ write_errors += 1
+ counters.inc("errors")
+
+ if write_errors > 0 and not new_keys and not existing_keys:
+ # Instance unreachable - likely watchdog fired while we were scheduled
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.generation == generation:
+ mod.state = "idle"
+ mod.base_uri = None
+ mod.written_keys = []
+ return
+
+ # Read back 1-3 random keys from all known keys
+ all_keys = existing_keys + new_keys
+ num_reads = min(random.randint(2, 5), len(all_keys))
+ for key in random.sample(all_keys, num_reads):
+ status = _instance_get(base_uri, key)
+ if status == 200:
+ counters.inc("reads")
+ elif status in (404, 0):
+ # Key may not exist yet or instance gone; not fatal
+ pass
+ else:
+ counters.inc("errors")
+
+ counters.inc("activity_rounds")
+
+ next_generation = 0
+ with state_lock:
+ mod = state_map[module_id]
+ if mod.state != "active" or mod.generation != generation:
+ return
+ mod.written_keys = (existing_keys + new_keys)[-200:] # cap list size
+ mod.activity_rounds_left -= 1
+
+ if mod.activity_rounds_left <= 0:
+ if mod.explicit_deprovision:
+ mod.state = "deprovisioning"
+ mod.base_uri = None
+ try:
+ pool.submit(
+ _task_deprovision,
+ module_id, port, state_map, state_lock, counters,
+ )
+ except RuntimeError:
+ pass # pool shutting down; hub watchdog will clean up
+ else:
+ mod.state = "watchdog-pending"
+ mod.watchdog_pending_since = time.monotonic()
+ return
+
+ next_generation = mod.generation # capture under lock before releasing
+
+ # Schedule next burst soon for semi-continuous activity
+ next_time = time.monotonic() + random.uniform(0.2, 0.8)
+ activity_queue.put((next_time, next_generation, module_id))
+
+
+# ---------------------------------------------------------------------------
+# Orchestrator
+# ---------------------------------------------------------------------------
+
+def _orchestrate(
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ activity_queue: queue.PriorityQueue,
+ counters: Counters,
+ pool: ThreadPoolExecutor,
+ stop_event: threading.Event,
+ target_active: int,
+ explicit_deprovision_rate: float,
+ idle_timeout: int,
+) -> None:
+ last_status_poll = 0.0
+ provision_backoff = False
+
+ while not stop_event.is_set():
+ now = time.monotonic()
+
+ # Drain activity queue for due bursts
+ while True:
+ try:
+ next_time, generation, module_id = activity_queue.get_nowait()
+ except queue.Empty:
+ break
+ if next_time <= now:
+ try:
+ pool.submit(
+ _task_activity_burst,
+ module_id, generation, state_map, state_lock,
+ activity_queue, counters, pool, port,
+ )
+ except RuntimeError:
+ break # pool shutting down
+ else:
+ activity_queue.put((next_time, generation, module_id))
+ break
+
+ # Activate/deactivate backoff based on recent 409 rejections
+ last_reject = counters.last_reject_time
+ if last_reject > 0 and now - last_reject < 5.0:
+ provision_backoff = True
+ elif provision_backoff and now - last_reject >= 5.0:
+ provision_backoff = False
+
+ # Count states and submit provision tasks if needed
+ if not provision_backoff:
+ with state_lock:
+ idle_ids = [mid for mid, m in state_map.items() if m.state == "idle"]
+ working_count = sum(
+ 1 for m in state_map.values()
+ if m.state in ("active", "provisioning", "deprovisioning")
+ )
+ watchdog_count = sum(
+ 1 for m in state_map.values()
+ if m.state == "watchdog-pending"
+ )
+
+ # Provision enough to keep working_count at target, but cap total
+ # in-flight (working + watchdog-pending) at 2x target to prevent
+ # runaway accumulation when all modules cycle to watchdog-pending.
+ inflight_cap = target_active * 2
+ deficit = min(
+ target_active - working_count,
+ inflight_cap - working_count - watchdog_count,
+ )
+ to_provision = idle_ids[:max(0, deficit)]
+ for mid in to_provision:
+ with state_lock:
+ if state_map[mid].state != "idle":
+ continue
+ state_map[mid].state = "provisioning"
+ try:
+ pool.submit(
+ _task_provision,
+ mid, port, state_map, state_lock,
+ activity_queue, counters, explicit_deprovision_rate,
+ stop_event,
+ )
+ except RuntimeError:
+ with state_lock:
+ state_map[mid].state = "idle"
+
+ # Poll hub status to detect async provision completions and watchdog-fired modules
+ if now - last_status_poll >= 5.0:
+ last_status_poll = now
+ modules_status = _hub_status(port)
+ if modules_status is not None:
+ hub_ids = {m["moduleId"]: m.get("state", "") for m in modules_status}
+ # Hub watchdog fires at ~idle_timeout from last activity. However, if the
+ # watchdog's previous visit predates the last burst, it sees a changed
+ # activity sum and resets LastActivityTime, delaying deprovision by ~173s.
+ # Explicitly deprovision after idle_timeout + 15s to avoid waiting for that.
+ timeout_threshold = idle_timeout + 15
+ to_deprovision_explicitly: list[str] = []
+ with state_lock:
+ for mid, mod in state_map.items():
+ if mod.state == "provisioning" and mod.base_uri is not None:
+ # Waiting for async (202) provision to complete
+ hub_state = hub_ids.get(mid, "")
+ if hub_state == "provisioned":
+ mod.generation += 1
+ mod.state = "active"
+ mod.written_keys = []
+ mod.activity_rounds_left = random.randint(5, 20)
+ mod.explicit_deprovision = random.random() < explicit_deprovision_rate
+ activity_queue.put(
+ (time.monotonic() + random.uniform(0.1, 0.3), mod.generation, mid)
+ )
+ elif mid not in hub_ids:
+ # Provision failed or was rolled back
+ mod.state = "idle"
+ mod.base_uri = None
+ elif mod.state == "watchdog-pending":
+ hub_state = hub_ids.get(mid, "")
+ gone = mid not in hub_ids
+ timed_out = (now - mod.watchdog_pending_since) > timeout_threshold
+ if gone or hub_state in ("unprovisioned", "deprovisioning"):
+ mod.state = "idle"
+ mod.base_uri = None
+ mod.written_keys = []
+ counters.inc("deprovisions_watchdog")
+ elif timed_out:
+ mod.state = "deprovisioning"
+ to_deprovision_explicitly.append(mid)
+ for mid in to_deprovision_explicitly:
+ try:
+ pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
+ except RuntimeError:
+ with state_lock:
+ if state_map[mid].state == "deprovisioning":
+ state_map[mid].state = "idle"
+
+ stop_event.wait(timeout=0.05)
+
+
+# ---------------------------------------------------------------------------
+# Stats display
+# ---------------------------------------------------------------------------
+
+def _stats_thread(
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ stop_event: threading.Event,
+ interval_s: float = 5.0,
+) -> None:
+ is_tty = sys.stdout.isatty()
+ prev_lines = 0
+ t0 = time.monotonic()
+ prev_snap: Optional[dict] = None
+ prev_t = t0
+
+ while not stop_event.is_set():
+ stop_event.wait(timeout=interval_s)
+ now = time.monotonic()
+ elapsed = now - t0
+ dt = now - prev_t
+
+ snap = counters.snapshot()
+ with state_lock:
+ states: dict[str, int] = {}
+ for m in state_map.values():
+ states[m.state] = states.get(m.state, 0) + 1
+
+ def rate(key: str) -> float:
+ if prev_snap is None or dt <= 0:
+ return 0.0
+ return (snap[key] - prev_snap[key]) / dt * 60.0
+
+ lines = [
+ f"[{time.strftime('%H:%M:%S')}] elapsed={elapsed:.0f}s",
+ f" modules: idle={states.get('idle', 0)} "
+ f"provisioning={states.get('provisioning', 0)} "
+ f"active={states.get('active', 0)} "
+ f"watchdog-pending={states.get('watchdog-pending', 0)} "
+ f"deprovisioning={states.get('deprovisioning', 0)}",
+ f" totals: provisions={snap['provisions']} "
+ f"deprov-explicit={snap['deprovisions_explicit']} "
+ f"deprov-watchdog={snap['deprovisions_watchdog']} "
+ f"rejected={snap['provision_rejected']} "
+ f"errors={snap['errors']}",
+ f" data: writes={snap['writes']} reads={snap['reads']} "
+ f"rounds={snap['activity_rounds']}",
+ f" rates/min: provisions={rate('provisions'):.1f} "
+ f"deprov={rate('deprovisions_explicit') + rate('deprovisions_watchdog'):.1f} "
+ f"writes={rate('writes'):.1f} reads={rate('reads'):.1f}",
+ ]
+
+ if is_tty and prev_lines > 0:
+ # Move cursor up to overwrite previous block
+ sys.stdout.write(f"\033[{prev_lines}A\033[J")
+
+ sys.stdout.write("\n".join(lines) + "\n")
+ sys.stdout.flush()
+ prev_lines = len(lines)
+ prev_snap = snap
+ prev_t = now
+
+
+# ---------------------------------------------------------------------------
+# Shutdown
+# ---------------------------------------------------------------------------
+
+def _wait_for_hub_idle(port: int, hub_proc: subprocess.Popen, timeout_s: float = 120.0) -> None:
+ """Wait until the hub reports no transitioning instances (dehydration done)."""
+ _STABLE = {"provisioned", "hibernated", "crashed", "unprovisioned"}
+ print(f"[shutdown] waiting for hub dehydration (up to {timeout_s:.0f}s)...")
+ deadline = time.monotonic() + timeout_s
+ while time.monotonic() < deadline:
+ if hub_proc.poll() is not None:
+ print("[shutdown] hub process has exited")
+ return
+ modules = _hub_status(port, timeout_s=5.0)
+ if modules is None:
+ # Hub not responding. If it has exited, we're done. If it's still alive
+ # it may be saturated with S3 uploads - keep waiting rather than assuming done.
+ if hub_proc.poll() is not None:
+ print("[shutdown] hub process has exited")
+ return
+ time.sleep(1.0)
+ continue
+ transitioning = [m for m in modules if m.get("state") not in _STABLE]
+ remaining = len(modules)
+ if not transitioning:
+ if remaining:
+ print(f"[shutdown] hub idle ({remaining} instances in stable state)")
+ else:
+ print("[shutdown] hub idle (no instances remaining)")
+ return
+ print(f"[shutdown] {len(transitioning)} instance(s) still dehydrating...", end="\r")
+ time.sleep(1.0)
+ print(f"\n[shutdown] WARNING: hub did not become idle within {timeout_s:.0f}s")
+
+
+def _shutdown_deprovision_all(
+ port: int,
+ state_map: dict[str, ModuleState],
+ state_lock: threading.Lock,
+ counters: Counters,
+ workers: int,
+ timeout_s: float = 60.0,
+) -> None:
+ with state_lock:
+ to_deprovision = [
+ mid for mid, m in state_map.items()
+ if m.state in ("active", "watchdog-pending", "provisioning")
+ ]
+ for mid in to_deprovision:
+ state_map[mid].state = "deprovisioning"
+
+ if not to_deprovision:
+ return
+
+ print(f"\n[shutdown] deprovisioning {len(to_deprovision)} active modules...")
+ pool = ThreadPoolExecutor(max_workers=min(workers, len(to_deprovision)))
+ futures: list[Future] = [
+ pool.submit(_task_deprovision, mid, port, state_map, state_lock, counters)
+ for mid in to_deprovision
+ ]
+ pool.shutdown(wait=False)
+
+ done_set, not_done_set = futures_wait(futures, timeout=timeout_s)
+ if not_done_set:
+ print(f"[shutdown] WARNING: {len(not_done_set)} deprovision tasks did not complete within {timeout_s}s")
+ else:
+ print(f"[shutdown] all modules deprovisioned")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-loadtest",
+ help="Hub --data-dir (default: E:/Dev/hub-loadtest)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--workers", type=int, default=50,
+ help="Thread pool size for HTTP calls (default: 50)")
+ parser.add_argument("--active-modules", type=int, default=100,
+ help="Target number of concurrently provisioned modules (default: 100)")
+ parser.add_argument("--module-count", type=int, default=1000,
+ help="Total predefined module name pool size (default: 1000)")
+ parser.add_argument("--idle-timeout", type=int, default=90,
+ help="Hub watchdog inactivity timeout in seconds (default: 90)")
+ parser.add_argument("--explicit-deprovision-rate", type=float, default=0.3,
+ help="Fraction of modules explicitly deprovisioned (default: 0.3)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ parser.add_argument("--s3", action="store_true",
+ help="Use local MinIO for hub de/hydration instead of filesystem")
+ parser.add_argument("--minio-port", type=int, default=9000,
+ help="MinIO S3 API port (default: 9000)")
+ parser.add_argument("--minio-console-port", type=int, default=9001,
+ help="MinIO web console port (default: 9001)")
+ parser.add_argument("--s3-bucket", default="zen-load-test",
+ help="S3 bucket name for MinIO hydration (default: zen-load-test)")
+ parser.add_argument("--no-browser", action="store_true",
+ help="Skip opening browser tabs")
+ args = parser.parse_args()
+
+ if args.active_modules > args.module_count:
+ sys.exit(
+ f"--active-modules ({args.active_modules}) must not exceed "
+ f"--module-count ({args.module_count})"
+ )
+
+ _init_blobs()
+
+ data_dir = Path(args.data_dir)
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+
+ module_names = [f"load-test-module-{i:04d}" for i in range(args.module_count)]
+ state_map: dict[str, ModuleState] = {mid: ModuleState() for mid in module_names}
+ state_lock = threading.Lock()
+ activity_queue: queue.PriorityQueue = queue.PriorityQueue()
+ counters = Counters()
+ stop_event = threading.Event()
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+
+ hub_extra_args: list[str] = []
+ hub_extra_env: Optional[dict[str, str]] = None
+
+ try:
+ if args.s3:
+ minio_exe = _find_minio(zenserver_exe)
+ minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, args.s3_bucket)
+ if not args.no_browser:
+ webbrowser.open(f"http://localhost:{args.minio_console_port}")
+ config_json = {
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.s3_bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(json.dumps(config_json), encoding="ascii")
+ hub_extra_args = [f"--hub-hydration-target-config={config_path}"]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port,
+ hub_log, args.idle_timeout,
+ hub_extra_args, hub_extra_env,
+ )
+ _wait_for_hub(hub_proc, args.port)
+ if not args.no_browser:
+ webbrowser.open(f"http://localhost:{args.port}/dashboard")
+
+ print(f"[load-test] starting: pool={args.module_count} modules, "
+ f"target={args.active_modules} active, "
+ f"idle-timeout={args.idle_timeout}s, "
+ f"explicit-deprovision={args.explicit_deprovision_rate:.0%}")
+ print("[load-test] press Ctrl-C to stop")
+
+ with ThreadPoolExecutor(max_workers=args.workers) as pool:
+ stats_t = threading.Thread(
+ target=_stats_thread,
+ args=(state_map, state_lock, counters, stop_event),
+ daemon=True,
+ )
+ stats_t.start()
+
+ orch_t = threading.Thread(
+ target=_orchestrate,
+ args=(
+ args.port, state_map, state_lock,
+ activity_queue, counters, pool,
+ stop_event, args.active_modules,
+ args.explicit_deprovision_rate, args.idle_timeout,
+ ),
+ daemon=True,
+ )
+ orch_t.start()
+
+ try:
+ while not stop_event.is_set():
+ time.sleep(0.5)
+ if hub_proc.poll() is not None:
+ print(f"\n[hub] process exited unexpectedly (rc={hub_proc.returncode})")
+ break
+ except KeyboardInterrupt:
+ print("\n[load-test] Ctrl-C received, shutting down...")
+
+ stop_event.set()
+ orch_t.join(timeout=3.0)
+ stats_t.join(timeout=3.0)
+
+ _shutdown_deprovision_all(
+ args.port, state_map, state_lock, counters,
+ args.workers, timeout_s=60.0,
+ )
+ _wait_for_hub_idle(args.port, hub_proc, timeout_s=max(120.0, args.active_modules * 1.0))
+
+ _stop_process(hub_proc, "hub", timeout_s=120.0)
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ hub_log_handle = None
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+ minio_proc = None
+
+ finally:
+ # Safety net: only reached if an exception occurred before normal shutdown
+ if hub_proc is not None and hub_proc.poll() is None:
+ _stop_process(hub_proc, "hub")
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/hub_provision_perf_test.py b/scripts/test_scripts/hub_provision_perf_test.py
new file mode 100644
index 000000000..5b264ad62
--- /dev/null
+++ b/scripts/test_scripts/hub_provision_perf_test.py
@@ -0,0 +1,501 @@
+#!/usr/bin/env python3
+"""Hub provisioning performance test.
+
+Floods a zenserver hub with concurrent provision requests until the hub rejects
+further provisioning (HTTP 409), then deprovisions all instances and exits.
+A WPR ETW trace runs for the entire test and produces a .etl file for analysis
+in WPA or PerfView (CPU sampling + context-switch events for lock contention).
+
+Optional --s3: starts a local MinIO server and configures the hub to use it
+as the de/hydration backend instead of the default local filesystem.
+
+Requirements:
+ pip install boto3 (only needed with --s3)
+WPR (wpr.exe) must be available (ships with Windows). Running as Administrator
+is required for WPR to collect ETW traces.
+"""
+
+from __future__ import annotations
+
+import argparse
+import json
+import os
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+import uuid
+import webbrowser
+from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, as_completed, wait
+from pathlib import Path
+from typing import Optional
+
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+_STABLE_STATES = {"provisioned", "hibernated", "crashed", "unprovisioned"}
+_MINIO_USER = "minioadmin"
+_MINIO_PASS = "minioadmin"
+
+
+# ---------------------------------------------------------------------------
+# Executable discovery
+# ---------------------------------------------------------------------------
+
+def _find_zenserver(override: Optional[str]) -> Path:
+ if override:
+ p = Path(override) / f"zenserver{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(f"zenserver not found at {p}")
+ return p
+
+ script_dir = Path(__file__).resolve().parent
+ repo_root = script_dir.parent.parent
+ candidates = [
+ repo_root / "build" / "windows" / "x64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "linux" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ repo_root / "build" / "macosx" / "x86_64" / "release" / f"zenserver{_EXE_SUFFIX}",
+ ]
+ for c in candidates:
+ if c.exists():
+ return c
+
+ matches = repo_root.glob(f"build/**/release/zenserver{_EXE_SUFFIX}")
+ for match in sorted(matches, key=lambda p: p.stat().st_mtime, reverse=True):
+ return match
+
+ sys.exit(
+ "zenserver executable not found in build/. "
+ "Run: xmake config -y -m release -a x64 && xmake -y\n"
+ "Or pass --zenserver-dir <dir>."
+ )
+
+
+def _find_minio(zenserver_path: Path) -> Path:
+ p = zenserver_path.parent / f"minio{_EXE_SUFFIX}"
+ if not p.exists():
+ sys.exit(
+ f"minio executable not found at {p}. "
+ "Build with: xmake config -y -m release -a x64 && xmake -y"
+ )
+ return p
+
+
+# ---------------------------------------------------------------------------
+# WPR (Windows Performance Recorder)
+# ---------------------------------------------------------------------------
+
+def _is_elevated() -> bool:
+ if sys.platform != "win32":
+ return False
+ try:
+ import ctypes
+ return bool(ctypes.windll.shell32.IsUserAnAdmin())
+ except Exception:
+ return False
+
+
+def _wpr_start(trace_file: Path) -> bool:
+ if sys.platform != "win32":
+ print("[wpr] WPR is Windows-only; skipping ETW trace.")
+ return False
+ if not _is_elevated():
+ print("[wpr] skipping ETW trace - re-run from an elevated (Administrator) prompt to collect traces.")
+ return False
+ result = subprocess.run(
+ ["wpr.exe", "-start", "CPU", "-filemode"],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -start failed (code {result.returncode}):\n{result.stderr.strip()}")
+ return False
+ print(f"[wpr] ETW trace started (CPU profile, file mode)")
+ return True
+
+
+def _wpr_stop(trace_file: Path) -> None:
+ if sys.platform != "win32":
+ return
+ result = subprocess.run(
+ ["wpr.exe", "-stop", str(trace_file)],
+ capture_output=True, text=True
+ )
+ if result.returncode != 0:
+ print(f"[wpr] WARNING: wpr -stop failed (code {result.returncode}):\n{result.stderr.strip()}")
+ else:
+ print(f"[wpr] ETW trace saved: {trace_file}")
+
+
+# ---------------------------------------------------------------------------
+# MinIO
+# ---------------------------------------------------------------------------
+
+def _start_minio(minio_exe: Path, data_dir: Path, port: int, console_port: int) -> subprocess.Popen:
+ minio_data = data_dir / "minio"
+ minio_data.mkdir(parents=True, exist_ok=True)
+ env = os.environ.copy()
+ env["MINIO_ROOT_USER"] = _MINIO_USER
+ env["MINIO_ROOT_PASSWORD"] = _MINIO_PASS
+ proc = subprocess.Popen(
+ [str(minio_exe), "server", str(minio_data),
+ "--address", f":{port}",
+ "--console-address", f":{console_port}",
+ "--quiet"],
+ env=env,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ print(f"[minio] started (pid {proc.pid}) on port {port}, console on port {console_port}")
+ return proc
+
+
+def _wait_for_minio(port: int, timeout_s: float = 30.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ url = f"http://localhost:{port}/minio/health/live"
+ while time.monotonic() < deadline:
+ try:
+ with urllib.request.urlopen(url, timeout=1):
+ print("[minio] ready")
+ return
+ except Exception:
+ time.sleep(0.1)
+ sys.exit(f"[minio] timed out waiting for readiness after {timeout_s}s")
+
+
+def _create_minio_bucket(port: int, bucket: str) -> None:
+ try:
+ import boto3
+ import botocore.config
+ import botocore.exceptions
+ except ImportError:
+ sys.exit(
+ "[minio] boto3 is required for --s3.\n"
+ "Install it with: pip install boto3"
+ )
+ s3 = boto3.client(
+ "s3",
+ endpoint_url=f"http://localhost:{port}",
+ aws_access_key_id=_MINIO_USER,
+ aws_secret_access_key=_MINIO_PASS,
+ region_name="us-east-1",
+ config=botocore.config.Config(signature_version="s3v4"),
+ )
+ try:
+ s3.create_bucket(Bucket=bucket)
+ print(f"[minio] created bucket '{bucket}'")
+ except botocore.exceptions.ClientError as e:
+ if e.response["Error"]["Code"] in ("BucketAlreadyOwnedByYou", "BucketAlreadyExists"):
+ print(f"[minio] bucket '{bucket}' already exists")
+ else:
+ raise
+
+
+# ---------------------------------------------------------------------------
+# Hub lifecycle
+# ---------------------------------------------------------------------------
+
+def _start_hub(
+ zenserver_exe: Path,
+ data_dir: Path,
+ port: int,
+ log_file: Path,
+ extra_args: list[str],
+ extra_env: Optional[dict[str, str]],
+) -> tuple[subprocess.Popen, object]:
+ data_dir.mkdir(parents=True, exist_ok=True)
+ cmd = [
+ str(zenserver_exe),
+ "hub",
+ f"--data-dir={data_dir}",
+ f"--port={port}",
+ "--hub-instance-http-threads=8",
+ "--hub-instance-corelimit=4",
+ "--hub-provision-disk-limit-percent=99",
+ "--hub-provision-memory-limit-percent=80",
+ "--hub-instance-limit=100",
+ ] + extra_args
+
+ env = os.environ.copy()
+ if extra_env:
+ env.update(extra_env)
+
+ log_handle = log_file.open("wb")
+ try:
+ proc = subprocess.Popen(
+ cmd, env=env, stdout=log_handle, stderr=subprocess.STDOUT
+ )
+ except Exception:
+ log_handle.close()
+ raise
+ print(f"[hub] started (pid {proc.pid}), log: {log_file}")
+ return proc, log_handle
+
+
+def _wait_for_hub(proc: subprocess.Popen, port: int, timeout_s: float = 100.0) -> None:
+ deadline = time.monotonic() + timeout_s
+ req = urllib.request.Request(f"http://localhost:{port}/hub/status",
+ headers={"Accept": "application/json"})
+ while time.monotonic() < deadline:
+ if proc.poll() is not None:
+ sys.exit(f"[hub] process exited unexpectedly (rc={proc.returncode}) - "
+ f"is another zenserver already running on port {port}?")
+ try:
+ with urllib.request.urlopen(req, timeout=2):
+ print("[hub] ready")
+ return
+ except Exception:
+ time.sleep(0.2)
+ sys.exit(f"[hub] timed out waiting for readiness after {timeout_s}s")
+
+
+def _stop_process(proc: subprocess.Popen, name: str, timeout_s: float = 10.0) -> None:
+ if proc.poll() is not None:
+ return
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout_s)
+ except subprocess.TimeoutExpired:
+ print(f"[{name}] did not exit after {timeout_s}s, killing")
+ proc.kill()
+ proc.wait()
+
+
+# ---------------------------------------------------------------------------
+# Hub HTTP helpers
+# ---------------------------------------------------------------------------
+
+def _hub_url(port: int, path: str) -> str:
+ return f"http://localhost:{port}{path}"
+
+
+def _provision_one(port: int) -> tuple[str, int]:
+ module_id = str(uuid.uuid4())
+ url = _hub_url(port, f"/hub/modules/{module_id}/provision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return module_id, resp.status
+ except urllib.error.HTTPError as e:
+ return module_id, e.code
+ except Exception:
+ return module_id, 0
+
+
+def _deprovision_one(port: int, module_id: str, retries: int = 5) -> int:
+ url = _hub_url(port, f"/hub/modules/{module_id}/deprovision")
+ req = urllib.request.Request(url, data=b"{}", method="POST",
+ headers={"Content-Type": "application/json",
+ "Accept": "application/json"})
+ for attempt in range(retries + 1):
+ try:
+ with urllib.request.urlopen(req, timeout=30) as resp:
+ return resp.status
+ except urllib.error.HTTPError as e:
+ if e.code == 409 and attempt < retries:
+ time.sleep(0.2)
+ continue
+ return e.code
+ except Exception:
+ return 0
+
+
+def _hub_status(port: int, timeout_s: float = 5.0) -> Optional[list[dict]]:
+ try:
+ req = urllib.request.Request(_hub_url(port, "/hub/status"),
+ headers={"Accept": "application/json"})
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ data = json.loads(resp.read())
+ return data.get("modules", [])
+ except Exception:
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Test phases
+# ---------------------------------------------------------------------------
+
+def _flood_provision(port: int, workers: int) -> tuple[list[str], float, float]:
+ stopped = False
+ provisioned_ids: list[str] = []
+ time_to_rejection: Optional[float] = None
+ t0 = time.monotonic()
+
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ pending: set[Future] = {pool.submit(_provision_one, port) for _ in range(workers)}
+
+ while pending:
+ done, pending = wait(pending, return_when=FIRST_COMPLETED)
+
+ for f in done:
+ module_id, status = f.result()
+ if status in (200, 202):
+ provisioned_ids.append(module_id)
+ elif status == 409:
+ if not stopped:
+ time_to_rejection = time.monotonic() - t0
+ stopped = True
+ print(f"\n[flood] hub rejected provisioning after "
+ f"{len(provisioned_ids)} instances "
+ f"({time_to_rejection:.2f}s)")
+ elif status == 0:
+ if not stopped:
+ stopped = True
+ print(f"\n[flood] hub unreachable - stopping flood "
+ f"({len(provisioned_ids)} instances so far)")
+ else:
+ print(f"[flood] unexpected status {status} for {module_id}")
+
+ if not stopped:
+ pending.add(pool.submit(_provision_one, port))
+
+ wall_clock = time.monotonic() - t0
+ return provisioned_ids, time_to_rejection or wall_clock, wall_clock
+
+
+def _wait_stable(port: int, timeout_s: float = 20.0) -> None:
+ print("[hub] waiting for all instances to reach stable state...")
+ deadline = time.monotonic() + timeout_s
+ status_timeout_s = 5.0
+ while time.monotonic() < deadline:
+ modules = _hub_status(port, timeout_s=status_timeout_s)
+ if modules is None:
+ time.sleep(0.5)
+ continue
+ transitioning = [m for m in modules if m.get("state") not in _STABLE_STATES]
+ elapsed = time.monotonic() - (deadline - timeout_s)
+ print(f"[hub] {elapsed:.1f}s: {len(modules) - len(transitioning)}/{len(modules)} stable", end="\r")
+ if not transitioning:
+ print(f"\n[hub] all {len(modules)} instances in stable state")
+ return
+ time.sleep(0.5)
+ print(f"\n[hub] WARNING: timed out waiting for stable states after {timeout_s}s")
+
+
+def _deprovision_all(port: int, module_ids: list[str], workers: int) -> None:
+ raw_status = _hub_status(port, timeout_s=60.0)
+ if raw_status is None:
+ print("[deprovision] WARNING: could not reach hub to enumerate extra modules - "
+ "only deprovisioning tracked instances")
+ extra_ids = {m["moduleId"] for m in (raw_status or [])
+ if m.get("state") not in ("unprovisioned",)} - set(module_ids)
+ all_ids = list(module_ids) + list(extra_ids)
+
+ print(f"[deprovision] deprovisioning {len(all_ids)} instances...")
+ t0 = time.monotonic()
+ errors = 0
+ with ThreadPoolExecutor(max_workers=workers) as pool:
+ futures = {pool.submit(_deprovision_one, port, mid): mid for mid in all_ids}
+ for f in as_completed(futures):
+ status = f.result()
+ if status not in (200, 202, 409):
+ errors += 1
+ print(f"[deprovision] module {futures[f]}: unexpected status {status}")
+ elapsed = time.monotonic() - t0
+ print(f"[deprovision] done in {elapsed:.2f}s ({errors} errors)")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description=__doc__,
+ formatter_class=argparse.RawDescriptionHelpFormatter)
+ parser.add_argument("--data-dir", default="E:/Dev/hub-perftest",
+ help="Hub --data-dir (default: E:/Dev/hub-perftest)")
+ parser.add_argument("--port", type=int, default=8558,
+ help="Hub HTTP port (default: 8558)")
+ parser.add_argument("--workers", type=int, default=20,
+ help="Concurrent provisioning threads (default: 20)")
+ parser.add_argument("--trace-file", default="hub_perf_trace.etl",
+ help="WPR output .etl path (default: hub_perf_trace.etl)")
+ parser.add_argument("--zenserver-dir",
+ help="Directory containing zenserver executable (auto-detected by default)")
+ parser.add_argument("--s3", action="store_true",
+ help="Use local MinIO for hub de/hydration instead of filesystem")
+ parser.add_argument("--minio-port", type=int, default=9000,
+ help="MinIO S3 API port when --s3 is used (default: 9000)")
+ parser.add_argument("--minio-console-port", type=int, default=9001,
+ help="MinIO web console port when --s3 is used (default: 9001)")
+ parser.add_argument("--s3-bucket", default="zen-hydration-test",
+ help="S3 bucket name for MinIO hydration (default: zen-hydration-test)")
+ args = parser.parse_args()
+
+ data_dir = Path(args.data_dir)
+ trace_file = Path(args.trace_file).resolve()
+ hub_log = data_dir / "hub.log"
+
+ zenserver_exe = _find_zenserver(args.zenserver_dir)
+ print(f"[setup] zenserver: {zenserver_exe}")
+
+ minio_proc: Optional[subprocess.Popen] = None
+ hub_proc: Optional[subprocess.Popen] = None
+ hub_log_handle = None
+ wpr_started = False
+
+ hub_extra_args: list[str] = []
+ hub_extra_env: Optional[dict[str, str]] = None
+
+ try:
+ if args.s3:
+ minio_exe = _find_minio(zenserver_exe)
+ minio_proc = _start_minio(minio_exe, data_dir, args.minio_port, args.minio_console_port)
+ _wait_for_minio(args.minio_port)
+ _create_minio_bucket(args.minio_port, args.s3_bucket)
+ webbrowser.open(f"http://localhost:{args.minio_console_port}")
+ config_json = {
+ "type": "s3",
+ "settings": {
+ "uri": f"s3://{args.s3_bucket}",
+ "endpoint": f"http://localhost:{args.minio_port}",
+ "path-style": True,
+ "region": "us-east-1",
+ },
+ }
+ data_dir.mkdir(parents=True, exist_ok=True)
+ config_path = data_dir / "hydration_config.json"
+ config_path.write_text(json.dumps(config_json), encoding="ascii")
+ hub_extra_args = [
+ f"--hub-hydration-target-config={config_path}",
+ ]
+ hub_extra_env = {
+ "AWS_ACCESS_KEY_ID": _MINIO_USER,
+ "AWS_SECRET_ACCESS_KEY": _MINIO_PASS,
+ }
+
+ wpr_started = _wpr_start(trace_file)
+
+ hub_proc, hub_log_handle = _start_hub(
+ zenserver_exe, data_dir, args.port,
+ hub_log, hub_extra_args, hub_extra_env
+ )
+ _wait_for_hub(hub_proc, args.port)
+ webbrowser.open(f"http://localhost:{args.port}/dashboard")
+
+ provisioned_ids, time_to_rejection, wall_clock = _flood_provision(args.port, args.workers)
+
+ print(f"\n[results] provisioned : {len(provisioned_ids)}")
+ print(f"[results] time to 409 : {time_to_rejection:.3f}s")
+ print(f"[results] wall clock : {wall_clock:.3f}s")
+ if time_to_rejection > 0:
+ print(f"[results] rate : {len(provisioned_ids) / time_to_rejection:.1f} provisions/s")
+
+ _wait_stable(args.port, timeout_s=120.0)
+ _deprovision_all(args.port, provisioned_ids, args.workers)
+ dehydration_timeout_s = max(60.0, len(provisioned_ids) * 0.5)
+ _wait_stable(args.port, timeout_s=dehydration_timeout_s)
+
+ finally:
+ if wpr_started:
+ _wpr_stop(trace_file)
+ if hub_proc is not None:
+ _stop_process(hub_proc, "hub")
+ if hub_log_handle is not None:
+ hub_log_handle.close()
+ if minio_proc is not None:
+ _stop_process(minio_proc, "minio")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 0d361801f..416312cae 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -3275,11 +3275,11 @@ MakeSafeAbsolutePathInPlace(std::filesystem::path& Path)
{
if (!Path.empty())
{
- std::filesystem::path AbsolutePath = std::filesystem::absolute(Path).make_preferred();
+ Path = std::filesystem::absolute(Path).make_preferred();
#if ZEN_PLATFORM_WINDOWS
const std::string_view Prefix = "\\\\?\\";
const std::u8string PrefixU8(Prefix.begin(), Prefix.end());
- std::u8string PathString = AbsolutePath.u8string();
+ std::u8string PathString = Path.u8string();
if (!PathString.empty() && !PathString.starts_with(PrefixU8))
{
PathString.insert(0, PrefixU8);
diff --git a/src/zens3-testbed/main.cpp b/src/zens3-testbed/main.cpp
index 4cd6b411f..1543c4d7c 100644
--- a/src/zens3-testbed/main.cpp
+++ b/src/zens3-testbed/main.cpp
@@ -110,7 +110,7 @@ CreateClient(const cxxopts::ParseResult& Args)
if (Args.count("timeout"))
{
- Options.Timeout = std::chrono::milliseconds(Args["timeout"].as<int>() * 1000);
+ Options.HttpSettings.Timeout = std::chrono::milliseconds(Args["timeout"].as<int>() * 1000);
}
return S3Client(Options);
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index bf846d68e..76c7a8f6d 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -182,7 +182,11 @@ Hub::Hub(const Configuration& Config,
, m_ActiveInstances(Config.InstanceLimit)
, m_FreeActiveInstanceIndexes(Config.InstanceLimit)
{
- if (m_Config.HydrationTargetSpecification.empty())
+ if (!m_Config.HydrationTargetSpecification.empty())
+ {
+ m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ }
+ else if (!m_Config.HydrationOptions)
{
std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage");
ZEN_INFO("using file hydration path: '{}'", FileHydrationPath);
@@ -190,7 +194,7 @@ Hub::Hub(const Configuration& Config,
}
else
{
- m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification;
+ m_HydrationOptions = m_Config.HydrationOptions;
}
m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp");
@@ -322,6 +326,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex),
.HydrationTempPath = m_HydrationTempPath,
.HydrationTargetSpecification = m_HydrationTargetSpecification,
+ .HydrationOptions = m_HydrationOptions,
.HttpThreadCount = m_Config.InstanceHttpThreadCount,
.CoreLimit = m_Config.InstanceCoreLimit,
.ConfigPath = m_Config.InstanceConfigPath},
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 9895f7068..ac3e680ae 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -6,6 +6,7 @@
#include "resourcemetrics.h"
#include "storageserverinstance.h"
+#include <zencore/compactbinary.h>
#include <zencore/filesystem.h>
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
@@ -67,6 +68,7 @@ public:
int InstanceCoreLimit = 0; // Automatic
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
+ CbObject HydrationOptions;
WatchDogConfiguration WatchDog;
@@ -181,6 +183,7 @@ private:
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
std::string m_HydrationTargetSpecification;
+ CbObject m_HydrationOptions;
std::filesystem::path m_HydrationTempPath;
#if ZEN_PLATFORM_WINDOWS
diff --git a/src/zenserver/hub/hydration.cpp b/src/zenserver/hub/hydration.cpp
index 541127590..ed16bfe56 100644
--- a/src/zenserver/hub/hydration.cpp
+++ b/src/zenserver/hub/hydration.cpp
@@ -10,6 +10,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/system.h>
+#include <zencore/timer.h>
#include <zenutil/cloud/imdscredentials.h>
#include <zenutil/cloud/s3client.h>
@@ -60,6 +61,7 @@ namespace {
///////////////////////////////////////////////////////////////////////////
constexpr std::string_view FileHydratorPrefix = "file://";
+constexpr std::string_view FileHydratorType = "file";
struct FileHydrator : public HydrationStrategyBase
{
@@ -77,7 +79,21 @@ FileHydrator::Configure(const HydrationConfig& Config)
{
m_Config = Config;
- std::filesystem::path ConfigPath(Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length())));
+ std::filesystem::path ConfigPath;
+ if (!m_Config.TargetSpecification.empty())
+ {
+ ConfigPath = Utf8ToWide(m_Config.TargetSpecification.substr(FileHydratorPrefix.length()));
+ }
+ else
+ {
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Path = Settings["path"].AsString();
+ if (Path.empty())
+ {
+ throw zen::runtime_error("Hydration config 'file' type requires 'settings.path'");
+ }
+ ConfigPath = Utf8ToWide(std::string(Path));
+ }
MakeSafeAbsolutePathInPlace(ConfigPath);
if (!std::filesystem::exists(ConfigPath))
@@ -95,6 +111,8 @@ FileHydrator::Hydrate()
{
ZEN_INFO("Hydrating state from '{}' to '{}'", m_StorageModuleRootDir, m_Config.ServerStateDir);
+ Stopwatch Timer;
+
// Ensure target is clean
ZEN_DEBUG("Wiping server state at '{}'", m_Config.ServerStateDir);
const bool ForceRemoveReadOnlyFiles = true;
@@ -120,6 +138,10 @@ FileHydrator::Hydrate()
ZEN_DEBUG("Cleaning server state '{}'", m_Config.ServerStateDir);
CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
}
+ else
+ {
+ ZEN_INFO("Hydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
}
void
@@ -127,6 +149,8 @@ FileHydrator::Dehydrate()
{
ZEN_INFO("Dehydrating state from '{}' to '{}'", m_Config.ServerStateDir, m_StorageModuleRootDir);
+ Stopwatch Timer;
+
const std::filesystem::path TargetDir = m_StorageModuleRootDir;
// Ensure target is clean. This could be replaced with an atomic copy at a later date
@@ -141,7 +165,23 @@ FileHydrator::Dehydrate()
try
{
ZEN_DEBUG("Copying '{}' to '{}'", m_Config.ServerStateDir, TargetDir);
- CopyTree(m_Config.ServerStateDir, TargetDir, {.EnableClone = true});
+ for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.ServerStateDir))
+ {
+ if (Entry.path().filename() == ".sentry-native")
+ {
+ continue;
+ }
+ std::filesystem::path Dest = TargetDir / Entry.path().filename();
+ if (Entry.is_directory())
+ {
+ CreateDirectories(Dest);
+ CopyTree(Entry.path(), Dest, {.EnableClone = true});
+ }
+ else
+ {
+ CopyFile(Entry.path(), Dest, {.EnableClone = true});
+ }
+ }
}
catch (std::exception& Ex)
{
@@ -159,11 +199,17 @@ FileHydrator::Dehydrate()
ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+
+ if (CopySuccess)
+ {
+ ZEN_INFO("Dehydration complete in {}", NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
}
///////////////////////////////////////////////////////////////////////////
constexpr std::string_view S3HydratorPrefix = "s3://";
+constexpr std::string_view S3HydratorType = "s3";
struct S3Hydrator : public HydrationStrategyBase
{
@@ -182,6 +228,8 @@ private:
std::string m_Region;
SigV4Credentials m_Credentials;
Ref<ImdsCredentialProvider> m_CredentialProvider;
+
+ static constexpr uint64_t MultipartChunkSize = 8 * 1024 * 1024;
};
void
@@ -189,8 +237,23 @@ S3Hydrator::Configure(const HydrationConfig& Config)
{
m_Config = Config;
- std::string_view Spec = m_Config.TargetSpecification;
- Spec.remove_prefix(S3HydratorPrefix.size());
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Spec;
+ if (!m_Config.TargetSpecification.empty())
+ {
+ Spec = m_Config.TargetSpecification;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
+ else
+ {
+ std::string_view Uri = Settings["uri"].AsString();
+ if (Uri.empty())
+ {
+ throw zen::runtime_error("Hydration config 's3' type requires 'settings.uri'");
+ }
+ Spec = Uri;
+ Spec.remove_prefix(S3HydratorPrefix.size());
+ }
size_t SlashPos = Spec.find('/');
std::string UserPrefix = SlashPos != std::string_view::npos ? std::string(Spec.substr(SlashPos + 1)) : std::string{};
@@ -199,7 +262,11 @@ S3Hydrator::Configure(const HydrationConfig& Config)
ZEN_ASSERT(!m_Bucket.empty());
- std::string Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ std::string Region = std::string(Settings["region"].AsString());
+ if (Region.empty())
+ {
+ Region = GetEnvVariable("AWS_DEFAULT_REGION");
+ }
if (Region.empty())
{
Region = GetEnvVariable("AWS_REGION");
@@ -230,10 +297,12 @@ S3Hydrator::CreateS3Client() const
Options.BucketName = m_Bucket;
Options.Region = m_Region;
- if (!m_Config.S3Endpoint.empty())
+ CbObjectView Settings = m_Config.Options["settings"].AsObjectView();
+ std::string_view Endpoint = Settings["endpoint"].AsString();
+ if (!Endpoint.empty())
{
- Options.Endpoint = m_Config.S3Endpoint;
- Options.PathStyle = m_Config.S3PathStyle;
+ Options.Endpoint = std::string(Endpoint);
+ Options.PathStyle = Settings["path-style"].AsBool();
}
if (m_CredentialProvider)
@@ -245,6 +314,8 @@ S3Hydrator::CreateS3Client() const
Options.Credentials = m_Credentials;
}
+ Options.HttpSettings.MaximumInMemoryDownloadSize = 16u * 1024u;
+
return S3Client(Options);
}
@@ -275,11 +346,11 @@ S3Hydrator::Dehydrate()
try
{
- S3Client Client = CreateS3Client();
- std::string FolderName = BuildTimestampFolderName();
- uint64_t TotalBytes = 0;
- uint32_t FileCount = 0;
- std::chrono::steady_clock::time_point UploadStart = std::chrono::steady_clock::now();
+ S3Client Client = CreateS3Client();
+ std::string FolderName = BuildTimestampFolderName();
+ uint64_t TotalBytes = 0;
+ uint32_t FileCount = 0;
+ Stopwatch Timer;
DirectoryContent DirContent;
GetDirectoryContent(m_Config.ServerStateDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive, DirContent);
@@ -295,13 +366,20 @@ S3Hydrator::Dehydrate()
AbsPath.string(),
m_Config.ServerStateDir.string());
}
+ if (*RelPath.begin() == ".sentry-native")
+ {
+ continue;
+ }
std::string Key = MakeObjectKey(FolderName, RelPath);
BasicFile File(AbsPath, BasicFile::Mode::kRead);
uint64_t FileSize = File.FileSize();
- S3Result UploadResult =
- Client.PutObjectMultipart(Key, FileSize, [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); });
+ S3Result UploadResult = Client.PutObjectMultipart(
+ Key,
+ FileSize,
+ [&File](uint64_t Offset, uint64_t Size) { return File.ReadRange(Offset, Size); },
+ MultipartChunkSize);
if (!UploadResult.IsSuccess())
{
throw zen::runtime_error("Failed to upload '{}' to S3: {}", Key, UploadResult.Error);
@@ -312,8 +390,7 @@ S3Hydrator::Dehydrate()
}
// Write current-state.json
- int64_t UploadDurationMs =
- std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - UploadStart).count();
+ uint64_t UploadDurationMs = Timer.GetElapsedTimeMs();
UtcTime Now = UtcTime::Now();
std::string UploadTimeUtc = fmt::format("{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}Z",
@@ -346,7 +423,7 @@ S3Hydrator::Dehydrate()
throw zen::runtime_error("Failed to write current-state.json to '{}': {}", MetaKey, MetaUploadResult.Error);
}
- ZEN_INFO("Dehydration complete: {} files, {} bytes, {} ms", FileCount, TotalBytes, UploadDurationMs);
+ ZEN_INFO("Dehydration complete: {} files, {}, {}", FileCount, NiceBytes(TotalBytes), NiceTimeSpanMs(UploadDurationMs));
}
catch (std::exception& Ex)
{
@@ -361,6 +438,7 @@ S3Hydrator::Hydrate()
{
ZEN_INFO("Hydrating state from s3://{}/{} to '{}'", m_Bucket, m_KeyPrefix, m_Config.ServerStateDir);
+ Stopwatch Timer;
const bool ForceRemoveReadOnlyFiles = true;
// Clean temp dir before starting in case of leftover state from a previous failed hydration
@@ -374,19 +452,17 @@ S3Hydrator::Hydrate()
S3Client Client = CreateS3Client();
std::string MetaKey = m_KeyPrefix + "/current-state.json";
- S3HeadObjectResult HeadResult = Client.HeadObject(MetaKey);
- if (HeadResult.Status == HeadObjectResult::NotFound)
- {
- throw zen::runtime_error("No state found in S3 at '{}'", MetaKey);
- }
- if (!HeadResult.IsSuccess())
- {
- throw zen::runtime_error("Failed to check for state in S3 at '{}': {}", MetaKey, HeadResult.Error);
- }
-
S3GetObjectResult MetaResult = Client.GetObject(MetaKey);
if (!MetaResult.IsSuccess())
{
+ if (MetaResult.Error == S3GetObjectResult::NotFoundErrorText)
+ {
+ ZEN_INFO("No state found in S3 at {}", MetaKey);
+
+ ZEN_DEBUG("Wiping server state '{}'", m_Config.ServerStateDir);
+ CleanDirectory(m_Config.ServerStateDir, ForceRemoveReadOnlyFiles);
+ return;
+ }
throw zen::runtime_error("Failed to read current-state.json from '{}': {}", MetaKey, MetaResult.Error);
}
@@ -426,17 +502,17 @@ S3Hydrator::Hydrate()
std::filesystem::path DestPath = MakeSafeAbsolutePath(m_Config.TempDir / std::filesystem::path(RelKey));
CreateDirectories(DestPath.parent_path());
- BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
- DestFile.SetFileSize(Obj.Size);
-
- if (Obj.Size > 0)
+ if (Obj.Size > MultipartChunkSize)
{
+ BasicFile DestFile(DestPath, BasicFile::Mode::kTruncate);
+ DestFile.SetFileSize(Obj.Size);
+
BasicFileWriter Writer(DestFile, 64 * 1024);
uint64_t Offset = 0;
while (Offset < Obj.Size)
{
- uint64_t ChunkSize = std::min<uint64_t>(8 * 1024 * 1024, Obj.Size - Offset);
+ uint64_t ChunkSize = std::min<uint64_t>(MultipartChunkSize, Obj.Size - Offset);
S3GetObjectResult Chunk = Client.GetObjectRange(Obj.Key, Offset, ChunkSize);
if (!Chunk.IsSuccess())
{
@@ -453,6 +529,34 @@ S3Hydrator::Hydrate()
Writer.Flush();
}
+ else
+ {
+ S3GetObjectResult Chunk = Client.GetObject(Obj.Key, m_Config.TempDir);
+ if (!Chunk.IsSuccess())
+ {
+ throw zen::runtime_error("Failed to download '{}' from S3: {}", Obj.Key, Chunk.Error);
+ }
+
+ if (IoBufferFileReference FileRef; Chunk.Content.GetFileReference(FileRef))
+ {
+ std::error_code Ec;
+ std::filesystem::path ChunkPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (Ec)
+ {
+ WriteFile(DestPath, Chunk.Content);
+ }
+ else
+ {
+ Chunk.Content.SetDeleteOnClose(false);
+ Chunk.Content = {};
+ RenameFile(ChunkPath, DestPath, Ec);
+ }
+ }
+ else
+ {
+ WriteFile(DestPath, Chunk.Content);
+ }
+ }
}
// Downloaded successfully - swap into ServerStateDir
@@ -465,19 +569,20 @@ S3Hydrator::Hydrate()
std::mismatch(m_Config.TempDir.begin(), m_Config.TempDir.end(), m_Config.ServerStateDir.begin(), m_Config.ServerStateDir.end());
if (ItTmp != m_Config.TempDir.begin())
{
- // Fast path: atomic renames - no data copying needed
- for (const std::filesystem::directory_entry& Entry : std::filesystem::directory_iterator(m_Config.TempDir))
+ DirectoryContent DirContent;
+ GetDirectoryContent(m_Config.TempDir, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::IncludeDirs, DirContent);
+
+ for (const std::filesystem::path& AbsPath : DirContent.Directories)
{
- std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / Entry.path().filename());
- if (Entry.is_directory())
- {
- RenameDirectory(Entry.path(), Dest);
- }
- else
- {
- RenameFile(Entry.path(), Dest);
- }
+ std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
+ RenameDirectory(AbsPath, Dest);
+ }
+ for (const std::filesystem::path& AbsPath : DirContent.Files)
+ {
+ std::filesystem::path Dest = MakeSafeAbsolutePath(m_Config.ServerStateDir / AbsPath.filename());
+ RenameFile(AbsPath, Dest);
}
+
ZEN_DEBUG("Cleaning temp dir '{}'", m_Config.TempDir);
CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
}
@@ -491,7 +596,7 @@ S3Hydrator::Hydrate()
CleanDirectory(m_Config.TempDir, ForceRemoveReadOnlyFiles);
}
- ZEN_INFO("Hydration complete from folder '{}'", FolderName);
+ ZEN_INFO("Hydration complete from folder '{}' in {}", FolderName, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
catch (std::exception& Ex)
{
@@ -513,19 +618,41 @@ S3Hydrator::Hydrate()
std::unique_ptr<HydrationStrategyBase>
CreateHydrator(const HydrationConfig& Config)
{
- if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
+ if (!Config.TargetSpecification.empty())
+ {
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, FileHydratorPrefix.length()), FileHydratorPrefix) == 0)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
+ Hydrator->Configure(Config);
+ return Hydrator;
+ }
+ throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
+ }
+
+ std::string_view Type = Config.Options["type"].AsString();
+ if (Type == FileHydratorType)
{
std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<FileHydrator>();
Hydrator->Configure(Config);
return Hydrator;
}
- if (StrCaseCompare(Config.TargetSpecification.substr(0, S3HydratorPrefix.length()), S3HydratorPrefix) == 0)
+ if (Type == S3HydratorType)
{
std::unique_ptr<HydrationStrategyBase> Hydrator = std::make_unique<S3Hydrator>();
Hydrator->Configure(Config);
return Hydrator;
}
- throw std::runtime_error(fmt::format("Unknown hydration strategy: {}", Config.TargetSpecification));
+ if (!Type.empty())
+ {
+ throw zen::runtime_error("Unknown hydration target type '{}'", Type);
+ }
+ throw zen::runtime_error("No hydration target configured");
}
#if ZEN_WITH_TESTS
@@ -607,6 +734,12 @@ namespace {
AddFile("file_a.bin", CreateSemiRandomBlob(1024));
AddFile("subdir/file_b.bin", CreateSemiRandomBlob(2048));
AddFile("subdir/nested/file_c.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_d.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_e.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/file_f.bin", CreateSemiRandomBlob(512));
+ AddFile("subdir/nested/medium.bulk", CreateSemiRandomBlob(256u * 1024u));
+ AddFile("subdir/nested/big.bulk", CreateSemiRandomBlob(512u * 1024u));
+ AddFile("subdir/nested/huge.bulk", CreateSemiRandomBlob(9u * 1024u * 1024u));
return Files;
}
@@ -844,12 +977,16 @@ TEST_CASE("hydration.s3.dehydrate_hydrate")
auto TestFiles = CreateTestTree(ServerStateDir);
HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
// Dehydrate: upload server state to MinIO
{
@@ -902,12 +1039,18 @@ TEST_CASE("hydration.s3.current_state_json_selects_latest_folder")
const std::string ModuleId = "s3test_folder_select";
HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = ModuleId;
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
// v1: dehydrate without a marker file
CreateTestTree(ServerStateDir);
@@ -972,13 +1115,19 @@ TEST_CASE("hydration.s3.module_isolation")
CreateDirectories(TempPath);
ModuleData Data;
- Data.Config.ServerStateDir = StateDir;
- Data.Config.TempDir = TempPath;
- Data.Config.ModuleId = ModuleId;
- Data.Config.TargetSpecification = "s3://zen-hydration-test";
- Data.Config.S3Endpoint = Minio.Endpoint();
- Data.Config.S3PathStyle = true;
- Data.Files = CreateTestTree(StateDir);
+ Data.Config.ServerStateDir = StateDir;
+ Data.Config.TempDir = TempPath;
+ Data.Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Data.Config.Options = std::move(Root).AsObject();
+ }
+ Data.Files = CreateTestTree(StateDir);
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Data.Config);
Hydrator->Dehydrate();
@@ -1015,7 +1164,8 @@ TEST_CASE("hydration.s3.concurrent")
ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
- constexpr int kModuleCount = 4;
+ constexpr int kModuleCount = 16;
+ constexpr int kThreadCount = 4;
ScopedTemporaryDirectory TempDir;
@@ -1034,18 +1184,24 @@ TEST_CASE("hydration.s3.concurrent")
CreateDirectories(StateDir);
CreateDirectories(TempPath);
- Modules[I].Config.ServerStateDir = StateDir;
- Modules[I].Config.TempDir = TempPath;
- Modules[I].Config.ModuleId = ModuleId;
- Modules[I].Config.TargetSpecification = "s3://zen-hydration-test";
- Modules[I].Config.S3Endpoint = Minio.Endpoint();
- Modules[I].Config.S3PathStyle = true;
- Modules[I].Files = CreateTestTree(StateDir);
+ Modules[I].Config.ServerStateDir = StateDir;
+ Modules[I].Config.TempDir = TempPath;
+ Modules[I].Config.ModuleId = ModuleId;
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Modules[I].Config.Options = std::move(Root).AsObject();
+ }
+ Modules[I].Files = CreateTestTree(StateDir);
}
// Concurrent dehydrate
{
- WorkerThreadPool Pool(kModuleCount, "hydration_s3_dehy");
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_dehy");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
@@ -1063,7 +1219,7 @@ TEST_CASE("hydration.s3.concurrent")
// Concurrent hydrate
{
- WorkerThreadPool Pool(kModuleCount, "hydration_s3_hy");
+ WorkerThreadPool Pool(kThreadCount, "hydration_s3_hy");
std::atomic<bool> AbortFlag{false};
std::atomic<bool> PauseFlag{false};
ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
@@ -1116,12 +1272,18 @@ TEST_CASE("hydration.s3.no_prior_state")
WriteFile(ServerStateDir / "stale.bin", CreateSemiRandomBlob(256));
HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_no_prior";
- Config.TargetSpecification = "s3://zen-hydration-test";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_no_prior";
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
Hydrator->Hydrate();
@@ -1159,12 +1321,71 @@ TEST_CASE("hydration.s3.path_prefix")
std::vector<std::pair<std::filesystem::path, IoBuffer>> TestFiles = CreateTestTree(ServerStateDir);
HydrationConfig Config;
- Config.ServerStateDir = ServerStateDir;
- Config.TempDir = HydrationTemp;
- Config.ModuleId = "s3test_prefix";
- Config.TargetSpecification = "s3://zen-hydration-test/team/project";
- Config.S3Endpoint = Minio.Endpoint();
- Config.S3PathStyle = true;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_prefix";
+ {
+ std::string ConfigJson =
+ fmt::format(R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test/team/project","endpoint":"{}","path-style":true}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Dehydrate();
+ }
+
+ CleanDirectory(ServerStateDir, true);
+
+ {
+ std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
+ Hydrator->Hydrate();
+ }
+
+ VerifyTree(ServerStateDir, TestFiles);
+}
+
+TEST_CASE("hydration.s3.options_region_override")
+{
+ // Verify that 'region' in Options["settings"] takes precedence over AWS_DEFAULT_REGION env var.
+ // AWS_DEFAULT_REGION is set to a bogus value; hydration must succeed using the region from Options.
+
+ MinioProcessOptions MinioOpts;
+ MinioOpts.Port = 19016;
+ MinioProcess Minio(MinioOpts);
+ Minio.SpawnMinioServer();
+ Minio.CreateBucket("zen-hydration-test");
+
+ ScopedEnvVar EnvAccessKey("AWS_ACCESS_KEY_ID", Minio.RootUser());
+ ScopedEnvVar EnvSecretKey("AWS_SECRET_ACCESS_KEY", Minio.RootPassword());
+ ScopedEnvVar EnvRegion("AWS_DEFAULT_REGION", "wrong-region");
+
+ ScopedTemporaryDirectory TempDir;
+
+ std::filesystem::path ServerStateDir = TempDir.Path() / "server_state";
+ std::filesystem::path HydrationTemp = TempDir.Path() / "hydration_temp";
+ CreateDirectories(ServerStateDir);
+ CreateDirectories(HydrationTemp);
+
+ auto TestFiles = CreateTestTree(ServerStateDir);
+
+ HydrationConfig Config;
+ Config.ServerStateDir = ServerStateDir;
+ Config.TempDir = HydrationTemp;
+ Config.ModuleId = "s3test_region_override";
+ {
+ std::string ConfigJson = fmt::format(
+ R"({{"type":"s3","settings":{{"uri":"s3://zen-hydration-test","endpoint":"{}","path-style":true,"region":"us-east-1"}}}})",
+ Minio.Endpoint());
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(ConfigJson, ParseError);
+ ZEN_ASSERT(ParseError.empty() && Root.IsObject());
+ Config.Options = std::move(Root).AsObject();
+ }
{
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
diff --git a/src/zenserver/hub/hydration.h b/src/zenserver/hub/hydration.h
index d29ffe5c0..19a96c248 100644
--- a/src/zenserver/hub/hydration.h
+++ b/src/zenserver/hub/hydration.h
@@ -2,6 +2,8 @@
#pragma once
+#include <zencore/compactbinary.h>
+
#include <filesystem>
namespace zen {
@@ -16,12 +18,8 @@ struct HydrationConfig
std::string ModuleId;
// Back-end specific target specification (e.g. S3 bucket, file path, etc)
std::string TargetSpecification;
-
- // Optional S3 endpoint override (e.g. "http://localhost:9000" for MinIO).
- std::string S3Endpoint;
- // Use path-style S3 URLs (endpoint/bucket/key) instead of virtual-hosted-style
- // (bucket.endpoint/key). Required for MinIO and other non-AWS endpoints.
- bool S3PathStyle = false;
+ // Full config object when using --hub-hydration-target-config (mutually exclusive with TargetSpecification)
+ CbObject Options;
};
/**
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 802606f6a..0c9354990 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -157,7 +157,8 @@ StorageServerInstance::Hydrate()
HydrationConfig Config{.ServerStateDir = m_BaseDir,
.TempDir = m_TempDir,
.ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification};
+ .TargetSpecification = m_Config.HydrationTargetSpecification,
+ .Options = m_Config.HydrationOptions};
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
@@ -170,7 +171,8 @@ StorageServerInstance::Dehydrate()
HydrationConfig Config{.ServerStateDir = m_BaseDir,
.TempDir = m_TempDir,
.ModuleId = m_ModuleId,
- .TargetSpecification = m_Config.HydrationTargetSpecification};
+ .TargetSpecification = m_Config.HydrationTargetSpecification,
+ .Options = m_Config.HydrationOptions};
std::unique_ptr<HydrationStrategyBase> Hydrator = CreateHydrator(Config);
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 33646c375..1b0078d87 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zencore/compactbinary.h>
#include <zenutil/zenserverprocess.h>
#include <atomic>
@@ -24,6 +25,7 @@ public:
uint16_t BasePort;
std::filesystem::path HydrationTempPath;
std::string HydrationTargetSpecification;
+ CbObject HydrationOptions;
uint32_t HttpThreadCount = 0; // Automatic
int CoreLimit = 0; // Automatic
std::filesystem::path ConfigPath;
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 2d0d5398b..499586abc 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -6,8 +6,10 @@
#include "httphubservice.h"
#include "hub.h"
+#include <zencore/compactbinary.h>
#include <zencore/config.h>
#include <zencore/except.h>
+#include <zencore/except_fmt.h>
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/memory/llm.h>
@@ -142,6 +144,14 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HydrationTargetSpecification),
"<hydration-target-spec>");
+ Options.add_option("hub",
+ "",
+ "hub-hydration-target-config",
+ "Path to JSON file specifying the hydration target (mutually exclusive with "
+ "--hub-hydration-target-spec). Supported types: 'file', 's3'.",
+ cxxopts::value(m_ServerOptions.HydrationTargetConfigPath),
+ "<path>");
+
#if ZEN_PLATFORM_WINDOWS
Options.add_option("hub",
"",
@@ -269,6 +279,16 @@ ZenHubServerConfigurator::ValidateOptions()
m_ServerOptions.HubProvisionMemoryLimitPercent),
{});
}
+ if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty())
+ {
+ throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {});
+ }
+ if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath))
+ {
+ throw OptionParseException(
+ fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()),
+ {});
+ }
}
///////////////////////////////////////////////////////////////////////////
@@ -479,6 +499,29 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
},
.ResourceLimits = ResolveLimits(ServerConfig)};
+ if (!ServerConfig.HydrationTargetConfigPath.empty())
+ {
+ FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath);
+ if (!Contents)
+ {
+ throw zen::runtime_error("Failed to read hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ Contents.ErrorCode.message());
+ }
+ IoBuffer Buffer(Contents.Flatten());
+ std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize());
+
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError);
+ if (!ParseError.empty() || !Root.IsObject())
+ {
+ throw zen::runtime_error("Failed to parse hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ ParseError.empty() ? "root must be a JSON object" : ParseError);
+ }
+ HubConfig.HydrationOptions = std::move(Root).AsObject();
+ }
+
m_Hub = std::make_unique<Hub>(
std::move(HubConfig),
ZenServerEnvironment(ZenServerEnvironment::Hub,
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index 9660e9a49..b976c52b3 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -49,6 +49,7 @@ struct ZenHubServerConfig : public ZenServerConfig
int HubInstanceCoreLimit = 0; // Automatic
std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ std::filesystem::path HydrationTargetConfigPath; // path to JSON config file (mutually exclusive with HydrationTargetSpecification)
ZenHubWatchdogConfig WatchdogConfig;
uint64_t HubProvisionDiskLimitBytes = 0;
uint32_t HubProvisionDiskLimitPercent = 0;
diff --git a/src/zenutil/cloud/s3client.cpp b/src/zenutil/cloud/s3client.cpp
index c3404699d..d9fde05d9 100644
--- a/src/zenutil/cloud/s3client.cpp
+++ b/src/zenutil/cloud/s3client.cpp
@@ -137,6 +137,8 @@ namespace {
} // namespace
+std::string_view S3GetObjectResult::NotFoundErrorText = "Not found";
+
S3Client::S3Client(const S3ClientOptions& Options)
: m_Log(logging::Get("s3"))
, m_BucketName(Options.BucketName)
@@ -145,13 +147,7 @@ S3Client::S3Client(const S3ClientOptions& Options)
, m_PathStyle(Options.PathStyle)
, m_Credentials(Options.Credentials)
, m_CredentialProvider(Options.CredentialProvider)
-, m_HttpClient(BuildEndpoint(),
- HttpClientSettings{
- .LogCategory = "s3",
- .ConnectTimeout = Options.ConnectTimeout,
- .Timeout = Options.Timeout,
- .RetryCount = Options.RetryCount,
- })
+, m_HttpClient(BuildEndpoint(), Options.HttpSettings)
{
m_Host = BuildHostHeader();
ZEN_INFO("S3 client configured for bucket '{}' in region '{}' (endpoint: {}, {})",
@@ -347,15 +343,20 @@ S3Client::PutObject(std::string_view Key, IoBuffer Content)
}
S3GetObjectResult
-S3Client::GetObject(std::string_view Key)
+S3Client::GetObject(std::string_view Key, const std::filesystem::path& TempFilePath)
{
std::string Path = KeyToPath(Key);
HttpClient::KeyValueMap Headers = SignRequest("GET", Path, "", EmptyPayloadHash);
- HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
+ HttpClient::Response Response = m_HttpClient.Download(Path, TempFilePath, Headers);
if (!Response.IsSuccess())
{
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
std::string Err = Response.ErrorMessage("S3 GET failed");
ZEN_WARN("S3 GET '{}' failed: {}", Key, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
@@ -377,6 +378,11 @@ S3Client::GetObjectRange(std::string_view Key, uint64_t RangeStart, uint64_t Ran
HttpClient::Response Response = m_HttpClient.Get(Path, Headers);
if (!Response.IsSuccess())
{
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ return S3GetObjectResult{S3Result{.Error = std::string(S3GetObjectResult::NotFoundErrorText)}, {}};
+ }
+
std::string Err = Response.ErrorMessage("S3 GET range failed");
ZEN_WARN("S3 GET range '{}' [{}-{}] failed: {}", Key, RangeStart, RangeStart + RangeSize - 1, Err);
return S3GetObjectResult{S3Result{std::move(Err)}, {}};
@@ -749,7 +755,7 @@ S3Client::PutObjectMultipart(std::string_view Key,
return PutObject(Key, TotalSize > 0 ? FetchRange(0, TotalSize) : IoBuffer{});
}
- ZEN_INFO("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
+ ZEN_DEBUG("S3 multipart upload '{}': {} bytes in ~{} parts", Key, TotalSize, (TotalSize + PartSize - 1) / PartSize);
S3CreateMultipartUploadResult InitResult = CreateMultipartUpload(Key);
if (!InitResult)
@@ -797,7 +803,7 @@ S3Client::PutObjectMultipart(std::string_view Key,
throw;
}
- ZEN_INFO("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
+ ZEN_DEBUG("S3 multipart upload '{}' completed ({} parts, {} bytes)", Key, PartETags.size(), TotalSize);
return {};
}
diff --git a/src/zenutil/include/zenutil/cloud/s3client.h b/src/zenutil/include/zenutil/cloud/s3client.h
index bd30aa8a2..f1f0df0e4 100644
--- a/src/zenutil/include/zenutil/cloud/s3client.h
+++ b/src/zenutil/include/zenutil/cloud/s3client.h
@@ -35,9 +35,7 @@ struct S3ClientOptions
/// Overrides the static Credentials field.
Ref<ImdsCredentialProvider> CredentialProvider;
- std::chrono::milliseconds ConnectTimeout{5000};
- std::chrono::milliseconds Timeout{};
- uint8_t RetryCount = 3;
+ HttpClientSettings HttpSettings = {.LogCategory = "s3", .ConnectTimeout = std::chrono::milliseconds(5000), .RetryCount = 3};
};
struct S3ObjectInfo
@@ -70,6 +68,8 @@ struct S3GetObjectResult : S3Result
IoBuffer Content;
std::string_view AsText() const { return std::string_view(reinterpret_cast<const char*>(Content.GetData()), Content.GetSize()); }
+
+ static std::string_view NotFoundErrorText;
};
/// Result of HeadObject - carries object metadata and existence status.
@@ -119,7 +119,7 @@ public:
S3Result PutObject(std::string_view Key, IoBuffer Content);
/// Download an object from S3
- S3GetObjectResult GetObject(std::string_view Key);
+ S3GetObjectResult GetObject(std::string_view Key, const std::filesystem::path& TempFilePath = {});
/// Download a byte range of an object from S3
/// @param RangeStart First byte offset (inclusive)