aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-09 13:08:00 +0100
committerGitHub Enterprise <[email protected]>2026-03-09 13:08:00 +0100
commitf9d8cbcb3573b47b639b7bd73d3a4eed17653d71 (patch)
treedd295b2e929f050f292a6415fd0330da24b683a4
parentadded auto-detection logic for console colour output (#817) (diff)
downloadzen-f9d8cbcb3573b47b639b7bd73d3a4eed17653d71.tar.xz
zen-f9d8cbcb3573b47b639b7bd73d3a4eed17653d71.zip
add fallback for zencache multirange (#816)
* clean up BuildStorageResolveResult to allow capabilities * add check for multirange request capability * add MaxRangeCountPerRequest capabilities * project export tests * add InMemoryBuildStorageCache * progress and logging improvements * fix ElapsedSeconds calculations in fileremoteprojectstore.cpp * oplogs/builds test script
-rw-r--r--scripts/test_scripts/builds-download-upload-test.py196
-rw-r--r--scripts/test_scripts/metadatas/AndroidClient.json9
-rw-r--r--scripts/test_scripts/metadatas/IOSClient.json9
-rw-r--r--scripts/test_scripts/metadatas/LinuxServer.json9
-rw-r--r--scripts/test_scripts/metadatas/PS4Client.json9
-rw-r--r--scripts/test_scripts/metadatas/Switch2Client.json9
-rw-r--r--scripts/test_scripts/metadatas/SwitchClient.json9
-rw-r--r--scripts/test_scripts/metadatas/WindowsClient.json9
-rw-r--r--scripts/test_scripts/metadatas/XB1Client.json9
-rw-r--r--scripts/test_scripts/oplog-import-export-test.py228
-rw-r--r--src/zen/cmds/builds_cmd.cpp139
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp26
-rw-r--r--src/zenhttp/include/zenhttp/formatters.h2
-rw-r--r--src/zenremotestore/builds/buildstoragecache.cpp209
-rw-r--r--src/zenremotestore/builds/buildstorageoperations.cpp391
-rw-r--r--src/zenremotestore/builds/buildstorageutil.cpp97
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorage.h2
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h10
-rw-r--r--src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h36
-rw-r--r--src/zenremotestore/include/zenremotestore/chunking/chunkblock.h1
-rw-r--r--src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h3
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h22
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h68
-rw-r--r--src/zenremotestore/jupiter/jupiterhost.cpp11
-rw-r--r--src/zenremotestore/projectstore/buildsremoteprojectstore.cpp314
-rw-r--r--src/zenremotestore/projectstore/fileremoteprojectstore.cpp301
-rw-r--r--src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp81
-rw-r--r--src/zenremotestore/projectstore/projectstoreoperations.cpp14
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp1694
-rw-r--r--src/zenremotestore/projectstore/zenremoteprojectstore.cpp164
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.cpp13
-rw-r--r--src/zenserver/storage/buildstore/httpbuildstore.h2
-rw-r--r--src/zenserver/storage/projectstore/httpprojectstore.cpp263
33 files changed, 2922 insertions, 1437 deletions
diff --git a/scripts/test_scripts/builds-download-upload-test.py b/scripts/test_scripts/builds-download-upload-test.py
new file mode 100644
index 000000000..f03528d98
--- /dev/null
+++ b/scripts/test_scripts/builds-download-upload-test.py
@@ -0,0 +1,196 @@
+#!/usr/bin/env python3
+"""Test script for builds download/upload operations."""
+
+from __future__ import annotations
+
+import argparse
+import platform
+import subprocess
+import sys
+from pathlib import Path
+from typing import NamedTuple
+
+_PLATFORM = "windows" if sys.platform == "win32" else "macosx" if sys.platform == "darwin" else "linux"
+_ARCH = "x64" if sys.platform == "win32" else platform.machine().lower()
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+
+class Build(NamedTuple):
+ name: str
+ bucket: str
+ id: str
+
+
+BUILDS = [
+ Build("XB1Client", "fortnitegame.staged-build.fortnite-main.xb1-client", "09a7616c1a388dfe6056aa57"),
+ Build("WindowsClient", "fortnitegame.staged-build.fortnite-main.windows-client", "09a762c81e2cf213142d0ce5"),
+ Build("SwitchClient", "fortnitegame.staged-build.fortnite-main.switch-client", "09a75bf9c3ce75bce09f644f"),
+ Build("LinuxServer", "fortnitegame.staged-build.fortnite-main.linux-server", "09a750ac155eb3e3b62e87e0"),
+ Build("Switch2Client", "fortnitegame.staged-build.fortnite-main.switch2-client", "09a78f3df07b289691ec5710"),
+ Build("PS4Client", "fortnitegame.staged-build.fortnite-main.ps4-client", "09a76ea92ad301d4724fafad"),
+ Build("IOSClient", "fortnitegame.staged-build.fortnite-main.ios-client", "09a7816fa26c23362fef0c5d"),
+ Build("AndroidClient", "fortnitegame.staged-build.fortnite-main.android-client", "09a76725f1620d62c6be06e4"),
+]
+
+ZEN_EXE: Path = Path(f"./build/{_PLATFORM}/{_ARCH}/release/zen{_EXE_SUFFIX}")
+ZEN_METADATA_DIR: Path = Path(__file__).resolve().parent / "metadatas"
+
+ZEN_PORT = 8558
+ZEN_CACHE_PORT = 8559
+ZEN_CACHE = f"http://127.0.0.1:{ZEN_CACHE_PORT}"
+ZEN_PARTIAL_REQUEST_MODE = "true"
+
+SERVER_ARGS: tuple[str, ...] = (
+ "--http", "asio",
+ "--gc-cache-duration-seconds", "1209600",
+ "--gc-interval-seconds", "21600",
+ "--gc-low-diskspace-threshold", "2147483648",
+ "--cache-bucket-limit-overwrites",
+)
+
+
+def run(cmd: list[str | Path]) -> None:
+ try:
+ subprocess.run(cmd, check=True)
+ except FileNotFoundError:
+ sys.exit(f"error: executable not found: {cmd[0]}")
+ except subprocess.CalledProcessError as e:
+ sys.exit(f"error: command failed with exit code {e.returncode}:\n {' '.join(str(x) for x in e.cmd)}")
+
+
+def stop_server(label: str, port: int) -> None:
+ """Stop a zen server. Tolerates failures so it is safe to call from finally blocks."""
+ print(f"--------- stopping {label}")
+ try:
+ subprocess.run([ZEN_EXE, "down", "--port", str(port)])
+ except OSError as e:
+ print(f"warning: could not stop {label}: {e}", file=sys.stderr)
+ print()
+
+
+def start_server(label: str, data_dir: Path, port: int, extra_args: list[str] | None = None) -> None:
+ print(f"--------- starting {label} {data_dir}")
+ run([
+ ZEN_EXE, "up", "--port", str(port), "--show-console", "--",
+ f"--data-dir={data_dir}",
+ *SERVER_ARGS,
+ *(extra_args or []),
+ ])
+ print()
+
+
+def wipe_or_create(label: str, path: Path) -> None:
+ if path.exists():
+ print(f"--------- cleaning {label} {path}")
+ run([ZEN_EXE, "wipe", "-y", path])
+ else:
+ print(f"--------- creating {label} {path}")
+ path.mkdir(parents=True, exist_ok=True)
+ print()
+
+
+def check_prerequisites() -> None:
+ if not ZEN_EXE.is_file():
+ sys.exit(f"error: zen executable not found: {ZEN_EXE}")
+ if not ZEN_METADATA_DIR.is_dir():
+ sys.exit(f"error: metadata directory not found: {ZEN_METADATA_DIR}")
+ for build in BUILDS:
+ metadata = ZEN_METADATA_DIR / f"{build.name}.json"
+ if not metadata.is_file():
+ sys.exit(f"error: metadata file not found: {metadata}")
+
+
+def main() -> None:
+ global ZEN_EXE
+
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "positional_path",
+ nargs="?",
+ default=None,
+ type=Path,
+ metavar="DATA_PATH",
+ help="root path for all data directories (positional shorthand for --data-path)",
+ )
+ parser.add_argument(
+ "zen_exe_positional",
+ nargs="?",
+ default=None,
+ type=Path,
+ metavar="ZEN_EXE_PATH",
+ help="path to zen executable (positional shorthand for --zen-exe-path)",
+ )
+ parser.add_argument(
+ "--data-path",
+ default=Path(Path(__file__).stem + "_datadir"),
+ type=Path,
+ metavar="PATH",
+ help=f"root path for all data directories (default: {Path(__file__).stem}_datadir)",
+ )
+ parser.add_argument(
+ "--zen-exe-path",
+ default=ZEN_EXE,
+ type=Path,
+ metavar="PATH",
+ help=f"path to zen executable (default: {ZEN_EXE})",
+ )
+ args = parser.parse_args()
+
+ data_path = args.positional_path
+ if data_path is None:
+ data_path = args.data_path
+
+ ZEN_EXE = args.zen_exe_positional
+ if ZEN_EXE is None:
+ ZEN_EXE = args.zen_exe_path
+ zen_system_dir = data_path / "system"
+ zen_download_dir = data_path / "Download"
+ zen_cache_data_dir = data_path / "ZenBuildsCache"
+ zen_upload_dir = data_path / "Upload"
+ zen_chunk_cache_path = data_path / "ChunkCache"
+
+ check_prerequisites()
+
+ start_server("cache zenserver", zen_cache_data_dir, ZEN_CACHE_PORT, ["--buildstore-enabled"])
+ try:
+ wipe_or_create("download folder", zen_download_dir)
+ wipe_or_create("system folder", zen_system_dir)
+
+ for build in BUILDS:
+ print(f"--------- importing {build.name} build")
+ run([
+ ZEN_EXE, "builds", "download",
+ "--host", "https://jupiter.devtools.epicgames.com",
+ "--namespace", "fortnite.oplog",
+ "--bucket", build.bucket,
+ "--build-id", build.id,
+ "--local-path", zen_download_dir / build.name,
+ f"--zen-cache-host={ZEN_CACHE}",
+ f"--allow-partial-block-requests={ZEN_PARTIAL_REQUEST_MODE}",
+ "--verify",
+ "--system-dir", zen_system_dir,
+ ])
+ print()
+
+ wipe_or_create("upload folder", zen_upload_dir)
+
+ for build in BUILDS:
+ print(f"--------- exporting {build.name} build")
+ run([
+ ZEN_EXE, "builds", "upload",
+ "--storage-path", zen_upload_dir,
+ "--build-id", build.id,
+ "--local-path", zen_download_dir / build.name,
+ "--verify",
+ "--system-dir", zen_system_dir,
+ "--metadata-path", str(ZEN_METADATA_DIR / f"{build.name}.json"),
+ "--create-build",
+ "--chunking-cache-path", zen_chunk_cache_path,
+ ])
+ print()
+ finally:
+ stop_server("cache zenserver", ZEN_CACHE_PORT)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/test_scripts/metadatas/AndroidClient.json b/scripts/test_scripts/metadatas/AndroidClient.json
new file mode 100644
index 000000000..378d0454d
--- /dev/null
+++ b/scripts/test_scripts/metadatas/AndroidClient.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 AndroidClient",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "Android",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/IOSClient.json b/scripts/test_scripts/metadatas/IOSClient.json
new file mode 100644
index 000000000..fb0f9a342
--- /dev/null
+++ b/scripts/test_scripts/metadatas/IOSClient.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 IOSClient",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "IOS",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/LinuxServer.json b/scripts/test_scripts/metadatas/LinuxServer.json
new file mode 100644
index 000000000..02ae2d970
--- /dev/null
+++ b/scripts/test_scripts/metadatas/LinuxServer.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 LinuxServer",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "Linux",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/PS4Client.json b/scripts/test_scripts/metadatas/PS4Client.json
new file mode 100644
index 000000000..6e49e3e5e
--- /dev/null
+++ b/scripts/test_scripts/metadatas/PS4Client.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 PS4Client",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "PS4",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/Switch2Client.json b/scripts/test_scripts/metadatas/Switch2Client.json
new file mode 100644
index 000000000..41732e7bc
--- /dev/null
+++ b/scripts/test_scripts/metadatas/Switch2Client.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 Switch2Client",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "Switch2",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/SwitchClient.json b/scripts/test_scripts/metadatas/SwitchClient.json
new file mode 100644
index 000000000..49362f23e
--- /dev/null
+++ b/scripts/test_scripts/metadatas/SwitchClient.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 SwitchClient",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "Switch",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/WindowsClient.json b/scripts/test_scripts/metadatas/WindowsClient.json
new file mode 100644
index 000000000..c7af270c2
--- /dev/null
+++ b/scripts/test_scripts/metadatas/WindowsClient.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 Windows Client",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "Windows",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/metadatas/XB1Client.json b/scripts/test_scripts/metadatas/XB1Client.json
new file mode 100644
index 000000000..36fb45801
--- /dev/null
+++ b/scripts/test_scripts/metadatas/XB1Client.json
@@ -0,0 +1,9 @@
+{
+ "name": "++Fortnite+Main-CL-50966326 XB1Client",
+ "branch": "ZenBuildTest2",
+ "baselineBranch": "ZenBuildTest2",
+ "platform": "XB1",
+ "project": "Fortnite",
+ "changelist": 50966326,
+ "buildType": "staged-build"
+}
diff --git a/scripts/test_scripts/oplog-import-export-test.py b/scripts/test_scripts/oplog-import-export-test.py
new file mode 100644
index 000000000..51593d5a9
--- /dev/null
+++ b/scripts/test_scripts/oplog-import-export-test.py
@@ -0,0 +1,228 @@
+#!/usr/bin/env python3
+"""Test script for oplog import/export operations."""
+
+from __future__ import annotations
+
+import argparse
+import platform
+import subprocess
+import sys
+from pathlib import Path
+from typing import NamedTuple
+
+_PLATFORM = "windows" if sys.platform == "win32" else "macosx" if sys.platform == "darwin" else "linux"
+_ARCH = "x64" if sys.platform == "win32" else platform.machine().lower()
+_EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""
+
+
+class Build(NamedTuple):
+ name: str
+ bucket: str
+ id: str
+
+
+BUILDS = [
+ Build("XB1Client", "fortnitegame.oplog.fortnite-main.xb1client", "09a75f7f3b7517653dcdaaa4"),
+ Build("WindowsClient", "fortnitegame.oplog.fortnite-main.windowsclient", "09a75d977ef944ecfd0eddfd"),
+ Build("SwitchClient", "fortnitegame.oplog.fortnite-main.switchclient", "09a74d03b3598ec94cfd2644"),
+ Build("XSXClient", "fortnitegame.oplog.fortnite-main.xsxclient", "09a76c2bbd6cd78f4d40d9ea"),
+ Build("Switch2Client", "fortnitegame.oplog.fortnite-main.switch2client", "09a7686b3d9faa78fb24a38f"),
+ Build("PS4Client", "fortnitegame.oplog.fortnite-main.ps4client", "09a75b72d1c260ed26020140"),
+ Build("LinuxServer", "fortnitegame.oplog.fortnite-main.linuxserver", "09a747f5e0ee83a04be013e6"),
+ Build("IOSClient", "fortnitegame.oplog.fortnite-main.iosclient", "09a75f677e883325a209148c"),
+ Build("Android_ASTCClient", "fortnitegame.oplog.fortnite-main.android_astcclient", "09a7422c08c6f37becc7d37f"),
+]
+
+ZEN_EXE: Path = Path(f"./build/{_PLATFORM}/{_ARCH}/release/zen{_EXE_SUFFIX}")
+
+ZEN_PORT = 8558
+ZEN_CACHE_PORT = 8559
+ZEN_CACHE = f"http://127.0.0.1:{ZEN_CACHE_PORT}"
+ZEN_CACHE_POPULATE = "true"
+ZEN_PARTIAL_REQUEST_MODE = "true"
+
+SERVER_ARGS: tuple[str, ...] = (
+ "--http", "asio",
+ "--gc-cache-duration-seconds", "1209600",
+ "--gc-interval-seconds", "21600",
+ "--gc-low-diskspace-threshold", "2147483648",
+ "--cache-bucket-limit-overwrites",
+)
+
+
+def run(cmd: list[str | Path]) -> None:
+ try:
+ subprocess.run(cmd, check=True)
+ except FileNotFoundError:
+ sys.exit(f"error: executable not found: {cmd[0]}")
+ except subprocess.CalledProcessError as e:
+ sys.exit(f"error: command failed with exit code {e.returncode}:\n {' '.join(str(x) for x in e.cmd)}")
+
+
+def stop_server(label: str, port: int) -> None:
+ """Stop a zen server. Tolerates failures so it is safe to call from finally blocks."""
+ print(f"--------- stopping {label}")
+ try:
+ subprocess.run([ZEN_EXE, "down", "--port", str(port)])
+ except OSError as e:
+ print(f"warning: could not stop {label}: {e}", file=sys.stderr)
+ print()
+
+
+def start_server(label: str, data_dir: Path, port: int, extra_args: list[str] | None = None) -> None:
+ print(f"--------- starting {label} {data_dir}")
+ run([
+ ZEN_EXE, "up", "--port", str(port), "--show-console", "--",
+ f"--data-dir={data_dir}",
+ *SERVER_ARGS,
+ *(extra_args or []),
+ ])
+ print()
+
+
+def wipe_or_create(label: str, path: Path) -> None:
+ if path.exists():
+ print(f"--------- cleaning {label} {path}")
+ run([ZEN_EXE, "wipe", "-y", path])
+ else:
+ print(f"--------- creating {label} {path}")
+ path.mkdir(parents=True, exist_ok=True)
+ print()
+
+
+def check_prerequisites() -> None:
+ if not ZEN_EXE.is_file():
+ sys.exit(f"error: zen executable not found: {ZEN_EXE}")
+
+
+def setup_project(port: int) -> None:
+ """Create the FortniteGame project and all oplogs on the server at the given port."""
+ print("--------- creating FortniteGame project")
+ run([ZEN_EXE, "project-create", f"--hosturl=127.0.0.1:{port}", "FortniteGame", "--force-update"])
+ print()
+
+ for build in BUILDS:
+ print(f"--------- creating {build.name} oplog")
+ run([ZEN_EXE, "oplog-create", f"--hosturl=127.0.0.1:{port}", "FortniteGame", build.name, "--force-update"])
+ print()
+
+
+def main() -> None:
+ global ZEN_EXE
+
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "positional_path",
+ nargs="?",
+ default=None,
+ type=Path,
+ metavar="DATA_PATH",
+ help="root path for all data directories (positional shorthand for --data-path)",
+ )
+ parser.add_argument(
+ "zen_exe_positional",
+ nargs="?",
+ default=None,
+ type=Path,
+ metavar="ZEN_EXE_PATH",
+ help="path to zen executable (positional shorthand for --zen-exe-path)",
+ )
+ parser.add_argument(
+ "--data-path",
+ default=Path(Path(__file__).stem + "_datadir"),
+ type=Path,
+ metavar="PATH",
+ help=f"root path for all data directories (default: {Path(__file__).stem}_datadir)",
+ )
+ parser.add_argument(
+ "--zen-exe-path",
+ default=ZEN_EXE,
+ type=Path,
+ metavar="PATH",
+ help=f"path to zen executable (default: {ZEN_EXE})",
+ )
+ args = parser.parse_args()
+
+ data_path = args.positional_path
+ if data_path is None:
+ data_path = args.data_path
+
+ ZEN_EXE = args.zen_exe_positional
+ if ZEN_EXE is None:
+ ZEN_EXE = args.zen_exe_path
+ zen_data_dir = data_path / "DDC" / "OplogsZen"
+ zen_cache_data_dir = data_path / "DDC" / "ZenBuildsCache"
+ zen_import_data_dir = data_path / "DDC" / "OplogsZenImport"
+ export_dir = data_path / "Export" / "FortniteGame"
+
+ check_prerequisites()
+
+ start_server("cache zenserver", zen_cache_data_dir, ZEN_CACHE_PORT, ["--buildstore-enabled"])
+ try:
+ wipe_or_create("zenserver data", zen_data_dir)
+ start_server("zenserver", zen_data_dir, ZEN_PORT)
+ try:
+ setup_project(ZEN_PORT)
+
+ for build in BUILDS:
+ print(f"--------- importing {build.name} oplog")
+ run([
+ ZEN_EXE, "oplog-import",
+ f"--hosturl=127.0.0.1:{ZEN_PORT}",
+ "FortniteGame", build.name,
+ "--clean",
+ "--builds", "https://jupiter.devtools.epicgames.com",
+ "--namespace", "fortnite.oplog",
+ "--bucket", build.bucket,
+ "--builds-id", build.id,
+ f"--zen-cache-host={ZEN_CACHE}",
+ f"--zen-cache-upload={ZEN_CACHE_POPULATE}",
+ f"--allow-partial-block-requests={ZEN_PARTIAL_REQUEST_MODE}",
+ ])
+ print()
+
+ print(f"--------- validating {build.name} oplog")
+ run([ZEN_EXE, "oplog-validate", f"--hosturl=127.0.0.1:{ZEN_PORT}", "FortniteGame", build.name])
+ print()
+
+ wipe_or_create("export folder", export_dir)
+
+ for build in BUILDS:
+ print(f"--------- exporting {build.name} oplog")
+ run([
+ ZEN_EXE, "oplog-export",
+ f"--hosturl=127.0.0.1:{ZEN_PORT}",
+ "FortniteGame", build.name,
+ "--file", export_dir,
+ "--forcetempblocks",
+ ])
+ print()
+ finally:
+ stop_server("zenserver", ZEN_PORT)
+
+ wipe_or_create("alternate zenserver data", zen_import_data_dir)
+ start_server("import zenserver", zen_import_data_dir, ZEN_PORT)
+ try:
+ setup_project(ZEN_PORT)
+
+ for build in BUILDS:
+ print(f"--------- importing {build.name} oplog")
+ run([
+ ZEN_EXE, "oplog-import",
+ f"--hosturl=127.0.0.1:{ZEN_PORT}",
+ "FortniteGame", build.name,
+ "--file", export_dir,
+ ])
+ print()
+
+ print(f"--------- validating {build.name} oplog")
+ run([ZEN_EXE, "oplog-validate", f"--hosturl=127.0.0.1:{ZEN_PORT}", "FortniteGame", build.name])
+ print()
+ finally:
+ stop_server("alternative zenserver", ZEN_PORT)
+ finally:
+ stop_server("cache zenserver", ZEN_CACHE_PORT)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index e5cbafbea..b4b4df7c9 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -1594,7 +1594,7 @@ namespace builds_impl {
}
}
}
- if (Storage.BuildCacheStorage)
+ if (Storage.CacheStorage)
{
if (SB.Size() > 0)
{
@@ -1649,9 +1649,9 @@ namespace builds_impl {
}
if (Options.PrimeCacheOnly)
{
- if (Storage.BuildCacheStorage)
+ if (Storage.CacheStorage)
{
- Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) {
+ Storage.CacheStorage->Flush(5000, [](intptr_t Remaining) {
if (!IsQuiet)
{
if (Remaining == 0)
@@ -2826,47 +2826,47 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
BuildStorageResolveResult ResolveRes =
ResolveBuildStorage(*Output, ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::All, m_Verbose);
- if (!ResolveRes.HostUrl.empty())
+ if (!ResolveRes.Cloud.Address.empty())
{
- ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2;
+ ClientSettings.AssumeHttp2 = ResolveRes.Cloud.AssumeHttp2;
Result.BuildStorageHttp =
- std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings, []() { return AbortFlag.load(); });
+ std::make_unique<HttpClient>(ResolveRes.Cloud.Address, ClientSettings, []() { return AbortFlag.load(); });
- Result.BuildStorage = CreateJupiterBuildStorage(Log(),
+ Result.BuildStorage = CreateJupiterBuildStorage(Log(),
*Result.BuildStorageHttp,
StorageStats,
m_Namespace,
m_Bucket,
m_AllowRedirect,
TempPath / "storage");
- Result.StorageName = ResolveRes.HostName;
+ Result.BuildStorageHost = ResolveRes.Cloud;
- uint64_t HostLatencyNs = ResolveRes.HostLatencySec >= 0 ? uint64_t(ResolveRes.HostLatencySec * 1000000000.0) : 0;
+ uint64_t HostLatencyNs = ResolveRes.Cloud.LatencySec >= 0 ? uint64_t(ResolveRes.Cloud.LatencySec * 1000000000.0) : 0;
- StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}",
- ResolveRes.HostName,
- (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
- Result.BuildStorageHttp->GetSessionId(),
- m_Namespace,
- m_Bucket,
- NiceLatencyNs(HostLatencyNs));
- Result.BuildStorageLatencySec = ResolveRes.HostLatencySec;
+ StorageDescription =
+ fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}",
+ ResolveRes.Cloud.Name,
+ (ResolveRes.Cloud.Address == ResolveRes.Cloud.Name) ? "" : fmt::format(" {}", ResolveRes.Cloud.Address),
+ Result.BuildStorageHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket,
+ NiceLatencyNs(HostLatencyNs));
- if (!ResolveRes.CacheUrl.empty())
+ if (!ResolveRes.Cache.Address.empty())
{
Result.CacheHttp = std::make_unique<HttpClient>(
- ResolveRes.CacheUrl,
+ ResolveRes.Cache.Address,
HttpClientSettings{
.LogCategory = "httpcacheclient",
.ConnectTimeout = std::chrono::milliseconds{3000},
.Timeout = std::chrono::milliseconds{30000},
- .AssumeHttp2 = ResolveRes.CacheAssumeHttp2,
+ .AssumeHttp2 = ResolveRes.Cache.AssumeHttp2,
.AllowResume = true,
.RetryCount = 0,
.Verbose = m_VerboseHttp,
.MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)},
[]() { return AbortFlag.load(); });
- Result.BuildCacheStorage =
+ Result.CacheStorage =
CreateZenBuildStorageCache(*Result.CacheHttp,
StorageCacheStats,
m_Namespace,
@@ -2874,19 +2874,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
TempPath / "zencache",
BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background)
: GetTinyWorkerPool(EWorkloadType::Background));
- Result.CacheName = ResolveRes.CacheName;
+ Result.CacheHost = ResolveRes.Cache;
- uint64_t CacheLatencyNs = ResolveRes.CacheLatencySec >= 0 ? uint64_t(ResolveRes.CacheLatencySec * 1000000000.0) : 0;
+ uint64_t CacheLatencyNs = ResolveRes.Cache.LatencySec >= 0 ? uint64_t(ResolveRes.Cache.LatencySec * 1000000000.0) : 0;
CacheDescription =
fmt::format("Zen {}{}. SessionId: '{}'. Latency: {}",
- ResolveRes.CacheName,
- (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl),
+ ResolveRes.Cache.Name,
+ (ResolveRes.Cache.Address == ResolveRes.Cache.Name) ? "" : fmt::format(" {}", ResolveRes.Cache.Address),
Result.CacheHttp->GetSessionId(),
NiceLatencyNs(CacheLatencyNs));
- Result.CacheLatencySec = ResolveRes.CacheLatencySec;
-
if (!m_Namespace.empty())
{
CacheDescription += fmt::format(". Namespace '{}'", m_Namespace);
@@ -2902,41 +2900,56 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
StorageDescription = fmt::format("folder {}", m_StoragePath);
Result.BuildStorage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec);
- Result.StorageName = fmt::format("Disk {}", m_StoragePath.stem());
+
+ Result.BuildStorageHost = BuildStorageResolveResult::Host{.Address = m_StoragePath.generic_string(),
+ .Name = "Disk",
+ .LatencySec = 1.0 / 100000, // 1 us
+ .Caps = {.MaxRangeCountPerRequest = 2048u}};
if (!m_ZenCacheHost.empty())
{
- Result.CacheHttp = std::make_unique<HttpClient>(
- m_ZenCacheHost,
- HttpClientSettings{
- .LogCategory = "httpcacheclient",
- .ConnectTimeout = std::chrono::milliseconds{3000},
- .Timeout = std::chrono::milliseconds{30000},
- .AssumeHttp2 = m_AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0,
- .Verbose = m_VerboseHttp,
- .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)},
- []() { return AbortFlag.load(); });
- Result.BuildCacheStorage =
- CreateZenBuildStorageCache(*Result.CacheHttp,
- StorageCacheStats,
- m_Namespace,
- m_Bucket,
- TempPath / "zencache",
- BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background)
- : GetTinyWorkerPool(EWorkloadType::Background));
- Result.CacheName = m_ZenCacheHost;
-
- CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'", Result.CacheName, "", Result.CacheHttp->GetSessionId());
- ;
- if (!m_Namespace.empty())
- {
- CacheDescription += fmt::format(". Namespace '{}'", m_Namespace);
- }
- if (!m_Bucket.empty())
+ ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(m_ZenCacheHost, m_AssumeHttp2, m_VerboseHttp);
+
+ if (TestResult.Success)
{
- CacheDescription += fmt::format(" Bucket '{}'", m_Bucket);
+ Result.CacheHttp = std::make_unique<HttpClient>(
+ m_ZenCacheHost,
+ HttpClientSettings{
+ .LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = m_AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0,
+ .Verbose = m_VerboseHttp,
+ .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)},
+ []() { return AbortFlag.load(); });
+
+ Result.CacheStorage =
+ CreateZenBuildStorageCache(*Result.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ TempPath / "zencache",
+ BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background));
+ Result.CacheHost =
+ BuildStorageResolveResult::Host{.Address = m_ZenCacheHost,
+ .Name = m_ZenCacheHost,
+ .AssumeHttp2 = m_AssumeHttp2,
+ .LatencySec = TestResult.LatencySeconds,
+ .Caps = {.MaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest}};
+
+ CacheDescription = fmt::format("Zen {}. SessionId: '{}'", Result.CacheHost.Name, Result.CacheHttp->GetSessionId());
+
+ if (!m_Namespace.empty())
+ {
+ CacheDescription += fmt::format(". Namespace '{}'", m_Namespace);
+ }
+ if (!m_Bucket.empty())
+ {
+ CacheDescription += fmt::format(" Bucket '{}'", m_Bucket);
+ }
}
}
}
@@ -2948,7 +2961,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!IsQuiet)
{
ZEN_CONSOLE("Remote: {}", StorageDescription);
- if (!Result.CacheName.empty())
+ if (!Result.CacheHost.Name.empty())
{
ZEN_CONSOLE("Cache : {}", CacheDescription);
}
@@ -3489,7 +3502,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
"Requests: {}\n"
"Avg Request Time: {}\n"
"Avg I/O Time: {}",
- Storage.StorageName,
+ Storage.BuildStorageHost.Name,
NiceBytes(StorageStats.TotalBytesRead.load()),
NiceBytes(StorageStats.TotalBytesWritten.load()),
StorageStats.TotalRequestCount.load(),
@@ -3810,12 +3823,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!IsQuiet)
{
- if (Storage.BuildCacheStorage)
+ if (Storage.CacheStorage)
{
- ZEN_CONSOLE("Uploaded {} ({}) blobs",
+ ZEN_CONSOLE("Uploaded {} ({}) blobs to {}",
StorageCacheStats.PutBlobCount.load(),
NiceBytes(StorageCacheStats.PutBlobByteCount),
- Storage.CacheName);
+ Storage.CacheHost.Name);
}
}
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index dfc6c1650..5ff591b54 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -2602,38 +2602,37 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
StorageInstance Storage;
- ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2;
+ ClientSettings.AssumeHttp2 = ResolveRes.Cloud.AssumeHttp2;
ClientSettings.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u;
- Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings);
- Storage.BuildStorageLatencySec = ResolveRes.HostLatencySec;
+ Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.Cloud.Address, ClientSettings);
+ Storage.BuildStorageHost = ResolveRes.Cloud;
BuildStorageCache::Statistics StorageCacheStats;
std::atomic<bool> AbortFlag(false);
- if (!ResolveRes.CacheUrl.empty())
+ if (!ResolveRes.Cache.Address.empty())
{
Storage.CacheHttp = std::make_unique<HttpClient>(
- ResolveRes.CacheUrl,
+ ResolveRes.Cache.Address,
HttpClientSettings{
.LogCategory = "httpcacheclient",
.ConnectTimeout = std::chrono::milliseconds{3000},
.Timeout = std::chrono::milliseconds{30000},
- .AssumeHttp2 = ResolveRes.CacheAssumeHttp2,
+ .AssumeHttp2 = ResolveRes.Cache.AssumeHttp2,
.AllowResume = true,
.RetryCount = 0,
.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u},
[&AbortFlag]() { return AbortFlag.load(); });
- Storage.CacheName = ResolveRes.CacheName;
- Storage.CacheLatencySec = ResolveRes.CacheLatencySec;
+ Storage.CacheHost = ResolveRes.Cache;
}
if (!m_Quiet)
{
std::string StorageDescription =
fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
- ResolveRes.HostName,
- (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
+ ResolveRes.Cloud.Name,
+ (ResolveRes.Cloud.Address == ResolveRes.Cloud.Name) ? "" : fmt::format(" {}", ResolveRes.Cloud.Address),
Storage.BuildStorageHttp->GetSessionId(),
m_Namespace,
m_Bucket);
@@ -2644,8 +2643,8 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
{
std::string CacheDescription =
fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
- ResolveRes.CacheName,
- (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl),
+ ResolveRes.Cache.Name,
+ (ResolveRes.Cache.Address == ResolveRes.Cache.Name) ? "" : fmt::format(" {}", ResolveRes.Cache.Address),
Storage.CacheHttp->GetSessionId(),
m_Namespace,
m_Bucket);
@@ -2661,11 +2660,10 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
Storage.BuildStorage =
CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath);
- Storage.StorageName = ResolveRes.HostName;
if (Storage.CacheHttp)
{
- Storage.BuildCacheStorage = CreateZenBuildStorageCache(
+ Storage.CacheStorage = CreateZenBuildStorageCache(
*Storage.CacheHttp,
StorageCacheStats,
m_Namespace,
diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h
index addb00cb8..57ab01158 100644
--- a/src/zenhttp/include/zenhttp/formatters.h
+++ b/src/zenhttp/include/zenhttp/formatters.h
@@ -73,7 +73,7 @@ struct fmt::formatter<zen::HttpClient::Response>
if (Response.IsSuccess())
{
return fmt::format_to(Ctx.out(),
- "OK: Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
+ "OK: Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}",
ToString(Response.StatusCode),
Response.UploadedBytes,
Response.DownloadedBytes,
diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp
index 53d33bd7e..00765903d 100644
--- a/src/zenremotestore/builds/buildstoragecache.cpp
+++ b/src/zenremotestore/builds/buildstoragecache.cpp
@@ -528,6 +528,192 @@ CreateZenBuildStorageCache(HttpClient& HttpClient,
return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BackgroundWorkerPool);
}
+#if ZEN_WITH_TESTS
+
+class InMemoryBuildStorageCache : public BuildStorageCache
+{
+public:
+ // MaxRangeSupported == 0 : no range requests are accepted, always return full blob
+ // MaxRangeSupported == 1 : single range is supported, multi range returns full blob
+ // MaxRangeSupported > 1 : multirange is supported up to MaxRangeSupported, more ranges returns empty blob (bad request)
+ explicit InMemoryBuildStorageCache(uint64_t MaxRangeSupported,
+ BuildStorageCache::Statistics& Stats,
+ double LatencySec = 0.0,
+ double DelayPerKBSec = 0.0)
+ : m_MaxRangeSupported(MaxRangeSupported)
+ , m_Stats(Stats)
+ , m_LatencySec(LatencySec)
+ , m_DelayPerKBSec(DelayPerKBSec)
+ {
+ }
+ void PutBuildBlob(const Oid&, const IoHash& RawHash, ZenContentType, const CompositeBuffer& Payload) override
+ {
+ IoBuffer Buf = Payload.Flatten().AsIoBuffer();
+ Buf.MakeOwned();
+ const uint64_t SentBytes = Buf.Size();
+ uint64_t ReceivedBytes = 0;
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); });
+ {
+ std::lock_guard Lock(m_Mutex);
+ m_Entries[RawHash] = std::move(Buf);
+ }
+ m_Stats.PutBlobCount.fetch_add(1);
+ m_Stats.PutBlobByteCount.fetch_add(SentBytes);
+ }
+
+ IoBuffer GetBuildBlob(const Oid&, const IoHash& RawHash, uint64_t RangeOffset = 0, uint64_t RangeBytes = (uint64_t)-1) override
+ {
+ uint64_t SentBytes = 0;
+ uint64_t ReceivedBytes = 0;
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); });
+ IoBuffer FullPayload;
+ {
+ std::lock_guard Lock(m_Mutex);
+ auto It = m_Entries.find(RawHash);
+ if (It == m_Entries.end())
+ {
+ return {};
+ }
+ FullPayload = It->second;
+ }
+
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ if (m_MaxRangeSupported == 0)
+ {
+ ReceivedBytes = FullPayload.Size();
+ return FullPayload;
+ }
+ else
+ {
+ ReceivedBytes = (RangeBytes == (uint64_t)-1) ? FullPayload.Size() - RangeOffset : RangeBytes;
+ return IoBuffer(FullPayload, RangeOffset, RangeBytes);
+ }
+ }
+ else
+ {
+ ReceivedBytes = FullPayload.Size();
+ return FullPayload;
+ }
+ }
+
+ BuildBlobRanges GetBuildBlobRanges(const Oid&, const IoHash& RawHash, std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
+ {
+ ZEN_ASSERT(!Ranges.empty());
+ uint64_t SentBytes = 0;
+ uint64_t ReceivedBytes = 0;
+ SimulateLatency(SentBytes, 0);
+ auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
+ Stopwatch ExecutionTimer;
+ auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); });
+ if (m_MaxRangeSupported > 1 && Ranges.size() > m_MaxRangeSupported)
+ {
+ return {};
+ }
+ IoBuffer FullPayload;
+ {
+ std::lock_guard Lock(m_Mutex);
+ auto It = m_Entries.find(RawHash);
+ if (It == m_Entries.end())
+ {
+ return {};
+ }
+ FullPayload = It->second;
+ }
+
+ if (Ranges.size() > m_MaxRangeSupported)
+ {
+ // An empty Ranges signals to the caller: "full buffer given, use it for all requested ranges".
+ ReceivedBytes = FullPayload.Size();
+ return {.PayloadBuffer = FullPayload};
+ }
+ else
+ {
+ uint64_t PayloadStart = Ranges.front().first;
+ uint64_t PayloadSize = Ranges.back().first + Ranges.back().second - PayloadStart;
+ IoBuffer RangeBuffer = IoBuffer(FullPayload, PayloadStart, PayloadSize);
+ std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges;
+ PayloadRanges.reserve(Ranges.size());
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ PayloadRanges.push_back(std::make_pair(Range.first - PayloadStart, Range.second));
+ }
+ ReceivedBytes = PayloadSize;
+ return {.PayloadBuffer = RangeBuffer, .Ranges = std::move(PayloadRanges)};
+ }
+ }
+
+ void PutBlobMetadatas(const Oid&, std::span<const IoHash>, std::span<const CbObject>) override {}
+
+ std::vector<CbObject> GetBlobMetadatas(const Oid&, std::span<const IoHash> Hashes) override
+ {
+ return std::vector<CbObject>(Hashes.size());
+ }
+
+ std::vector<BlobExistsResult> BlobsExists(const Oid&, std::span<const IoHash> Hashes) override
+ {
+ std::lock_guard Lock(m_Mutex);
+ std::vector<BlobExistsResult> Result;
+ Result.reserve(Hashes.size());
+ for (const IoHash& Hash : Hashes)
+ {
+ auto It = m_Entries.find(Hash);
+ Result.push_back({.HasBody = (It != m_Entries.end() && It->second)});
+ }
+ return Result;
+ }
+
+ void Flush(int32_t, std::function<bool(intptr_t)>&&) override {}
+
+private:
+ void AddStatistic(uint64_t ElapsedTimeUs, uint64_t ReceivedBytes, uint64_t SentBytes)
+ {
+ m_Stats.TotalBytesWritten += SentBytes;
+ m_Stats.TotalBytesRead += ReceivedBytes;
+ m_Stats.TotalExecutionTimeUs += ElapsedTimeUs;
+ m_Stats.TotalRequestCount++;
+ SetAtomicMax(m_Stats.PeakSentBytes, SentBytes);
+ SetAtomicMax(m_Stats.PeakReceivedBytes, ReceivedBytes);
+ if (ElapsedTimeUs > 0)
+ {
+ SetAtomicMax(m_Stats.PeakBytesPerSec, (ReceivedBytes + SentBytes) * 1000000 / ElapsedTimeUs);
+ }
+ }
+
+ void SimulateLatency(uint64_t SendBytes, uint64_t ReceiveBytes)
+ {
+ double SleepSec = m_LatencySec;
+ if (m_DelayPerKBSec > 0.0)
+ {
+ SleepSec += m_DelayPerKBSec * (double(SendBytes + ReceiveBytes) / 1024u);
+ }
+ if (SleepSec > 0)
+ {
+ Sleep(int(SleepSec * 1000));
+ }
+ }
+
+ uint64_t m_MaxRangeSupported = 0;
+ BuildStorageCache::Statistics& m_Stats;
+ const double m_LatencySec = 0.0;
+ const double m_DelayPerKBSec = 0.0;
+ std::mutex m_Mutex;
+ std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Entries;
+};
+
+std::unique_ptr<BuildStorageCache>
+CreateInMemoryBuildStorageCache(uint64_t MaxRangeSupported, BuildStorageCache::Statistics& Stats, double LatencySec, double DelayPerKBSec)
+{
+ return std::make_unique<InMemoryBuildStorageCache>(MaxRangeSupported, Stats, LatencySec, DelayPerKBSec);
+}
+#endif // ZEN_WITH_TESTS
+
ZenCacheEndpointTestResult
TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose)
{
@@ -542,15 +728,28 @@ TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const boo
HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
if (TestResponse.IsSuccess())
{
- LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health");
+ uint64_t MaxRangeCountPerRequest = 1;
+ CbObject StatusResponse = TestResponse.AsObject();
+ if (StatusResponse["ok"].AsBool())
+ {
+ MaxRangeCountPerRequest = StatusResponse["capabilities"].AsObjectView()["maxrangecountperrequest"].AsUInt64(1);
+
+ LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health");
+
+ if (!LatencyResult.Success)
+ {
+ return {.Success = false, .FailureReason = LatencyResult.FailureReason};
+ }
- if (!LatencyResult.Success)
+ return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds, .MaxRangeCountPerRequest = MaxRangeCountPerRequest};
+ }
+ else
{
- return {.Success = false, .FailureReason = LatencyResult.FailureReason};
+ return {.Success = false,
+ .FailureReason = fmt::format("ZenCache endpoint {}/status/builds did not respond with \"ok\"", BaseUrl)};
}
- return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds};
}
return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
-};
+}
} // namespace zen
diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp
index 43a4937f0..f4b167b73 100644
--- a/src/zenremotestore/builds/buildstorageoperations.cpp
+++ b/src/zenremotestore/builds/buildstorageoperations.cpp
@@ -887,11 +887,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
ChunkBlockAnalyser BlockAnalyser(
m_LogOutput,
m_BlockDescriptions,
- ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
- .IsVerbose = m_Options.IsVerbose,
- .HostLatencySec = m_Storage.BuildStorageLatencySec,
- .HostHighSpeedLatencySec = m_Storage.CacheLatencySec,
- .HostMaxRangeCountPerRequest = BuildStorageBase::MaxRangeCountPerRequest});
+ ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet,
+ .IsVerbose = m_Options.IsVerbose,
+ .HostLatencySec = m_Storage.BuildStorageHost.LatencySec,
+ .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec,
+ .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest,
+ .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest});
std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded(
m_RemoteLookup.ChunkHashToChunkIndex,
@@ -974,7 +975,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
}
- if (m_Storage.BuildCacheStorage)
+ if (m_Storage.CacheStorage)
{
ZEN_TRACE_CPU("BlobCacheExistCheck");
Stopwatch Timer;
@@ -993,7 +994,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
if (CacheExistsResult.size() == BlobHashes.size())
{
@@ -1018,32 +1019,50 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
}
std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes;
+
if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
else
{
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+
+ switch (m_Options.PartialBlockRequestMode)
+ {
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ case EPartialBlockRequestMode::All:
+ CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ default:
+ ZEN_ASSERT(false);
+ break;
+ }
+
BlockPartialDownloadModes.reserve(m_BlockDescriptions.size());
for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++)
{
const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash);
- if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All)
- {
- BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
- }
- else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
- {
- BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
- }
- else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
- {
- BlockPartialDownloadModes.push_back(BlockExistInCache
- ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
- }
+ BlockPartialDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode);
}
}
@@ -1527,20 +1546,20 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState)
IoBuffer BlockBuffer;
const bool ExistsInCache =
- m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
+ m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
if (ExistsInCache)
{
- BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
+ BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
}
if (!BlockBuffer)
{
BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash);
- if (BlockBuffer && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (BlockBuffer && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BlockBuffer)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BlockBuffer)));
}
}
if (!BlockBuffer)
@@ -3103,10 +3122,10 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex];
// FilteredDownloadedBytesPerSecond.Start();
IoBuffer BuildBlob;
- const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash);
if (ExistsInCache)
{
- BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
+ BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash);
}
if (BuildBlob)
{
@@ -3134,12 +3153,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
m_DownloadStats.DownloadedChunkCount++;
m_DownloadStats.RequestsCompleteCount++;
- if (Payload && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
}
OnDownloaded(std::move(Payload));
@@ -3148,12 +3167,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde
else
{
BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash);
- if (BuildBlob && m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- ChunkHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(BuildBlob)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ ChunkHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(BuildBlob)));
}
if (!BuildBlob)
{
@@ -3273,34 +3292,7 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
}
- if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash))
- {
- BuildStorageCache::BuildBlobRanges RangeBuffers =
- m_Storage.BuildCacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, Ranges);
- if (RangeBuffers.PayloadBuffer)
- {
- if (!m_AbortFlag)
- {
- if (RangeBuffers.Ranges.size() != Ranges.size())
- {
- throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
- Ranges.size(),
- BlockDescription.BlockHash,
- RangeBuffers.Ranges.size()));
- }
-
- std::vector<std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths = std::move(RangeBuffers.Ranges);
- ProcessDownload(BlockDescription,
- std::move(RangeBuffers.PayloadBuffer),
- BlockRangeStartIndex,
- BlockOffsetAndLengths,
- OnDownloaded);
- }
- return;
- }
- }
-
- const size_t MaxRangesPerRequestToJupiter = BuildStorageBase::MaxRangeCountPerRequest;
+ const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash);
size_t SubBlockRangeCount = BlockRangeCount;
size_t SubRangeCountComplete = 0;
@@ -3311,30 +3303,101 @@ BuildsOperationUpdateFolder::DownloadPartialBlock(
{
break;
}
- size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter);
+
+ // First try to get subrange from cache.
+ // If not successful, try to get the ranges from the build store and adapt SubRangeCount...
+
size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete;
+ if (ExistsInCache)
+ {
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest);
+
+ if (SubRangeCount == 1)
+ {
+ // Legacy single-range path, prefer that for max compatibility
+
+ const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete];
+ IoBuffer PayloadBuffer =
+ m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (PayloadBuffer)
+ {
+ ProcessDownload(BlockDescription,
+ std::move(PayloadBuffer),
+ SubRangeStartIndex,
+ std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)},
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ else
+ {
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (m_AbortFlag)
+ {
+ break;
+ }
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount),
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ else if (RangeBuffers.Ranges.size() == SubRangeCount)
+ {
+ ProcessDownload(BlockDescription,
+ std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangeBuffers.Ranges,
+ OnDownloaded);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ }
+ }
+
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest);
auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
BuildStorageBase::BuildBlobRanges RangeBuffers =
m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges);
+ if (m_AbortFlag)
+ {
+ break;
+ }
if (RangeBuffers.PayloadBuffer)
{
- if (m_AbortFlag)
- {
- break;
- }
if (RangeBuffers.Ranges.empty())
{
// Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
// Upload to cache (if enabled) and use the whole payload for the remaining ranges
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockDescription.BlockHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer}));
+ if (m_AbortFlag)
+ {
+ break;
+ }
}
SubRangeCount = Ranges.size() - SubRangeCountComplete;
@@ -4932,12 +4995,12 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash;
const uint64_t CompressedBlockSize = Payload.GetCompressedSize();
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlockHash,
- ZenContentType::kCompressedBinary,
- Payload.GetCompressed());
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlockHash,
+ ZenContentType::kCompressedBinary,
+ Payload.GetCompressed());
}
m_Storage.BuildStorage->PutBuildBlob(m_BuildId,
@@ -4955,11 +5018,11 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent&
OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size());
}
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded =
@@ -5803,11 +5866,11 @@ BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController
{
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
if (MetadataSucceeded)
@@ -6001,9 +6064,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
const CbObject BlockMetaData =
BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]);
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
}
m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload);
if (m_Options.IsVerbose)
@@ -6017,11 +6080,11 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
UploadedBlockSize += PayloadSize;
TempUploadStats.BlocksBytes += PayloadSize;
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId,
- std::vector<IoHash>({BlockHash}),
- std::vector<CbObject>({BlockMetaData}));
+ m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId,
+ std::vector<IoHash>({BlockHash}),
+ std::vector<CbObject>({BlockMetaData}));
}
bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData);
if (MetadataSucceeded)
@@ -6084,9 +6147,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co
const uint64_t PayloadSize = Payload.GetSize();
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload);
}
if (PayloadSize >= LargeAttachmentSize)
@@ -6830,14 +6893,14 @@ BuildsOperationPrimeCache::Execute()
std::vector<IoHash> BlobsToDownload;
BlobsToDownload.reserve(BuildBlobs.size());
- if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
+ if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload)
{
ZEN_TRACE_CPU("BlobCacheExistCheck");
Stopwatch Timer;
const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end());
const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes);
+ m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes);
if (CacheExistsResult.size() == BlobHashes.size())
{
@@ -6884,33 +6947,33 @@ BuildsOperationPrimeCache::Execute()
for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++)
{
- Work.ScheduleWork(
- m_NetworkPool,
- [this,
- &Work,
- &BlobsToDownload,
- BlobCount,
- &LooseChunkRawSizes,
- &CompletedDownloadCount,
- &FilteredDownloadedBytesPerSecond,
- &MultipartAttachmentCount,
- BlobIndex](std::atomic<bool>&) {
- if (!m_AbortFlag)
- {
- const IoHash& BlobHash = BlobsToDownload[BlobIndex];
+ Work.ScheduleWork(m_NetworkPool,
+ [this,
+ &Work,
+ &BlobsToDownload,
+ BlobCount,
+ &LooseChunkRawSizes,
+ &CompletedDownloadCount,
+ &FilteredDownloadedBytesPerSecond,
+ &MultipartAttachmentCount,
+ BlobIndex](std::atomic<bool>&) {
+ if (!m_AbortFlag)
+ {
+ const IoHash& BlobHash = BlobsToDownload[BlobIndex];
- bool IsLargeBlob = false;
+ bool IsLargeBlob = false;
- if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
- {
- IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
- }
+ if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end())
+ {
+ IsLargeBlob = It->second >= m_Options.LargeAttachmentSize;
+ }
- FilteredDownloadedBytesPerSecond.Start();
+ FilteredDownloadedBytesPerSecond.Start();
- if (IsLargeBlob)
- {
- DownloadLargeBlob(*m_Storage.BuildStorage,
+ if (IsLargeBlob)
+ {
+ DownloadLargeBlob(
+ *m_Storage.BuildStorage,
m_TempPath,
m_BuildId,
BlobHash,
@@ -6926,12 +6989,12 @@ BuildsOperationPrimeCache::Execute()
if (!m_AbortFlag)
{
- if (Payload && m_Storage.BuildCacheStorage)
+ if (Payload && m_Storage.CacheStorage)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlobHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(Payload)));
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(Payload)));
}
}
CompletedDownloadCount++;
@@ -6940,32 +7003,32 @@ BuildsOperationPrimeCache::Execute()
FilteredDownloadedBytesPerSecond.Stop();
}
});
- }
- else
- {
- IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
- m_DownloadStats.DownloadedBlockCount++;
- m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
- m_DownloadStats.RequestsCompleteCount++;
+ }
+ else
+ {
+ IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash);
+ m_DownloadStats.DownloadedBlockCount++;
+ m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize();
+ m_DownloadStats.RequestsCompleteCount++;
- if (!m_AbortFlag)
- {
- if (Payload && m_Storage.BuildCacheStorage)
- {
- m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId,
- BlobHash,
- ZenContentType::kCompressedBinary,
- CompositeBuffer(SharedBuffer(std::move(Payload))));
- }
- }
- CompletedDownloadCount++;
- if (CompletedDownloadCount == BlobCount)
- {
- FilteredDownloadedBytesPerSecond.Stop();
- }
- }
- }
- });
+ if (!m_AbortFlag)
+ {
+ if (Payload && m_Storage.CacheStorage)
+ {
+ m_Storage.CacheStorage->PutBuildBlob(m_BuildId,
+ BlobHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(SharedBuffer(std::move(Payload))));
+ }
+ }
+ CompletedDownloadCount++;
+ if (CompletedDownloadCount == BlobCount)
+ {
+ FilteredDownloadedBytesPerSecond.Stop();
+ }
+ }
+ }
+ });
}
Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
@@ -6977,10 +7040,10 @@ BuildsOperationPrimeCache::Execute()
std::string DownloadRateString = (CompletedDownloadCount == BlobCount)
? ""
: fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8));
- std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.",
- m_StorageCacheStats.PutBlobCount.load(),
- NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
- : "";
+ std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.",
+ m_StorageCacheStats.PutBlobCount.load(),
+ NiceBytes(m_StorageCacheStats.PutBlobByteCount.load()))
+ : "";
std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}",
CompletedDownloadCount.load(),
@@ -7005,13 +7068,13 @@ BuildsOperationPrimeCache::Execute()
return;
}
- if (m_Storage.BuildCacheStorage)
+ if (m_Storage.CacheStorage)
{
- m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
+ m_Storage.CacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool {
ZEN_UNUSED(Remaining);
if (!m_Options.IsQuiet)
{
- ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName);
+ ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name);
}
return !m_AbortFlag;
});
@@ -7221,7 +7284,7 @@ GetRemoteContent(OperationLogOutput& Output,
bool AttemptFallback = false;
OutBlockDescriptions = GetBlockDescriptions(Output,
*Storage.BuildStorage,
- Storage.BuildCacheStorage.get(),
+ Storage.CacheStorage.get(),
BuildId,
BlockRawHashes,
AttemptFallback,
diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp
index d65f18b9a..2ae726e29 100644
--- a/src/zenremotestore/builds/buildstorageutil.cpp
+++ b/src/zenremotestore/builds/buildstorageutil.cpp
@@ -63,13 +63,15 @@ ResolveBuildStorage(OperationLogOutput& Output,
std::string HostUrl;
std::string HostName;
- double HostLatencySec = -1.0;
+ double HostLatencySec = -1.0;
+ uint64_t HostMaxRangeCountPerRequest = 1;
std::string CacheUrl;
std::string CacheName;
- bool HostAssumeHttp2 = ClientSettings.AssumeHttp2;
- bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2;
- double CacheLatencySec = -1.0;
+ bool HostAssumeHttp2 = ClientSettings.AssumeHttp2;
+ bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2;
+ double CacheLatencySec = -1.0;
+ uint64_t CacheMaxRangeCountPerRequest = 1;
JupiterServerDiscovery DiscoveryResponse;
const std::string_view DiscoveryHost = Host.empty() ? OverrideHost : Host;
@@ -100,9 +102,10 @@ ResolveBuildStorage(OperationLogOutput& Output,
{
ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", OverrideHost);
}
- HostUrl = OverrideHost;
- HostName = GetHostNameFromUrl(OverrideHost);
- HostLatencySec = TestResult.LatencySeconds;
+ HostUrl = OverrideHost;
+ HostName = GetHostNameFromUrl(OverrideHost);
+ HostLatencySec = TestResult.LatencySeconds;
+ HostMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest;
}
else
{
@@ -137,10 +140,11 @@ ResolveBuildStorage(OperationLogOutput& Output,
ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl);
}
- HostUrl = ServerEndpoint.BaseUrl;
- HostAssumeHttp2 = ServerEndpoint.AssumeHttp2;
- HostName = ServerEndpoint.Name;
- HostLatencySec = TestResult.LatencySeconds;
+ HostUrl = ServerEndpoint.BaseUrl;
+ HostAssumeHttp2 = ServerEndpoint.AssumeHttp2;
+ HostName = ServerEndpoint.Name;
+ HostLatencySec = TestResult.LatencySeconds;
+ HostMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest;
break;
}
else
@@ -184,10 +188,11 @@ ResolveBuildStorage(OperationLogOutput& Output,
ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl);
}
- CacheUrl = CacheEndpoint.BaseUrl;
- CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2;
- CacheName = CacheEndpoint.Name;
- CacheLatencySec = TestResult.LatencySeconds;
+ CacheUrl = CacheEndpoint.BaseUrl;
+ CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2;
+ CacheName = CacheEndpoint.Name;
+ CacheLatencySec = TestResult.LatencySeconds;
+ CacheMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest;
break;
}
}
@@ -225,9 +230,10 @@ ResolveBuildStorage(OperationLogOutput& Output,
if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenCacheHost, /*AssumeHttp2*/ false, ClientSettings.Verbose);
TestResult.Success)
{
- CacheUrl = ZenCacheHost;
- CacheName = GetHostNameFromUrl(ZenCacheHost);
- CacheLatencySec = TestResult.LatencySeconds;
+ CacheUrl = ZenCacheHost;
+ CacheName = GetHostNameFromUrl(ZenCacheHost);
+ CacheLatencySec = TestResult.LatencySeconds;
+ CacheMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest;
}
else
{
@@ -235,15 +241,34 @@ ResolveBuildStorage(OperationLogOutput& Output,
}
}
- return BuildStorageResolveResult{.HostUrl = HostUrl,
- .HostName = HostName,
- .HostAssumeHttp2 = HostAssumeHttp2,
- .HostLatencySec = HostLatencySec,
+ return BuildStorageResolveResult{
+ .Cloud = {.Address = HostUrl,
+ .Name = HostName,
+ .AssumeHttp2 = HostAssumeHttp2,
+ .LatencySec = HostLatencySec,
+ .Caps = BuildStorageResolveResult::Capabilities{.MaxRangeCountPerRequest = HostMaxRangeCountPerRequest}},
+ .Cache = {.Address = CacheUrl,
+ .Name = CacheName,
+ .AssumeHttp2 = CacheAssumeHttp2,
+ .LatencySec = CacheLatencySec,
+ .Caps = BuildStorageResolveResult::Capabilities{.MaxRangeCountPerRequest = CacheMaxRangeCountPerRequest}}};
+}
- .CacheUrl = CacheUrl,
- .CacheName = CacheName,
- .CacheAssumeHttp2 = CacheAssumeHttp2,
- .CacheLatencySec = CacheLatencySec};
+std::vector<ChunkBlockDescription>
+ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas)
+{
+ std::vector<ChunkBlockDescription> UnorderedList;
+ UnorderedList.reserve(BlockMetadatas.size());
+ for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < BlockMetadatas.size(); CacheBlockMetadataIndex++)
+ {
+ const CbObject& CacheBlockMetadata = BlockMetadatas[CacheBlockMetadataIndex];
+ ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
+ if (Description.BlockHash != IoHash::Zero)
+ {
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ return UnorderedList;
}
std::vector<ChunkBlockDescription>
@@ -263,25 +288,15 @@ GetBlockDescriptions(OperationLogOutput& Output,
if (OptionalCacheStorage && !BlockRawHashes.empty())
{
std::vector<CbObject> CacheBlockMetadatas = OptionalCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes);
- UnorderedList.reserve(CacheBlockMetadatas.size());
- for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); CacheBlockMetadataIndex++)
+ if (!CacheBlockMetadatas.empty())
{
- const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex];
- ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata);
- if (Description.BlockHash == IoHash::Zero)
- {
- ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote cache, skipping block");
- }
- else
+ UnorderedList = ParseBlockMetadatas(CacheBlockMetadatas);
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
{
- UnorderedList.emplace_back(std::move(Description));
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
}
}
- for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
- {
- const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
- BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
- }
}
if (UnorderedList.size() < BlockRawHashes.size())
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
index ce3da41c1..da8437a58 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h
@@ -58,8 +58,6 @@ public:
uint64_t RangeOffset = 0,
uint64_t RangeBytes = (uint64_t)-1) = 0;
- static constexpr size_t MaxRangeCountPerRequest = 128u;
-
struct BuildBlobRanges
{
IoBuffer PayloadBuffer;
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
index 67c93480b..24702df0f 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h
@@ -69,11 +69,19 @@ std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& H
const std::filesystem::path& TempFolderPath,
WorkerThreadPool& BackgroundWorkerPool);
+#if ZEN_WITH_TESTS
+std::unique_ptr<BuildStorageCache> CreateInMemoryBuildStorageCache(uint64_t MaxRangeSupported,
+ BuildStorageCache::Statistics& Stats,
+ double LatencySec = 0.0,
+ double DelayPerKBSec = 0.0);
+#endif // ZEN_WITH_TESTS
+
struct ZenCacheEndpointTestResult
{
bool Success = false;
std::string FailureReason;
- double LatencySeconds = -1.0;
+ double LatencySeconds = -1.0;
+ uint64_t MaxRangeCountPerRequest = 1;
};
ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose);
diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
index 764a24e61..7306188ca 100644
--- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
+++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h
@@ -14,15 +14,20 @@ class BuildStorageCache;
struct BuildStorageResolveResult
{
- std::string HostUrl;
- std::string HostName;
- bool HostAssumeHttp2 = false;
- double HostLatencySec = -1.0;
-
- std::string CacheUrl;
- std::string CacheName;
- bool CacheAssumeHttp2 = false;
- double CacheLatencySec = -1.0;
+ struct Capabilities
+ {
+ uint64_t MaxRangeCountPerRequest = 1;
+ };
+ struct Host
+ {
+ std::string Address;
+ std::string Name;
+ bool AssumeHttp2 = false;
+ double LatencySec = -1.0;
+ Capabilities Caps;
+ };
+ Host Cloud;
+ Host Cache;
};
enum class ZenCacheResolveMode
@@ -52,14 +57,13 @@ std::vector<ChunkBlockDescription> GetBlockDescriptions(OperationLogOutput& Out
struct StorageInstance
{
- std::unique_ptr<HttpClient> BuildStorageHttp;
- std::unique_ptr<BuildStorageBase> BuildStorage;
- std::string StorageName;
- double BuildStorageLatencySec = -1.0;
+ BuildStorageResolveResult::Host BuildStorageHost;
+ std::unique_ptr<HttpClient> BuildStorageHttp;
+ std::unique_ptr<BuildStorageBase> BuildStorage;
+
+ BuildStorageResolveResult::Host CacheHost;
std::unique_ptr<HttpClient> CacheHttp;
- std::unique_ptr<BuildStorageCache> BuildCacheStorage;
- std::string CacheName;
- double CacheLatencySec = -1.0;
+ std::unique_ptr<BuildStorageCache> CacheStorage;
};
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
index 20b6fd371..c085f10e7 100644
--- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
+++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h
@@ -31,6 +31,7 @@ struct ChunkBlockDescription : public ThinChunkBlockDescription
std::vector<ChunkBlockDescription> ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject);
ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockObject);
+std::vector<ChunkBlockDescription> ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas);
CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData);
ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash);
typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc;
diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
index 7bbf40dfa..bb41f9efc 100644
--- a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
+++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h
@@ -28,7 +28,8 @@ struct JupiterEndpointTestResult
{
bool Success = false;
std::string FailureReason;
- double LatencySeconds = -1.0;
+ double LatencySeconds = -1.0;
+ uint64_t MaxRangeCountPerRequest = 1;
};
JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose);
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
index 66dfcc62d..c058e1c1f 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h
@@ -2,6 +2,7 @@
#pragma once
+#include <zenhttp/httpclient.h>
#include <zenremotestore/projectstore/remoteprojectstore.h>
namespace zen {
@@ -10,9 +11,6 @@ class AuthMgr;
struct BuildsRemoteStoreOptions : RemoteStoreOptions
{
- std::string Host;
- std::string OverrideHost;
- std::string ZenHost;
std::string Namespace;
std::string Bucket;
Oid BuildId;
@@ -22,20 +20,16 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions
std::filesystem::path OidcExePath;
bool ForceDisableBlocks = false;
bool ForceDisableTempBlocks = false;
- bool AssumeHttp2 = false;
- bool PopulateCache = true;
IoBuffer MetaData;
size_t MaximumInMemoryDownloadSize = 1024u * 1024u;
};
-std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef InLog,
- const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet,
- bool Unattended,
- bool Hidden,
- WorkerThreadPool& CacheBackgroundWorkerPool,
- double& OutHostLatencySec,
- double& OutCacheLatencySec);
+struct BuildStorageResolveResult;
+
+std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef InLog,
+ const BuildStorageResolveResult& ResolveResult,
+ std::function<HttpClientAccessToken()>&& TokenProvider,
+ const BuildsRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath);
} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 42786d0f2..084d975a2 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -5,6 +5,7 @@
#include <zencore/jobqueue.h>
#include <zenstore/projectstore.h>
+#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkblock.h>
#include <zenremotestore/partialblockrequestmode.h>
@@ -79,11 +80,6 @@ public:
std::vector<ChunkBlockDescription> Blocks;
};
- struct AttachmentExistsInCacheResult : public Result
- {
- std::vector<bool> HasBody;
- };
-
struct LoadAttachmentRangesResult : public Result
{
IoBuffer Bytes;
@@ -128,28 +124,17 @@ public:
virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0;
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0;
- virtual LoadContainerResult LoadContainer() = 0;
- virtual GetKnownBlocksResult GetKnownBlocks() = 0;
- virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) = 0;
- virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) = 0;
-
- enum ESourceMode
- {
- kAny,
- kCacheOnly,
- kHostOnly
- };
+ virtual LoadContainerResult LoadContainer() = 0;
+ virtual GetKnownBlocksResult GetKnownBlocks() = 0;
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes,
+ BuildStorageCache* OptionalCache,
+ const Oid& CacheBuildId) = 0;
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode = ESourceMode::kAny) = 0;
-
- static constexpr size_t MaxRangeCountPerRequest = 128u;
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0;
virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
- std::span<const std::pair<uint64_t, uint64_t>> Ranges,
- ESourceMode SourceMode = ESourceMode::kAny) = 0;
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode = ESourceMode::kAny) = 0;
-
- virtual void Flush() = 0;
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0;
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0;
};
struct RemoteStoreOptions
@@ -211,18 +196,29 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore,
bool IgnoreMissingAttachments,
JobContext* OptionalContext);
-RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- EPartialBlockRequestMode PartialBlockRequestMode,
- double HostLatencySec,
- double CacheLatencySec,
- JobContext* OptionalContext);
+struct LoadOplogContext
+{
+ CidStore& ChunkStore;
+ RemoteProjectStore& RemoteStore;
+ BuildStorageCache* OptionalCache = nullptr;
+ Oid CacheBuildId = Oid::Zero;
+ BuildStorageCache::Statistics* OptionalCacheStats = nullptr;
+ ProjectStore::Oplog& Oplog;
+ WorkerThreadPool& NetworkWorkerPool;
+ WorkerThreadPool& WorkerPool;
+ bool ForceDownload = false;
+ bool IgnoreMissingAttachments = false;
+ bool CleanOplog = false;
+ EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::All;
+ bool PopulateCache = false;
+ double StoreLatencySec = -1.0;
+ uint64_t StoreMaxRangeCountPerRequest = 1;
+ double CacheLatencySec = -1.0;
+ uint64_t CacheMaxRangeCountPerRequest = 1;
+ JobContext* OptionalJobContext = nullptr;
+};
+
+RemoteProjectStore::Result LoadOplog(LoadOplogContext&& Context);
std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp
index 2583cfc84..4479c8b97 100644
--- a/src/zenremotestore/jupiter/jupiterhost.cpp
+++ b/src/zenremotestore/jupiter/jupiterhost.cpp
@@ -59,13 +59,22 @@ TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool
HttpClient::Response TestResponse = TestHttpClient.Get("/health/live");
if (TestResponse.IsSuccess())
{
+ // TODO: dan.engelbrecht 20260305 - replace this naive nginx detection with proper capabilites end point once it exists in Jupiter
+ uint64_t MaxRangeCountPerRequest = 1;
+ if (auto It = TestResponse.Header.Entries.find("Server"); It != TestResponse.Header.Entries.end())
+ {
+ if (StrCaseCompare(It->second.c_str(), "nginx", 5) == 0)
+ {
+ MaxRangeCountPerRequest = 128u;
+ }
+ }
LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health/ready");
if (!LatencyResult.Success)
{
return {.Success = false, .FailureReason = LatencyResult.FailureReason};
}
- return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds};
+ return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds, .MaxRangeCountPerRequest = MaxRangeCountPerRequest};
}
return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
}
diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
index 3400cdbf5..2282a31dd 100644
--- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp
@@ -7,8 +7,6 @@
#include <zencore/fmtutils.h>
#include <zencore/scopeguard.h>
-#include <zenhttp/httpclientauth.h>
-#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/buildstorageutil.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
#include <zenremotestore/operationlogoutput.h>
@@ -26,18 +24,14 @@ class BuildsRemoteStore : public RemoteProjectStore
public:
BuildsRemoteStore(LoggerRef InLog,
const HttpClientSettings& ClientSettings,
- HttpClientSettings* OptionalCacheClientSettings,
std::string_view HostUrl,
- std::string_view CacheUrl,
const std::filesystem::path& TempFilePath,
- WorkerThreadPool& CacheBackgroundWorkerPool,
std::string_view Namespace,
std::string_view Bucket,
const Oid& BuildId,
const IoBuffer& MetaData,
bool ForceDisableBlocks,
- bool ForceDisableTempBlocks,
- bool PopulateCache)
+ bool ForceDisableTempBlocks)
: m_Log(InLog)
, m_BuildStorageHttp(HostUrl, ClientSettings)
, m_BuildStorage(CreateJupiterBuildStorage(Log(),
@@ -53,20 +47,8 @@ public:
, m_MetaData(MetaData)
, m_EnableBlocks(!ForceDisableBlocks)
, m_UseTempBlocks(!ForceDisableTempBlocks)
- , m_PopulateCache(PopulateCache)
{
m_MetaData.MakeOwned();
- if (OptionalCacheClientSettings)
- {
- ZEN_ASSERT(!CacheUrl.empty());
- m_BuildCacheStorageHttp = std::make_unique<HttpClient>(CacheUrl, *OptionalCacheClientSettings);
- m_BuildCacheStorage = CreateZenBuildStorageCache(*m_BuildCacheStorageHttp,
- m_StorageCacheStats,
- Namespace,
- Bucket,
- TempFilePath,
- CacheBackgroundWorkerPool);
- }
}
virtual RemoteStoreInfo GetInfo() const override
@@ -75,9 +57,8 @@ public:
.UseTempBlockFiles = m_UseTempBlocks,
.AllowChunking = true,
.ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId),
- .Description = fmt::format("[cloud] {}{}. SessionId: {}. {}/{}/{}"sv,
+ .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}"sv,
m_BuildStorageHttp.GetBaseUri(),
- m_BuildCacheStorage ? fmt::format(" (Cache: {})", m_BuildCacheStorageHttp->GetBaseUri()) : ""sv,
m_BuildStorageHttp.GetSessionId(),
m_Namespace,
m_Bucket,
@@ -86,15 +67,13 @@ public:
virtual Stats GetStats() const override
{
- return {
- .m_SentBytes = m_BuildStorageStats.TotalBytesWritten.load() + m_StorageCacheStats.TotalBytesWritten.load(),
- .m_ReceivedBytes = m_BuildStorageStats.TotalBytesRead.load() + m_StorageCacheStats.TotalBytesRead.load(),
- .m_RequestTimeNS = m_BuildStorageStats.TotalRequestTimeUs.load() * 1000 + m_StorageCacheStats.TotalRequestTimeUs.load() * 1000,
- .m_RequestCount = m_BuildStorageStats.TotalRequestCount.load() + m_StorageCacheStats.TotalRequestCount.load(),
- .m_PeakSentBytes = Max(m_BuildStorageStats.PeakSentBytes.load(), m_StorageCacheStats.PeakSentBytes.load()),
- .m_PeakReceivedBytes = Max(m_BuildStorageStats.PeakReceivedBytes.load(), m_StorageCacheStats.PeakReceivedBytes.load()),
- .m_PeakBytesPerSec = Max(m_BuildStorageStats.PeakBytesPerSec.load(), m_StorageCacheStats.PeakBytesPerSec.load()),
- };
+ return {.m_SentBytes = m_BuildStorageStats.TotalBytesWritten.load(),
+ .m_ReceivedBytes = m_BuildStorageStats.TotalBytesRead.load(),
+ .m_RequestTimeNS = m_BuildStorageStats.TotalRequestTimeUs.load() * 1000,
+ .m_RequestCount = m_BuildStorageStats.TotalRequestCount.load(),
+ .m_PeakSentBytes = m_BuildStorageStats.PeakSentBytes.load(),
+ .m_PeakReceivedBytes = m_BuildStorageStats.PeakReceivedBytes.load(),
+ .m_PeakBytesPerSec = m_BuildStorageStats.PeakBytesPerSec.load()};
}
virtual bool GetExtendedStats(ExtendedStats& OutStats) const override
@@ -109,11 +88,6 @@ public:
}
Result = true;
}
- if (m_BuildCacheStorage)
- {
- OutStats.m_ReceivedBytesPerSource.insert_or_assign("Cache", m_StorageCacheStats.TotalBytesRead);
- Result = true;
- }
return Result;
}
@@ -462,11 +436,14 @@ public:
return Result;
}
- virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes,
+ BuildStorageCache* OptionalCache,
+ const Oid& CacheBuildId) override
{
std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(Log()));
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
+ ZEN_ASSERT(OptionalCache == nullptr || CacheBuildId == m_BuildId);
GetBlockDescriptionsResult Result;
Stopwatch Timer;
@@ -476,7 +453,7 @@ public:
{
Result.Blocks = zen::GetBlockDescriptions(*Output,
*m_BuildStorage,
- m_BuildCacheStorage.get(),
+ OptionalCache,
m_BuildId,
BlockHashes,
/*AttemptFallback*/ false,
@@ -506,49 +483,7 @@ public:
return Result;
}
- virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
- {
- AttachmentExistsInCacheResult Result;
- Stopwatch Timer;
- auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
- try
- {
- const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
- m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes);
-
- if (CacheExistsResult.size() == RawHashes.size())
- {
- Result.HasBody.reserve(CacheExistsResult.size());
- for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++)
- {
- Result.HasBody.push_back(CacheExistsResult[BlobIndex].HasBody);
- }
- }
- }
- catch (const HttpClientError& Ex)
- {
- Result.ErrorCode = MakeErrorCode(Ex);
- Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'",
- m_BuildStorageHttp.GetBaseUri(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- catch (const std::exception& Ex)
- {
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'",
- m_BuildStorageHttp.GetBaseUri(),
- m_Namespace,
- m_Bucket,
- m_BuildId,
- Ex.what());
- }
- return Result;
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero);
@@ -558,25 +493,7 @@ public:
try
{
- if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
- {
- IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash);
- if (CachedBlob)
- {
- Result.Bytes = std::move(CachedBlob);
- }
- }
- if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly)
- {
- Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
- if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache)
- {
- m_BuildCacheStorage->PutBuildBlob(m_BuildId,
- RawHash,
- Result.Bytes.GetContentType(),
- CompositeBuffer(SharedBuffer(Result.Bytes)));
- }
- }
+ Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash);
}
catch (const HttpClientError& Ex)
{
@@ -605,45 +522,20 @@ public:
}
virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
- std::span<const std::pair<uint64_t, uint64_t>> Ranges,
- ESourceMode SourceMode) override
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
{
+ ZEN_ASSERT(!Ranges.empty());
LoadAttachmentRangesResult Result;
Stopwatch Timer;
auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; });
try
{
- if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
+ BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges);
+ if (BlobRanges.PayloadBuffer)
{
- BuildStorageCache::BuildBlobRanges BlobRanges = m_BuildCacheStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges);
- if (BlobRanges.PayloadBuffer)
- {
- Result.Bytes = std::move(BlobRanges.PayloadBuffer);
- Result.Ranges = std::move(BlobRanges.Ranges);
- }
- }
- if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly)
- {
- BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges);
- if (BlobRanges.PayloadBuffer)
- {
- Result.Bytes = std::move(BlobRanges.PayloadBuffer);
- Result.Ranges = std::move(BlobRanges.Ranges);
-
- if (Result.Ranges.empty())
- {
- // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3/Replicated
- // Upload to cache (if enabled)
- if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache)
- {
- m_BuildCacheStorage->PutBuildBlob(m_BuildId,
- RawHash,
- Result.Bytes.GetContentType(),
- CompositeBuffer(SharedBuffer(Result.Bytes)));
- }
- }
- }
+ Result.Bytes = std::move(BlobRanges.PayloadBuffer);
+ Result.Ranges = std::move(BlobRanges.Ranges);
}
}
catch (const HttpClientError& Ex)
@@ -674,7 +566,7 @@ public:
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
LoadAttachmentsResult Result;
Stopwatch Timer;
@@ -682,67 +574,20 @@ public:
std::vector<IoHash> AttachmentsLeftToFind = RawHashes;
- if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly)
- {
- std::vector<BuildStorageCache::BlobExistsResult> ExistCheck = m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes);
- if (ExistCheck.size() == RawHashes.size())
- {
- AttachmentsLeftToFind.clear();
- for (size_t BlobIndex = 0; BlobIndex < RawHashes.size(); BlobIndex++)
- {
- const IoHash& Hash = RawHashes[BlobIndex];
- const BuildStorageCache::BlobExistsResult& BlobExists = ExistCheck[BlobIndex];
- if (BlobExists.HasBody)
- {
- IoBuffer CachedPayload = m_BuildCacheStorage->GetBuildBlob(m_BuildId, Hash);
- if (CachedPayload)
- {
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Hash,
- CompressedBuffer::FromCompressedNoValidate(std::move(CachedPayload))});
- }
- else
- {
- AttachmentsLeftToFind.push_back(Hash);
- }
- }
- else
- {
- AttachmentsLeftToFind.push_back(Hash);
- }
- }
- }
- }
-
for (const IoHash& Hash : AttachmentsLeftToFind)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
}
ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
- if (m_BuildCacheStorage && ChunkResult.Bytes && m_PopulateCache)
- {
- m_BuildCacheStorage->PutBuildBlob(m_BuildId,
- Hash,
- ChunkResult.Bytes.GetContentType(),
- CompositeBuffer(SharedBuffer(ChunkResult.Bytes)));
- }
Result.Chunks.emplace_back(
std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
}
return Result;
}
- virtual void Flush() override
- {
- if (m_BuildCacheStorage)
- {
- m_BuildCacheStorage->Flush(100, [](intptr_t) { return false; });
- }
- }
-
private:
static int MakeErrorCode(const HttpClientError& Ex)
{
@@ -759,10 +604,6 @@ private:
HttpClient m_BuildStorageHttp;
std::unique_ptr<BuildStorageBase> m_BuildStorage;
- BuildStorageCache::Statistics m_StorageCacheStats;
- std::unique_ptr<HttpClient> m_BuildCacheStorageHttp;
- std::unique_ptr<BuildStorageCache> m_BuildCacheStorage;
-
const std::string m_Namespace;
const std::string m_Bucket;
const Oid m_BuildId;
@@ -771,125 +612,34 @@ private:
const bool m_EnableBlocks = true;
const bool m_UseTempBlocks = true;
const bool m_AllowRedirect = false;
- const bool m_PopulateCache = true;
};
std::shared_ptr<RemoteProjectStore>
-CreateJupiterBuildsRemoteStore(LoggerRef InLog,
- const BuildsRemoteStoreOptions& Options,
- const std::filesystem::path& TempFilePath,
- bool Quiet,
- bool Unattended,
- bool Hidden,
- WorkerThreadPool& CacheBackgroundWorkerPool,
- double& OutHostLatencySec,
- double& OutCacheLatencySec)
+CreateJupiterBuildsRemoteStore(LoggerRef InLog,
+ const BuildStorageResolveResult& ResolveResult,
+ std::function<HttpClientAccessToken()>&& TokenProvider,
+ const BuildsRemoteStoreOptions& Options,
+ const std::filesystem::path& TempFilePath)
{
- std::string Host = Options.Host;
- if (!Host.empty() && Host.find("://"sv) == std::string::npos)
- {
- // Assume https URL
- Host = fmt::format("https://{}"sv, Host);
- }
- std::string OverrideUrl = Options.OverrideHost;
- if (!OverrideUrl.empty() && OverrideUrl.find("://"sv) == std::string::npos)
- {
- // Assume https URL
- OverrideUrl = fmt::format("https://{}"sv, OverrideUrl);
- }
- std::string ZenHost = Options.ZenHost;
- if (!ZenHost.empty() && ZenHost.find("://"sv) == std::string::npos)
- {
- // Assume https URL
- ZenHost = fmt::format("https://{}"sv, ZenHost);
- }
-
- // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
- // 2) Access token as parameter in request
- // 3) Environment variable (different win vs linux/mac)
- // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider
-
- std::function<HttpClientAccessToken()> TokenProvider;
- if (!Options.OpenIdProvider.empty())
- {
- TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider);
- }
- else if (!Options.AccessToken.empty())
- {
- TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken);
- }
- else if (!Options.OidcExePath.empty())
- {
- if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath,
- Host.empty() ? OverrideUrl : Host,
- Quiet,
- Unattended,
- Hidden);
- TokenProviderMaybe)
- {
- TokenProvider = TokenProviderMaybe.value();
- }
- }
-
- if (!TokenProvider)
- {
- TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager);
- }
-
- BuildStorageResolveResult ResolveRes;
- {
- HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
- .AccessTokenProvider = TokenProvider,
- .AssumeHttp2 = Options.AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 2};
-
- std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(InLog));
-
- ResolveRes =
- ResolveBuildStorage(*Output, ClientSettings, Host, OverrideUrl, ZenHost, ZenCacheResolveMode::Discovery, /*Verbose*/ false);
- }
-
HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
.ConnectTimeout = std::chrono::milliseconds(3000),
.Timeout = std::chrono::milliseconds(1800000),
.AccessTokenProvider = std::move(TokenProvider),
- .AssumeHttp2 = ResolveRes.HostAssumeHttp2,
+ .AssumeHttp2 = ResolveResult.Cloud.AssumeHttp2,
.AllowResume = true,
.RetryCount = 4,
.MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize};
- std::unique_ptr<HttpClientSettings> CacheClientSettings;
-
- if (!ResolveRes.CacheUrl.empty())
- {
- CacheClientSettings =
- std::make_unique<HttpClientSettings>(HttpClientSettings{.LogCategory = "httpcacheclient",
- .ConnectTimeout = std::chrono::milliseconds{3000},
- .Timeout = std::chrono::milliseconds{30000},
- .AssumeHttp2 = ResolveRes.CacheAssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0,
- .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize});
- }
-
std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(InLog,
ClientSettings,
- CacheClientSettings.get(),
- ResolveRes.HostUrl,
- ResolveRes.CacheUrl,
+ ResolveResult.Cloud.Address,
TempFilePath,
- CacheBackgroundWorkerPool,
Options.Namespace,
Options.Bucket,
Options.BuildId,
Options.MetaData,
Options.ForceDisableBlocks,
- Options.ForceDisableTempBlocks,
- Options.PopulateCache);
-
- OutHostLatencySec = ResolveRes.HostLatencySec;
- OutCacheLatencySec = ResolveRes.CacheLatencySec;
+ Options.ForceDisableTempBlocks);
return RemoteStore;
}
diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
index f950fd46c..bb21de12c 100644
--- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp
@@ -7,8 +7,12 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+
+#include <numeric>
namespace zen {
@@ -74,9 +78,11 @@ public:
virtual SaveResult SaveContainer(const IoBuffer& Payload) override
{
- Stopwatch Timer;
SaveResult Result;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
+
{
CbObject ContainerObject = LoadCompactBinaryObject(Payload);
@@ -87,6 +93,10 @@ public:
{
Result.Needs.insert(AttachmentHash);
}
+ else if (std::filesystem::path AttachmentMetaPath = GetAttachmentMetaPath(AttachmentHash); IsFile(AttachmentMetaPath))
+ {
+ BasicFile TouchIt(AttachmentMetaPath, BasicFile::Mode::kWrite);
+ }
});
}
@@ -112,14 +122,18 @@ public:
Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what());
}
AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
- virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override
+ virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload,
+ const IoHash& RawHash,
+ ChunkBlockDescription&& BlockDescription) override
{
- Stopwatch Timer;
- SaveAttachmentResult Result;
+ SaveAttachmentResult Result;
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
+
std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
if (!IsFile(ChunkPath))
{
@@ -142,14 +156,33 @@ public:
Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what());
}
}
+ if (!Result.ErrorCode && BlockDescription.BlockHash != IoHash::Zero)
+ {
+ try
+ {
+ std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash);
+ CbObject MetaData = BuildChunkBlockDescription(BlockDescription, {});
+ SharedBuffer MetaBuffer = MetaData.GetBuffer();
+ BasicFile MetaFile;
+ MetaFile.Open(MetaPath, BasicFile::Mode::kTruncate);
+ MetaFile.Write(MetaBuffer.GetView(), 0);
+ }
+ catch (const std::exception& Ex)
+ {
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("Failed saving block description to '{}'. Reason: {}", RawHash, Ex.what());
+ }
+ }
AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
{
+ SaveAttachmentsResult Result;
+
Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
for (const SharedBuffer& Chunk : Chunks)
{
@@ -157,12 +190,10 @@ public:
SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {});
if (ChunkResult.ErrorCode)
{
- ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return SaveAttachmentsResult{ChunkResult};
+ Result = SaveAttachmentsResult{ChunkResult};
+ break;
}
}
- SaveAttachmentsResult Result;
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
@@ -172,21 +203,60 @@ public:
virtual GetKnownBlocksResult GetKnownBlocks() override
{
+ Stopwatch Timer;
if (m_OptionalBaseName.empty())
{
- return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
+ size_t MaxBlockCount = 10000;
+
+ GetKnownBlocksResult Result;
+
+ DirectoryContent Content;
+ GetDirectoryContent(
+ m_OutputPath,
+ DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeModificationTick,
+ Content);
+ std::vector<size_t> RecentOrder(Content.Files.size());
+ std::iota(RecentOrder.begin(), RecentOrder.end(), 0u);
+ std::sort(RecentOrder.begin(), RecentOrder.end(), [&Content](size_t Lhs, size_t Rhs) {
+ return Content.FileModificationTicks[Lhs] > Content.FileModificationTicks[Rhs];
+ });
+
+ for (size_t FileIndex : RecentOrder)
+ {
+ std::filesystem::path MetaPath = Content.Files[FileIndex];
+ if (MetaPath.extension() == MetaExtension)
+ {
+ IoBuffer MetaFile = ReadFile(MetaPath).Flatten();
+ CbValidateError Err;
+ CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err);
+ if (Err == CbValidateError::None)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject);
+ if (Description.BlockHash != IoHash::Zero)
+ {
+ Result.Blocks.emplace_back(std::move(Description));
+ if (Result.Blocks.size() == MaxBlockCount)
+ {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ return Result;
}
LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName);
if (LoadResult.ErrorCode)
{
return GetKnownBlocksResult{LoadResult};
}
- Stopwatch Timer;
std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject);
if (BlockHashes.empty())
{
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
- .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}};
}
std::vector<IoHash> ExistingBlockHashes;
for (const IoHash& RawHash : BlockHashes)
@@ -200,15 +270,15 @@ public:
if (ExistingBlockHashes.empty())
{
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent),
- .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
+ .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}};
}
std::vector<ThinChunkBlockDescription> ThinKnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes);
- const size_t KnowBlockCount = ThinKnownBlocks.size();
+ const size_t KnownBlockCount = ThinKnownBlocks.size();
- GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}};
- Result.Blocks.resize(KnowBlockCount);
- for (size_t BlockIndex = 0; BlockIndex < KnowBlockCount; BlockIndex++)
+ GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}};
+ Result.Blocks.resize(KnownBlockCount);
+ for (size_t BlockIndex = 0; BlockIndex < KnownBlockCount; BlockIndex++)
{
Result.Blocks[BlockIndex].BlockHash = ThinKnownBlocks[BlockIndex].BlockHash;
Result.Blocks[BlockIndex].ChunkRawHashes = std::move(ThinKnownBlocks[BlockIndex].ChunkRawHashes);
@@ -217,87 +287,141 @@ public:
return Result;
}
- virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes,
+ BuildStorageCache* OptionalCache,
+ const Oid& CacheBuildId) override
{
- ZEN_UNUSED(BlockHashes);
- return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
- }
+ GetBlockDescriptionsResult Result;
- virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
- {
- return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
- }
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
- {
- Stopwatch Timer;
- LoadAttachmentResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
+ Result.Blocks.reserve(BlockHashes.size());
+
+ uint64_t ByteCount = 0;
+
+ std::vector<ChunkBlockDescription> UnorderedList;
{
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!IsFile(ChunkPath))
+ if (OptionalCache)
{
- Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
- Result.Reason =
- fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
+ std::vector<CbObject> CacheBlockMetadatas = OptionalCache->GetBlobMetadatas(CacheBuildId, BlockHashes);
+ for (const CbObject& BlockObject : CacheBlockMetadatas)
+ {
+ ByteCount += BlockObject.GetSize();
+ }
+ UnorderedList = ParseBlockMetadatas(CacheBlockMetadatas);
}
+
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup;
+ BlockDescriptionLookup.reserve(BlockHashes.size());
+ for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++)
{
- BasicFile ChunkFile;
- ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
- Result.Bytes = ChunkFile.ReadAll();
+ const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex];
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex);
+ }
+
+ if (UnorderedList.size() < BlockHashes.size())
+ {
+ for (const IoHash& RawHash : BlockHashes)
+ {
+ if (!BlockDescriptionLookup.contains(RawHash))
+ {
+ std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash);
+ IoBuffer MetaFile = ReadFile(MetaPath).Flatten();
+ ByteCount += MetaFile.GetSize();
+ CbValidateError Err;
+ CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err);
+ if (Err == CbValidateError::None)
+ {
+ ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject);
+ if (Description.BlockHash != IoHash::Zero)
+ {
+ BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size());
+ UnorderedList.emplace_back(std::move(Description));
+ }
+ }
+ }
+ }
+ }
+
+ Result.Blocks.reserve(UnorderedList.size());
+ for (const IoHash& RawHash : BlockHashes)
+ {
+ if (auto It = BlockDescriptionLookup.find(RawHash); It != BlockDescriptionLookup.end())
+ {
+ Result.Blocks.emplace_back(std::move(UnorderedList[It->second]));
+ }
}
}
+ AddStats(0, ByteCount, Timer.GetElapsedTimeUs() * 1000);
+ return Result;
+ }
+
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
+ {
+ LoadAttachmentResult Result;
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
+
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!IsFile(ChunkPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
+ return Result;
+ }
+ {
+ BasicFile ChunkFile;
+ ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
+ Result.Bytes = ChunkFile.ReadAll();
+ }
AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
- std::span<const std::pair<uint64_t, uint64_t>> Ranges,
- ESourceMode SourceMode) override
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
{
- Stopwatch Timer;
+ ZEN_ASSERT(!Ranges.empty());
LoadAttachmentRangesResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
- if (!IsFile(ChunkPath))
- {
- Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
- Result.Reason =
- fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- return Result;
- }
- {
- BasicFile ChunkFile;
- ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead);
- uint64_t Start = Ranges.front().first;
- uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first;
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
- Result.Bytes = ChunkFile.ReadRange(Start, Length);
- Result.Ranges.reserve(Ranges.size());
- for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
- {
- Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second));
- }
+ std::filesystem::path ChunkPath = GetAttachmentPath(RawHash);
+ if (!IsFile(ChunkPath))
+ {
+ Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
+ Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string());
+ return Result;
+ }
+ {
+ uint64_t Start = Ranges.front().first;
+ uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first;
+ Result.Bytes = IoBufferBuilder::MakeFromFile(ChunkPath, Start, Length);
+ Result.Ranges.reserve(Ranges.size());
+ for (const std::pair<uint64_t, uint64_t>& Range : Ranges)
+ {
+ Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second));
}
}
- AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000);
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ AddStats(0,
+ std::accumulate(Result.Ranges.begin(),
+ Result.Ranges.end(),
+ uint64_t(0),
+ [](uint64_t Current, const std::pair<uint64_t, uint64_t>& Value) { return Current + Value.second; }),
+ Timer.GetElapsedTimeUs() * 1000);
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
Stopwatch Timer;
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
if (ChunkResult.ErrorCode)
{
ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
@@ -310,20 +434,20 @@ public:
return Result;
}
- virtual void Flush() override {}
-
private:
LoadContainerResult LoadContainer(const std::string& Name)
{
- Stopwatch Timer;
- LoadContainerResult Result;
+ LoadContainerResult Result;
+
+ Stopwatch Timer;
+ auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; });
+
std::filesystem::path SourcePath = m_OutputPath;
SourcePath.append(Name);
if (!IsFile(SourcePath))
{
Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound);
Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string());
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
IoBuffer ContainerPayload;
@@ -337,18 +461,16 @@ private:
if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult);
ValidateResult != CbValidateError::None || !Result.ContainerObject)
{
- Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
- Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')",
- SourcePath.string(),
- ToString(ValidateResult));
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
+ Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
+ Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')",
+ SourcePath.string(),
+ ToString(ValidateResult));
return Result;
}
- Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
return Result;
}
- std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const
+ std::filesystem::path GetAttachmentBasePath(const IoHash& RawHash) const
{
ExtendablePathBuilder<128> ShardedPath;
ShardedPath.Append(m_OutputPath.c_str());
@@ -367,6 +489,19 @@ private:
return ShardedPath.ToPath();
}
+ static constexpr std::string_view BlobExtension = ".blob";
+ static constexpr std::string_view MetaExtension = ".meta";
+
+ std::filesystem::path GetAttachmentPath(const IoHash& RawHash)
+ {
+ return GetAttachmentBasePath(RawHash).replace_extension(BlobExtension);
+ }
+
+ std::filesystem::path GetAttachmentMetaPath(const IoHash& RawHash)
+ {
+ return GetAttachmentBasePath(RawHash).replace_extension(MetaExtension);
+ }
+
void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS)
{
m_SentBytes.fetch_add(UploadedBytes);
diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
index 514484f30..5b456cb4c 100644
--- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp
@@ -212,73 +212,64 @@ public:
return Result;
}
- virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes,
+ BuildStorageCache* OptionalCache,
+ const Oid& CacheBuildId) override
{
- ZEN_UNUSED(BlockHashes);
+ ZEN_UNUSED(BlockHashes, OptionalCache, CacheBuildId);
return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
}
- virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
- {
- return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
LoadAttachmentResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- AddStats(GetResult);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ AddStats(GetResult);
- Result = {ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- RawHash,
- Result.Reason);
- }
+ Result = {ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
}
return Result;
}
virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
- std::span<const std::pair<uint64_t, uint64_t>> Ranges,
- ESourceMode SourceMode) override
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
{
+ ZEN_ASSERT(!Ranges.empty());
LoadAttachmentRangesResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
- JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
- AddStats(GetResult);
+ JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect);
+ JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
+ AddStats(GetResult);
- Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)};
- if (GetResult.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
- m_JupiterClient->ServiceUrl(),
- m_Namespace,
- RawHash,
- Result.Reason);
- }
- else
- {
- Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
- }
+ Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)};
+ if (GetResult.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
+ m_JupiterClient->ServiceUrl(),
+ m_Namespace,
+ RawHash,
+ Result.Reason);
+ }
+ else
+ {
+ Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
}
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
LoadAttachmentsResult Result;
for (const IoHash& Hash : RawHashes)
{
- LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode);
+ LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
if (ChunkResult.ErrorCode)
{
return LoadAttachmentsResult{ChunkResult};
@@ -290,8 +281,6 @@ public:
return Result;
}
- virtual void Flush() override {}
-
private:
LoadContainerResult LoadContainer(const IoHash& Key)
{
diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp
index becac3d4c..36dc4d868 100644
--- a/src/zenremotestore/projectstore/projectstoreoperations.cpp
+++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp
@@ -426,19 +426,19 @@ ProjectStoreOperationDownloadAttachments::Execute()
auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) {
IoBuffer Payload;
- if (m_Storage.BuildCacheStorage)
+ if (m_Storage.CacheStorage)
{
- Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
+ Payload = m_Storage.CacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
}
if (!Payload)
{
Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
- if (m_Storage.BuildCacheStorage && m_Options.PopulateCache)
+ if (m_Storage.CacheStorage && m_Options.PopulateCache)
{
- m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(),
- RawHash,
- Payload.GetContentType(),
- CompositeBuffer(SharedBuffer(Payload)));
+ m_Storage.CacheStorage->PutBuildBlob(m_State.GetBuildId(),
+ RawHash,
+ Payload.GetContentType(),
+ CompositeBuffer(SharedBuffer(Payload)));
}
}
uint64_t PayloadSize = Payload.GetSize();
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index c8c5f201d..d5c6286a8 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/chunking/chunkedcontent.h>
#include <zenremotestore/chunking/chunkedfile.h>
#include <zenremotestore/operationlogoutput.h>
@@ -124,14 +125,17 @@ namespace remotestore_impl {
return OptionalContext->IsCancelled();
}
- std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS)
+ std::string GetStats(const RemoteProjectStore::Stats& Stats,
+ const BuildStorageCache::Statistics* OptionalCacheStats,
+ uint64_t ElapsedWallTimeMS)
{
- return fmt::format(
- "Sent: {} ({}bits/s) Recv: {} ({}bits/s)",
- NiceBytes(Stats.m_SentBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u),
- NiceBytes(Stats.m_ReceivedBytes),
- NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u));
+ uint64_t SentBytes = Stats.m_SentBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesWritten.load() : 0);
+ uint64_t ReceivedBytes = Stats.m_ReceivedBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesRead.load() : 0);
+ return fmt::format("Sent: {} ({}bits/s) Recv: {} ({}bits/s)",
+ NiceBytes(SentBytes),
+ NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u),
+ NiceBytes(ReceivedBytes),
+ NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u));
}
void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats)
@@ -269,12 +273,7 @@ namespace remotestore_impl {
JobContext* m_OptionalContext;
};
- void DownloadAndSaveBlockChunks(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
+ void DownloadAndSaveBlockChunks(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
AsyncRemoteResult& RemoteResult,
@@ -285,10 +284,8 @@ namespace remotestore_impl {
std::vector<uint32_t>&& NeededChunkIndexes)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
+ Context.NetworkWorkerPool.ScheduleWork(
+ [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
@@ -296,9 +293,7 @@ namespace remotestore_impl {
NeededChunkIndexes = std::move(NeededChunkIndexes),
&Info,
&LoadAttachmentsTimer,
- &DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ &DownloadStartMS]() {
ZEN_TRACE_CPU("DownloadBlockChunks");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -317,16 +312,16 @@ namespace remotestore_impl {
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks);
+ RemoteProjectStore::LoadAttachmentsResult Result = Context.RemoteStore.LoadAttachments(Chunks);
if (Result.ErrorCode)
{
- ReportMessage(OptionalContext,
+ ReportMessage(Context.OptionalJobContext,
fmt::format("Failed to load attachments with {} chunks ({}): {}",
Chunks.size(),
RemoteResult.GetError(),
RemoteResult.GetErrorReason()));
Info.MissingAttachmentCount.fetch_add(1);
- if (IgnoreMissingAttachments)
+ if (Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
@@ -338,7 +333,7 @@ namespace remotestore_impl {
uint64_t ChunkSize = It.second.GetCompressedSize();
Info.AttachmentBytesDownloaded.fetch_add(ChunkSize);
}
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Loaded {} bulk attachments in {}",
Chunks.size(),
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000))));
@@ -347,8 +342,8 @@ namespace remotestore_impl {
return;
}
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() {
+ Context.WorkerPool.ScheduleWork(
+ [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -369,7 +364,9 @@ namespace remotestore_impl {
WriteRawHashes.push_back(It.first);
}
std::vector<CidStore::InsertResult> InsertResults =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly);
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers,
+ WriteRawHashes,
+ CidStore::InsertMode::kCopyOnly);
for (size_t Index = 0; Index < InsertResults.size(); Index++)
{
@@ -400,12 +397,7 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
+ void DownloadAndSaveBlock(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
AsyncRemoteResult& RemoteResult,
@@ -418,19 +410,14 @@ namespace remotestore_impl {
uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
+ Context.NetworkWorkerPool.ScheduleWork(
[&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ &Context,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
RetriesLeft,
BlockHash = IoHash(BlockHash),
&AllNeededPartialChunkHashesLookup,
@@ -446,52 +433,65 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash);
- if (BlockResult.ErrorCode)
+
+ IoBuffer BlobBuffer;
+ if (Context.OptionalCache)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to download block attachment {} ({}): {}",
- BlockHash,
- RemoteResult.GetError(),
- RemoteResult.GetErrorReason()));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
- {
- RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- }
- return;
+ BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockHash);
}
- if (RemoteResult.IsError())
+
+ if (!BlobBuffer)
{
- return;
+ RemoteProjectStore::LoadAttachmentResult BlockResult = Context.RemoteStore.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode)
+ {
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("Failed to download block attachment {} ({}): {}",
+ BlockHash,
+ BlockResult.Reason,
+ BlockResult.Text));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
+ }
+ return;
+ }
+ if (RemoteResult.IsError())
+ {
+ return;
+ }
+ BlobBuffer = std::move(BlockResult.Bytes);
+ ZEN_DEBUG("Loaded block attachment '{}' in {} ({})",
+ BlockHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlobBuffer.Size()));
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ BlockHash,
+ BlobBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlobBuffer)));
+ }
}
- uint64_t BlockSize = BlockResult.Bytes.GetSize();
+ uint64_t BlockSize = BlobBuffer.GetSize();
Info.AttachmentBlocksDownloaded.fetch_add(1);
- ZEN_DEBUG("Loaded block attachment '{}' in {} ({})",
- BlockHash,
- NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)),
- NiceBytes(BlockSize));
Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize);
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
+ Context.WorkerPool.ScheduleWork(
[&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ &Context,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
RetriesLeft,
BlockHash = IoHash(BlockHash),
&AllNeededPartialChunkHashesLookup,
ChunkDownloadedFlags,
- Bytes = std::move(BlockResult.Bytes)]() {
+ Bytes = std::move(BlobBuffer)]() {
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
if (RemoteResult.IsError())
{
@@ -569,7 +569,7 @@ namespace remotestore_impl {
if (!WriteAttachmentBuffers.empty())
{
std::vector<CidStore::InsertResult> Results =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < Results.size(); Index++)
{
const CidStore::InsertResult& Result = Results[Index];
@@ -598,14 +598,9 @@ namespace remotestore_impl {
{
if (RetriesLeft > 0)
{
- ReportMessage(OptionalContext, fmt::format("{}, retrying download", ErrorString));
-
- return DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString));
+
+ return DownloadAndSaveBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -619,7 +614,7 @@ namespace remotestore_impl {
}
else
{
- ReportMessage(OptionalContext, ErrorString);
+ ReportMessage(Context.OptionalJobContext, ErrorString);
RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {});
return;
}
@@ -637,28 +632,29 @@ namespace remotestore_impl {
catch (const std::exception& Ex)
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError),
- fmt::format("Failed to block attachment {}", BlockHash),
+ fmt::format("Failed to download block attachment {}", BlockHash),
Ex.what());
}
},
WorkerThreadPool::EMode::EnableBacklog);
};
- bool DownloadPartialBlock(RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- AsyncRemoteResult& RemoteResult,
- DownloadInfo& Info,
- double& DownloadTimeSeconds,
- const ChunkBlockDescription& BlockDescription,
- bool BlockExistsInCache,
- std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
- size_t BlockRangeIndexStart,
- size_t BlockRangeCount,
+ void DownloadPartialBlock(LoadOplogContext& Context,
+ AsyncRemoteResult& RemoteResult,
+ DownloadInfo& Info,
+ double& DownloadTimeSeconds,
+ const ChunkBlockDescription& BlockDescription,
+ bool BlockExistsInCache,
+ std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors,
+ size_t BlockRangeIndexStart,
+ size_t BlockRangeCount,
std::function<void(IoBuffer&& Buffer,
size_t BlockRangeStartIndex,
std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded)
{
+ ZEN_ASSERT(Context.StoreMaxRangeCountPerRequest != 0);
+ ZEN_ASSERT(BlockExistsInCache == false || Context.CacheMaxRangeCountPerRequest != 0);
+
std::vector<std::pair<uint64_t, uint64_t>> Ranges;
Ranges.reserve(BlockRangeDescriptors.size());
for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++)
@@ -667,65 +663,104 @@ namespace remotestore_impl {
Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength));
}
- if (BlockExistsInCache)
- {
- RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
- RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, Ranges, RemoteProjectStore::ESourceMode::kCacheOnly);
- DownloadTimeSeconds += BlockResult.ElapsedSeconds;
- if (RemoteResult.IsError())
- {
- return false;
- }
- if (!BlockResult.ErrorCode && BlockResult.Bytes)
- {
- if (BlockResult.Ranges.size() != Ranges.size())
- {
- throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
- Ranges.size(),
- BlockDescription.BlockHash,
- BlockResult.Ranges.size()));
- }
- OnDownloaded(std::move(BlockResult.Bytes), BlockRangeIndexStart, BlockResult.Ranges);
- return true;
- }
- }
-
- const size_t MaxRangesPerRequestToJupiter = RemoteProjectStore::MaxRangeCountPerRequest;
-
size_t SubBlockRangeCount = BlockRangeCount;
size_t SubRangeCountComplete = 0;
std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges);
+
while (SubRangeCountComplete < SubBlockRangeCount)
{
if (RemoteResult.IsError())
{
break;
}
- size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter);
+
size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete;
+ if (BlockExistsInCache)
+ {
+ ZEN_ASSERT(Context.OptionalCache);
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.CacheMaxRangeCountPerRequest);
+
+ if (SubRangeCount == 1)
+ {
+ // Legacy single-range path, prefer that for max compatibility
+
+ const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete];
+ Stopwatch CacheTimer;
+ IoBuffer PayloadBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId,
+ BlockDescription.BlockHash,
+ SubRange.first,
+ SubRange.second);
+ DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (PayloadBuffer)
+ {
+ OnDownloaded(std::move(PayloadBuffer),
+ SubRangeStartIndex,
+ std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)});
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ else
+ {
+ auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
+
+ Stopwatch CacheTimer;
+ BuildStorageCache::BuildBlobRanges RangeBuffers =
+ Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges);
+ DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0;
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ if (RangeBuffers.PayloadBuffer)
+ {
+ if (RangeBuffers.Ranges.empty())
+ {
+ SubRangeCount = Ranges.size() - SubRangeCountComplete;
+ OnDownloaded(std::move(RangeBuffers.PayloadBuffer),
+ SubRangeStartIndex,
+ RangesSpan.subspan(SubRangeCountComplete, SubRangeCount));
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ else if (RangeBuffers.Ranges.size() == SubRangeCount)
+ {
+ OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges);
+ SubRangeCountComplete += SubRangeCount;
+ continue;
+ }
+ }
+ }
+ }
+
+ size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.StoreMaxRangeCountPerRequest);
auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount);
RemoteProjectStore::LoadAttachmentRangesResult BlockResult =
- RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges, RemoteProjectStore::ESourceMode::kHostOnly);
+ Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges);
DownloadTimeSeconds += BlockResult.ElapsedSeconds;
if (RemoteResult.IsError())
{
- return false;
+ break;
}
if (BlockResult.ErrorCode || !BlockResult.Bytes)
{
- ReportMessage(OptionalContext,
+ ReportMessage(Context.OptionalJobContext,
fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}",
SubRanges.size(),
BlockDescription.BlockHash,
BlockResult.ErrorCode,
BlockResult.Reason));
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text);
- return false;
+ break;
}
}
else
@@ -734,6 +769,18 @@ namespace remotestore_impl {
{
// Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3
// Use the whole payload for the remaining ranges
+
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ BlockDescription.BlockHash,
+ ZenContentType::kCompressedBinary,
+ CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes}));
+ if (RemoteResult.IsError())
+ {
+ break;
+ }
+ }
SubRangeCount = Ranges.size() - SubRangeCountComplete;
OnDownloaded(std::move(BlockResult.Bytes),
SubRangeStartIndex,
@@ -743,10 +790,13 @@ namespace remotestore_impl {
{
if (BlockResult.Ranges.size() != SubRanges.size())
{
- throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges",
- SubRanges.size(),
- BlockDescription.BlockHash,
- BlockResult.Ranges.size()));
+ RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
+ fmt::format("Range response for block {} contains {} ranges, expected {} ranges",
+ BlockDescription.BlockHash,
+ BlockResult.Ranges.size(),
+ SubRanges.size()),
+ "");
+ break;
}
OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges);
}
@@ -754,15 +804,9 @@ namespace remotestore_impl {
SubRangeCountComplete += SubRangeCount;
}
- return true;
}
- void DownloadAndSavePartialBlock(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
+ void DownloadAndSavePartialBlock(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
AsyncRemoteResult& RemoteResult,
@@ -779,19 +823,14 @@ namespace remotestore_impl {
uint32_t RetriesLeft)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
+ Context.NetworkWorkerPool.ScheduleWork(
[&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ &Context,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
BlockDescription,
BlockExistsInCache,
BlockRangeDescriptors,
@@ -811,10 +850,8 @@ namespace remotestore_impl {
double DownloadElapsedSeconds = 0;
uint64_t DownloadedBytes = 0;
- bool Success = DownloadPartialBlock(
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
+ DownloadPartialBlock(
+ Context,
RemoteResult,
Info,
DownloadElapsedSeconds,
@@ -833,19 +870,14 @@ namespace remotestore_impl {
Info.AttachmentBlocksRangesDownloaded++;
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
+ Context.WorkerPool.ScheduleWork(
[&AttachmentsWriteLatch,
- &ChunkStore,
- &RemoteStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ &Context,
&AttachmentsDownloadLatch,
&RemoteResult,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- IgnoreMissingAttachments,
- OptionalContext,
BlockDescription,
BlockExistsInCache,
BlockRangeDescriptors,
@@ -945,14 +977,9 @@ namespace remotestore_impl {
{
if (RetriesLeft > 0)
{
- ReportMessage(OptionalContext,
+ ReportMessage(Context.OptionalJobContext,
fmt::format("{}, retrying download", ErrorString));
- return DownloadAndSavePartialBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ return DownloadAndSavePartialBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -969,9 +996,9 @@ namespace remotestore_impl {
RetriesLeft - 1);
}
- ReportMessage(OptionalContext, ErrorString);
+ ReportMessage(Context.OptionalJobContext, ErrorString);
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound),
"Malformed chunk block",
@@ -998,7 +1025,7 @@ namespace remotestore_impl {
if (!WriteAttachmentBuffers.empty())
{
std::vector<CidStore::InsertResult> Results =
- ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
+ Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes);
for (size_t Index = 0; Index < Results.size(); Index++)
{
const CidStore::InsertResult& Result = Results[Index];
@@ -1037,7 +1064,7 @@ namespace remotestore_impl {
},
WorkerThreadPool::EMode::EnableBacklog);
});
- if (Success)
+ if (!RemoteResult.IsError())
{
ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})",
BlockRangeCount,
@@ -1056,12 +1083,7 @@ namespace remotestore_impl {
WorkerThreadPool::EMode::EnableBacklog);
};
- void DownloadAndSaveAttachment(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- bool IgnoreMissingAttachments,
- JobContext* OptionalContext,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
+ void DownloadAndSaveAttachment(LoadOplogContext& Context,
Latch& AttachmentsDownloadLatch,
Latch& AttachmentsWriteLatch,
AsyncRemoteResult& RemoteResult,
@@ -1071,19 +1093,15 @@ namespace remotestore_impl {
const IoHash& RawHash)
{
AttachmentsDownloadLatch.AddCount(1);
- NetworkWorkerPool.ScheduleWork(
- [&RemoteStore,
- &ChunkStore,
- &WorkerPool,
+ Context.NetworkWorkerPool.ScheduleWork(
+ [&Context,
&RemoteResult,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
RawHash,
&LoadAttachmentsTimer,
&DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext]() {
+ &Info]() {
ZEN_TRACE_CPU("DownloadAttachment");
auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); });
@@ -1095,43 +1113,52 @@ namespace remotestore_impl {
{
uint64_t Unset = (std::uint64_t)-1;
DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs());
- RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash);
- if (AttachmentResult.ErrorCode)
+ IoBuffer BlobBuffer;
+ if (Context.OptionalCache)
{
- ReportMessage(OptionalContext,
- fmt::format("Failed to download large attachment {}: '{}', error code : {}",
- RawHash,
- AttachmentResult.Reason,
- AttachmentResult.ErrorCode));
- Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, RawHash);
+ }
+ if (!BlobBuffer)
+ {
+ RemoteProjectStore::LoadAttachmentResult AttachmentResult = Context.RemoteStore.LoadAttachment(RawHash);
+ if (AttachmentResult.ErrorCode)
{
- RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ ReportMessage(Context.OptionalJobContext,
+ fmt::format("Failed to download large attachment {}: '{}', error code : {}",
+ RawHash,
+ AttachmentResult.Reason,
+ AttachmentResult.ErrorCode));
+ Info.MissingAttachmentCount.fetch_add(1);
+ if (!Context.IgnoreMissingAttachments)
+ {
+ RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text);
+ }
+ return;
+ }
+ BlobBuffer = std::move(AttachmentResult.Bytes);
+ ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
+ RawHash,
+ NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
+ NiceBytes(BlobBuffer.GetSize()));
+ if (Context.OptionalCache && Context.PopulateCache)
+ {
+ Context.OptionalCache->PutBuildBlob(Context.CacheBuildId,
+ RawHash,
+ BlobBuffer.GetContentType(),
+ CompositeBuffer(SharedBuffer(BlobBuffer)));
}
- return;
}
- uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize();
- ZEN_DEBUG("Loaded large attachment '{}' in {} ({})",
- RawHash,
- NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)),
- NiceBytes(AttachmentSize));
- Info.AttachmentsDownloaded.fetch_add(1);
if (RemoteResult.IsError())
{
return;
}
+ uint64_t AttachmentSize = BlobBuffer.GetSize();
+ Info.AttachmentsDownloaded.fetch_add(1);
Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize);
AttachmentsWriteLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&AttachmentsWriteLatch,
- &RemoteResult,
- &Info,
- &ChunkStore,
- RawHash,
- AttachmentSize,
- Bytes = std::move(AttachmentResult.Bytes),
- OptionalContext]() {
+ Context.WorkerPool.ScheduleWork(
+ [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() {
ZEN_TRACE_CPU("WriteAttachment");
auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); });
@@ -1141,7 +1168,7 @@ namespace remotestore_impl {
}
try
{
- CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash);
+ CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash);
if (InsertResult.New)
{
Info.AttachmentBytesStored.fetch_add(AttachmentSize);
@@ -1557,7 +1584,9 @@ namespace remotestore_impl {
uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs();
ReportProgress(OptionalContext,
"Saving attachments"sv,
- fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
+ fmt::format("{} remaining... {}",
+ Remaining,
+ GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)),
AttachmentsToSave,
Remaining);
}
@@ -1566,7 +1595,7 @@ namespace remotestore_impl {
{
ReportProgress(OptionalContext,
"Saving attachments"sv,
- fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)),
+ fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)),
AttachmentsToSave,
0);
}
@@ -1577,7 +1606,7 @@ namespace remotestore_impl {
LargeAttachmentCountToUpload,
BulkAttachmentCountToUpload,
NiceTimeSpanMs(ElapsedTimeMS),
- GetStats(RemoteStore.GetStats(), ElapsedTimeMS)));
+ GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)));
}
} // namespace remotestore_impl
@@ -2186,31 +2215,36 @@ BuildContainer(CidStore& ChunkStore,
}
ResolveAttachmentsLatch.CountDown();
- while (!ResolveAttachmentsLatch.Wait(1000))
{
- ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining();
+ while (!ResolveAttachmentsLatch.Wait(1000))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
- while (!ResolveAttachmentsLatch.Wait(1000))
+ ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining();
+ if (remotestore_impl::IsCancelled(OptionalContext))
{
- Remaining = ResolveAttachmentsLatch.Remaining();
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("Aborting, {} attachments remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
+ while (!ResolveAttachmentsLatch.Wait(1000))
+ {
+ Remaining = ResolveAttachmentsLatch.Remaining();
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("Aborting, {} attachments remaining...", Remaining),
+ UploadAttachments.size(),
+ Remaining);
+ }
+ remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
+ return {};
}
- remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0);
- return {};
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Resolving attachments"sv,
+ fmt::format("{} remaining...", Remaining),
+ AttachmentCountToUseForProgress,
+ Remaining);
}
- remotestore_impl::ReportProgress(OptionalContext,
- "Resolving attachments"sv,
- fmt::format("{} remaining...", Remaining),
- UploadAttachments.size(),
- Remaining);
}
if (UploadAttachments.size() > 0)
{
@@ -2598,12 +2632,14 @@ BuildContainer(CidStore& ChunkStore,
0);
}
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}",
- ChunkAssembleCount,
- TotalOpCount,
- GeneratedBlockCount,
- NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}",
+ ChunkAssembleCount,
+ TotalOpCount,
+ GeneratedBlockCount,
+ LargeChunkHashes.size(),
+ NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs()))));
if (remotestore_impl::IsCancelled(OptionalContext))
{
@@ -3155,17 +3191,18 @@ SaveOplog(CidStore& ChunkStore,
remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
- RemoteStoreInfo.ContainerName,
- RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksUploaded.load(),
- NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
- Info.AttachmentsUploaded.load(),
- NiceBytes(Info.AttachmentBytesUploaded.load()),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::ReportMessage(
+ OptionalContext,
+ fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}",
+ RemoteStoreInfo.ContainerName,
+ RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksUploaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesUploaded.load()),
+ Info.AttachmentsUploaded.load(),
+ NiceBytes(Info.AttachmentBytesUploaded.load()),
+ remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS)));
return Result;
};
@@ -3234,6 +3271,11 @@ ParseOplogContainer(
OpCount - OpsCompleteCount);
}
}
+ remotestore_impl::ReportProgress(OptionalContext,
+ "Scanning oplog"sv,
+ fmt::format("{} attachments found", NeededAttachments.size()),
+ OpCount,
+ OpCount - OpsCompleteCount);
}
{
std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end());
@@ -3406,22 +3448,11 @@ SaveOplogContainer(
}
RemoteProjectStore::Result
-LoadOplog(CidStore& ChunkStore,
- RemoteProjectStore& RemoteStore,
- ProjectStore::Oplog& Oplog,
- WorkerThreadPool& NetworkWorkerPool,
- WorkerThreadPool& WorkerPool,
- bool ForceDownload,
- bool IgnoreMissingAttachments,
- bool CleanOplog,
- EPartialBlockRequestMode PartialBlockRequestMode,
- double HostLatencySec,
- double CacheLatencySec,
- JobContext* OptionalContext)
+LoadOplog(LoadOplogContext&& Context)
{
using namespace std::literals;
- std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext));
+ std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext));
remotestore_impl::DownloadInfo Info;
@@ -3430,25 +3461,25 @@ LoadOplog(CidStore& ChunkStore,
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
uint64_t BlockCountToDownload = 0;
- RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo();
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
+ RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = Context.RemoteStore.GetInfo();
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName));
uint64_t TransferWallTimeMS = 0;
Stopwatch LoadContainerTimer;
- RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer();
+ RemoteProjectStore::LoadContainerResult LoadContainerResult = Context.RemoteStore.LoadContainer();
TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs();
if (LoadContainerResult.ErrorCode)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode));
return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode,
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
.Reason = LoadContainerResult.Reason,
.Text = LoadContainerResult.Text};
}
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Loaded container in {} ({})",
NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)),
NiceBytes(LoadContainerResult.ContainerObject.GetSize())));
@@ -3462,12 +3493,12 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch LoadAttachmentsTimer;
std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1;
- auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) {
- if (ForceDownload)
+ auto HasAttachment = [&Context](const IoHash& RawHash) {
+ if (Context.ForceDownload)
{
return false;
}
- if (ChunkStore.ContainsChunk(RawHash))
+ if (Context.ChunkStore.ContainsChunk(RawHash))
{
return true;
}
@@ -3482,10 +3513,7 @@ LoadOplog(CidStore& ChunkStore,
std::vector<NeededBlockDownload> NeededBlockDownloads;
- auto OnNeedBlock = [&RemoteStore,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ auto OnNeedBlock = [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&AttachmentCount,
@@ -3494,9 +3522,8 @@ LoadOplog(CidStore& ChunkStore,
&Info,
&LoadAttachmentsTimer,
&DownloadStartMS,
- &NeededBlockDownloads,
- IgnoreMissingAttachments,
- OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) {
+ &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription,
+ std::vector<uint32_t>&& NeededChunkIndexes) {
if (RemoteResult.IsError())
{
return;
@@ -3506,12 +3533,7 @@ LoadOplog(CidStore& ChunkStore,
AttachmentCount.fetch_add(1);
if (ThinBlockDescription.BlockHash == IoHash::Zero)
{
- DownloadAndSaveBlockChunks(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSaveBlockChunks(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3528,11 +3550,7 @@ LoadOplog(CidStore& ChunkStore,
}
};
- auto OnNeedAttachment = [&RemoteStore,
- &Oplog,
- &ChunkStore,
- &NetworkWorkerPool,
- &WorkerPool,
+ auto OnNeedAttachment = [&Context,
&AttachmentsDownloadLatch,
&AttachmentsWriteLatch,
&RemoteResult,
@@ -3540,9 +3558,7 @@ LoadOplog(CidStore& ChunkStore,
&AttachmentCount,
&LoadAttachmentsTimer,
&DownloadStartMS,
- &Info,
- IgnoreMissingAttachments,
- OptionalContext](const IoHash& RawHash) {
+ &Info](const IoHash& RawHash) {
if (!Attachments.insert(RawHash).second)
{
return;
@@ -3552,12 +3568,7 @@ LoadOplog(CidStore& ChunkStore,
return;
}
AttachmentCount.fetch_add(1);
- DownloadAndSaveAttachment(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSaveAttachment(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3570,11 +3581,11 @@ LoadOplog(CidStore& ChunkStore,
std::vector<ChunkedInfo> FilesToDechunk;
auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); };
- auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); };
+ auto OnReferencedAttachments = [&Context](std::span<IoHash> RawHashes) { Context.Oplog.CaptureAddedAttachments(RawHashes); };
// Make sure we retain any attachments we download before writing the oplog
- Oplog.EnableUpdateCapture();
- auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); });
+ Context.Oplog.EnableUpdateCapture();
+ auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); });
CbObject OplogSection;
RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject,
@@ -3584,12 +3595,12 @@ LoadOplog(CidStore& ChunkStore,
OnNeedAttachment,
OnChunkedAttachment,
OplogSection,
- OptionalContext);
+ Context.OptionalJobContext);
if (Result.ErrorCode != 0)
{
RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text);
}
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download",
NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
Attachments.size(),
@@ -3613,11 +3624,12 @@ LoadOplog(CidStore& ChunkStore,
std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false);
ChunkBlockAnalyser::BlockResult PartialBlocksResult;
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size()));
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size()));
- RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes);
+ RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions =
+ Context.RemoteStore.GetBlockDescriptions(BlockHashes, Context.OptionalCache, Context.CacheBuildId);
- remotestore_impl::ReportMessage(OptionalContext,
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
fmt::format("GetBlockDescriptions took {}. Found {} blocks",
NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)),
BlockDescriptions.Blocks.size()));
@@ -3636,12 +3648,7 @@ LoadOplog(CidStore& ChunkStore,
if (FindIt == BlockDescriptions.Blocks.end())
{
// Fall back to full download as we can't get enough information about the block
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSaveBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3678,56 +3685,86 @@ LoadOplog(CidStore& ChunkStore,
if (!AllNeededChunkHashes.empty())
{
std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes;
- std::vector<bool> BlockExistsInCache;
+ std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false);
- if (PartialBlockRequestMode == EPartialBlockRequestMode::Off)
+ if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off)
{
PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
}
else
{
- RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult =
- RemoteStore.AttachmentExistsInCache(BlocksWithDescription);
- if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size())
+ if (Context.OptionalCache)
{
- BlockExistsInCache.resize(BlocksWithDescription.size(), false);
+ std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult =
+ Context.OptionalCache->BlobsExists(Context.CacheBuildId, BlocksWithDescription);
+ if (CacheExistsResult.size() == BlocksWithDescription.size())
+ {
+ for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++)
+ {
+ BlockExistsInCache[BlobIndex] = CacheExistsResult[BlobIndex].HasBody;
+ }
+ }
+ uint64_t FoundBlocks =
+ std::accumulate(BlockExistsInCache.begin(),
+ BlockExistsInCache.end(),
+ uint64_t(0u),
+ [](uint64_t Current, bool Exists) -> uint64_t { return Current + (Exists ? 1 : 0); });
+ if (FoundBlocks > 0)
+ {
+ remotestore_impl::ReportMessage(
+ Context.OptionalJobContext,
+ fmt::format("Found {} out of {} blocks in cache", FoundBlocks, BlockExistsInCache.size()));
+ }
}
- else
+
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+
+ switch (Context.PartialBlockRequestMode)
{
- BlockExistsInCache = std::move(CacheExistsResult.HasBody);
+ case EPartialBlockRequestMode::Off:
+ break;
+ case EPartialBlockRequestMode::ZenCacheOnly:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off;
+ break;
+ case EPartialBlockRequestMode::Mixed:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
+ case EPartialBlockRequestMode::All:
+ CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange;
+ CloudPartialDownloadMode = Context.StoreMaxRangeCountPerRequest > 1
+ ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange
+ : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange;
+ break;
}
PartialBlockDownloadModes.reserve(BlocksWithDescription.size());
-
- for (bool ExistsInCache : BlockExistsInCache)
+ for (uint32_t BlockIndex = 0; BlockIndex < BlocksWithDescription.size(); BlockIndex++)
{
- if (PartialBlockRequestMode == EPartialBlockRequestMode::All)
- {
- PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange);
- }
- else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly)
- {
- PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off);
- }
- else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed)
- {
- PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed
- : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange);
- }
+ const bool BlockExistInCache = BlockExistsInCache[BlockIndex];
+ PartialBlockDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode);
}
}
ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size());
+
ChunkBlockAnalyser PartialAnalyser(
*LogOutput,
BlockDescriptions.Blocks,
- ChunkBlockAnalyser::Options{.IsQuiet = false,
- .IsVerbose = false,
- .HostLatencySec = HostLatencySec,
- .HostHighSpeedLatencySec = CacheLatencySec,
- .HostMaxRangeCountPerRequest = RemoteProjectStore::MaxRangeCountPerRequest});
+ ChunkBlockAnalyser::Options{.IsQuiet = false,
+ .IsVerbose = false,
+ .HostLatencySec = Context.StoreLatencySec,
+ .HostHighSpeedLatencySec = Context.CacheLatencySec,
+ .HostMaxRangeCountPerRequest = Context.StoreMaxRangeCountPerRequest,
+ .HostHighSpeedMaxRangeCountPerRequest = Context.CacheMaxRangeCountPerRequest});
std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks =
PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup,
@@ -3736,12 +3773,7 @@ LoadOplog(CidStore& ChunkStore,
PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes);
for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes)
{
- DownloadAndSaveBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSaveBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3765,12 +3797,7 @@ LoadOplog(CidStore& ChunkStore,
RangeCount++;
}
- DownloadAndSavePartialBlock(ChunkStore,
- RemoteStore,
- IgnoreMissingAttachments,
- OptionalContext,
- NetworkWorkerPool,
- WorkerPool,
+ DownloadAndSavePartialBlock(Context,
AttachmentsDownloadLatch,
AttachmentsWriteLatch,
RemoteResult,
@@ -3791,38 +3818,44 @@ LoadOplog(CidStore& ChunkStore,
}
AttachmentsDownloadLatch.CountDown();
- while (!AttachmentsDownloadLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining();
+ while (!AttachmentsDownloadLatch.Wait(1000))
{
- if (!RemoteResult.IsError())
+ ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining();
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
+ }
+ uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
+ if (DownloadStartMS != (uint64_t)-1)
+ {
+ PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
}
- }
- uint64_t PartialTransferWallTimeMS = TransferWallTimeMS;
- if (DownloadStartMS != (uint64_t)-1)
- {
- PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load();
- }
-
- uint64_t AttachmentsDownloaded =
- Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
- uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() +
- Info.AttachmentBytesDownloaded.load();
- remotestore_impl::ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{} ({}) downloaded, {} ({}) stored, {} remaining. {}",
- AttachmentsDownloaded,
- NiceBytes(AttachmentBytesDownloaded),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Remaining,
- remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)),
- AttachmentCount.load(),
- Remaining);
+ uint64_t AttachmentsDownloaded =
+ Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load();
+ uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() +
+ Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
+
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(
+ Context.OptionalJobContext,
+ "Loading attachments"sv,
+ fmt::format(
+ "{} ({}) downloaded, {} ({}) stored, {} remaining. {}",
+ AttachmentsDownloaded,
+ NiceBytes(AttachmentBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Remaining,
+ remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)),
+ AttachmentCountToUseForProgress,
+ Remaining);
+ }
}
if (DownloadStartMS != (uint64_t)-1)
{
@@ -3831,58 +3864,58 @@ LoadOplog(CidStore& ChunkStore,
if (AttachmentCount.load() > 0)
{
- remotestore_impl::ReportProgress(OptionalContext,
- "Loading attachments"sv,
- fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)),
- AttachmentCount.load(),
- 0);
+ remotestore_impl::ReportProgress(
+ Context.OptionalJobContext,
+ "Loading attachments"sv,
+ fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)),
+ AttachmentCount.load(),
+ 0);
}
AttachmentsWriteLatch.CountDown();
- while (!AttachmentsWriteLatch.Wait(1000))
{
- ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining();
+ while (!AttachmentsWriteLatch.Wait(1000))
{
- if (!RemoteResult.IsError())
+ ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining();
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
- RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ if (!RemoteResult.IsError())
+ {
+ RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
+ }
}
+ AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
+ "Writing attachments"sv,
+ fmt::format("{} ({}), {} remaining.",
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Remaining),
+ AttachmentCountToUseForProgress,
+ Remaining);
}
- remotestore_impl::ReportProgress(
- OptionalContext,
- "Writing attachments"sv,
- fmt::format("{} ({}), {} remaining.", Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Remaining),
- AttachmentCount.load(),
- Remaining);
}
if (AttachmentCount.load() > 0)
{
- remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0);
}
if (Result.ErrorCode == 0)
{
if (!FilesToDechunk.empty())
{
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
+ remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size()));
Latch DechunkLatch(1);
- std::filesystem::path TempFilePath = Oplog.TempPath();
+ std::filesystem::path TempFilePath = Context.Oplog.TempPath();
for (const ChunkedInfo& Chunked : FilesToDechunk)
{
std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString();
DechunkLatch.AddCount(1);
- WorkerPool.ScheduleWork(
- [&ChunkStore,
- &DechunkLatch,
- TempFileName,
- &Chunked,
- &RemoteResult,
- IgnoreMissingAttachments,
- &Info,
- OptionalContext]() {
+ Context.WorkerPool.ScheduleWork(
+ [&Context, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, &Info]() {
ZEN_TRACE_CPU("DechunkAttachment");
auto _ = MakeGuard([&DechunkLatch, &TempFileName] {
@@ -3916,16 +3949,16 @@ LoadOplog(CidStore& ChunkStore,
for (std::uint32_t SequenceIndex : Chunked.ChunkSequence)
{
const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex];
- IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash);
+ IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash);
if (!Chunk)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash));
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -3943,7 +3976,7 @@ LoadOplog(CidStore& ChunkStore,
if (RawHash != ChunkHash)
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}",
RawHash,
ChunkHash,
@@ -3951,7 +3984,7 @@ LoadOplog(CidStore& ChunkStore,
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -3988,14 +4021,14 @@ LoadOplog(CidStore& ChunkStore,
}))
{
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Failed to decompress chunk {} for chunked attachment {}",
ChunkHash,
Chunked.RawHash));
// We only add 1 as the resulting missing count will be 1 for the dechunked file
Info.MissingAttachmentCount.fetch_add(1);
- if (!IgnoreMissingAttachments)
+ if (!Context.IgnoreMissingAttachments)
{
RemoteResult.SetError(
gsl::narrow<int>(HttpResponseCode::NotFound),
@@ -4019,18 +4052,17 @@ LoadOplog(CidStore& ChunkStore,
}
uint64_t TmpBufferSize = TmpBuffer.GetSize();
CidStore::InsertResult InsertResult =
- ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
+ Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace);
if (InsertResult.New)
{
Info.AttachmentBytesStored.fetch_add(TmpBufferSize);
Info.AttachmentsStored.fetch_add(1);
}
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Dechunked attachment {} ({}) in {}",
- Chunked.RawHash,
- NiceBytes(Chunked.RawSize),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs())));
+ ZEN_INFO("Dechunked attachment {} ({}) in {}",
+ Chunked.RawHash,
+ NiceBytes(Chunked.RawSize),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
catch (const std::exception& Ex)
{
@@ -4046,54 +4078,58 @@ LoadOplog(CidStore& ChunkStore,
while (!DechunkLatch.Wait(1000))
{
ptrdiff_t Remaining = DechunkLatch.Remaining();
- if (remotestore_impl::IsCancelled(OptionalContext))
+ if (remotestore_impl::IsCancelled(Context.OptionalJobContext))
{
if (!RemoteResult.IsError())
{
RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", "");
remotestore_impl::ReportMessage(
- OptionalContext,
+ Context.OptionalJobContext,
fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason()));
}
}
- remotestore_impl::ReportProgress(OptionalContext,
+ remotestore_impl::ReportProgress(Context.OptionalJobContext,
"Dechunking attachments"sv,
fmt::format("{} remaining...", Remaining),
FilesToDechunk.size(),
Remaining);
}
- remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
+ remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0);
}
Result = RemoteResult.ConvertResult();
}
if (Result.ErrorCode == 0)
{
- if (CleanOplog)
+ if (Context.CleanOplog)
{
- RemoteStore.Flush();
- if (!Oplog.Reset())
+ if (Context.OptionalCache)
+ {
+ Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; });
+ }
+ if (!Context.Oplog.Reset())
{
Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError),
.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0,
- .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())};
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
+ .Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId())};
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason));
}
}
if (Result.ErrorCode == 0)
{
- remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext);
+ remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext);
}
}
Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0;
- remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats());
+ remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats());
{
std::string DownloadDetails;
RemoteProjectStore::ExtendedStats ExtendedStats;
- if (RemoteStore.GetExtendedStats(ExtendedStats))
+ if (Context.RemoteStore.GetExtendedStats(ExtendedStats))
{
if (!ExtendedStats.m_ReceivedBytesPerSource.empty())
{
@@ -4112,7 +4148,8 @@ LoadOplog(CidStore& ChunkStore,
Total += It.second;
}
- remotestore_impl::ReportMessage(OptionalContext, fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView()));
+ remotestore_impl::ReportMessage(Context.OptionalJobContext,
+ fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView()));
}
}
}
@@ -4122,25 +4159,26 @@ LoadOplog(CidStore& ChunkStore,
uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() +
Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load();
- remotestore_impl::ReportMessage(OptionalContext,
- fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} "
- "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}",
- RemoteStoreInfo.ContainerName,
- Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
- NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
- NiceBytes(Info.OplogSizeBytes),
- Info.AttachmentBlocksDownloaded.load(),
- NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
- Info.AttachmentBlocksRangesDownloaded.load(),
- NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()),
- Info.AttachmentsDownloaded.load(),
- NiceBytes(Info.AttachmentBytesDownloaded.load()),
- TotalDownloads,
- NiceBytes(TotalBytesDownloaded),
- Info.AttachmentsStored.load(),
- NiceBytes(Info.AttachmentBytesStored.load()),
- Info.MissingAttachmentCount.load(),
- remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)));
+ remotestore_impl::ReportMessage(
+ Context.OptionalJobContext,
+ fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} "
+ "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}",
+ RemoteStoreInfo.ContainerName,
+ Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE",
+ NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)),
+ NiceBytes(Info.OplogSizeBytes),
+ Info.AttachmentBlocksDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockBytesDownloaded.load()),
+ Info.AttachmentBlocksRangesDownloaded.load(),
+ NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()),
+ Info.AttachmentsDownloaded.load(),
+ NiceBytes(Info.AttachmentBytesDownloaded.load()),
+ TotalDownloads,
+ NiceBytes(TotalBytesDownloaded),
+ Info.AttachmentsStored.load(),
+ NiceBytes(Info.AttachmentBytesStored.load()),
+ Info.MissingAttachmentCount.load(),
+ remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)));
return Result;
}
@@ -4237,6 +4275,26 @@ namespace projectstore_testutils {
return Result;
}
+ class TestJobContext : public JobContext
+ {
+ public:
+ explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {}
+ virtual bool IsCancelled() const { return false; }
+ virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); }
+ virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount)
+ {
+ ZEN_INFO("Job {}: Op '{}'{} {}/{}",
+ m_OpIndex,
+ CurrentOp,
+ Details.empty() ? "" : fmt::format(" {}", Details),
+ TotalCount - RemainingCount,
+ TotalCount);
+ }
+
+ private:
+ int& m_OpIndex;
+ };
+
} // namespace projectstore_testutils
TEST_SUITE_BEGIN("remotestore.projectstore");
@@ -4334,66 +4392,708 @@ TEST_CASE_TEMPLATE("project.store.export",
false,
nullptr);
- CHECK(ExportResult.ErrorCode == 0);
+ REQUIRE(ExportResult.ErrorCode == 0);
Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {});
CHECK(OplogImport);
- RemoteProjectStore::Result ImportResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- EPartialBlockRequestMode::Mixed,
- /*HostLatencySec*/ -1.0,
- /*CacheLatencySec*/ -1.0,
- nullptr);
+ int OpJobIndex = 0;
+ TestJobContext OpJobContext(OpJobIndex);
+
+ RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ false,
- EPartialBlockRequestMode::Mixed,
- /*HostLatencySec*/ -1.0,
- /*CacheLatencySec*/ -1.0,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportForceResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportForceResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ false,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- EPartialBlockRequestMode::Mixed,
- /*HostLatencySec*/ -1.0,
- /*CacheLatencySec*/ -1.0,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportCleanResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = true,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportCleanResult.ErrorCode == 0);
-
- RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore,
- *RemoteStore,
- *OplogImport,
- NetworkPool,
- WorkerPool,
- /*Force*/ true,
- /*IgnoreMissingAttachments*/ false,
- /*CleanOplog*/ true,
- EPartialBlockRequestMode::Mixed,
- /*HostLatencySec*/ -1.0,
- /*CacheLatencySec*/ -1.0,
- nullptr);
+ OpJobIndex++;
+
+ RemoteProjectStore::Result ImportForceCleanResult =
+ LoadOplog(LoadOplogContext{.ChunkStore = CidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = Oid::Zero,
+ .Oplog = *OplogImport,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = true,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = true,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed,
+ .OptionalJobContext = &OpJobContext});
CHECK(ImportForceCleanResult.ErrorCode == 0);
+ OpJobIndex++;
+}
+
+// Common oplog setup used by the two tests below.
+// Returns a FileRemoteStore backed by ExportDir that has been populated with a SaveOplog call.
+// Keeps the test data identical to project.store.export so the two test suites exercise the same blocks/attachments.
+static RemoteProjectStore::Result
+SetupExportStore(CidStore& CidStore,
+ ProjectStore::Project& Project,
+ WorkerThreadPool& NetworkPool,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& ExportDir,
+ std::shared_ptr<RemoteProjectStore>& OutRemoteStore)
+{
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ Ref<ProjectStore::Oplog> Oplog = Project.NewOplog("oplog_export", {});
+ if (!Oplog)
+ {
+ return RemoteProjectStore::Result{.ErrorCode = -1};
+ }
+
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {}));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77})));
+ Oplog->AppendNewOplogEntry(
+ CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122})));
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None)));
+
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024,
+ .MaxChunksPerBlock = 1000,
+ .MaxChunkEmbedSize = 32 * 1024u,
+ .ChunkFileSizeLimit = 64u * 1024u},
+ /*.FolderPath =*/ExportDir,
+ /*.Name =*/std::string("oplog_export"),
+ /*.OptionalBaseName =*/std::string(),
+ /*.ForceDisableBlocks =*/false,
+ /*.ForceEnableTempBlocks =*/false};
+
+ OutRemoteStore = CreateFileRemoteStore(Log(), Options);
+ return SaveOplog(CidStore,
+ *OutRemoteStore,
+ Project,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ Options.MaxBlockSize,
+ Options.MaxChunksPerBlock,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+}
+
+// Creates an export store with a single oplog entry that packs six 512 KB chunks into one
+// ~3 MB block (MaxBlockSize = 8 MB). The resulting block slack (~1.5 MB) far exceeds the
+// 512 KB threshold that ChunkBlockAnalyser requires before it will consider partial-block
+// downloads instead of full-block downloads.
+//
+// This function is self-contained: it creates its own GcManager, CidStore, ProjectStore and
+// Project internally so that each call is independent of any outer test context. After
+// SaveOplog returns, all persistent data lives on disk inside ExportDir and the caller can
+// freely query OutRemoteStore without holding any references to the internal context.
+static RemoteProjectStore::Result
+SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& ExportDir,
+ std::shared_ptr<RemoteProjectStore>& OutRemoteStore)
+{
+ using namespace projectstore_testutils;
+ using namespace std::literals;
+
+ // Self-contained CAS and project store. Subdirectories of ExportDir keep everything
+ // together without relying on the outer TEST_CASE's ExportCidStore / ExportProject.
+ GcManager LocalGc;
+ CidStore LocalCidStore(LocalGc);
+ CidStoreConfiguration LocalCidConfig = {.RootDirectory = ExportDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096};
+ LocalCidStore.Initialize(LocalCidConfig);
+
+ std::filesystem::path LocalProjectBasePath = ExportDir / "proj";
+ ProjectStore LocalProjectStore(LocalCidStore, LocalProjectBasePath, LocalGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> LocalProject(LocalProjectStore.NewProject(LocalProjectBasePath / "p"sv,
+ "p"sv,
+ (ExportDir / "root").string(),
+ (ExportDir / "engine").string(),
+ (ExportDir / "game").string(),
+ (ExportDir / "game" / "game.uproject").string()));
+
+ Ref<ProjectStore::Oplog> Oplog = LocalProject->NewOplog("oplog_partial_block", {});
+ if (!Oplog)
+ {
+ return RemoteProjectStore::Result{.ErrorCode = -1};
+ }
+
+ // Six 512 KB chunks with OodleCompressionLevel::None so the compressed size stays large
+ // and the block genuinely exceeds the 512 KB slack threshold.
+ Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(
+ Oid::NewOid(),
+ CreateAttachments(std::initializer_list<size_t>{512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u},
+ OodleCompressionLevel::None)));
+
+ // MaxChunkEmbedSize must be larger than the compressed size of each 512 KB chunk
+ // (OodleCompressionLevel::None → compressed ≈ raw ≈ 512 KB). With the legacy
+ // 32 KB limit all six chunks would become loose large attachments and no block would
+ // be created, so we use the production default of 1.5 MB instead.
+ FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u,
+ .MaxChunksPerBlock = 1000,
+ .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize,
+ .ChunkFileSizeLimit = 64u * 1024u * 1024u},
+ /*.FolderPath =*/ExportDir,
+ /*.Name =*/std::string("oplog_partial_block"),
+ /*.OptionalBaseName =*/std::string(),
+ /*.ForceDisableBlocks =*/false,
+ /*.ForceEnableTempBlocks =*/false};
+ OutRemoteStore = CreateFileRemoteStore(Log(), Options);
+ return SaveOplog(LocalCidStore,
+ *OutRemoteStore,
+ *LocalProject,
+ *Oplog,
+ NetworkPool,
+ WorkerPool,
+ Options.MaxBlockSize,
+ Options.MaxChunksPerBlock,
+ Options.MaxChunkEmbedSize,
+ Options.ChunkFileSizeLimit,
+ /*EmbedLooseFiles*/ true,
+ /*ForceUpload*/ false,
+ /*IgnoreMissingAttachments*/ false,
+ /*OptionalContext*/ nullptr);
+}
+
+// Returns the first block hash that has at least MinChunkCount chunks, or a zero IoHash
+// if no qualifying block exists in Store.
+static IoHash
+FindBlockWithMultipleChunks(RemoteProjectStore& Store, size_t MinChunkCount)
+{
+ RemoteProjectStore::LoadContainerResult ContainerResult = Store.LoadContainer();
+ if (ContainerResult.ErrorCode != 0)
+ {
+ return {};
+ }
+ std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject);
+ if (BlockHashes.empty())
+ {
+ return {};
+ }
+ RemoteProjectStore::GetBlockDescriptionsResult Descriptions = Store.GetBlockDescriptions(BlockHashes, nullptr, Oid{});
+ if (Descriptions.ErrorCode != 0)
+ {
+ return {};
+ }
+ for (const ChunkBlockDescription& Desc : Descriptions.Blocks)
+ {
+ if (Desc.ChunkRawHashes.size() >= MinChunkCount)
+ {
+ return Desc.BlockHash;
+ }
+ }
+ return {};
+}
+
+// Loads BlockHash from Source and inserts every even-indexed chunk (0, 2, 4, …) into
+// TargetCidStore. Odd-indexed chunks are left absent so that when an import is run
+// against the same block, HasAttachment returns false for three non-adjacent positions
+// — the minimum needed to exercise the multi-range partial-block download paths.
+static void
+SeedCidStoreWithAlternateChunks(CidStore& TargetCidStore, RemoteProjectStore& Source, const IoHash& BlockHash)
+{
+ RemoteProjectStore::LoadAttachmentResult BlockResult = Source.LoadAttachment(BlockHash);
+ if (BlockResult.ErrorCode != 0 || !BlockResult.Bytes)
+ {
+ return;
+ }
+
+ IoHash RawHash;
+ uint64_t RawSize;
+ CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(BlockResult.Bytes), RawHash, RawSize);
+ if (!Compressed)
+ {
+ return;
+ }
+ CompositeBuffer BlockPayload = Compressed.DecompressToComposite();
+ if (!BlockPayload)
+ {
+ return;
+ }
+
+ uint32_t ChunkIndex = 0;
+ uint64_t HeaderSize = 0;
+ IterateChunkBlock(
+ BlockPayload.Flatten(),
+ [&TargetCidStore, &ChunkIndex](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) {
+ if (ChunkIndex % 2 == 0)
+ {
+ IoBuffer ChunkData = Chunk.GetCompressed().Flatten().AsIoBuffer();
+ TargetCidStore.AddChunk(ChunkData, AttachmentHash);
+ }
+ ++ChunkIndex;
+ },
+ HeaderSize);
+}
+
+TEST_CASE("project.store.import.context_settings")
+{
+ using namespace std::literals;
+ using namespace projectstore_testutils;
+
+ ScopedTemporaryDirectory TempDir;
+ ScopedTemporaryDirectory ExportDir;
+
+ std::filesystem::path RootDir = TempDir.Path() / "root";
+ std::filesystem::path EngineRootDir = TempDir.Path() / "engine";
+ std::filesystem::path ProjectRootDir = TempDir.Path() / "game";
+ std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject";
+
+ // Export-side CAS and project store: used only by SetupExportStore to build the remote store
+ // payload. Kept separate from the import side so the two CAS instances are disjoint.
+ GcManager ExportGc;
+ CidStore ExportCidStore(ExportGc);
+ CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ExportCidStore.Initialize(ExportCidConfig);
+
+ std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore";
+ ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u);
+ uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u;
+ WorkerThreadPool WorkerPool(WorkerCount);
+ WorkerThreadPool NetworkPool(NetworkWorkerCount);
+
+ std::shared_ptr<RemoteProjectStore> RemoteStore;
+ RemoteProjectStore::Result ExportResult =
+ SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path(), RemoteStore);
+ REQUIRE(ExportResult.ErrorCode == 0);
+
+ // Import-side CAS and project store: starts empty, mirroring a fresh machine that has never
+ // downloaded the data. HasAttachment() therefore returns false for every chunk, so the import
+ // genuinely contacts the remote store without needing ForceDownload on the populate pass.
+ GcManager ImportGc;
+ CidStore ImportCidStore(ImportGc);
+ CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ ImportCidStore.Initialize(ImportCidConfig);
+
+ std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore";
+ ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{});
+ Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv,
+ "proj1"sv,
+ RootDir.string(),
+ EngineRootDir.string(),
+ ProjectRootDir.string(),
+ ProjectFilePath.string()));
+
+ const Oid CacheBuildId = Oid::NewOid();
+ BuildStorageCache::Statistics CacheStats;
+ std::unique_ptr<BuildStorageCache> Cache = CreateInMemoryBuildStorageCache(256u, CacheStats);
+ auto ResetCacheStats = [&]() {
+ CacheStats.TotalBytesRead = 0;
+ CacheStats.TotalBytesWritten = 0;
+ CacheStats.TotalRequestCount = 0;
+ CacheStats.TotalRequestTimeUs = 0;
+ CacheStats.TotalExecutionTimeUs = 0;
+ CacheStats.PeakSentBytes = 0;
+ CacheStats.PeakReceivedBytes = 0;
+ CacheStats.PeakBytesPerSec = 0;
+ CacheStats.PutBlobCount = 0;
+ CacheStats.PutBlobByteCount = 0;
+ };
+
+ int OpJobIndex = 0;
+
+ TestJobContext OpJobContext(OpJobIndex);
+
+ // Helper: run a LoadOplog against the import-side CAS/project with the given context knobs.
+ // Each call creates a fresh oplog so repeated calls within one SUBCASE don't short-circuit on
+ // already-present data.
+ auto DoImport = [&](BuildStorageCache* OptCache,
+ EPartialBlockRequestMode Mode,
+ double StoreLatency,
+ uint64_t StoreRanges,
+ double CacheLatency,
+ uint64_t CacheRanges,
+ bool PopulateCache,
+ bool ForceDownload) -> RemoteProjectStore::Result {
+ Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {});
+ return LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *RemoteStore,
+ .OptionalCache = OptCache,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *ImportOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = ForceDownload,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = Mode,
+ .PopulateCache = PopulateCache,
+ .StoreLatencySec = StoreLatency,
+ .StoreMaxRangeCountPerRequest = StoreRanges,
+ .CacheLatencySec = CacheLatency,
+ .CacheMaxRangeCountPerRequest = CacheRanges,
+ .OptionalJobContext = &OpJobContext});
+ };
+
+ // Shorthand: Mode=All, low latency, 128 ranges for both store and cache.
+ auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) {
+ return DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force);
+ };
+
+ SUBCASE("mode_off_no_cache")
+ {
+ // Baseline: no partial block requests, no cache.
+ RemoteProjectStore::Result R =
+ DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_all_multirange_cloud_no_cache")
+ {
+ // StoreMaxRangeCountPerRequest > 1 → MultiRange cloud path.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_all_singlerange_cloud_no_cache")
+ {
+ // StoreMaxRangeCountPerRequest == 1 → SingleRange cloud path.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("mode_mixed_high_latency_no_cache")
+ {
+ // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache.
+ RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false);
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("cache_populate_and_hit")
+ {
+ // First import: ImportCidStore is empty so all blocks are downloaded from the remote store
+ // and written to the cache.
+ RemoteProjectStore::Result PopulateResult = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(PopulateResult.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+
+ // Re-import with ForceDownload=true: all chunks are now in ImportCidStore but Force overrides
+ // HasAttachment() so the download logic re-runs and serves blocks from the cache instead of
+ // the remote store.
+ ResetCacheStats();
+ RemoteProjectStore::Result HitResult = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true);
+ CHECK(HitResult.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount == 0);
+ // TotalRequestCount covers both full-blob cache hits and partial-range cache hits.
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("cache_no_populate_flag")
+ {
+ // Cache is provided but PopulateCache=false: blocks are downloaded to ImportCidStore but
+ // nothing should be written to the cache.
+ RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount == 0);
+ }
+
+ SUBCASE("mode_zencacheonly_cache_multirange")
+ {
+ // Pre-populate the cache via a plain import, then re-import with ZenCacheOnly +
+ // CacheMaxRangeCountPerRequest=128. With 100% of chunks needed, all blocks go to
+ // FullBlockIndexes and GetBuildBlob (full blob) is called from the cache.
+ // CacheMaxRangeCountPerRequest > 1 would route partial downloads through GetBuildBlobRanges
+ // if the analyser ever emits BlockRanges entries.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("mode_zencacheonly_cache_singlerange")
+ {
+ // Pre-populate the cache, then re-import with ZenCacheOnly + CacheMaxRangeCountPerRequest=1.
+ // With 100% of chunks needed the analyser sends all blocks to FullBlockIndexes (full-block
+ // download path), which calls GetBuildBlob with no range offset — a full-blob cache hit.
+ // The single-range vs multi-range distinction only matters for the partial-block (BlockRanges)
+ // path, which is not reached when all chunks are needed.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("mode_all_cache_and_cloud_multirange")
+ {
+ // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths.
+ RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false);
+ CHECK(Populate.ErrorCode == 0);
+ ResetCacheStats();
+
+ RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true);
+ CHECK(R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("partial_block_cloud_multirange")
+ {
+ // Export store with 6 × 512 KB chunks packed into one ~3 MB block.
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in
+ // ImportCidStore. Three non-adjacent needed positions → three BlockRangeDescriptors.
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+ SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash);
+
+ // StoreMaxRangeCountPerRequest=128 → all three ranges sent in one LoadAttachmentRanges call.
+ Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *PartialOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = -1.0,
+ .CacheMaxRangeCountPerRequest = 0u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("partial_block_cloud_singlerange")
+ {
+ // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1.
+ // DownloadPartialBlock issues one LoadAttachmentRanges call per range.
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+ SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = nullptr,
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *PartialOplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 1u,
+ .CacheLatencySec = -1.0,
+ .CacheMaxRangeCountPerRequest = 0u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(R.ErrorCode == 0);
+ }
+
+ SUBCASE("partial_block_cache_multirange")
+ {
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+
+ // Phase 1: ImportCidStore starts empty → full block download from remote → PutBuildBlob
+ // populates the cache.
+ {
+ Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase1Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = true,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase1R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+ }
+ ResetCacheStats();
+
+ // Phase 2: fresh CidStore with only even-indexed chunks seeded.
+ // HasAttachment returns false for odd chunks (1, 3, 5) → three BlockRangeDescriptors.
+ // Block is in cache from Phase 1 → cache partial path.
+ // CacheMaxRangeCountPerRequest=128 → SubRangeCount=3 > 1 → GetBuildBlobRanges.
+ GcManager Phase2Gc;
+ CidStore Phase2CidStore(Phase2Gc);
+ CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ Phase2CidStore.Initialize(Phase2CidConfig);
+ SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase2Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase2R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
+
+ SUBCASE("partial_block_cache_singlerange")
+ {
+ ScopedTemporaryDirectory PartialExportDir;
+ std::shared_ptr<RemoteProjectStore> PartialRemoteStore;
+ RemoteProjectStore::Result ExportR =
+ SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore);
+ REQUIRE(ExportR.ErrorCode == 0);
+
+ IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u);
+ CHECK(BlockHash != IoHash::Zero);
+
+ // Phase 1: full block download from remote into cache.
+ {
+ Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase1Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::All,
+ .PopulateCache = true,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 128u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase1R.ErrorCode == 0);
+ CHECK(CacheStats.PutBlobCount > 0);
+ }
+ ResetCacheStats();
+
+ // Phase 2: fresh CidStore with only even-indexed chunks seeded.
+ // CacheMaxRangeCountPerRequest=1 → SubRangeCount=Min(3,1)=1 → GetBuildBlob with range
+ // offset (single-range legacy cache path), called once per needed chunk range.
+ GcManager Phase2Gc;
+ CidStore Phase2CidStore(Phase2Gc);
+ CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas_single",
+ .TinyValueThreshold = 1024,
+ .HugeValueThreshold = 4096};
+ Phase2CidStore.Initialize(Phase2CidConfig);
+ SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash);
+
+ Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {});
+ RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore,
+ .RemoteStore = *PartialRemoteStore,
+ .OptionalCache = Cache.get(),
+ .CacheBuildId = CacheBuildId,
+ .Oplog = *Phase2Oplog,
+ .NetworkWorkerPool = NetworkPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = false,
+ .IgnoreMissingAttachments = false,
+ .CleanOplog = false,
+ .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly,
+ .PopulateCache = false,
+ .StoreLatencySec = 0.001,
+ .StoreMaxRangeCountPerRequest = 128u,
+ .CacheLatencySec = 0.001,
+ .CacheMaxRangeCountPerRequest = 1u,
+ .OptionalJobContext = &OpJobContext});
+ CHECK(Phase2R.ErrorCode == 0);
+ CHECK(CacheStats.TotalRequestCount > 0);
+ }
}
TEST_SUITE_END();
diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
index ef82c45e0..115d6438d 100644
--- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp
@@ -157,59 +157,56 @@ public:
return Result;
}
- virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override
+ virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
{
LoadAttachmentsResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog);
- CbObject Request;
+ CbObject Request;
+ {
+ CbObjectWriter RequestWriter;
+ RequestWriter.AddString("method"sv, "getchunks"sv);
+ RequestWriter.BeginObject("Request"sv);
{
- CbObjectWriter RequestWriter;
- RequestWriter.AddString("method"sv, "getchunks"sv);
- RequestWriter.BeginObject("Request"sv);
+ RequestWriter.BeginArray("Chunks"sv);
{
- RequestWriter.BeginArray("Chunks"sv);
+ for (const IoHash& RawHash : RawHashes)
{
- for (const IoHash& RawHash : RawHashes)
+ RequestWriter.BeginObject();
{
- RequestWriter.BeginObject();
- {
- RequestWriter.AddHash("RawHash", RawHash);
- }
- RequestWriter.EndObject();
+ RequestWriter.AddHash("RawHash", RawHash);
}
+ RequestWriter.EndObject();
}
- RequestWriter.EndArray(); // "chunks"
}
- RequestWriter.EndObject();
- Request = RequestWriter.Save();
+ RequestWriter.EndArray(); // "chunks"
}
+ RequestWriter.EndObject();
+ Request = RequestWriter.Save();
+ }
- HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
- AddStats(Response);
+ HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage));
+ AddStats(Response);
- Result = LoadAttachmentsResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
- RawHashes.size(),
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- Result.Reason);
- }
- else
+ Result = LoadAttachmentsResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'",
+ RawHashes.size(),
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ Result.Reason);
+ }
+ else
+ {
+ CbPackage Package = Response.AsPackage();
+ std::span<const CbAttachment> Attachments = Package.GetAttachments();
+ Result.Chunks.reserve(Attachments.size());
+ for (const CbAttachment& Attachment : Attachments)
{
- CbPackage Package = Response.AsPackage();
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- Result.Chunks.reserve(Attachments.size());
- for (const CbAttachment& Attachment : Attachments)
- {
- Result.Chunks.emplace_back(
- std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
- }
+ Result.Chunks.emplace_back(
+ std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()});
}
}
return Result;
@@ -253,75 +250,64 @@ public:
return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
}
- virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override
+ virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes,
+ BuildStorageCache* OptionalCache,
+ const Oid& CacheBuildId) override
{
- ZEN_UNUSED(BlockHashes);
+ ZEN_UNUSED(BlockHashes, OptionalCache, CacheBuildId);
return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}};
}
- virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override
- {
- return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)};
- }
-
- virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override
+ virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
{
LoadAttachmentResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
- HttpClient::Response Response =
- m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
- AddStats(Response);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ AddStats(Response);
- Result = LoadAttachmentResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- RawHash,
- Result.Reason);
- }
- Result.Bytes = Response.ResponsePayload;
- Result.Bytes.MakeOwned();
+ Result = LoadAttachmentResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
}
+ Result.Bytes = Response.ResponsePayload;
+ Result.Bytes.MakeOwned();
return Result;
}
virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash,
- std::span<const std::pair<uint64_t, uint64_t>> Ranges,
- ESourceMode SourceMode) override
+ std::span<const std::pair<uint64_t, uint64_t>> Ranges) override
{
+ ZEN_ASSERT(!Ranges.empty());
LoadAttachmentRangesResult Result;
- if (SourceMode != ESourceMode::kCacheOnly)
- {
- std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
- HttpClient::Response Response =
- m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
- AddStats(Response);
+ std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash);
+ HttpClient::Response Response =
+ m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary));
+ AddStats(Response);
- Result = LoadAttachmentRangesResult{ConvertResult(Response)};
- if (Result.ErrorCode)
- {
- Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
- m_ProjectStoreUrl,
- m_Project,
- m_Oplog,
- RawHash,
- Result.Reason);
- }
- else
- {
- Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
- }
+ Result = LoadAttachmentRangesResult{ConvertResult(Response)};
+ if (Result.ErrorCode)
+ {
+ Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'",
+ m_ProjectStoreUrl,
+ m_Project,
+ m_Oplog,
+ RawHash,
+ Result.Reason);
+ }
+ else
+ {
+ Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end());
}
return Result;
}
- virtual void Flush() override {}
-
private:
void AddStats(const HttpClient::Response& Result)
{
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp
index 459e044eb..38d97765b 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.cpp
+++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp
@@ -177,6 +177,14 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req)
uint64_t RangeLength = RangeView["length"sv].AsUInt64();
OffsetAndLengthPairs.push_back(std::make_pair(RangeOffset, RangeLength));
}
+ if (OffsetAndLengthPairs.size() > MaxRangeCountPerRequestSupported)
+ {
+ return ServerRequest.WriteResponse(HttpResponseCode::BadRequest,
+ HttpContentType::kText,
+ fmt::format("Number of ranges ({}) for blob request exceeds maximum range count {}",
+ OffsetAndLengthPairs.size(),
+ MaxRangeCountPerRequestSupported));
+ }
}
if (OffsetAndLengthPairs.empty())
{
@@ -661,6 +669,11 @@ HttpBuildStoreService::HandleStatusRequest(HttpServerRequest& Request)
ZEN_TRACE_CPU("HttpBuildStoreService::Status");
CbObjectWriter Cbo;
Cbo << "ok" << true;
+ Cbo.BeginObject("capabilities");
+ {
+ Cbo << "maxrangecountperrequest" << MaxRangeCountPerRequestSupported;
+ }
+ Cbo.EndObject(); // capabilities
Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
}
diff --git a/src/zenserver/storage/buildstore/httpbuildstore.h b/src/zenserver/storage/buildstore/httpbuildstore.h
index e10986411..5fa7cd642 100644
--- a/src/zenserver/storage/buildstore/httpbuildstore.h
+++ b/src/zenserver/storage/buildstore/httpbuildstore.h
@@ -45,6 +45,8 @@ private:
inline LoggerRef Log() { return m_Log; }
+ static constexpr uint32_t MaxRangeCountPerRequestSupported = 256u;
+
LoggerRef m_Log;
void PutBlobRequest(HttpRouterRequest& Req);
diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp
index 2b5474d00..0ec6faea3 100644
--- a/src/zenserver/storage/projectstore/httpprojectstore.cpp
+++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp
@@ -13,7 +13,12 @@
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
#include <zencore/trace.h>
+#include <zenhttp/httpclientauth.h>
#include <zenhttp/packageformat.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/builds/buildstorageutil.h>
+#include <zenremotestore/jupiter/jupiterhost.h>
+#include <zenremotestore/operationlogoutput.h>
#include <zenremotestore/projectstore/buildsremoteprojectstore.h>
#include <zenremotestore/projectstore/fileremoteprojectstore.h>
#include <zenremotestore/projectstore/jupiterremoteprojectstore.h>
@@ -244,8 +249,22 @@ namespace {
{
std::shared_ptr<RemoteProjectStore> Store;
std::string Description;
- double HostLatencySec = -1.0;
- double CacheLatencySec = -1.0;
+ double LatencySec = -1.0;
+ uint64_t MaxRangeCountPerRequest = 1;
+
+ struct Cache
+ {
+ std::unique_ptr<HttpClient> Http;
+ std::unique_ptr<BuildStorageCache> Cache;
+ Oid BuildsId = Oid::Zero;
+ std::string Description;
+ double LatencySec = -1.0;
+ uint64_t MaxRangeCountPerRequest = 1;
+ BuildStorageCache::Statistics Stats;
+ bool Populate = false;
+ };
+
+ std::unique_ptr<Cache> OptionalCache;
};
CreateRemoteStoreResult CreateRemoteStore(LoggerRef InLog,
@@ -262,9 +281,7 @@ namespace {
using namespace std::literals;
- std::shared_ptr<RemoteProjectStore> RemoteStore;
- double HostLatencySec = -1.0;
- double CacheLatencySec = -1.0;
+ CreateRemoteStoreResult Result;
if (CbObjectView File = Params["file"sv].AsObjectView(); File)
{
@@ -282,6 +299,9 @@ namespace {
bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false);
bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false);
+ Result.LatencySec = 0;
+ Result.MaxRangeCountPerRequest = 1;
+
FileRemoteStoreOptions Options = {
RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
FolderPath,
@@ -289,7 +309,7 @@ namespace {
std::string(OptionalBaseName),
ForceDisableBlocks,
ForceEnableTempBlocks};
- RemoteStore = CreateFileRemoteStore(Log(), Options);
+ Result.Store = CreateFileRemoteStore(Log(), Options);
}
if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud)
@@ -367,21 +387,32 @@ namespace {
bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false);
bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false);
- JupiterRemoteStoreOptions Options = {
- RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Url,
- std::string(Namespace),
- std::string(Bucket),
- Key,
- BaseKey,
- std::string(OpenIdProvider),
- AccessToken,
- AuthManager,
- OidcExePath,
- ForceDisableBlocks,
- ForceDisableTempBlocks,
- AssumeHttp2};
- RemoteStore = CreateJupiterRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true);
+ if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(Url, AssumeHttp2, /*Verbose*/ false); TestResult.Success)
+ {
+ Result.LatencySec = TestResult.LatencySeconds;
+ Result.MaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest;
+
+ JupiterRemoteStoreOptions Options = {
+ RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
+ Url,
+ std::string(Namespace),
+ std::string(Bucket),
+ Key,
+ BaseKey,
+ std::string(OpenIdProvider),
+ AccessToken,
+ AuthManager,
+ OidcExePath,
+ ForceDisableBlocks,
+ ForceDisableTempBlocks,
+ AssumeHttp2};
+ Result.Store =
+ CreateJupiterRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true);
+ }
+ else
+ {
+ return {nullptr, fmt::format("Unable to connect to jupiter host '{}'", Url)};
+ }
}
if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen)
@@ -397,12 +428,13 @@ namespace {
{
return {nullptr, "Missing oplog"};
}
+
ZenRemoteStoreOptions Options = {
RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
std::string(Url),
std::string(Project),
std::string(Oplog)};
- RemoteStore = CreateZenRemoteStore(Log(), Options, TempFilePath);
+ Result.Store = CreateZenRemoteStore(Log(), Options, TempFilePath);
}
if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds)
@@ -475,11 +507,76 @@ namespace {
MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView();
IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize());
+ auto EnsureHttps = [](const std::string& Host, std::string_view PreferredProtocol) {
+ if (!Host.empty() && Host.find("://"sv) == std::string::npos)
+ {
+ // Assume https URL
+ return fmt::format("{}://{}"sv, PreferredProtocol, Host);
+ }
+ return Host;
+ };
+
+ Host = EnsureHttps(Host, "https");
+ OverrideHost = EnsureHttps(OverrideHost, "https");
+ ZenHost = EnsureHttps(ZenHost, "http");
+
+ std::function<HttpClientAccessToken()> TokenProvider;
+ if (!OpenIdProvider.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromOpenIdProvider(AuthManager, OpenIdProvider);
+ }
+ else if (!AccessToken.empty())
+ {
+ TokenProvider = httpclientauth::CreateFromStaticToken(AccessToken);
+ }
+ else if (!OidcExePath.empty())
+ {
+ if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath,
+ Host.empty() ? OverrideHost : Host,
+ /*Quiet*/ false,
+ /*Unattended*/ false,
+ /*Hidden*/ true);
+ TokenProviderMaybe)
+ {
+ TokenProvider = TokenProviderMaybe.value();
+ }
+ }
+
+ if (!TokenProvider)
+ {
+ TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(AuthManager);
+ }
+
+ BuildStorageResolveResult ResolveResult;
+ {
+ HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
+ .AccessTokenProvider = TokenProvider,
+ .AssumeHttp2 = AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 2};
+
+ std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(Log()));
+
+ try
+ {
+ ResolveResult = ResolveBuildStorage(*Output,
+ ClientSettings,
+ Host,
+ OverrideHost,
+ ZenHost,
+ ZenCacheResolveMode::Discovery,
+ /*Verbose*/ false);
+ }
+ catch (const std::exception& Ex)
+ {
+ return {nullptr, fmt::format("Failed resolving storage host and cache. Reason: '{}'", Ex.what())};
+ }
+ }
+ Result.LatencySec = ResolveResult.Cloud.LatencySec;
+ Result.MaxRangeCountPerRequest = ResolveResult.Cloud.Caps.MaxRangeCountPerRequest;
+
BuildsRemoteStoreOptions Options = {
RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize},
- Host,
- OverrideHost,
- ZenHost,
std::string(Namespace),
std::string(Bucket),
BuildId,
@@ -489,30 +586,43 @@ namespace {
OidcExePath,
ForceDisableBlocks,
ForceDisableTempBlocks,
- AssumeHttp2,
- PopulateCache,
MetaData,
MaximumInMemoryDownloadSize};
- RemoteStore = CreateJupiterBuildsRemoteStore(Log(),
- Options,
- TempFilePath,
- /*Quiet*/ false,
- /*Unattended*/ false,
- /*Hidden*/ true,
- GetTinyWorkerPool(EWorkloadType::Background),
- HostLatencySec,
- CacheLatencySec);
+ Result.Store = CreateJupiterBuildsRemoteStore(Log(), ResolveResult, std::move(TokenProvider), Options, TempFilePath);
+
+ if (!ResolveResult.Cache.Address.empty())
+ {
+ Result.OptionalCache = std::make_unique<CreateRemoteStoreResult::Cache>();
+
+ HttpClientSettings CacheClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = ResolveResult.Cache.AssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0,
+ .MaximumInMemoryDownloadSize = MaximumInMemoryDownloadSize};
+
+ Result.OptionalCache->Http = std::make_unique<HttpClient>(ResolveResult.Cache.Address, CacheClientSettings);
+ Result.OptionalCache->Cache = CreateZenBuildStorageCache(*Result.OptionalCache->Http,
+ Result.OptionalCache->Stats,
+ Namespace,
+ Bucket,
+ TempFilePath,
+ GetTinyWorkerPool(EWorkloadType::Background));
+ Result.OptionalCache->BuildsId = BuildId;
+ Result.OptionalCache->LatencySec = ResolveResult.Cache.LatencySec;
+ Result.OptionalCache->MaxRangeCountPerRequest = ResolveResult.Cache.Caps.MaxRangeCountPerRequest;
+ Result.OptionalCache->Populate = PopulateCache;
+ Result.OptionalCache->Description =
+ fmt::format("[zenserver] {} namespace {} bucket {}", ResolveResult.Cache.Address, Namespace, Bucket);
+ }
}
-
- if (!RemoteStore)
+ if (!Result.Store)
{
return {nullptr, "Unknown remote store type"};
}
- return CreateRemoteStoreResult{.Store = std::move(RemoteStore),
- .Description = "",
- .HostLatencySec = HostLatencySec,
- .CacheLatencySec = CacheLatencySec};
+ return Result;
}
std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result)
@@ -2679,38 +2789,36 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
EPartialBlockRequestMode PartialBlockRequestMode =
PartialBlockRequestModeFromString(Params["partialblockrequestmode"sv].AsString("true"));
- CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(),
- Params,
- m_AuthMgr,
- MaxBlockSize,
- MaxChunkEmbedSize,
- GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory),
- Oplog->TempPath());
+ std::shared_ptr<CreateRemoteStoreResult> RemoteStoreResult =
+ std::make_shared<CreateRemoteStoreResult>(CreateRemoteStore(Log(),
+ Params,
+ m_AuthMgr,
+ MaxBlockSize,
+ MaxChunkEmbedSize,
+ GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory),
+ Oplog->TempPath()));
- if (RemoteStoreResult.Store == nullptr)
+ if (RemoteStoreResult->Store == nullptr)
{
- return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description);
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult->Description);
}
- std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store);
- RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
JobId JobId = m_JobQueue.QueueJob(
fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()),
[this,
- ChunkStore = &m_CidStore,
- ActualRemoteStore = std::move(RemoteStore),
+ RemoteStoreResult = std::move(RemoteStoreResult),
Oplog,
Force,
IgnoreMissingAttachments,
CleanOplog,
PartialBlockRequestMode,
- HostLatencySec = RemoteStoreResult.HostLatencySec,
- CacheLatencySec = RemoteStoreResult.CacheLatencySec,
BoostWorkerCount](JobContext& Context) {
- Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}",
- Oplog->GetOuterProjectIdentifier(),
- Oplog->OplogId(),
- ActualRemoteStore->GetInfo().Description));
+ Context.ReportMessage(
+ fmt::format("Loading oplog '{}/{}'\n Host: {}\n Cache: {}",
+ Oplog->GetOuterProjectIdentifier(),
+ Oplog->OplogId(),
+ RemoteStoreResult->Store->GetInfo().Description,
+ RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Description : "<none>"));
Ref<TransferThreadWorkers> Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false);
@@ -2718,19 +2826,26 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req)
WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool();
Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo()));
-
- RemoteProjectStore::Result Result = LoadOplog(m_CidStore,
- *ActualRemoteStore,
- *Oplog,
- NetworkWorkerPool,
- WorkerPool,
- Force,
- IgnoreMissingAttachments,
- CleanOplog,
- PartialBlockRequestMode,
- HostLatencySec,
- CacheLatencySec,
- &Context);
+ RemoteProjectStore::Result Result = LoadOplog(LoadOplogContext{
+ .ChunkStore = m_CidStore,
+ .RemoteStore = *RemoteStoreResult->Store,
+ .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr,
+ .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero,
+ .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr,
+ .Oplog = *Oplog,
+ .NetworkWorkerPool = NetworkWorkerPool,
+ .WorkerPool = WorkerPool,
+ .ForceDownload = Force,
+ .IgnoreMissingAttachments = IgnoreMissingAttachments,
+ .CleanOplog = CleanOplog,
+ .PartialBlockRequestMode = PartialBlockRequestMode,
+ .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false,
+ .StoreLatencySec = RemoteStoreResult->LatencySec,
+ .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest,
+ .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0,
+ .CacheMaxRangeCountPerRequest =
+ RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0,
+ .OptionalJobContext = &Context});
auto Response = ConvertResult(Result);
ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second);
if (!IsHttpSuccessCode(Response.first))