diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-09 13:08:00 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-09 13:08:00 +0100 |
| commit | f9d8cbcb3573b47b639b7bd73d3a4eed17653d71 (patch) | |
| tree | dd295b2e929f050f292a6415fd0330da24b683a4 | |
| parent | added auto-detection logic for console colour output (#817) (diff) | |
| download | zen-f9d8cbcb3573b47b639b7bd73d3a4eed17653d71.tar.xz zen-f9d8cbcb3573b47b639b7bd73d3a4eed17653d71.zip | |
add fallback for zencache multirange (#816)
* clean up BuildStorageResolveResult to allow capabilities
* add check for multirange request capability
* add MaxRangeCountPerRequest capabilities
* project export tests
* add InMemoryBuildStorageCache
* progress and logging improvements
* fix ElapsedSeconds calculations in fileremoteprojectstore.cpp
* oplogs/builds test script
33 files changed, 2922 insertions, 1437 deletions
diff --git a/scripts/test_scripts/builds-download-upload-test.py b/scripts/test_scripts/builds-download-upload-test.py new file mode 100644 index 000000000..f03528d98 --- /dev/null +++ b/scripts/test_scripts/builds-download-upload-test.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +"""Test script for builds download/upload operations.""" + +from __future__ import annotations + +import argparse +import platform +import subprocess +import sys +from pathlib import Path +from typing import NamedTuple + +_PLATFORM = "windows" if sys.platform == "win32" else "macosx" if sys.platform == "darwin" else "linux" +_ARCH = "x64" if sys.platform == "win32" else platform.machine().lower() +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + + +class Build(NamedTuple): + name: str + bucket: str + id: str + + +BUILDS = [ + Build("XB1Client", "fortnitegame.staged-build.fortnite-main.xb1-client", "09a7616c1a388dfe6056aa57"), + Build("WindowsClient", "fortnitegame.staged-build.fortnite-main.windows-client", "09a762c81e2cf213142d0ce5"), + Build("SwitchClient", "fortnitegame.staged-build.fortnite-main.switch-client", "09a75bf9c3ce75bce09f644f"), + Build("LinuxServer", "fortnitegame.staged-build.fortnite-main.linux-server", "09a750ac155eb3e3b62e87e0"), + Build("Switch2Client", "fortnitegame.staged-build.fortnite-main.switch2-client", "09a78f3df07b289691ec5710"), + Build("PS4Client", "fortnitegame.staged-build.fortnite-main.ps4-client", "09a76ea92ad301d4724fafad"), + Build("IOSClient", "fortnitegame.staged-build.fortnite-main.ios-client", "09a7816fa26c23362fef0c5d"), + Build("AndroidClient", "fortnitegame.staged-build.fortnite-main.android-client", "09a76725f1620d62c6be06e4"), +] + +ZEN_EXE: Path = Path(f"./build/{_PLATFORM}/{_ARCH}/release/zen{_EXE_SUFFIX}") +ZEN_METADATA_DIR: Path = Path(__file__).resolve().parent / "metadatas" + +ZEN_PORT = 8558 +ZEN_CACHE_PORT = 8559 +ZEN_CACHE = f"http://127.0.0.1:{ZEN_CACHE_PORT}" +ZEN_PARTIAL_REQUEST_MODE = "true" + +SERVER_ARGS: tuple[str, ...] = ( + "--http", "asio", + "--gc-cache-duration-seconds", "1209600", + "--gc-interval-seconds", "21600", + "--gc-low-diskspace-threshold", "2147483648", + "--cache-bucket-limit-overwrites", +) + + +def run(cmd: list[str | Path]) -> None: + try: + subprocess.run(cmd, check=True) + except FileNotFoundError: + sys.exit(f"error: executable not found: {cmd[0]}") + except subprocess.CalledProcessError as e: + sys.exit(f"error: command failed with exit code {e.returncode}:\n {' '.join(str(x) for x in e.cmd)}") + + +def stop_server(label: str, port: int) -> None: + """Stop a zen server. Tolerates failures so it is safe to call from finally blocks.""" + print(f"--------- stopping {label}") + try: + subprocess.run([ZEN_EXE, "down", "--port", str(port)]) + except OSError as e: + print(f"warning: could not stop {label}: {e}", file=sys.stderr) + print() + + +def start_server(label: str, data_dir: Path, port: int, extra_args: list[str] | None = None) -> None: + print(f"--------- starting {label} {data_dir}") + run([ + ZEN_EXE, "up", "--port", str(port), "--show-console", "--", + f"--data-dir={data_dir}", + *SERVER_ARGS, + *(extra_args or []), + ]) + print() + + +def wipe_or_create(label: str, path: Path) -> None: + if path.exists(): + print(f"--------- cleaning {label} {path}") + run([ZEN_EXE, "wipe", "-y", path]) + else: + print(f"--------- creating {label} {path}") + path.mkdir(parents=True, exist_ok=True) + print() + + +def check_prerequisites() -> None: + if not ZEN_EXE.is_file(): + sys.exit(f"error: zen executable not found: {ZEN_EXE}") + if not ZEN_METADATA_DIR.is_dir(): + sys.exit(f"error: metadata directory not found: {ZEN_METADATA_DIR}") + for build in BUILDS: + metadata = ZEN_METADATA_DIR / f"{build.name}.json" + if not metadata.is_file(): + sys.exit(f"error: metadata file not found: {metadata}") + + +def main() -> None: + global ZEN_EXE + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "positional_path", + nargs="?", + default=None, + type=Path, + metavar="DATA_PATH", + help="root path for all data directories (positional shorthand for --data-path)", + ) + parser.add_argument( + "zen_exe_positional", + nargs="?", + default=None, + type=Path, + metavar="ZEN_EXE_PATH", + help="path to zen executable (positional shorthand for --zen-exe-path)", + ) + parser.add_argument( + "--data-path", + default=Path(Path(__file__).stem + "_datadir"), + type=Path, + metavar="PATH", + help=f"root path for all data directories (default: {Path(__file__).stem}_datadir)", + ) + parser.add_argument( + "--zen-exe-path", + default=ZEN_EXE, + type=Path, + metavar="PATH", + help=f"path to zen executable (default: {ZEN_EXE})", + ) + args = parser.parse_args() + + data_path = args.positional_path + if data_path is None: + data_path = args.data_path + + ZEN_EXE = args.zen_exe_positional + if ZEN_EXE is None: + ZEN_EXE = args.zen_exe_path + zen_system_dir = data_path / "system" + zen_download_dir = data_path / "Download" + zen_cache_data_dir = data_path / "ZenBuildsCache" + zen_upload_dir = data_path / "Upload" + zen_chunk_cache_path = data_path / "ChunkCache" + + check_prerequisites() + + start_server("cache zenserver", zen_cache_data_dir, ZEN_CACHE_PORT, ["--buildstore-enabled"]) + try: + wipe_or_create("download folder", zen_download_dir) + wipe_or_create("system folder", zen_system_dir) + + for build in BUILDS: + print(f"--------- importing {build.name} build") + run([ + ZEN_EXE, "builds", "download", + "--host", "https://jupiter.devtools.epicgames.com", + "--namespace", "fortnite.oplog", + "--bucket", build.bucket, + "--build-id", build.id, + "--local-path", zen_download_dir / build.name, + f"--zen-cache-host={ZEN_CACHE}", + f"--allow-partial-block-requests={ZEN_PARTIAL_REQUEST_MODE}", + "--verify", + "--system-dir", zen_system_dir, + ]) + print() + + wipe_or_create("upload folder", zen_upload_dir) + + for build in BUILDS: + print(f"--------- exporting {build.name} build") + run([ + ZEN_EXE, "builds", "upload", + "--storage-path", zen_upload_dir, + "--build-id", build.id, + "--local-path", zen_download_dir / build.name, + "--verify", + "--system-dir", zen_system_dir, + "--metadata-path", str(ZEN_METADATA_DIR / f"{build.name}.json"), + "--create-build", + "--chunking-cache-path", zen_chunk_cache_path, + ]) + print() + finally: + stop_server("cache zenserver", ZEN_CACHE_PORT) + + +if __name__ == "__main__": + main() diff --git a/scripts/test_scripts/metadatas/AndroidClient.json b/scripts/test_scripts/metadatas/AndroidClient.json new file mode 100644 index 000000000..378d0454d --- /dev/null +++ b/scripts/test_scripts/metadatas/AndroidClient.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 AndroidClient", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "Android", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/IOSClient.json b/scripts/test_scripts/metadatas/IOSClient.json new file mode 100644 index 000000000..fb0f9a342 --- /dev/null +++ b/scripts/test_scripts/metadatas/IOSClient.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 IOSClient", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "IOS", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/LinuxServer.json b/scripts/test_scripts/metadatas/LinuxServer.json new file mode 100644 index 000000000..02ae2d970 --- /dev/null +++ b/scripts/test_scripts/metadatas/LinuxServer.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 LinuxServer", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "Linux", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/PS4Client.json b/scripts/test_scripts/metadatas/PS4Client.json new file mode 100644 index 000000000..6e49e3e5e --- /dev/null +++ b/scripts/test_scripts/metadatas/PS4Client.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 PS4Client", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "PS4", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/Switch2Client.json b/scripts/test_scripts/metadatas/Switch2Client.json new file mode 100644 index 000000000..41732e7bc --- /dev/null +++ b/scripts/test_scripts/metadatas/Switch2Client.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 Switch2Client", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "Switch2", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/SwitchClient.json b/scripts/test_scripts/metadatas/SwitchClient.json new file mode 100644 index 000000000..49362f23e --- /dev/null +++ b/scripts/test_scripts/metadatas/SwitchClient.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 SwitchClient", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "Switch", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/WindowsClient.json b/scripts/test_scripts/metadatas/WindowsClient.json new file mode 100644 index 000000000..c7af270c2 --- /dev/null +++ b/scripts/test_scripts/metadatas/WindowsClient.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 Windows Client", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "Windows", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/metadatas/XB1Client.json b/scripts/test_scripts/metadatas/XB1Client.json new file mode 100644 index 000000000..36fb45801 --- /dev/null +++ b/scripts/test_scripts/metadatas/XB1Client.json @@ -0,0 +1,9 @@ +{ + "name": "++Fortnite+Main-CL-50966326 XB1Client", + "branch": "ZenBuildTest2", + "baselineBranch": "ZenBuildTest2", + "platform": "XB1", + "project": "Fortnite", + "changelist": 50966326, + "buildType": "staged-build" +} diff --git a/scripts/test_scripts/oplog-import-export-test.py b/scripts/test_scripts/oplog-import-export-test.py new file mode 100644 index 000000000..51593d5a9 --- /dev/null +++ b/scripts/test_scripts/oplog-import-export-test.py @@ -0,0 +1,228 @@ +#!/usr/bin/env python3 +"""Test script for oplog import/export operations.""" + +from __future__ import annotations + +import argparse +import platform +import subprocess +import sys +from pathlib import Path +from typing import NamedTuple + +_PLATFORM = "windows" if sys.platform == "win32" else "macosx" if sys.platform == "darwin" else "linux" +_ARCH = "x64" if sys.platform == "win32" else platform.machine().lower() +_EXE_SUFFIX = ".exe" if sys.platform == "win32" else "" + + +class Build(NamedTuple): + name: str + bucket: str + id: str + + +BUILDS = [ + Build("XB1Client", "fortnitegame.oplog.fortnite-main.xb1client", "09a75f7f3b7517653dcdaaa4"), + Build("WindowsClient", "fortnitegame.oplog.fortnite-main.windowsclient", "09a75d977ef944ecfd0eddfd"), + Build("SwitchClient", "fortnitegame.oplog.fortnite-main.switchclient", "09a74d03b3598ec94cfd2644"), + Build("XSXClient", "fortnitegame.oplog.fortnite-main.xsxclient", "09a76c2bbd6cd78f4d40d9ea"), + Build("Switch2Client", "fortnitegame.oplog.fortnite-main.switch2client", "09a7686b3d9faa78fb24a38f"), + Build("PS4Client", "fortnitegame.oplog.fortnite-main.ps4client", "09a75b72d1c260ed26020140"), + Build("LinuxServer", "fortnitegame.oplog.fortnite-main.linuxserver", "09a747f5e0ee83a04be013e6"), + Build("IOSClient", "fortnitegame.oplog.fortnite-main.iosclient", "09a75f677e883325a209148c"), + Build("Android_ASTCClient", "fortnitegame.oplog.fortnite-main.android_astcclient", "09a7422c08c6f37becc7d37f"), +] + +ZEN_EXE: Path = Path(f"./build/{_PLATFORM}/{_ARCH}/release/zen{_EXE_SUFFIX}") + +ZEN_PORT = 8558 +ZEN_CACHE_PORT = 8559 +ZEN_CACHE = f"http://127.0.0.1:{ZEN_CACHE_PORT}" +ZEN_CACHE_POPULATE = "true" +ZEN_PARTIAL_REQUEST_MODE = "true" + +SERVER_ARGS: tuple[str, ...] = ( + "--http", "asio", + "--gc-cache-duration-seconds", "1209600", + "--gc-interval-seconds", "21600", + "--gc-low-diskspace-threshold", "2147483648", + "--cache-bucket-limit-overwrites", +) + + +def run(cmd: list[str | Path]) -> None: + try: + subprocess.run(cmd, check=True) + except FileNotFoundError: + sys.exit(f"error: executable not found: {cmd[0]}") + except subprocess.CalledProcessError as e: + sys.exit(f"error: command failed with exit code {e.returncode}:\n {' '.join(str(x) for x in e.cmd)}") + + +def stop_server(label: str, port: int) -> None: + """Stop a zen server. Tolerates failures so it is safe to call from finally blocks.""" + print(f"--------- stopping {label}") + try: + subprocess.run([ZEN_EXE, "down", "--port", str(port)]) + except OSError as e: + print(f"warning: could not stop {label}: {e}", file=sys.stderr) + print() + + +def start_server(label: str, data_dir: Path, port: int, extra_args: list[str] | None = None) -> None: + print(f"--------- starting {label} {data_dir}") + run([ + ZEN_EXE, "up", "--port", str(port), "--show-console", "--", + f"--data-dir={data_dir}", + *SERVER_ARGS, + *(extra_args or []), + ]) + print() + + +def wipe_or_create(label: str, path: Path) -> None: + if path.exists(): + print(f"--------- cleaning {label} {path}") + run([ZEN_EXE, "wipe", "-y", path]) + else: + print(f"--------- creating {label} {path}") + path.mkdir(parents=True, exist_ok=True) + print() + + +def check_prerequisites() -> None: + if not ZEN_EXE.is_file(): + sys.exit(f"error: zen executable not found: {ZEN_EXE}") + + +def setup_project(port: int) -> None: + """Create the FortniteGame project and all oplogs on the server at the given port.""" + print("--------- creating FortniteGame project") + run([ZEN_EXE, "project-create", f"--hosturl=127.0.0.1:{port}", "FortniteGame", "--force-update"]) + print() + + for build in BUILDS: + print(f"--------- creating {build.name} oplog") + run([ZEN_EXE, "oplog-create", f"--hosturl=127.0.0.1:{port}", "FortniteGame", build.name, "--force-update"]) + print() + + +def main() -> None: + global ZEN_EXE + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "positional_path", + nargs="?", + default=None, + type=Path, + metavar="DATA_PATH", + help="root path for all data directories (positional shorthand for --data-path)", + ) + parser.add_argument( + "zen_exe_positional", + nargs="?", + default=None, + type=Path, + metavar="ZEN_EXE_PATH", + help="path to zen executable (positional shorthand for --zen-exe-path)", + ) + parser.add_argument( + "--data-path", + default=Path(Path(__file__).stem + "_datadir"), + type=Path, + metavar="PATH", + help=f"root path for all data directories (default: {Path(__file__).stem}_datadir)", + ) + parser.add_argument( + "--zen-exe-path", + default=ZEN_EXE, + type=Path, + metavar="PATH", + help=f"path to zen executable (default: {ZEN_EXE})", + ) + args = parser.parse_args() + + data_path = args.positional_path + if data_path is None: + data_path = args.data_path + + ZEN_EXE = args.zen_exe_positional + if ZEN_EXE is None: + ZEN_EXE = args.zen_exe_path + zen_data_dir = data_path / "DDC" / "OplogsZen" + zen_cache_data_dir = data_path / "DDC" / "ZenBuildsCache" + zen_import_data_dir = data_path / "DDC" / "OplogsZenImport" + export_dir = data_path / "Export" / "FortniteGame" + + check_prerequisites() + + start_server("cache zenserver", zen_cache_data_dir, ZEN_CACHE_PORT, ["--buildstore-enabled"]) + try: + wipe_or_create("zenserver data", zen_data_dir) + start_server("zenserver", zen_data_dir, ZEN_PORT) + try: + setup_project(ZEN_PORT) + + for build in BUILDS: + print(f"--------- importing {build.name} oplog") + run([ + ZEN_EXE, "oplog-import", + f"--hosturl=127.0.0.1:{ZEN_PORT}", + "FortniteGame", build.name, + "--clean", + "--builds", "https://jupiter.devtools.epicgames.com", + "--namespace", "fortnite.oplog", + "--bucket", build.bucket, + "--builds-id", build.id, + f"--zen-cache-host={ZEN_CACHE}", + f"--zen-cache-upload={ZEN_CACHE_POPULATE}", + f"--allow-partial-block-requests={ZEN_PARTIAL_REQUEST_MODE}", + ]) + print() + + print(f"--------- validating {build.name} oplog") + run([ZEN_EXE, "oplog-validate", f"--hosturl=127.0.0.1:{ZEN_PORT}", "FortniteGame", build.name]) + print() + + wipe_or_create("export folder", export_dir) + + for build in BUILDS: + print(f"--------- exporting {build.name} oplog") + run([ + ZEN_EXE, "oplog-export", + f"--hosturl=127.0.0.1:{ZEN_PORT}", + "FortniteGame", build.name, + "--file", export_dir, + "--forcetempblocks", + ]) + print() + finally: + stop_server("zenserver", ZEN_PORT) + + wipe_or_create("alternate zenserver data", zen_import_data_dir) + start_server("import zenserver", zen_import_data_dir, ZEN_PORT) + try: + setup_project(ZEN_PORT) + + for build in BUILDS: + print(f"--------- importing {build.name} oplog") + run([ + ZEN_EXE, "oplog-import", + f"--hosturl=127.0.0.1:{ZEN_PORT}", + "FortniteGame", build.name, + "--file", export_dir, + ]) + print() + + print(f"--------- validating {build.name} oplog") + run([ZEN_EXE, "oplog-validate", f"--hosturl=127.0.0.1:{ZEN_PORT}", "FortniteGame", build.name]) + print() + finally: + stop_server("alternative zenserver", ZEN_PORT) + finally: + stop_server("cache zenserver", ZEN_CACHE_PORT) + + +if __name__ == "__main__": + main() diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index e5cbafbea..b4b4df7c9 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -1594,7 +1594,7 @@ namespace builds_impl { } } } - if (Storage.BuildCacheStorage) + if (Storage.CacheStorage) { if (SB.Size() > 0) { @@ -1649,9 +1649,9 @@ namespace builds_impl { } if (Options.PrimeCacheOnly) { - if (Storage.BuildCacheStorage) + if (Storage.CacheStorage) { - Storage.BuildCacheStorage->Flush(5000, [](intptr_t Remaining) { + Storage.CacheStorage->Flush(5000, [](intptr_t Remaining) { if (!IsQuiet) { if (Remaining == 0) @@ -2826,47 +2826,47 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildStorageResolveResult ResolveRes = ResolveBuildStorage(*Output, ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::All, m_Verbose); - if (!ResolveRes.HostUrl.empty()) + if (!ResolveRes.Cloud.Address.empty()) { - ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2; + ClientSettings.AssumeHttp2 = ResolveRes.Cloud.AssumeHttp2; Result.BuildStorageHttp = - std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings, []() { return AbortFlag.load(); }); + std::make_unique<HttpClient>(ResolveRes.Cloud.Address, ClientSettings, []() { return AbortFlag.load(); }); - Result.BuildStorage = CreateJupiterBuildStorage(Log(), + Result.BuildStorage = CreateJupiterBuildStorage(Log(), *Result.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, TempPath / "storage"); - Result.StorageName = ResolveRes.HostName; + Result.BuildStorageHost = ResolveRes.Cloud; - uint64_t HostLatencyNs = ResolveRes.HostLatencySec >= 0 ? uint64_t(ResolveRes.HostLatencySec * 1000000000.0) : 0; + uint64_t HostLatencyNs = ResolveRes.Cloud.LatencySec >= 0 ? uint64_t(ResolveRes.Cloud.LatencySec * 1000000000.0) : 0; - StorageDescription = fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}", - ResolveRes.HostName, - (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), - Result.BuildStorageHttp->GetSessionId(), - m_Namespace, - m_Bucket, - NiceLatencyNs(HostLatencyNs)); - Result.BuildStorageLatencySec = ResolveRes.HostLatencySec; + StorageDescription = + fmt::format("Cloud {}{}. SessionId: '{}'. Namespace '{}', Bucket '{}'. Latency: {}", + ResolveRes.Cloud.Name, + (ResolveRes.Cloud.Address == ResolveRes.Cloud.Name) ? "" : fmt::format(" {}", ResolveRes.Cloud.Address), + Result.BuildStorageHttp->GetSessionId(), + m_Namespace, + m_Bucket, + NiceLatencyNs(HostLatencyNs)); - if (!ResolveRes.CacheUrl.empty()) + if (!ResolveRes.Cache.Address.empty()) { Result.CacheHttp = std::make_unique<HttpClient>( - ResolveRes.CacheUrl, + ResolveRes.Cache.Address, HttpClientSettings{ .LogCategory = "httpcacheclient", .ConnectTimeout = std::chrono::milliseconds{3000}, .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AssumeHttp2 = ResolveRes.Cache.AssumeHttp2, .AllowResume = true, .RetryCount = 0, .Verbose = m_VerboseHttp, .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}, []() { return AbortFlag.load(); }); - Result.BuildCacheStorage = + Result.CacheStorage = CreateZenBuildStorageCache(*Result.CacheHttp, StorageCacheStats, m_Namespace, @@ -2874,19 +2874,17 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) TempPath / "zencache", BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background)); - Result.CacheName = ResolveRes.CacheName; + Result.CacheHost = ResolveRes.Cache; - uint64_t CacheLatencyNs = ResolveRes.CacheLatencySec >= 0 ? uint64_t(ResolveRes.CacheLatencySec * 1000000000.0) : 0; + uint64_t CacheLatencyNs = ResolveRes.Cache.LatencySec >= 0 ? uint64_t(ResolveRes.Cache.LatencySec * 1000000000.0) : 0; CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'. Latency: {}", - ResolveRes.CacheName, - (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl), + ResolveRes.Cache.Name, + (ResolveRes.Cache.Address == ResolveRes.Cache.Name) ? "" : fmt::format(" {}", ResolveRes.Cache.Address), Result.CacheHttp->GetSessionId(), NiceLatencyNs(CacheLatencyNs)); - Result.CacheLatencySec = ResolveRes.CacheLatencySec; - if (!m_Namespace.empty()) { CacheDescription += fmt::format(". Namespace '{}'", m_Namespace); @@ -2902,41 +2900,56 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { StorageDescription = fmt::format("folder {}", m_StoragePath); Result.BuildStorage = CreateFileBuildStorage(m_StoragePath, StorageStats, false, DefaultLatency, DefaultDelayPerKBSec); - Result.StorageName = fmt::format("Disk {}", m_StoragePath.stem()); + + Result.BuildStorageHost = BuildStorageResolveResult::Host{.Address = m_StoragePath.generic_string(), + .Name = "Disk", + .LatencySec = 1.0 / 100000, // 1 us + .Caps = {.MaxRangeCountPerRequest = 2048u}}; if (!m_ZenCacheHost.empty()) { - Result.CacheHttp = std::make_unique<HttpClient>( - m_ZenCacheHost, - HttpClientSettings{ - .LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = m_AssumeHttp2, - .AllowResume = true, - .RetryCount = 0, - .Verbose = m_VerboseHttp, - .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}, - []() { return AbortFlag.load(); }); - Result.BuildCacheStorage = - CreateZenBuildStorageCache(*Result.CacheHttp, - StorageCacheStats, - m_Namespace, - m_Bucket, - TempPath / "zencache", - BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background) - : GetTinyWorkerPool(EWorkloadType::Background)); - Result.CacheName = m_ZenCacheHost; - - CacheDescription = fmt::format("Zen {}{}. SessionId: '{}'", Result.CacheName, "", Result.CacheHttp->GetSessionId()); - ; - if (!m_Namespace.empty()) - { - CacheDescription += fmt::format(". Namespace '{}'", m_Namespace); - } - if (!m_Bucket.empty()) + ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(m_ZenCacheHost, m_AssumeHttp2, m_VerboseHttp); + + if (TestResult.Success) { - CacheDescription += fmt::format(" Bucket '{}'", m_Bucket); + Result.CacheHttp = std::make_unique<HttpClient>( + m_ZenCacheHost, + HttpClientSettings{ + .LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = m_AssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .Verbose = m_VerboseHttp, + .MaximumInMemoryDownloadSize = GetMaxMemoryBufferSize(DefaultMaxChunkBlockSize, m_BoostWorkerMemory)}, + []() { return AbortFlag.load(); }); + + Result.CacheStorage = + CreateZenBuildStorageCache(*Result.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + TempPath / "zencache", + BoostCacheBackgroundWorkerPool ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)); + Result.CacheHost = + BuildStorageResolveResult::Host{.Address = m_ZenCacheHost, + .Name = m_ZenCacheHost, + .AssumeHttp2 = m_AssumeHttp2, + .LatencySec = TestResult.LatencySeconds, + .Caps = {.MaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest}}; + + CacheDescription = fmt::format("Zen {}. SessionId: '{}'", Result.CacheHost.Name, Result.CacheHttp->GetSessionId()); + + if (!m_Namespace.empty()) + { + CacheDescription += fmt::format(". Namespace '{}'", m_Namespace); + } + if (!m_Bucket.empty()) + { + CacheDescription += fmt::format(" Bucket '{}'", m_Bucket); + } } } } @@ -2948,7 +2961,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (!IsQuiet) { ZEN_CONSOLE("Remote: {}", StorageDescription); - if (!Result.CacheName.empty()) + if (!Result.CacheHost.Name.empty()) { ZEN_CONSOLE("Cache : {}", CacheDescription); } @@ -3489,7 +3502,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) "Requests: {}\n" "Avg Request Time: {}\n" "Avg I/O Time: {}", - Storage.StorageName, + Storage.BuildStorageHost.Name, NiceBytes(StorageStats.TotalBytesRead.load()), NiceBytes(StorageStats.TotalBytesWritten.load()), StorageStats.TotalRequestCount.load(), @@ -3810,12 +3823,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (!IsQuiet) { - if (Storage.BuildCacheStorage) + if (Storage.CacheStorage) { - ZEN_CONSOLE("Uploaded {} ({}) blobs", + ZEN_CONSOLE("Uploaded {} ({}) blobs to {}", StorageCacheStats.PutBlobCount.load(), NiceBytes(StorageCacheStats.PutBlobByteCount), - Storage.CacheName); + Storage.CacheHost.Name); } } diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index dfc6c1650..5ff591b54 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -2602,38 +2602,37 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a StorageInstance Storage; - ClientSettings.AssumeHttp2 = ResolveRes.HostAssumeHttp2; + ClientSettings.AssumeHttp2 = ResolveRes.Cloud.AssumeHttp2; ClientSettings.MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u; - Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings); - Storage.BuildStorageLatencySec = ResolveRes.HostLatencySec; + Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.Cloud.Address, ClientSettings); + Storage.BuildStorageHost = ResolveRes.Cloud; BuildStorageCache::Statistics StorageCacheStats; std::atomic<bool> AbortFlag(false); - if (!ResolveRes.CacheUrl.empty()) + if (!ResolveRes.Cache.Address.empty()) { Storage.CacheHttp = std::make_unique<HttpClient>( - ResolveRes.CacheUrl, + ResolveRes.Cache.Address, HttpClientSettings{ .LogCategory = "httpcacheclient", .ConnectTimeout = std::chrono::milliseconds{3000}, .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AssumeHttp2 = ResolveRes.Cache.AssumeHttp2, .AllowResume = true, .RetryCount = 0, .MaximumInMemoryDownloadSize = m_BoostWorkerMemory ? RemoteStoreOptions::DefaultMaxBlockSize : 1024u * 1024u}, [&AbortFlag]() { return AbortFlag.load(); }); - Storage.CacheName = ResolveRes.CacheName; - Storage.CacheLatencySec = ResolveRes.CacheLatencySec; + Storage.CacheHost = ResolveRes.Cache; } if (!m_Quiet) { std::string StorageDescription = fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'", - ResolveRes.HostName, - (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), + ResolveRes.Cloud.Name, + (ResolveRes.Cloud.Address == ResolveRes.Cloud.Name) ? "" : fmt::format(" {}", ResolveRes.Cloud.Address), Storage.BuildStorageHttp->GetSessionId(), m_Namespace, m_Bucket); @@ -2644,8 +2643,8 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a { std::string CacheDescription = fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'", - ResolveRes.CacheName, - (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl), + ResolveRes.Cache.Name, + (ResolveRes.Cache.Address == ResolveRes.Cache.Name) ? "" : fmt::format(" {}", ResolveRes.Cache.Address), Storage.CacheHttp->GetSessionId(), m_Namespace, m_Bucket); @@ -2661,11 +2660,10 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a Storage.BuildStorage = CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath); - Storage.StorageName = ResolveRes.HostName; if (Storage.CacheHttp) { - Storage.BuildCacheStorage = CreateZenBuildStorageCache( + Storage.CacheStorage = CreateZenBuildStorageCache( *Storage.CacheHttp, StorageCacheStats, m_Namespace, diff --git a/src/zenhttp/include/zenhttp/formatters.h b/src/zenhttp/include/zenhttp/formatters.h index addb00cb8..57ab01158 100644 --- a/src/zenhttp/include/zenhttp/formatters.h +++ b/src/zenhttp/include/zenhttp/formatters.h @@ -73,7 +73,7 @@ struct fmt::formatter<zen::HttpClient::Response> if (Response.IsSuccess()) { return fmt::format_to(Ctx.out(), - "OK: Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", + "OK: Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}", ToString(Response.StatusCode), Response.UploadedBytes, Response.DownloadedBytes, diff --git a/src/zenremotestore/builds/buildstoragecache.cpp b/src/zenremotestore/builds/buildstoragecache.cpp index 53d33bd7e..00765903d 100644 --- a/src/zenremotestore/builds/buildstoragecache.cpp +++ b/src/zenremotestore/builds/buildstoragecache.cpp @@ -528,6 +528,192 @@ CreateZenBuildStorageCache(HttpClient& HttpClient, return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BackgroundWorkerPool); } +#if ZEN_WITH_TESTS + +class InMemoryBuildStorageCache : public BuildStorageCache +{ +public: + // MaxRangeSupported == 0 : no range requests are accepted, always return full blob + // MaxRangeSupported == 1 : single range is supported, multi range returns full blob + // MaxRangeSupported > 1 : multirange is supported up to MaxRangeSupported, more ranges returns empty blob (bad request) + explicit InMemoryBuildStorageCache(uint64_t MaxRangeSupported, + BuildStorageCache::Statistics& Stats, + double LatencySec = 0.0, + double DelayPerKBSec = 0.0) + : m_MaxRangeSupported(MaxRangeSupported) + , m_Stats(Stats) + , m_LatencySec(LatencySec) + , m_DelayPerKBSec(DelayPerKBSec) + { + } + void PutBuildBlob(const Oid&, const IoHash& RawHash, ZenContentType, const CompositeBuffer& Payload) override + { + IoBuffer Buf = Payload.Flatten().AsIoBuffer(); + Buf.MakeOwned(); + const uint64_t SentBytes = Buf.Size(); + uint64_t ReceivedBytes = 0; + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); }); + { + std::lock_guard Lock(m_Mutex); + m_Entries[RawHash] = std::move(Buf); + } + m_Stats.PutBlobCount.fetch_add(1); + m_Stats.PutBlobByteCount.fetch_add(SentBytes); + } + + IoBuffer GetBuildBlob(const Oid&, const IoHash& RawHash, uint64_t RangeOffset = 0, uint64_t RangeBytes = (uint64_t)-1) override + { + uint64_t SentBytes = 0; + uint64_t ReceivedBytes = 0; + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); }); + IoBuffer FullPayload; + { + std::lock_guard Lock(m_Mutex); + auto It = m_Entries.find(RawHash); + if (It == m_Entries.end()) + { + return {}; + } + FullPayload = It->second; + } + + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + if (m_MaxRangeSupported == 0) + { + ReceivedBytes = FullPayload.Size(); + return FullPayload; + } + else + { + ReceivedBytes = (RangeBytes == (uint64_t)-1) ? FullPayload.Size() - RangeOffset : RangeBytes; + return IoBuffer(FullPayload, RangeOffset, RangeBytes); + } + } + else + { + ReceivedBytes = FullPayload.Size(); + return FullPayload; + } + } + + BuildBlobRanges GetBuildBlobRanges(const Oid&, const IoHash& RawHash, std::span<const std::pair<uint64_t, uint64_t>> Ranges) override + { + ZEN_ASSERT(!Ranges.empty()); + uint64_t SentBytes = 0; + uint64_t ReceivedBytes = 0; + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer.GetElapsedTimeUs(), ReceivedBytes, SentBytes); }); + if (m_MaxRangeSupported > 1 && Ranges.size() > m_MaxRangeSupported) + { + return {}; + } + IoBuffer FullPayload; + { + std::lock_guard Lock(m_Mutex); + auto It = m_Entries.find(RawHash); + if (It == m_Entries.end()) + { + return {}; + } + FullPayload = It->second; + } + + if (Ranges.size() > m_MaxRangeSupported) + { + // An empty Ranges signals to the caller: "full buffer given, use it for all requested ranges". + ReceivedBytes = FullPayload.Size(); + return {.PayloadBuffer = FullPayload}; + } + else + { + uint64_t PayloadStart = Ranges.front().first; + uint64_t PayloadSize = Ranges.back().first + Ranges.back().second - PayloadStart; + IoBuffer RangeBuffer = IoBuffer(FullPayload, PayloadStart, PayloadSize); + std::vector<std::pair<uint64_t, uint64_t>> PayloadRanges; + PayloadRanges.reserve(Ranges.size()); + for (const std::pair<uint64_t, uint64_t>& Range : Ranges) + { + PayloadRanges.push_back(std::make_pair(Range.first - PayloadStart, Range.second)); + } + ReceivedBytes = PayloadSize; + return {.PayloadBuffer = RangeBuffer, .Ranges = std::move(PayloadRanges)}; + } + } + + void PutBlobMetadatas(const Oid&, std::span<const IoHash>, std::span<const CbObject>) override {} + + std::vector<CbObject> GetBlobMetadatas(const Oid&, std::span<const IoHash> Hashes) override + { + return std::vector<CbObject>(Hashes.size()); + } + + std::vector<BlobExistsResult> BlobsExists(const Oid&, std::span<const IoHash> Hashes) override + { + std::lock_guard Lock(m_Mutex); + std::vector<BlobExistsResult> Result; + Result.reserve(Hashes.size()); + for (const IoHash& Hash : Hashes) + { + auto It = m_Entries.find(Hash); + Result.push_back({.HasBody = (It != m_Entries.end() && It->second)}); + } + return Result; + } + + void Flush(int32_t, std::function<bool(intptr_t)>&&) override {} + +private: + void AddStatistic(uint64_t ElapsedTimeUs, uint64_t ReceivedBytes, uint64_t SentBytes) + { + m_Stats.TotalBytesWritten += SentBytes; + m_Stats.TotalBytesRead += ReceivedBytes; + m_Stats.TotalExecutionTimeUs += ElapsedTimeUs; + m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, SentBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, ReceivedBytes); + if (ElapsedTimeUs > 0) + { + SetAtomicMax(m_Stats.PeakBytesPerSec, (ReceivedBytes + SentBytes) * 1000000 / ElapsedTimeUs); + } + } + + void SimulateLatency(uint64_t SendBytes, uint64_t ReceiveBytes) + { + double SleepSec = m_LatencySec; + if (m_DelayPerKBSec > 0.0) + { + SleepSec += m_DelayPerKBSec * (double(SendBytes + ReceiveBytes) / 1024u); + } + if (SleepSec > 0) + { + Sleep(int(SleepSec * 1000)); + } + } + + uint64_t m_MaxRangeSupported = 0; + BuildStorageCache::Statistics& m_Stats; + const double m_LatencySec = 0.0; + const double m_DelayPerKBSec = 0.0; + std::mutex m_Mutex; + std::unordered_map<IoHash, IoBuffer, IoHash::Hasher> m_Entries; +}; + +std::unique_ptr<BuildStorageCache> +CreateInMemoryBuildStorageCache(uint64_t MaxRangeSupported, BuildStorageCache::Statistics& Stats, double LatencySec, double DelayPerKBSec) +{ + return std::make_unique<InMemoryBuildStorageCache>(MaxRangeSupported, Stats, LatencySec, DelayPerKBSec); +} +#endif // ZEN_WITH_TESTS + ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose) { @@ -542,15 +728,28 @@ TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const boo HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); if (TestResponse.IsSuccess()) { - LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health"); + uint64_t MaxRangeCountPerRequest = 1; + CbObject StatusResponse = TestResponse.AsObject(); + if (StatusResponse["ok"].AsBool()) + { + MaxRangeCountPerRequest = StatusResponse["capabilities"].AsObjectView()["maxrangecountperrequest"].AsUInt64(1); + + LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health"); + + if (!LatencyResult.Success) + { + return {.Success = false, .FailureReason = LatencyResult.FailureReason}; + } - if (!LatencyResult.Success) + return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds, .MaxRangeCountPerRequest = MaxRangeCountPerRequest}; + } + else { - return {.Success = false, .FailureReason = LatencyResult.FailureReason}; + return {.Success = false, + .FailureReason = fmt::format("ZenCache endpoint {}/status/builds did not respond with \"ok\"", BaseUrl)}; } - return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds}; } return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; -}; +} } // namespace zen diff --git a/src/zenremotestore/builds/buildstorageoperations.cpp b/src/zenremotestore/builds/buildstorageoperations.cpp index 43a4937f0..f4b167b73 100644 --- a/src/zenremotestore/builds/buildstorageoperations.cpp +++ b/src/zenremotestore/builds/buildstorageoperations.cpp @@ -887,11 +887,12 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) ChunkBlockAnalyser BlockAnalyser( m_LogOutput, m_BlockDescriptions, - ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, - .IsVerbose = m_Options.IsVerbose, - .HostLatencySec = m_Storage.BuildStorageLatencySec, - .HostHighSpeedLatencySec = m_Storage.CacheLatencySec, - .HostMaxRangeCountPerRequest = BuildStorageBase::MaxRangeCountPerRequest}); + ChunkBlockAnalyser::Options{.IsQuiet = m_Options.IsQuiet, + .IsVerbose = m_Options.IsVerbose, + .HostLatencySec = m_Storage.BuildStorageHost.LatencySec, + .HostHighSpeedLatencySec = m_Storage.CacheHost.LatencySec, + .HostMaxRangeCountPerRequest = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest, + .HostHighSpeedMaxRangeCountPerRequest = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest}); std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = BlockAnalyser.GetNeeded( m_RemoteLookup.ChunkHashToChunkIndex, @@ -974,7 +975,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } } - if (m_Storage.BuildCacheStorage) + if (m_Storage.CacheStorage) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; @@ -993,7 +994,7 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { @@ -1018,32 +1019,50 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) } std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> BlockPartialDownloadModes; + if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { BlockPartialDownloadModes.resize(m_BlockDescriptions.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else { + ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + + switch (m_Options.PartialBlockRequestMode) + { + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + break; + case EPartialBlockRequestMode::Mixed: + CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + case EPartialBlockRequestMode::All: + CachePartialDownloadMode = m_Storage.CacheHost.Caps.MaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + default: + ZEN_ASSERT(false); + break; + } + BlockPartialDownloadModes.reserve(m_BlockDescriptions.size()); for (uint32_t BlockIndex = 0; BlockIndex < m_BlockDescriptions.size(); BlockIndex++) { const bool BlockExistInCache = ExistsResult.ExistingBlobs.contains(m_BlockDescriptions[BlockIndex].BlockHash); - if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::All) - { - BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); - } - else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) - { - BlockPartialDownloadModes.push_back(BlockExistInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact - : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); - } - else if (m_Options.PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) - { - BlockPartialDownloadModes.push_back(BlockExistInCache - ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); - } + BlockPartialDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); } } @@ -1527,20 +1546,20 @@ BuildsOperationUpdateFolder::Execute(FolderContent& OutLocalFolderState) IoBuffer BlockBuffer; const bool ExistsInCache = - m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); + m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); if (ExistsInCache) { - BlockBuffer = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); + BlockBuffer = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); } if (!BlockBuffer) { BlockBuffer = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash); - if (BlockBuffer && m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (BlockBuffer && m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BlockBuffer))); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BlockBuffer))); } } if (!BlockBuffer) @@ -3103,10 +3122,10 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde const IoHash& ChunkHash = m_RemoteContent.ChunkedContent.ChunkHashes[RemoteChunkIndex]; // FilteredDownloadedBytesPerSecond.Start(); IoBuffer BuildBlob; - const bool ExistsInCache = m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(ChunkHash); if (ExistsInCache) { - BuildBlob = m_Storage.BuildCacheStorage->GetBuildBlob(m_BuildId, ChunkHash); + BuildBlob = m_Storage.CacheStorage->GetBuildBlob(m_BuildId, ChunkHash); } if (BuildBlob) { @@ -3134,12 +3153,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde m_DownloadStats.DownloadedChunkCount++; m_DownloadStats.RequestsCompleteCount++; - if (Payload && m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (Payload && m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); } OnDownloaded(std::move(Payload)); @@ -3148,12 +3167,12 @@ BuildsOperationUpdateFolder::DownloadBuildBlob(uint32_t RemoteChunkInde else { BuildBlob = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, ChunkHash); - if (BuildBlob && m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (BuildBlob && m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - ChunkHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(BuildBlob))); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + ChunkHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(BuildBlob))); } if (!BuildBlob) { @@ -3273,34 +3292,7 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); } - if (m_Storage.BuildCacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash)) - { - BuildStorageCache::BuildBlobRanges RangeBuffers = - m_Storage.BuildCacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, Ranges); - if (RangeBuffers.PayloadBuffer) - { - if (!m_AbortFlag) - { - if (RangeBuffers.Ranges.size() != Ranges.size()) - { - throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", - Ranges.size(), - BlockDescription.BlockHash, - RangeBuffers.Ranges.size())); - } - - std::vector<std::pair<uint64_t, uint64_t>> BlockOffsetAndLengths = std::move(RangeBuffers.Ranges); - ProcessDownload(BlockDescription, - std::move(RangeBuffers.PayloadBuffer), - BlockRangeStartIndex, - BlockOffsetAndLengths, - OnDownloaded); - } - return; - } - } - - const size_t MaxRangesPerRequestToJupiter = BuildStorageBase::MaxRangeCountPerRequest; + const bool ExistsInCache = m_Storage.CacheStorage && ExistsResult.ExistingBlobs.contains(BlockDescription.BlockHash); size_t SubBlockRangeCount = BlockRangeCount; size_t SubRangeCountComplete = 0; @@ -3311,30 +3303,101 @@ BuildsOperationUpdateFolder::DownloadPartialBlock( { break; } - size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter); + + // First try to get subrange from cache. + // If not successful, try to get the ranges from the build store and adapt SubRangeCount... + size_t SubRangeStartIndex = BlockRangeStartIndex + SubRangeCountComplete; + if (ExistsInCache) + { + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.CacheHost.Caps.MaxRangeCountPerRequest); + + if (SubRangeCount == 1) + { + // Legacy single-range path, prefer that for max compatibility + + const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete]; + IoBuffer PayloadBuffer = + m_Storage.CacheStorage->GetBuildBlob(m_BuildId, BlockDescription.BlockHash, SubRange.first, SubRange.second); + if (m_AbortFlag) + { + break; + } + if (PayloadBuffer) + { + ProcessDownload(BlockDescription, + std::move(PayloadBuffer), + SubRangeStartIndex, + std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}, + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + else + { + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + BuildStorageCache::BuildBlobRanges RangeBuffers = + m_Storage.CacheStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); + if (m_AbortFlag) + { + break; + } + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) + { + SubRangeCount = Ranges.size() - SubRangeCountComplete; + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount), + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + else if (RangeBuffers.Ranges.size() == SubRangeCount) + { + ProcessDownload(BlockDescription, + std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangeBuffers.Ranges, + OnDownloaded); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + } + } + + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, m_Storage.BuildStorageHost.Caps.MaxRangeCountPerRequest); auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); BuildStorageBase::BuildBlobRanges RangeBuffers = m_Storage.BuildStorage->GetBuildBlobRanges(m_BuildId, BlockDescription.BlockHash, SubRanges); + if (m_AbortFlag) + { + break; + } if (RangeBuffers.PayloadBuffer) { - if (m_AbortFlag) - { - break; - } if (RangeBuffers.Ranges.empty()) { // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Upload to cache (if enabled) and use the whole payload for the remaining ranges - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlockDescription.BlockHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(std::vector<IoBuffer>{RangeBuffers.PayloadBuffer})); + if (m_AbortFlag) + { + break; + } } SubRangeCount = Ranges.size() - SubRangeCountComplete; @@ -4932,12 +4995,12 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& const IoHash& BlockHash = OutBlocks.BlockDescriptions[BlockIndex].BlockHash; const uint64_t CompressedBlockSize = Payload.GetCompressedSize(); - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlockHash, - ZenContentType::kCompressedBinary, - Payload.GetCompressed()); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlockHash, + ZenContentType::kCompressedBinary, + Payload.GetCompressed()); } m_Storage.BuildStorage->PutBuildBlob(m_BuildId, @@ -4955,11 +5018,11 @@ BuildsOperationUploadFolder::GenerateBuildBlocks(const ChunkedFolderContent& OutBlocks.BlockDescriptions[BlockIndex].ChunkRawHashes.size()); } - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); } bool MetadataSucceeded = @@ -5803,11 +5866,11 @@ BuildsOperationUploadFolder::UploadBuildPart(ChunkingController& ChunkController { const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) @@ -6001,9 +6064,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co const CbObject BlockMetaData = BuildChunkBlockDescription(NewBlocks.BlockDescriptions[BlockIndex], NewBlocks.BlockMetaDatas[BlockIndex]); - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); } m_Storage.BuildStorage->PutBuildBlob(m_BuildId, BlockHash, ZenContentType::kCompressedBinary, Payload); if (m_Options.IsVerbose) @@ -6017,11 +6080,11 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co UploadedBlockSize += PayloadSize; TempUploadStats.BlocksBytes += PayloadSize; - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBlobMetadatas(m_BuildId, - std::vector<IoHash>({BlockHash}), - std::vector<CbObject>({BlockMetaData})); + m_Storage.CacheStorage->PutBlobMetadatas(m_BuildId, + std::vector<IoHash>({BlockHash}), + std::vector<CbObject>({BlockMetaData})); } bool MetadataSucceeded = m_Storage.BuildStorage->PutBlockMetadata(m_BuildId, BlockHash, BlockMetaData); if (MetadataSucceeded) @@ -6084,9 +6147,9 @@ BuildsOperationUploadFolder::UploadPartBlobs(const ChunkedFolderContent& Co const uint64_t PayloadSize = Payload.GetSize(); - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, RawHash, ZenContentType::kCompressedBinary, Payload); } if (PayloadSize >= LargeAttachmentSize) @@ -6830,14 +6893,14 @@ BuildsOperationPrimeCache::Execute() std::vector<IoHash> BlobsToDownload; BlobsToDownload.reserve(BuildBlobs.size()); - if (m_Storage.BuildCacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) + if (m_Storage.CacheStorage && !BuildBlobs.empty() && !m_Options.ForceUpload) { ZEN_TRACE_CPU("BlobCacheExistCheck"); Stopwatch Timer; const std::vector<IoHash> BlobHashes(BuildBlobs.begin(), BuildBlobs.end()); const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_Storage.BuildCacheStorage->BlobsExists(m_BuildId, BlobHashes); + m_Storage.CacheStorage->BlobsExists(m_BuildId, BlobHashes); if (CacheExistsResult.size() == BlobHashes.size()) { @@ -6884,33 +6947,33 @@ BuildsOperationPrimeCache::Execute() for (size_t BlobIndex = 0; BlobIndex < BlobCount; BlobIndex++) { - Work.ScheduleWork( - m_NetworkPool, - [this, - &Work, - &BlobsToDownload, - BlobCount, - &LooseChunkRawSizes, - &CompletedDownloadCount, - &FilteredDownloadedBytesPerSecond, - &MultipartAttachmentCount, - BlobIndex](std::atomic<bool>&) { - if (!m_AbortFlag) - { - const IoHash& BlobHash = BlobsToDownload[BlobIndex]; + Work.ScheduleWork(m_NetworkPool, + [this, + &Work, + &BlobsToDownload, + BlobCount, + &LooseChunkRawSizes, + &CompletedDownloadCount, + &FilteredDownloadedBytesPerSecond, + &MultipartAttachmentCount, + BlobIndex](std::atomic<bool>&) { + if (!m_AbortFlag) + { + const IoHash& BlobHash = BlobsToDownload[BlobIndex]; - bool IsLargeBlob = false; + bool IsLargeBlob = false; - if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) - { - IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; - } + if (auto It = LooseChunkRawSizes.find(BlobHash); It != LooseChunkRawSizes.end()) + { + IsLargeBlob = It->second >= m_Options.LargeAttachmentSize; + } - FilteredDownloadedBytesPerSecond.Start(); + FilteredDownloadedBytesPerSecond.Start(); - if (IsLargeBlob) - { - DownloadLargeBlob(*m_Storage.BuildStorage, + if (IsLargeBlob) + { + DownloadLargeBlob( + *m_Storage.BuildStorage, m_TempPath, m_BuildId, BlobHash, @@ -6926,12 +6989,12 @@ BuildsOperationPrimeCache::Execute() if (!m_AbortFlag) { - if (Payload && m_Storage.BuildCacheStorage) + if (Payload && m_Storage.CacheStorage) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlobHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(Payload))); + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(Payload))); } } CompletedDownloadCount++; @@ -6940,32 +7003,32 @@ BuildsOperationPrimeCache::Execute() FilteredDownloadedBytesPerSecond.Stop(); } }); - } - else - { - IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); - m_DownloadStats.DownloadedBlockCount++; - m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); - m_DownloadStats.RequestsCompleteCount++; + } + else + { + IoBuffer Payload = m_Storage.BuildStorage->GetBuildBlob(m_BuildId, BlobHash); + m_DownloadStats.DownloadedBlockCount++; + m_DownloadStats.DownloadedBlockByteCount += Payload.GetSize(); + m_DownloadStats.RequestsCompleteCount++; - if (!m_AbortFlag) - { - if (Payload && m_Storage.BuildCacheStorage) - { - m_Storage.BuildCacheStorage->PutBuildBlob(m_BuildId, - BlobHash, - ZenContentType::kCompressedBinary, - CompositeBuffer(SharedBuffer(std::move(Payload)))); - } - } - CompletedDownloadCount++; - if (CompletedDownloadCount == BlobCount) - { - FilteredDownloadedBytesPerSecond.Stop(); - } - } - } - }); + if (!m_AbortFlag) + { + if (Payload && m_Storage.CacheStorage) + { + m_Storage.CacheStorage->PutBuildBlob(m_BuildId, + BlobHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(SharedBuffer(std::move(Payload)))); + } + } + CompletedDownloadCount++; + if (CompletedDownloadCount == BlobCount) + { + FilteredDownloadedBytesPerSecond.Stop(); + } + } + } + }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { @@ -6977,10 +7040,10 @@ BuildsOperationPrimeCache::Execute() std::string DownloadRateString = (CompletedDownloadCount == BlobCount) ? "" : fmt::format(" {}bits/s", NiceNum(FilteredDownloadedBytesPerSecond.GetCurrent() * 8)); - std::string UploadDetails = m_Storage.BuildCacheStorage ? fmt::format(" {} ({}) uploaded.", - m_StorageCacheStats.PutBlobCount.load(), - NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) - : ""; + std::string UploadDetails = m_Storage.CacheStorage ? fmt::format(" {} ({}) uploaded.", + m_StorageCacheStats.PutBlobCount.load(), + NiceBytes(m_StorageCacheStats.PutBlobByteCount.load())) + : ""; std::string Details = fmt::format("{}/{} ({}{}) downloaded.{}", CompletedDownloadCount.load(), @@ -7005,13 +7068,13 @@ BuildsOperationPrimeCache::Execute() return; } - if (m_Storage.BuildCacheStorage) + if (m_Storage.CacheStorage) { - m_Storage.BuildCacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { + m_Storage.CacheStorage->Flush(m_LogOutput.GetProgressUpdateDelayMS(), [this](intptr_t Remaining) -> bool { ZEN_UNUSED(Remaining); if (!m_Options.IsQuiet) { - ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheName); + ZEN_OPERATION_LOG_INFO(m_LogOutput, "Waiting for {} blobs to finish upload to '{}'", Remaining, m_Storage.CacheHost.Name); } return !m_AbortFlag; }); @@ -7221,7 +7284,7 @@ GetRemoteContent(OperationLogOutput& Output, bool AttemptFallback = false; OutBlockDescriptions = GetBlockDescriptions(Output, *Storage.BuildStorage, - Storage.BuildCacheStorage.get(), + Storage.CacheStorage.get(), BuildId, BlockRawHashes, AttemptFallback, diff --git a/src/zenremotestore/builds/buildstorageutil.cpp b/src/zenremotestore/builds/buildstorageutil.cpp index d65f18b9a..2ae726e29 100644 --- a/src/zenremotestore/builds/buildstorageutil.cpp +++ b/src/zenremotestore/builds/buildstorageutil.cpp @@ -63,13 +63,15 @@ ResolveBuildStorage(OperationLogOutput& Output, std::string HostUrl; std::string HostName; - double HostLatencySec = -1.0; + double HostLatencySec = -1.0; + uint64_t HostMaxRangeCountPerRequest = 1; std::string CacheUrl; std::string CacheName; - bool HostAssumeHttp2 = ClientSettings.AssumeHttp2; - bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2; - double CacheLatencySec = -1.0; + bool HostAssumeHttp2 = ClientSettings.AssumeHttp2; + bool CacheAssumeHttp2 = ClientSettings.AssumeHttp2; + double CacheLatencySec = -1.0; + uint64_t CacheMaxRangeCountPerRequest = 1; JupiterServerDiscovery DiscoveryResponse; const std::string_view DiscoveryHost = Host.empty() ? OverrideHost : Host; @@ -100,9 +102,10 @@ ResolveBuildStorage(OperationLogOutput& Output, { ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", OverrideHost); } - HostUrl = OverrideHost; - HostName = GetHostNameFromUrl(OverrideHost); - HostLatencySec = TestResult.LatencySeconds; + HostUrl = OverrideHost; + HostName = GetHostNameFromUrl(OverrideHost); + HostLatencySec = TestResult.LatencySeconds; + HostMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest; } else { @@ -137,10 +140,11 @@ ResolveBuildStorage(OperationLogOutput& Output, ZEN_OPERATION_LOG_INFO(Output, "Server endpoint at '{}/api/v1/status/servers' succeeded", ServerEndpoint.BaseUrl); } - HostUrl = ServerEndpoint.BaseUrl; - HostAssumeHttp2 = ServerEndpoint.AssumeHttp2; - HostName = ServerEndpoint.Name; - HostLatencySec = TestResult.LatencySeconds; + HostUrl = ServerEndpoint.BaseUrl; + HostAssumeHttp2 = ServerEndpoint.AssumeHttp2; + HostName = ServerEndpoint.Name; + HostLatencySec = TestResult.LatencySeconds; + HostMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest; break; } else @@ -184,10 +188,11 @@ ResolveBuildStorage(OperationLogOutput& Output, ZEN_OPERATION_LOG_INFO(Output, "Cache endpoint at '{}/status/builds' succeeded", CacheEndpoint.BaseUrl); } - CacheUrl = CacheEndpoint.BaseUrl; - CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2; - CacheName = CacheEndpoint.Name; - CacheLatencySec = TestResult.LatencySeconds; + CacheUrl = CacheEndpoint.BaseUrl; + CacheAssumeHttp2 = CacheEndpoint.AssumeHttp2; + CacheName = CacheEndpoint.Name; + CacheLatencySec = TestResult.LatencySeconds; + CacheMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest; break; } } @@ -225,9 +230,10 @@ ResolveBuildStorage(OperationLogOutput& Output, if (ZenCacheEndpointTestResult TestResult = TestZenCacheEndpoint(ZenCacheHost, /*AssumeHttp2*/ false, ClientSettings.Verbose); TestResult.Success) { - CacheUrl = ZenCacheHost; - CacheName = GetHostNameFromUrl(ZenCacheHost); - CacheLatencySec = TestResult.LatencySeconds; + CacheUrl = ZenCacheHost; + CacheName = GetHostNameFromUrl(ZenCacheHost); + CacheLatencySec = TestResult.LatencySeconds; + CacheMaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest; } else { @@ -235,15 +241,34 @@ ResolveBuildStorage(OperationLogOutput& Output, } } - return BuildStorageResolveResult{.HostUrl = HostUrl, - .HostName = HostName, - .HostAssumeHttp2 = HostAssumeHttp2, - .HostLatencySec = HostLatencySec, + return BuildStorageResolveResult{ + .Cloud = {.Address = HostUrl, + .Name = HostName, + .AssumeHttp2 = HostAssumeHttp2, + .LatencySec = HostLatencySec, + .Caps = BuildStorageResolveResult::Capabilities{.MaxRangeCountPerRequest = HostMaxRangeCountPerRequest}}, + .Cache = {.Address = CacheUrl, + .Name = CacheName, + .AssumeHttp2 = CacheAssumeHttp2, + .LatencySec = CacheLatencySec, + .Caps = BuildStorageResolveResult::Capabilities{.MaxRangeCountPerRequest = CacheMaxRangeCountPerRequest}}}; +} - .CacheUrl = CacheUrl, - .CacheName = CacheName, - .CacheAssumeHttp2 = CacheAssumeHttp2, - .CacheLatencySec = CacheLatencySec}; +std::vector<ChunkBlockDescription> +ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas) +{ + std::vector<ChunkBlockDescription> UnorderedList; + UnorderedList.reserve(BlockMetadatas.size()); + for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < BlockMetadatas.size(); CacheBlockMetadataIndex++) + { + const CbObject& CacheBlockMetadata = BlockMetadatas[CacheBlockMetadataIndex]; + ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); + if (Description.BlockHash != IoHash::Zero) + { + UnorderedList.emplace_back(std::move(Description)); + } + } + return UnorderedList; } std::vector<ChunkBlockDescription> @@ -263,25 +288,15 @@ GetBlockDescriptions(OperationLogOutput& Output, if (OptionalCacheStorage && !BlockRawHashes.empty()) { std::vector<CbObject> CacheBlockMetadatas = OptionalCacheStorage->GetBlobMetadatas(BuildId, BlockRawHashes); - UnorderedList.reserve(CacheBlockMetadatas.size()); - for (size_t CacheBlockMetadataIndex = 0; CacheBlockMetadataIndex < CacheBlockMetadatas.size(); CacheBlockMetadataIndex++) + if (!CacheBlockMetadatas.empty()) { - const CbObject& CacheBlockMetadata = CacheBlockMetadatas[CacheBlockMetadataIndex]; - ChunkBlockDescription Description = ParseChunkBlockDescription(CacheBlockMetadata); - if (Description.BlockHash == IoHash::Zero) - { - ZEN_OPERATION_LOG_WARN(Output, "Unexpected/invalid block metadata received from remote cache, skipping block"); - } - else + UnorderedList = ParseBlockMetadatas(CacheBlockMetadatas); + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) { - UnorderedList.emplace_back(std::move(Description)); + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); } } - for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) - { - const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; - BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); - } } if (UnorderedList.size() < BlockRawHashes.size()) diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h index ce3da41c1..da8437a58 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorage.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorage.h @@ -58,8 +58,6 @@ public: uint64_t RangeOffset = 0, uint64_t RangeBytes = (uint64_t)-1) = 0; - static constexpr size_t MaxRangeCountPerRequest = 128u; - struct BuildBlobRanges { IoBuffer PayloadBuffer; diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h index 67c93480b..24702df0f 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstoragecache.h @@ -69,11 +69,19 @@ std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& H const std::filesystem::path& TempFolderPath, WorkerThreadPool& BackgroundWorkerPool); +#if ZEN_WITH_TESTS +std::unique_ptr<BuildStorageCache> CreateInMemoryBuildStorageCache(uint64_t MaxRangeSupported, + BuildStorageCache::Statistics& Stats, + double LatencySec = 0.0, + double DelayPerKBSec = 0.0); +#endif // ZEN_WITH_TESTS + struct ZenCacheEndpointTestResult { bool Success = false; std::string FailureReason; - double LatencySeconds = -1.0; + double LatencySeconds = -1.0; + uint64_t MaxRangeCountPerRequest = 1; }; ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose); diff --git a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h index 764a24e61..7306188ca 100644 --- a/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h +++ b/src/zenremotestore/include/zenremotestore/builds/buildstorageutil.h @@ -14,15 +14,20 @@ class BuildStorageCache; struct BuildStorageResolveResult { - std::string HostUrl; - std::string HostName; - bool HostAssumeHttp2 = false; - double HostLatencySec = -1.0; - - std::string CacheUrl; - std::string CacheName; - bool CacheAssumeHttp2 = false; - double CacheLatencySec = -1.0; + struct Capabilities + { + uint64_t MaxRangeCountPerRequest = 1; + }; + struct Host + { + std::string Address; + std::string Name; + bool AssumeHttp2 = false; + double LatencySec = -1.0; + Capabilities Caps; + }; + Host Cloud; + Host Cache; }; enum class ZenCacheResolveMode @@ -52,14 +57,13 @@ std::vector<ChunkBlockDescription> GetBlockDescriptions(OperationLogOutput& Out struct StorageInstance { - std::unique_ptr<HttpClient> BuildStorageHttp; - std::unique_ptr<BuildStorageBase> BuildStorage; - std::string StorageName; - double BuildStorageLatencySec = -1.0; + BuildStorageResolveResult::Host BuildStorageHost; + std::unique_ptr<HttpClient> BuildStorageHttp; + std::unique_ptr<BuildStorageBase> BuildStorage; + + BuildStorageResolveResult::Host CacheHost; std::unique_ptr<HttpClient> CacheHttp; - std::unique_ptr<BuildStorageCache> BuildCacheStorage; - std::string CacheName; - double CacheLatencySec = -1.0; + std::unique_ptr<BuildStorageCache> CacheStorage; }; } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h index 20b6fd371..c085f10e7 100644 --- a/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h +++ b/src/zenremotestore/include/zenremotestore/chunking/chunkblock.h @@ -31,6 +31,7 @@ struct ChunkBlockDescription : public ThinChunkBlockDescription std::vector<ChunkBlockDescription> ParseChunkBlockDescriptionList(const CbObjectView& BlocksObject); ChunkBlockDescription ParseChunkBlockDescription(const CbObjectView& BlockObject); +std::vector<ChunkBlockDescription> ParseBlockMetadatas(std::span<const CbObject> BlockMetadatas); CbObject BuildChunkBlockDescription(const ChunkBlockDescription& Block, CbObjectView MetaData); ChunkBlockDescription GetChunkBlockDescription(const SharedBuffer& BlockPayload, const IoHash& RawHash); typedef std::function<std::pair<uint64_t, CompressedBuffer>(const IoHash& RawHash)> FetchChunkFunc; diff --git a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h index 7bbf40dfa..bb41f9efc 100644 --- a/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h +++ b/src/zenremotestore/include/zenremotestore/jupiter/jupiterhost.h @@ -28,7 +28,8 @@ struct JupiterEndpointTestResult { bool Success = false; std::string FailureReason; - double LatencySeconds = -1.0; + double LatencySeconds = -1.0; + uint64_t MaxRangeCountPerRequest = 1; }; JupiterEndpointTestResult TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpVerbose); diff --git a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h index 66dfcc62d..c058e1c1f 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/buildsremoteprojectstore.h @@ -2,6 +2,7 @@ #pragma once +#include <zenhttp/httpclient.h> #include <zenremotestore/projectstore/remoteprojectstore.h> namespace zen { @@ -10,9 +11,6 @@ class AuthMgr; struct BuildsRemoteStoreOptions : RemoteStoreOptions { - std::string Host; - std::string OverrideHost; - std::string ZenHost; std::string Namespace; std::string Bucket; Oid BuildId; @@ -22,20 +20,16 @@ struct BuildsRemoteStoreOptions : RemoteStoreOptions std::filesystem::path OidcExePath; bool ForceDisableBlocks = false; bool ForceDisableTempBlocks = false; - bool AssumeHttp2 = false; - bool PopulateCache = true; IoBuffer MetaData; size_t MaximumInMemoryDownloadSize = 1024u * 1024u; }; -std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef InLog, - const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet, - bool Unattended, - bool Hidden, - WorkerThreadPool& CacheBackgroundWorkerPool, - double& OutHostLatencySec, - double& OutCacheLatencySec); +struct BuildStorageResolveResult; + +std::shared_ptr<RemoteProjectStore> CreateJupiterBuildsRemoteStore(LoggerRef InLog, + const BuildStorageResolveResult& ResolveResult, + std::function<HttpClientAccessToken()>&& TokenProvider, + const BuildsRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath); } // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 42786d0f2..084d975a2 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -5,6 +5,7 @@ #include <zencore/jobqueue.h> #include <zenstore/projectstore.h> +#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/chunking/chunkblock.h> #include <zenremotestore/partialblockrequestmode.h> @@ -79,11 +80,6 @@ public: std::vector<ChunkBlockDescription> Blocks; }; - struct AttachmentExistsInCacheResult : public Result - { - std::vector<bool> HasBody; - }; - struct LoadAttachmentRangesResult : public Result { IoBuffer Bytes; @@ -128,28 +124,17 @@ public: virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) = 0; virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Payloads) = 0; - virtual LoadContainerResult LoadContainer() = 0; - virtual GetKnownBlocksResult GetKnownBlocks() = 0; - virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) = 0; - virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) = 0; - - enum ESourceMode - { - kAny, - kCacheOnly, - kHostOnly - }; + virtual LoadContainerResult LoadContainer() = 0; + virtual GetKnownBlocksResult GetKnownBlocks() = 0; + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes, + BuildStorageCache* OptionalCache, + const Oid& CacheBuildId) = 0; - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode = ESourceMode::kAny) = 0; - - static constexpr size_t MaxRangeCountPerRequest = 128u; + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) = 0; virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, - std::span<const std::pair<uint64_t, uint64_t>> Ranges, - ESourceMode SourceMode = ESourceMode::kAny) = 0; - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode = ESourceMode::kAny) = 0; - - virtual void Flush() = 0; + std::span<const std::pair<uint64_t, uint64_t>> Ranges) = 0; + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) = 0; }; struct RemoteStoreOptions @@ -211,18 +196,29 @@ RemoteProjectStore::Result SaveOplog(CidStore& ChunkStore, bool IgnoreMissingAttachments, JobContext* OptionalContext); -RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - EPartialBlockRequestMode PartialBlockRequestMode, - double HostLatencySec, - double CacheLatencySec, - JobContext* OptionalContext); +struct LoadOplogContext +{ + CidStore& ChunkStore; + RemoteProjectStore& RemoteStore; + BuildStorageCache* OptionalCache = nullptr; + Oid CacheBuildId = Oid::Zero; + BuildStorageCache::Statistics* OptionalCacheStats = nullptr; + ProjectStore::Oplog& Oplog; + WorkerThreadPool& NetworkWorkerPool; + WorkerThreadPool& WorkerPool; + bool ForceDownload = false; + bool IgnoreMissingAttachments = false; + bool CleanOplog = false; + EPartialBlockRequestMode PartialBlockRequestMode = EPartialBlockRequestMode::All; + bool PopulateCache = false; + double StoreLatencySec = -1.0; + uint64_t StoreMaxRangeCountPerRequest = 1; + double CacheLatencySec = -1.0; + uint64_t CacheMaxRangeCountPerRequest = 1; + JobContext* OptionalJobContext = nullptr; +}; + +RemoteProjectStore::Result LoadOplog(LoadOplogContext&& Context); std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); diff --git a/src/zenremotestore/jupiter/jupiterhost.cpp b/src/zenremotestore/jupiter/jupiterhost.cpp index 2583cfc84..4479c8b97 100644 --- a/src/zenremotestore/jupiter/jupiterhost.cpp +++ b/src/zenremotestore/jupiter/jupiterhost.cpp @@ -59,13 +59,22 @@ TestJupiterEndpoint(std::string_view BaseUrl, const bool AssumeHttp2, const bool HttpClient::Response TestResponse = TestHttpClient.Get("/health/live"); if (TestResponse.IsSuccess()) { + // TODO: dan.engelbrecht 20260305 - replace this naive nginx detection with proper capabilites end point once it exists in Jupiter + uint64_t MaxRangeCountPerRequest = 1; + if (auto It = TestResponse.Header.Entries.find("Server"); It != TestResponse.Header.Entries.end()) + { + if (StrCaseCompare(It->second.c_str(), "nginx", 5) == 0) + { + MaxRangeCountPerRequest = 128u; + } + } LatencyTestResult LatencyResult = MeasureLatency(TestHttpClient, "/health/ready"); if (!LatencyResult.Success) { return {.Success = false, .FailureReason = LatencyResult.FailureReason}; } - return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds}; + return {.Success = true, .LatencySeconds = LatencyResult.LatencySeconds, .MaxRangeCountPerRequest = MaxRangeCountPerRequest}; } return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; } diff --git a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp index 3400cdbf5..2282a31dd 100644 --- a/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/buildsremoteprojectstore.cpp @@ -7,8 +7,6 @@ #include <zencore/fmtutils.h> #include <zencore/scopeguard.h> -#include <zenhttp/httpclientauth.h> -#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/buildstorageutil.h> #include <zenremotestore/builds/jupiterbuildstorage.h> #include <zenremotestore/operationlogoutput.h> @@ -26,18 +24,14 @@ class BuildsRemoteStore : public RemoteProjectStore public: BuildsRemoteStore(LoggerRef InLog, const HttpClientSettings& ClientSettings, - HttpClientSettings* OptionalCacheClientSettings, std::string_view HostUrl, - std::string_view CacheUrl, const std::filesystem::path& TempFilePath, - WorkerThreadPool& CacheBackgroundWorkerPool, std::string_view Namespace, std::string_view Bucket, const Oid& BuildId, const IoBuffer& MetaData, bool ForceDisableBlocks, - bool ForceDisableTempBlocks, - bool PopulateCache) + bool ForceDisableTempBlocks) : m_Log(InLog) , m_BuildStorageHttp(HostUrl, ClientSettings) , m_BuildStorage(CreateJupiterBuildStorage(Log(), @@ -53,20 +47,8 @@ public: , m_MetaData(MetaData) , m_EnableBlocks(!ForceDisableBlocks) , m_UseTempBlocks(!ForceDisableTempBlocks) - , m_PopulateCache(PopulateCache) { m_MetaData.MakeOwned(); - if (OptionalCacheClientSettings) - { - ZEN_ASSERT(!CacheUrl.empty()); - m_BuildCacheStorageHttp = std::make_unique<HttpClient>(CacheUrl, *OptionalCacheClientSettings); - m_BuildCacheStorage = CreateZenBuildStorageCache(*m_BuildCacheStorageHttp, - m_StorageCacheStats, - Namespace, - Bucket, - TempFilePath, - CacheBackgroundWorkerPool); - } } virtual RemoteStoreInfo GetInfo() const override @@ -75,9 +57,8 @@ public: .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_BuildId), - .Description = fmt::format("[cloud] {}{}. SessionId: {}. {}/{}/{}"sv, + .Description = fmt::format("[cloud] {}. SessionId: {}. {}/{}/{}"sv, m_BuildStorageHttp.GetBaseUri(), - m_BuildCacheStorage ? fmt::format(" (Cache: {})", m_BuildCacheStorageHttp->GetBaseUri()) : ""sv, m_BuildStorageHttp.GetSessionId(), m_Namespace, m_Bucket, @@ -86,15 +67,13 @@ public: virtual Stats GetStats() const override { - return { - .m_SentBytes = m_BuildStorageStats.TotalBytesWritten.load() + m_StorageCacheStats.TotalBytesWritten.load(), - .m_ReceivedBytes = m_BuildStorageStats.TotalBytesRead.load() + m_StorageCacheStats.TotalBytesRead.load(), - .m_RequestTimeNS = m_BuildStorageStats.TotalRequestTimeUs.load() * 1000 + m_StorageCacheStats.TotalRequestTimeUs.load() * 1000, - .m_RequestCount = m_BuildStorageStats.TotalRequestCount.load() + m_StorageCacheStats.TotalRequestCount.load(), - .m_PeakSentBytes = Max(m_BuildStorageStats.PeakSentBytes.load(), m_StorageCacheStats.PeakSentBytes.load()), - .m_PeakReceivedBytes = Max(m_BuildStorageStats.PeakReceivedBytes.load(), m_StorageCacheStats.PeakReceivedBytes.load()), - .m_PeakBytesPerSec = Max(m_BuildStorageStats.PeakBytesPerSec.load(), m_StorageCacheStats.PeakBytesPerSec.load()), - }; + return {.m_SentBytes = m_BuildStorageStats.TotalBytesWritten.load(), + .m_ReceivedBytes = m_BuildStorageStats.TotalBytesRead.load(), + .m_RequestTimeNS = m_BuildStorageStats.TotalRequestTimeUs.load() * 1000, + .m_RequestCount = m_BuildStorageStats.TotalRequestCount.load(), + .m_PeakSentBytes = m_BuildStorageStats.PeakSentBytes.load(), + .m_PeakReceivedBytes = m_BuildStorageStats.PeakReceivedBytes.load(), + .m_PeakBytesPerSec = m_BuildStorageStats.PeakBytesPerSec.load()}; } virtual bool GetExtendedStats(ExtendedStats& OutStats) const override @@ -109,11 +88,6 @@ public: } Result = true; } - if (m_BuildCacheStorage) - { - OutStats.m_ReceivedBytesPerSource.insert_or_assign("Cache", m_StorageCacheStats.TotalBytesRead); - Result = true; - } return Result; } @@ -462,11 +436,14 @@ public: return Result; } - virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes, + BuildStorageCache* OptionalCache, + const Oid& CacheBuildId) override { std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(Log())); ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); + ZEN_ASSERT(OptionalCache == nullptr || CacheBuildId == m_BuildId); GetBlockDescriptionsResult Result; Stopwatch Timer; @@ -476,7 +453,7 @@ public: { Result.Blocks = zen::GetBlockDescriptions(*Output, *m_BuildStorage, - m_BuildCacheStorage.get(), + OptionalCache, m_BuildId, BlockHashes, /*AttemptFallback*/ false, @@ -506,49 +483,7 @@ public: return Result; } - virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override - { - AttachmentExistsInCacheResult Result; - Stopwatch Timer; - auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); - try - { - const std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = - m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes); - - if (CacheExistsResult.size() == RawHashes.size()) - { - Result.HasBody.reserve(CacheExistsResult.size()); - for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++) - { - Result.HasBody.push_back(CacheExistsResult[BlobIndex].HasBody); - } - } - } - catch (const HttpClientError& Ex) - { - Result.ErrorCode = MakeErrorCode(Ex); - Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'", - m_BuildStorageHttp.GetBaseUri(), - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - catch (const std::exception& Ex) - { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("Remote cache: Failed finding known blobs for {}/{}/{}/{}. Reason: '{}'", - m_BuildStorageHttp.GetBaseUri(), - m_Namespace, - m_Bucket, - m_BuildId, - Ex.what()); - } - return Result; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { ZEN_ASSERT(m_OplogBuildPartId != Oid::Zero); @@ -558,25 +493,7 @@ public: try { - if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) - { - IoBuffer CachedBlob = m_BuildCacheStorage->GetBuildBlob(m_BuildId, RawHash); - if (CachedBlob) - { - Result.Bytes = std::move(CachedBlob); - } - } - if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly) - { - Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); - if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache) - { - m_BuildCacheStorage->PutBuildBlob(m_BuildId, - RawHash, - Result.Bytes.GetContentType(), - CompositeBuffer(SharedBuffer(Result.Bytes))); - } - } + Result.Bytes = m_BuildStorage->GetBuildBlob(m_BuildId, RawHash); } catch (const HttpClientError& Ex) { @@ -605,45 +522,20 @@ public: } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, - std::span<const std::pair<uint64_t, uint64_t>> Ranges, - ESourceMode SourceMode) override + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override { + ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; Stopwatch Timer; auto _ = MakeGuard([&Timer, &Result]() { Result.ElapsedSeconds = Timer.GetElapsedTimeUs() / 1000000.0; }); try { - if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) + BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); + if (BlobRanges.PayloadBuffer) { - BuildStorageCache::BuildBlobRanges BlobRanges = m_BuildCacheStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); - if (BlobRanges.PayloadBuffer) - { - Result.Bytes = std::move(BlobRanges.PayloadBuffer); - Result.Ranges = std::move(BlobRanges.Ranges); - } - } - if (!Result.Bytes && SourceMode != ESourceMode::kCacheOnly) - { - BuildStorageBase::BuildBlobRanges BlobRanges = m_BuildStorage->GetBuildBlobRanges(m_BuildId, RawHash, Ranges); - if (BlobRanges.PayloadBuffer) - { - Result.Bytes = std::move(BlobRanges.PayloadBuffer); - Result.Ranges = std::move(BlobRanges.Ranges); - - if (Result.Ranges.empty()) - { - // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3/Replicated - // Upload to cache (if enabled) - if (m_BuildCacheStorage && Result.Bytes && m_PopulateCache) - { - m_BuildCacheStorage->PutBuildBlob(m_BuildId, - RawHash, - Result.Bytes.GetContentType(), - CompositeBuffer(SharedBuffer(Result.Bytes))); - } - } - } + Result.Bytes = std::move(BlobRanges.PayloadBuffer); + Result.Ranges = std::move(BlobRanges.Ranges); } } catch (const HttpClientError& Ex) @@ -674,7 +566,7 @@ public: return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { LoadAttachmentsResult Result; Stopwatch Timer; @@ -682,67 +574,20 @@ public: std::vector<IoHash> AttachmentsLeftToFind = RawHashes; - if (m_BuildCacheStorage && SourceMode != ESourceMode::kHostOnly) - { - std::vector<BuildStorageCache::BlobExistsResult> ExistCheck = m_BuildCacheStorage->BlobsExists(m_BuildId, RawHashes); - if (ExistCheck.size() == RawHashes.size()) - { - AttachmentsLeftToFind.clear(); - for (size_t BlobIndex = 0; BlobIndex < RawHashes.size(); BlobIndex++) - { - const IoHash& Hash = RawHashes[BlobIndex]; - const BuildStorageCache::BlobExistsResult& BlobExists = ExistCheck[BlobIndex]; - if (BlobExists.HasBody) - { - IoBuffer CachedPayload = m_BuildCacheStorage->GetBuildBlob(m_BuildId, Hash); - if (CachedPayload) - { - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Hash, - CompressedBuffer::FromCompressedNoValidate(std::move(CachedPayload))}); - } - else - { - AttachmentsLeftToFind.push_back(Hash); - } - } - else - { - AttachmentsLeftToFind.push_back(Hash); - } - } - } - } - for (const IoHash& Hash : AttachmentsLeftToFind) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000))); - if (m_BuildCacheStorage && ChunkResult.Bytes && m_PopulateCache) - { - m_BuildCacheStorage->PutBuildBlob(m_BuildId, - Hash, - ChunkResult.Bytes.GetContentType(), - CompositeBuffer(SharedBuffer(ChunkResult.Bytes))); - } Result.Chunks.emplace_back( std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } - virtual void Flush() override - { - if (m_BuildCacheStorage) - { - m_BuildCacheStorage->Flush(100, [](intptr_t) { return false; }); - } - } - private: static int MakeErrorCode(const HttpClientError& Ex) { @@ -759,10 +604,6 @@ private: HttpClient m_BuildStorageHttp; std::unique_ptr<BuildStorageBase> m_BuildStorage; - BuildStorageCache::Statistics m_StorageCacheStats; - std::unique_ptr<HttpClient> m_BuildCacheStorageHttp; - std::unique_ptr<BuildStorageCache> m_BuildCacheStorage; - const std::string m_Namespace; const std::string m_Bucket; const Oid m_BuildId; @@ -771,125 +612,34 @@ private: const bool m_EnableBlocks = true; const bool m_UseTempBlocks = true; const bool m_AllowRedirect = false; - const bool m_PopulateCache = true; }; std::shared_ptr<RemoteProjectStore> -CreateJupiterBuildsRemoteStore(LoggerRef InLog, - const BuildsRemoteStoreOptions& Options, - const std::filesystem::path& TempFilePath, - bool Quiet, - bool Unattended, - bool Hidden, - WorkerThreadPool& CacheBackgroundWorkerPool, - double& OutHostLatencySec, - double& OutCacheLatencySec) +CreateJupiterBuildsRemoteStore(LoggerRef InLog, + const BuildStorageResolveResult& ResolveResult, + std::function<HttpClientAccessToken()>&& TokenProvider, + const BuildsRemoteStoreOptions& Options, + const std::filesystem::path& TempFilePath) { - std::string Host = Options.Host; - if (!Host.empty() && Host.find("://"sv) == std::string::npos) - { - // Assume https URL - Host = fmt::format("https://{}"sv, Host); - } - std::string OverrideUrl = Options.OverrideHost; - if (!OverrideUrl.empty() && OverrideUrl.find("://"sv) == std::string::npos) - { - // Assume https URL - OverrideUrl = fmt::format("https://{}"sv, OverrideUrl); - } - std::string ZenHost = Options.ZenHost; - if (!ZenHost.empty() && ZenHost.find("://"sv) == std::string::npos) - { - // Assume https URL - ZenHost = fmt::format("https://{}"sv, ZenHost); - } - - // 1) openid-provider if given (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - // 2) Access token as parameter in request - // 3) Environment variable (different win vs linux/mac) - // 4) Default openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider - - std::function<HttpClientAccessToken()> TokenProvider; - if (!Options.OpenIdProvider.empty()) - { - TokenProvider = httpclientauth::CreateFromOpenIdProvider(Options.AuthManager, Options.OpenIdProvider); - } - else if (!Options.AccessToken.empty()) - { - TokenProvider = httpclientauth::CreateFromStaticToken(Options.AccessToken); - } - else if (!Options.OidcExePath.empty()) - { - if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(Options.OidcExePath, - Host.empty() ? OverrideUrl : Host, - Quiet, - Unattended, - Hidden); - TokenProviderMaybe) - { - TokenProvider = TokenProviderMaybe.value(); - } - } - - if (!TokenProvider) - { - TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(Options.AuthManager); - } - - BuildStorageResolveResult ResolveRes; - { - HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", - .AccessTokenProvider = TokenProvider, - .AssumeHttp2 = Options.AssumeHttp2, - .AllowResume = true, - .RetryCount = 2}; - - std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(InLog)); - - ResolveRes = - ResolveBuildStorage(*Output, ClientSettings, Host, OverrideUrl, ZenHost, ZenCacheResolveMode::Discovery, /*Verbose*/ false); - } - HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", .ConnectTimeout = std::chrono::milliseconds(3000), .Timeout = std::chrono::milliseconds(1800000), .AccessTokenProvider = std::move(TokenProvider), - .AssumeHttp2 = ResolveRes.HostAssumeHttp2, + .AssumeHttp2 = ResolveResult.Cloud.AssumeHttp2, .AllowResume = true, .RetryCount = 4, .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize}; - std::unique_ptr<HttpClientSettings> CacheClientSettings; - - if (!ResolveRes.CacheUrl.empty()) - { - CacheClientSettings = - std::make_unique<HttpClientSettings>(HttpClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{3000}, - .Timeout = std::chrono::milliseconds{30000}, - .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, - .AllowResume = true, - .RetryCount = 0, - .MaximumInMemoryDownloadSize = Options.MaximumInMemoryDownloadSize}); - } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<BuildsRemoteStore>(InLog, ClientSettings, - CacheClientSettings.get(), - ResolveRes.HostUrl, - ResolveRes.CacheUrl, + ResolveResult.Cloud.Address, TempFilePath, - CacheBackgroundWorkerPool, Options.Namespace, Options.Bucket, Options.BuildId, Options.MetaData, Options.ForceDisableBlocks, - Options.ForceDisableTempBlocks, - Options.PopulateCache); - - OutHostLatencySec = ResolveRes.HostLatencySec; - OutCacheLatencySec = ResolveRes.CacheLatencySec; + Options.ForceDisableTempBlocks); return RemoteStore; } diff --git a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp index f950fd46c..bb21de12c 100644 --- a/src/zenremotestore/projectstore/fileremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/fileremoteprojectstore.cpp @@ -7,8 +7,12 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/builds/buildstoragecache.h> + +#include <numeric> namespace zen { @@ -74,9 +78,11 @@ public: virtual SaveResult SaveContainer(const IoBuffer& Payload) override { - Stopwatch Timer; SaveResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); + { CbObject ContainerObject = LoadCompactBinaryObject(Payload); @@ -87,6 +93,10 @@ public: { Result.Needs.insert(AttachmentHash); } + else if (std::filesystem::path AttachmentMetaPath = GetAttachmentMetaPath(AttachmentHash); IsFile(AttachmentMetaPath)) + { + BasicFile TouchIt(AttachmentMetaPath, BasicFile::Mode::kWrite); + } }); } @@ -112,14 +122,18 @@ public: Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } - virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override + virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, + const IoHash& RawHash, + ChunkBlockDescription&& BlockDescription) override { - Stopwatch Timer; - SaveAttachmentResult Result; + SaveAttachmentResult Result; + + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { @@ -142,14 +156,33 @@ public: Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what()); } } + if (!Result.ErrorCode && BlockDescription.BlockHash != IoHash::Zero) + { + try + { + std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash); + CbObject MetaData = BuildChunkBlockDescription(BlockDescription, {}); + SharedBuffer MetaBuffer = MetaData.GetBuffer(); + BasicFile MetaFile; + MetaFile.Open(MetaPath, BasicFile::Mode::kTruncate); + MetaFile.Write(MetaBuffer.GetView(), 0); + } + catch (const std::exception& Ex) + { + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("Failed saving block description to '{}'. Reason: {}", RawHash, Ex.what()); + } + } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override { + SaveAttachmentsResult Result; + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); for (const SharedBuffer& Chunk : Chunks) { @@ -157,12 +190,10 @@ public: SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { - ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return SaveAttachmentsResult{ChunkResult}; + Result = SaveAttachmentsResult{ChunkResult}; + break; } } - SaveAttachmentsResult Result; - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } @@ -172,21 +203,60 @@ public: virtual GetKnownBlocksResult GetKnownBlocks() override { + Stopwatch Timer; if (m_OptionalBaseName.empty()) { - return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; + size_t MaxBlockCount = 10000; + + GetKnownBlocksResult Result; + + DirectoryContent Content; + GetDirectoryContent( + m_OutputPath, + DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeModificationTick, + Content); + std::vector<size_t> RecentOrder(Content.Files.size()); + std::iota(RecentOrder.begin(), RecentOrder.end(), 0u); + std::sort(RecentOrder.begin(), RecentOrder.end(), [&Content](size_t Lhs, size_t Rhs) { + return Content.FileModificationTicks[Lhs] > Content.FileModificationTicks[Rhs]; + }); + + for (size_t FileIndex : RecentOrder) + { + std::filesystem::path MetaPath = Content.Files[FileIndex]; + if (MetaPath.extension() == MetaExtension) + { + IoBuffer MetaFile = ReadFile(MetaPath).Flatten(); + CbValidateError Err; + CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err); + if (Err == CbValidateError::None) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject); + if (Description.BlockHash != IoHash::Zero) + { + Result.Blocks.emplace_back(std::move(Description)); + if (Result.Blocks.size() == MaxBlockCount) + { + break; + } + } + } + } + } + + Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + return Result; } LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName); if (LoadResult.ErrorCode) { return GetKnownBlocksResult{LoadResult}; } - Stopwatch Timer; std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); if (BlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), - .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; } std::vector<IoHash> ExistingBlockHashes; for (const IoHash& RawHash : BlockHashes) @@ -200,15 +270,15 @@ public: if (ExistingBlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent), - .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; + .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; } std::vector<ThinChunkBlockDescription> ThinKnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); - const size_t KnowBlockCount = ThinKnownBlocks.size(); + const size_t KnownBlockCount = ThinKnownBlocks.size(); - GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; - Result.Blocks.resize(KnowBlockCount); - for (size_t BlockIndex = 0; BlockIndex < KnowBlockCount; BlockIndex++) + GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; + Result.Blocks.resize(KnownBlockCount); + for (size_t BlockIndex = 0; BlockIndex < KnownBlockCount; BlockIndex++) { Result.Blocks[BlockIndex].BlockHash = ThinKnownBlocks[BlockIndex].BlockHash; Result.Blocks[BlockIndex].ChunkRawHashes = std::move(ThinKnownBlocks[BlockIndex].ChunkRawHashes); @@ -217,87 +287,141 @@ public: return Result; } - virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes, + BuildStorageCache* OptionalCache, + const Oid& CacheBuildId) override { - ZEN_UNUSED(BlockHashes); - return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; - } + GetBlockDescriptionsResult Result; - virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override - { - return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; - } + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override - { - Stopwatch Timer; - LoadAttachmentResult Result; - if (SourceMode != ESourceMode::kCacheOnly) + Result.Blocks.reserve(BlockHashes.size()); + + uint64_t ByteCount = 0; + + std::vector<ChunkBlockDescription> UnorderedList; { - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!IsFile(ChunkPath)) + if (OptionalCache) { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = - fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; + std::vector<CbObject> CacheBlockMetadatas = OptionalCache->GetBlobMetadatas(CacheBuildId, BlockHashes); + for (const CbObject& BlockObject : CacheBlockMetadatas) + { + ByteCount += BlockObject.GetSize(); + } + UnorderedList = ParseBlockMetadatas(CacheBlockMetadatas); } + + tsl::robin_map<IoHash, size_t, IoHash::Hasher> BlockDescriptionLookup; + BlockDescriptionLookup.reserve(BlockHashes.size()); + for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) { - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - Result.Bytes = ChunkFile.ReadAll(); + const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); + } + + if (UnorderedList.size() < BlockHashes.size()) + { + for (const IoHash& RawHash : BlockHashes) + { + if (!BlockDescriptionLookup.contains(RawHash)) + { + std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash); + IoBuffer MetaFile = ReadFile(MetaPath).Flatten(); + ByteCount += MetaFile.GetSize(); + CbValidateError Err; + CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err); + if (Err == CbValidateError::None) + { + ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject); + if (Description.BlockHash != IoHash::Zero) + { + BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size()); + UnorderedList.emplace_back(std::move(Description)); + } + } + } + } + } + + Result.Blocks.reserve(UnorderedList.size()); + for (const IoHash& RawHash : BlockHashes) + { + if (auto It = BlockDescriptionLookup.find(RawHash); It != BlockDescriptionLookup.end()) + { + Result.Blocks.emplace_back(std::move(UnorderedList[It->second])); + } } } + AddStats(0, ByteCount, Timer.GetElapsedTimeUs() * 1000); + return Result; + } + + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override + { + LoadAttachmentResult Result; + + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); + + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!IsFile(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); + return Result; + } + { + BasicFile ChunkFile; + ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); + Result.Bytes = ChunkFile.ReadAll(); + } AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, - std::span<const std::pair<uint64_t, uint64_t>> Ranges, - ESourceMode SourceMode) override + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override { - Stopwatch Timer; + ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); - if (!IsFile(ChunkPath)) - { - Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); - Result.Reason = - fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - return Result; - } - { - BasicFile ChunkFile; - ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); - uint64_t Start = Ranges.front().first; - uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first; + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); - Result.Bytes = ChunkFile.ReadRange(Start, Length); - Result.Ranges.reserve(Ranges.size()); - for (const std::pair<uint64_t, uint64_t>& Range : Ranges) - { - Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second)); - } + std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); + if (!IsFile(ChunkPath)) + { + Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); + Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); + return Result; + } + { + uint64_t Start = Ranges.front().first; + uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first; + Result.Bytes = IoBufferBuilder::MakeFromFile(ChunkPath, Start, Length); + Result.Ranges.reserve(Ranges.size()); + for (const std::pair<uint64_t, uint64_t>& Range : Ranges) + { + Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second)); } } - AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + AddStats(0, + std::accumulate(Result.Ranges.begin(), + Result.Ranges.end(), + uint64_t(0), + [](uint64_t Current, const std::pair<uint64_t, uint64_t>& Value) { return Current + Value.second; }), + Timer.GetElapsedTimeUs() * 1000); return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { Stopwatch Timer; LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; @@ -310,20 +434,20 @@ public: return Result; } - virtual void Flush() override {} - private: LoadContainerResult LoadContainer(const std::string& Name) { - Stopwatch Timer; - LoadContainerResult Result; + LoadContainerResult Result; + + Stopwatch Timer; + auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); + std::filesystem::path SourcePath = m_OutputPath; SourcePath.append(Name); if (!IsFile(SourcePath)) { Result.ErrorCode = gsl::narrow<int>(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } IoBuffer ContainerPayload; @@ -337,18 +461,16 @@ private: if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult); ValidateResult != CbValidateError::None || !Result.ContainerObject) { - Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); - Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')", - SourcePath.string(), - ToString(ValidateResult)); - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; + Result.ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError); + Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')", + SourcePath.string(), + ToString(ValidateResult)); return Result; } - Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } - std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const + std::filesystem::path GetAttachmentBasePath(const IoHash& RawHash) const { ExtendablePathBuilder<128> ShardedPath; ShardedPath.Append(m_OutputPath.c_str()); @@ -367,6 +489,19 @@ private: return ShardedPath.ToPath(); } + static constexpr std::string_view BlobExtension = ".blob"; + static constexpr std::string_view MetaExtension = ".meta"; + + std::filesystem::path GetAttachmentPath(const IoHash& RawHash) + { + return GetAttachmentBasePath(RawHash).replace_extension(BlobExtension); + } + + std::filesystem::path GetAttachmentMetaPath(const IoHash& RawHash) + { + return GetAttachmentBasePath(RawHash).replace_extension(MetaExtension); + } + void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS) { m_SentBytes.fetch_add(UploadedBytes); diff --git a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp index 514484f30..5b456cb4c 100644 --- a/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/jupiterremoteprojectstore.cpp @@ -212,73 +212,64 @@ public: return Result; } - virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes, + BuildStorageCache* OptionalCache, + const Oid& CacheBuildId) override { - ZEN_UNUSED(BlockHashes); + ZEN_UNUSED(BlockHashes, OptionalCache, CacheBuildId); return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; } - virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override - { - return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { LoadAttachmentResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - AddStats(GetResult); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + AddStats(GetResult); - Result = {ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - RawHash, - Result.Reason); - } + Result = {ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); } return Result; } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, - std::span<const std::pair<uint64_t, uint64_t>> Ranges, - ESourceMode SourceMode) override + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override { + ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); - JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); - AddStats(GetResult); + JupiterSession Session(m_JupiterClient->Logger(), m_JupiterClient->Client(), m_AllowRedirect); + JupiterResult GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath); + AddStats(GetResult); - Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)}; - if (GetResult.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", - m_JupiterClient->ServiceUrl(), - m_Namespace, - RawHash, - Result.Reason); - } - else - { - Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); - } + Result = LoadAttachmentRangesResult{ConvertResult(GetResult), std::move(GetResult.Response)}; + if (GetResult.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'", + m_JupiterClient->ServiceUrl(), + m_Namespace, + RawHash, + Result.Reason); + } + else + { + Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); } return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { - LoadAttachmentResult ChunkResult = LoadAttachment(Hash, SourceMode); + LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { return LoadAttachmentsResult{ChunkResult}; @@ -290,8 +281,6 @@ public: return Result; } - virtual void Flush() override {} - private: LoadContainerResult LoadContainer(const IoHash& Key) { diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp index becac3d4c..36dc4d868 100644 --- a/src/zenremotestore/projectstore/projectstoreoperations.cpp +++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp @@ -426,19 +426,19 @@ ProjectStoreOperationDownloadAttachments::Execute() auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) { IoBuffer Payload; - if (m_Storage.BuildCacheStorage) + if (m_Storage.CacheStorage) { - Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); + Payload = m_Storage.CacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); } if (!Payload) { Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); - if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) + if (m_Storage.CacheStorage && m_Options.PopulateCache) { - m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(), - RawHash, - Payload.GetContentType(), - CompositeBuffer(SharedBuffer(Payload))); + m_Storage.CacheStorage->PutBuildBlob(m_State.GetBuildId(), + RawHash, + Payload.GetContentType(), + CompositeBuffer(SharedBuffer(Payload))); } } uint64_t PayloadSize = Payload.GetSize(); diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index c8c5f201d..d5c6286a8 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/chunking/chunkedcontent.h> #include <zenremotestore/chunking/chunkedfile.h> #include <zenremotestore/operationlogoutput.h> @@ -124,14 +125,17 @@ namespace remotestore_impl { return OptionalContext->IsCancelled(); } - std::string GetStats(const RemoteProjectStore::Stats& Stats, uint64_t ElapsedWallTimeMS) + std::string GetStats(const RemoteProjectStore::Stats& Stats, + const BuildStorageCache::Statistics* OptionalCacheStats, + uint64_t ElapsedWallTimeMS) { - return fmt::format( - "Sent: {} ({}bits/s) Recv: {} ({}bits/s)", - NiceBytes(Stats.m_SentBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), - NiceBytes(Stats.m_ReceivedBytes), - NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((Stats.m_ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); + uint64_t SentBytes = Stats.m_SentBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesWritten.load() : 0); + uint64_t ReceivedBytes = Stats.m_ReceivedBytes + (OptionalCacheStats ? OptionalCacheStats->TotalBytesRead.load() : 0); + return fmt::format("Sent: {} ({}bits/s) Recv: {} ({}bits/s)", + NiceBytes(SentBytes), + NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((SentBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u), + NiceBytes(ReceivedBytes), + NiceNum(ElapsedWallTimeMS > 0u ? static_cast<uint64_t>((ReceivedBytes * 8 * 1000) / ElapsedWallTimeMS) : 0u)); } void LogRemoteStoreStatsDetails(const RemoteProjectStore::Stats& Stats) @@ -269,12 +273,7 @@ namespace remotestore_impl { JobContext* m_OptionalContext; }; - void DownloadAndSaveBlockChunks(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, + void DownloadAndSaveBlockChunks(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, AsyncRemoteResult& RemoteResult, @@ -285,10 +284,8 @@ namespace remotestore_impl { std::vector<uint32_t>&& NeededChunkIndexes) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, + Context.NetworkWorkerPool.ScheduleWork( + [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, @@ -296,9 +293,7 @@ namespace remotestore_impl { NeededChunkIndexes = std::move(NeededChunkIndexes), &Info, &LoadAttachmentsTimer, - &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext]() { + &DownloadStartMS]() { ZEN_TRACE_CPU("DownloadBlockChunks"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -317,16 +312,16 @@ namespace remotestore_impl { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentsResult Result = RemoteStore.LoadAttachments(Chunks); + RemoteProjectStore::LoadAttachmentsResult Result = Context.RemoteStore.LoadAttachments(Chunks); if (Result.ErrorCode) { - ReportMessage(OptionalContext, + ReportMessage(Context.OptionalJobContext, fmt::format("Failed to load attachments with {} chunks ({}): {}", Chunks.size(), RemoteResult.GetError(), RemoteResult.GetErrorReason())); Info.MissingAttachmentCount.fetch_add(1); - if (IgnoreMissingAttachments) + if (Context.IgnoreMissingAttachments) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } @@ -338,7 +333,7 @@ namespace remotestore_impl { uint64_t ChunkSize = It.second.GetCompressedSize(); Info.AttachmentBytesDownloaded.fetch_add(ChunkSize); } - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded {} bulk attachments in {}", Chunks.size(), NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000)))); @@ -347,8 +342,8 @@ namespace remotestore_impl { return; } AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, &RemoteResult, &Info, &ChunkStore, Chunks = std::move(Result.Chunks)]() { + Context.WorkerPool.ScheduleWork( + [&AttachmentsWriteLatch, &RemoteResult, &Info, &Context, Chunks = std::move(Result.Chunks)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -369,7 +364,9 @@ namespace remotestore_impl { WriteRawHashes.push_back(It.first); } std::vector<CidStore::InsertResult> InsertResults = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes, CidStore::InsertMode::kCopyOnly); + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, + WriteRawHashes, + CidStore::InsertMode::kCopyOnly); for (size_t Index = 0; Index < InsertResults.size(); Index++) { @@ -400,12 +397,7 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, + void DownloadAndSaveBlock(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, AsyncRemoteResult& RemoteResult, @@ -418,19 +410,14 @@ namespace remotestore_impl { uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( + Context.NetworkWorkerPool.ScheduleWork( [&AttachmentsDownloadLatch, &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, + &Context, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, RetriesLeft, BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, @@ -446,52 +433,65 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult BlockResult = RemoteStore.LoadAttachment(BlockHash); - if (BlockResult.ErrorCode) + + IoBuffer BlobBuffer; + if (Context.OptionalCache) { - ReportMessage(OptionalContext, - fmt::format("Failed to download block attachment {} ({}): {}", - BlockHash, - RemoteResult.GetError(), - RemoteResult.GetErrorReason())); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) - { - RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - } - return; + BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, BlockHash); } - if (RemoteResult.IsError()) + + if (!BlobBuffer) { - return; + RemoteProjectStore::LoadAttachmentResult BlockResult = Context.RemoteStore.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode) + { + ReportMessage(Context.OptionalJobContext, + fmt::format("Failed to download block attachment {} ({}): {}", + BlockHash, + BlockResult.Reason, + BlockResult.Text)); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); + } + return; + } + if (RemoteResult.IsError()) + { + return; + } + BlobBuffer = std::move(BlockResult.Bytes); + ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", + BlockHash, + NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), + NiceBytes(BlobBuffer.Size())); + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + BlockHash, + BlobBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlobBuffer))); + } } - uint64_t BlockSize = BlockResult.Bytes.GetSize(); + uint64_t BlockSize = BlobBuffer.GetSize(); Info.AttachmentBlocksDownloaded.fetch_add(1); - ZEN_DEBUG("Loaded block attachment '{}' in {} ({})", - BlockHash, - NiceTimeSpanMs(static_cast<uint64_t>(BlockResult.ElapsedSeconds * 1000)), - NiceBytes(BlockSize)); Info.AttachmentBlockBytesDownloaded.fetch_add(BlockSize); AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( + Context.WorkerPool.ScheduleWork( [&AttachmentsDownloadLatch, &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, + &Context, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, RetriesLeft, BlockHash = IoHash(BlockHash), &AllNeededPartialChunkHashesLookup, ChunkDownloadedFlags, - Bytes = std::move(BlockResult.Bytes)]() { + Bytes = std::move(BlobBuffer)]() { auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); if (RemoteResult.IsError()) { @@ -569,7 +569,7 @@ namespace remotestore_impl { if (!WriteAttachmentBuffers.empty()) { std::vector<CidStore::InsertResult> Results = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < Results.size(); Index++) { const CidStore::InsertResult& Result = Results[Index]; @@ -598,14 +598,9 @@ namespace remotestore_impl { { if (RetriesLeft > 0) { - ReportMessage(OptionalContext, fmt::format("{}, retrying download", ErrorString)); - - return DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); + + return DownloadAndSaveBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -619,7 +614,7 @@ namespace remotestore_impl { } else { - ReportMessage(OptionalContext, ErrorString); + ReportMessage(Context.OptionalJobContext, ErrorString); RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), ErrorString, {}); return; } @@ -637,28 +632,29 @@ namespace remotestore_impl { catch (const std::exception& Ex) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::InternalServerError), - fmt::format("Failed to block attachment {}", BlockHash), + fmt::format("Failed to download block attachment {}", BlockHash), Ex.what()); } }, WorkerThreadPool::EMode::EnableBacklog); }; - bool DownloadPartialBlock(RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - AsyncRemoteResult& RemoteResult, - DownloadInfo& Info, - double& DownloadTimeSeconds, - const ChunkBlockDescription& BlockDescription, - bool BlockExistsInCache, - std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, - size_t BlockRangeIndexStart, - size_t BlockRangeCount, + void DownloadPartialBlock(LoadOplogContext& Context, + AsyncRemoteResult& RemoteResult, + DownloadInfo& Info, + double& DownloadTimeSeconds, + const ChunkBlockDescription& BlockDescription, + bool BlockExistsInCache, + std::span<const ChunkBlockAnalyser::BlockRangeDescriptor> BlockRangeDescriptors, + size_t BlockRangeIndexStart, + size_t BlockRangeCount, std::function<void(IoBuffer&& Buffer, size_t BlockRangeStartIndex, std::span<const std::pair<uint64_t, uint64_t>> OffsetAndLengths)>&& OnDownloaded) { + ZEN_ASSERT(Context.StoreMaxRangeCountPerRequest != 0); + ZEN_ASSERT(BlockExistsInCache == false || Context.CacheMaxRangeCountPerRequest != 0); + std::vector<std::pair<uint64_t, uint64_t>> Ranges; Ranges.reserve(BlockRangeDescriptors.size()); for (size_t BlockRangeIndex = BlockRangeIndexStart; BlockRangeIndex < BlockRangeIndexStart + BlockRangeCount; BlockRangeIndex++) @@ -667,65 +663,104 @@ namespace remotestore_impl { Ranges.push_back(std::make_pair(BlockRange.RangeStart, BlockRange.RangeLength)); } - if (BlockExistsInCache) - { - RemoteProjectStore::LoadAttachmentRangesResult BlockResult = - RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, Ranges, RemoteProjectStore::ESourceMode::kCacheOnly); - DownloadTimeSeconds += BlockResult.ElapsedSeconds; - if (RemoteResult.IsError()) - { - return false; - } - if (!BlockResult.ErrorCode && BlockResult.Bytes) - { - if (BlockResult.Ranges.size() != Ranges.size()) - { - throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", - Ranges.size(), - BlockDescription.BlockHash, - BlockResult.Ranges.size())); - } - OnDownloaded(std::move(BlockResult.Bytes), BlockRangeIndexStart, BlockResult.Ranges); - return true; - } - } - - const size_t MaxRangesPerRequestToJupiter = RemoteProjectStore::MaxRangeCountPerRequest; - size_t SubBlockRangeCount = BlockRangeCount; size_t SubRangeCountComplete = 0; std::span<const std::pair<uint64_t, uint64_t>> RangesSpan(Ranges); + while (SubRangeCountComplete < SubBlockRangeCount) { if (RemoteResult.IsError()) { break; } - size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, MaxRangesPerRequestToJupiter); + size_t SubRangeStartIndex = BlockRangeIndexStart + SubRangeCountComplete; + if (BlockExistsInCache) + { + ZEN_ASSERT(Context.OptionalCache); + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.CacheMaxRangeCountPerRequest); + + if (SubRangeCount == 1) + { + // Legacy single-range path, prefer that for max compatibility + + const std::pair<uint64_t, uint64_t> SubRange = RangesSpan[SubRangeCountComplete]; + Stopwatch CacheTimer; + IoBuffer PayloadBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, + BlockDescription.BlockHash, + SubRange.first, + SubRange.second); + DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; + if (RemoteResult.IsError()) + { + break; + } + if (PayloadBuffer) + { + OnDownloaded(std::move(PayloadBuffer), + SubRangeStartIndex, + std::vector<std::pair<uint64_t, uint64_t>>{std::make_pair(0u, SubRange.second)}); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + else + { + auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); + + Stopwatch CacheTimer; + BuildStorageCache::BuildBlobRanges RangeBuffers = + Context.OptionalCache->GetBuildBlobRanges(Context.CacheBuildId, BlockDescription.BlockHash, SubRanges); + DownloadTimeSeconds += CacheTimer.GetElapsedTimeMs() / 1000.0; + if (RemoteResult.IsError()) + { + break; + } + if (RangeBuffers.PayloadBuffer) + { + if (RangeBuffers.Ranges.empty()) + { + SubRangeCount = Ranges.size() - SubRangeCountComplete; + OnDownloaded(std::move(RangeBuffers.PayloadBuffer), + SubRangeStartIndex, + RangesSpan.subspan(SubRangeCountComplete, SubRangeCount)); + SubRangeCountComplete += SubRangeCount; + continue; + } + else if (RangeBuffers.Ranges.size() == SubRangeCount) + { + OnDownloaded(std::move(RangeBuffers.PayloadBuffer), SubRangeStartIndex, RangeBuffers.Ranges); + SubRangeCountComplete += SubRangeCount; + continue; + } + } + } + } + + size_t SubRangeCount = Min(BlockRangeCount - SubRangeCountComplete, Context.StoreMaxRangeCountPerRequest); auto SubRanges = RangesSpan.subspan(SubRangeCountComplete, SubRangeCount); RemoteProjectStore::LoadAttachmentRangesResult BlockResult = - RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges, RemoteProjectStore::ESourceMode::kHostOnly); + Context.RemoteStore.LoadAttachmentRanges(BlockDescription.BlockHash, SubRanges); DownloadTimeSeconds += BlockResult.ElapsedSeconds; if (RemoteResult.IsError()) { - return false; + break; } if (BlockResult.ErrorCode || !BlockResult.Bytes) { - ReportMessage(OptionalContext, + ReportMessage(Context.OptionalJobContext, fmt::format("Failed to download {} ranges from block attachment '{}' ({}): {}", SubRanges.size(), BlockDescription.BlockHash, BlockResult.ErrorCode, BlockResult.Reason)); Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError(BlockResult.ErrorCode, BlockResult.Reason, BlockResult.Text); - return false; + break; } } else @@ -734,6 +769,18 @@ namespace remotestore_impl { { // Jupiter will ignore the ranges and send the whole payload if it fetches the payload from S3 // Use the whole payload for the remaining ranges + + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + BlockDescription.BlockHash, + ZenContentType::kCompressedBinary, + CompositeBuffer(std::vector<IoBuffer>{BlockResult.Bytes})); + if (RemoteResult.IsError()) + { + break; + } + } SubRangeCount = Ranges.size() - SubRangeCountComplete; OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, @@ -743,10 +790,13 @@ namespace remotestore_impl { { if (BlockResult.Ranges.size() != SubRanges.size()) { - throw std::runtime_error(fmt::format("Fetching {} ranges from {} resulted in {} ranges", - SubRanges.size(), - BlockDescription.BlockHash, - BlockResult.Ranges.size())); + RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::InternalServerError), + fmt::format("Range response for block {} contains {} ranges, expected {} ranges", + BlockDescription.BlockHash, + BlockResult.Ranges.size(), + SubRanges.size()), + ""); + break; } OnDownloaded(std::move(BlockResult.Bytes), SubRangeStartIndex, BlockResult.Ranges); } @@ -754,15 +804,9 @@ namespace remotestore_impl { SubRangeCountComplete += SubRangeCount; } - return true; } - void DownloadAndSavePartialBlock(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, + void DownloadAndSavePartialBlock(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, AsyncRemoteResult& RemoteResult, @@ -779,19 +823,14 @@ namespace remotestore_impl { uint32_t RetriesLeft) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( + Context.NetworkWorkerPool.ScheduleWork( [&AttachmentsDownloadLatch, &AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, + &Context, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, @@ -811,10 +850,8 @@ namespace remotestore_impl { double DownloadElapsedSeconds = 0; uint64_t DownloadedBytes = 0; - bool Success = DownloadPartialBlock( - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, + DownloadPartialBlock( + Context, RemoteResult, Info, DownloadElapsedSeconds, @@ -833,19 +870,14 @@ namespace remotestore_impl { Info.AttachmentBlocksRangesDownloaded++; AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( + Context.WorkerPool.ScheduleWork( [&AttachmentsWriteLatch, - &ChunkStore, - &RemoteStore, - &NetworkWorkerPool, - &WorkerPool, + &Context, &AttachmentsDownloadLatch, &RemoteResult, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - IgnoreMissingAttachments, - OptionalContext, BlockDescription, BlockExistsInCache, BlockRangeDescriptors, @@ -945,14 +977,9 @@ namespace remotestore_impl { { if (RetriesLeft > 0) { - ReportMessage(OptionalContext, + ReportMessage(Context.OptionalJobContext, fmt::format("{}, retrying download", ErrorString)); - return DownloadAndSavePartialBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + return DownloadAndSavePartialBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -969,9 +996,9 @@ namespace remotestore_impl { RetriesLeft - 1); } - ReportMessage(OptionalContext, ErrorString); + ReportMessage(Context.OptionalJobContext, ErrorString); Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError(gsl::narrow<int32_t>(HttpResponseCode::NotFound), "Malformed chunk block", @@ -998,7 +1025,7 @@ namespace remotestore_impl { if (!WriteAttachmentBuffers.empty()) { std::vector<CidStore::InsertResult> Results = - ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); + Context.ChunkStore.AddChunks(WriteAttachmentBuffers, WriteRawHashes); for (size_t Index = 0; Index < Results.size(); Index++) { const CidStore::InsertResult& Result = Results[Index]; @@ -1037,7 +1064,7 @@ namespace remotestore_impl { }, WorkerThreadPool::EMode::EnableBacklog); }); - if (Success) + if (!RemoteResult.IsError()) { ZEN_DEBUG("Loaded {} ranges from block attachment '{}' in {} ({})", BlockRangeCount, @@ -1056,12 +1083,7 @@ namespace remotestore_impl { WorkerThreadPool::EMode::EnableBacklog); }; - void DownloadAndSaveAttachment(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - bool IgnoreMissingAttachments, - JobContext* OptionalContext, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, + void DownloadAndSaveAttachment(LoadOplogContext& Context, Latch& AttachmentsDownloadLatch, Latch& AttachmentsWriteLatch, AsyncRemoteResult& RemoteResult, @@ -1071,19 +1093,15 @@ namespace remotestore_impl { const IoHash& RawHash) { AttachmentsDownloadLatch.AddCount(1); - NetworkWorkerPool.ScheduleWork( - [&RemoteStore, - &ChunkStore, - &WorkerPool, + Context.NetworkWorkerPool.ScheduleWork( + [&Context, &RemoteResult, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, RawHash, &LoadAttachmentsTimer, &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext]() { + &Info]() { ZEN_TRACE_CPU("DownloadAttachment"); auto _ = MakeGuard([&AttachmentsDownloadLatch] { AttachmentsDownloadLatch.CountDown(); }); @@ -1095,43 +1113,52 @@ namespace remotestore_impl { { uint64_t Unset = (std::uint64_t)-1; DownloadStartMS.compare_exchange_strong(Unset, LoadAttachmentsTimer.GetElapsedTimeMs()); - RemoteProjectStore::LoadAttachmentResult AttachmentResult = RemoteStore.LoadAttachment(RawHash); - if (AttachmentResult.ErrorCode) + IoBuffer BlobBuffer; + if (Context.OptionalCache) { - ReportMessage(OptionalContext, - fmt::format("Failed to download large attachment {}: '{}', error code : {}", - RawHash, - AttachmentResult.Reason, - AttachmentResult.ErrorCode)); - Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + BlobBuffer = Context.OptionalCache->GetBuildBlob(Context.CacheBuildId, RawHash); + } + if (!BlobBuffer) + { + RemoteProjectStore::LoadAttachmentResult AttachmentResult = Context.RemoteStore.LoadAttachment(RawHash); + if (AttachmentResult.ErrorCode) { - RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + ReportMessage(Context.OptionalJobContext, + fmt::format("Failed to download large attachment {}: '{}', error code : {}", + RawHash, + AttachmentResult.Reason, + AttachmentResult.ErrorCode)); + Info.MissingAttachmentCount.fetch_add(1); + if (!Context.IgnoreMissingAttachments) + { + RemoteResult.SetError(AttachmentResult.ErrorCode, AttachmentResult.Reason, AttachmentResult.Text); + } + return; + } + BlobBuffer = std::move(AttachmentResult.Bytes); + ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", + RawHash, + NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), + NiceBytes(BlobBuffer.GetSize())); + if (Context.OptionalCache && Context.PopulateCache) + { + Context.OptionalCache->PutBuildBlob(Context.CacheBuildId, + RawHash, + BlobBuffer.GetContentType(), + CompositeBuffer(SharedBuffer(BlobBuffer))); } - return; } - uint64_t AttachmentSize = AttachmentResult.Bytes.GetSize(); - ZEN_DEBUG("Loaded large attachment '{}' in {} ({})", - RawHash, - NiceTimeSpanMs(static_cast<uint64_t>(AttachmentResult.ElapsedSeconds * 1000)), - NiceBytes(AttachmentSize)); - Info.AttachmentsDownloaded.fetch_add(1); if (RemoteResult.IsError()) { return; } + uint64_t AttachmentSize = BlobBuffer.GetSize(); + Info.AttachmentsDownloaded.fetch_add(1); Info.AttachmentBytesDownloaded.fetch_add(AttachmentSize); AttachmentsWriteLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&AttachmentsWriteLatch, - &RemoteResult, - &Info, - &ChunkStore, - RawHash, - AttachmentSize, - Bytes = std::move(AttachmentResult.Bytes), - OptionalContext]() { + Context.WorkerPool.ScheduleWork( + [&Context, &AttachmentsWriteLatch, &RemoteResult, &Info, RawHash, AttachmentSize, Bytes = std::move(BlobBuffer)]() { ZEN_TRACE_CPU("WriteAttachment"); auto _ = MakeGuard([&AttachmentsWriteLatch] { AttachmentsWriteLatch.CountDown(); }); @@ -1141,7 +1168,7 @@ namespace remotestore_impl { } try { - CidStore::InsertResult InsertResult = ChunkStore.AddChunk(Bytes, RawHash); + CidStore::InsertResult InsertResult = Context.ChunkStore.AddChunk(Bytes, RawHash); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(AttachmentSize); @@ -1557,7 +1584,9 @@ namespace remotestore_impl { uint64_t PartialTransferWallTimeMS = Timer.GetElapsedTimeMs(); ReportProgress(OptionalContext, "Saving attachments"sv, - fmt::format("{} remaining... {}", Remaining, GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), + fmt::format("{} remaining... {}", + Remaining, + GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, PartialTransferWallTimeMS)), AttachmentsToSave, Remaining); } @@ -1566,7 +1595,7 @@ namespace remotestore_impl { { ReportProgress(OptionalContext, "Saving attachments"sv, - fmt::format("{}", GetStats(RemoteStore.GetStats(), ElapsedTimeMS)), + fmt::format("{}", GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS)), AttachmentsToSave, 0); } @@ -1577,7 +1606,7 @@ namespace remotestore_impl { LargeAttachmentCountToUpload, BulkAttachmentCountToUpload, NiceTimeSpanMs(ElapsedTimeMS), - GetStats(RemoteStore.GetStats(), ElapsedTimeMS))); + GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, ElapsedTimeMS))); } } // namespace remotestore_impl @@ -2186,31 +2215,36 @@ BuildContainer(CidStore& ChunkStore, } ResolveAttachmentsLatch.CountDown(); - while (!ResolveAttachmentsLatch.Wait(1000)) { - ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + ptrdiff_t AttachmentCountToUseForProgress = ResolveAttachmentsLatch.Remaining(); + while (!ResolveAttachmentsLatch.Wait(1000)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); - while (!ResolveAttachmentsLatch.Wait(1000)) + ptrdiff_t Remaining = ResolveAttachmentsLatch.Remaining(); + if (remotestore_impl::IsCancelled(OptionalContext)) { - Remaining = ResolveAttachmentsLatch.Remaining(); - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("Aborting, {} attachments remaining...", Remaining), - UploadAttachments.size(), - Remaining); + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); + while (!ResolveAttachmentsLatch.Wait(1000)) + { + Remaining = ResolveAttachmentsLatch.Remaining(); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("Aborting, {} attachments remaining...", Remaining), + UploadAttachments.size(), + Remaining); + } + remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); + return {}; } - remotestore_impl::ReportProgress(OptionalContext, "Resolving attachments"sv, "Aborted"sv, UploadAttachments.size(), 0); - return {}; + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress(OptionalContext, + "Resolving attachments"sv, + fmt::format("{} remaining...", Remaining), + AttachmentCountToUseForProgress, + Remaining); } - remotestore_impl::ReportProgress(OptionalContext, - "Resolving attachments"sv, - fmt::format("{} remaining...", Remaining), - UploadAttachments.size(), - Remaining); } if (UploadAttachments.size() > 0) { @@ -2598,12 +2632,14 @@ BuildContainer(CidStore& ChunkStore, 0); } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and in {}", - ChunkAssembleCount, - TotalOpCount, - GeneratedBlockCount, - NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Built oplog and collected {} attachments from {} ops into {} blocks and {} loose attachments in {}", + ChunkAssembleCount, + TotalOpCount, + GeneratedBlockCount, + LargeChunkHashes.size(), + NiceTimeSpanMs(static_cast<uint64_t>(Timer.GetElapsedTimeMs())))); if (remotestore_impl::IsCancelled(OptionalContext)) { @@ -3155,17 +3191,18 @@ SaveOplog(CidStore& ChunkStore, remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", - RemoteStoreInfo.ContainerName, - RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksUploaded.load(), - NiceBytes(Info.AttachmentBlockBytesUploaded.load()), - Info.AttachmentsUploaded.load(), - NiceBytes(Info.AttachmentBytesUploaded.load()), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::ReportMessage( + OptionalContext, + fmt::format("Saved oplog '{}' {} in {} ({}), Blocks: {} ({}), Attachments: {} ({}) {}", + RemoteStoreInfo.ContainerName, + RemoteResult.GetError() == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksUploaded.load(), + NiceBytes(Info.AttachmentBlockBytesUploaded.load()), + Info.AttachmentsUploaded.load(), + NiceBytes(Info.AttachmentBytesUploaded.load()), + remotestore_impl::GetStats(RemoteStore.GetStats(), /*OptionalCacheStats*/ nullptr, TransferWallTimeMS))); return Result; }; @@ -3234,6 +3271,11 @@ ParseOplogContainer( OpCount - OpsCompleteCount); } } + remotestore_impl::ReportProgress(OptionalContext, + "Scanning oplog"sv, + fmt::format("{} attachments found", NeededAttachments.size()), + OpCount, + OpCount - OpsCompleteCount); } { std::vector<IoHash> ReferencedAttachments(NeededAttachments.begin(), NeededAttachments.end()); @@ -3406,22 +3448,11 @@ SaveOplogContainer( } RemoteProjectStore::Result -LoadOplog(CidStore& ChunkStore, - RemoteProjectStore& RemoteStore, - ProjectStore::Oplog& Oplog, - WorkerThreadPool& NetworkWorkerPool, - WorkerThreadPool& WorkerPool, - bool ForceDownload, - bool IgnoreMissingAttachments, - bool CleanOplog, - EPartialBlockRequestMode PartialBlockRequestMode, - double HostLatencySec, - double CacheLatencySec, - JobContext* OptionalContext) +LoadOplog(LoadOplogContext&& Context) { using namespace std::literals; - std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(OptionalContext)); + std::unique_ptr<OperationLogOutput> LogOutput(std::make_unique<remotestore_impl::JobContextLogOutput>(Context.OptionalJobContext)); remotestore_impl::DownloadInfo Info; @@ -3430,25 +3461,25 @@ LoadOplog(CidStore& ChunkStore, std::unordered_set<IoHash, IoHash::Hasher> Attachments; uint64_t BlockCountToDownload = 0; - RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = RemoteStore.GetInfo(); - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); + RemoteProjectStore::RemoteStoreInfo RemoteStoreInfo = Context.RemoteStore.GetInfo(); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loading oplog container '{}'", RemoteStoreInfo.ContainerName)); uint64_t TransferWallTimeMS = 0; Stopwatch LoadContainerTimer; - RemoteProjectStore::LoadContainerResult LoadContainerResult = RemoteStore.LoadContainer(); + RemoteProjectStore::LoadContainerResult LoadContainerResult = Context.RemoteStore.LoadContainer(); TransferWallTimeMS += LoadContainerTimer.GetElapsedTimeMs(); if (LoadContainerResult.ErrorCode) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Failed to load oplog container: '{}', error code: {}", LoadContainerResult.Reason, LoadContainerResult.ErrorCode)); return RemoteProjectStore::Result{.ErrorCode = LoadContainerResult.ErrorCode, .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, .Reason = LoadContainerResult.Reason, .Text = LoadContainerResult.Text}; } - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Loaded container in {} ({})", NiceTimeSpanMs(static_cast<uint64_t>(LoadContainerResult.ElapsedSeconds * 1000)), NiceBytes(LoadContainerResult.ContainerObject.GetSize()))); @@ -3462,12 +3493,12 @@ LoadOplog(CidStore& ChunkStore, Stopwatch LoadAttachmentsTimer; std::atomic_uint64_t DownloadStartMS = (std::uint64_t)-1; - auto HasAttachment = [&Oplog, &ChunkStore, ForceDownload](const IoHash& RawHash) { - if (ForceDownload) + auto HasAttachment = [&Context](const IoHash& RawHash) { + if (Context.ForceDownload) { return false; } - if (ChunkStore.ContainsChunk(RawHash)) + if (Context.ChunkStore.ContainsChunk(RawHash)) { return true; } @@ -3482,10 +3513,7 @@ LoadOplog(CidStore& ChunkStore, std::vector<NeededBlockDownload> NeededBlockDownloads; - auto OnNeedBlock = [&RemoteStore, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, + auto OnNeedBlock = [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &AttachmentCount, @@ -3494,9 +3522,8 @@ LoadOplog(CidStore& ChunkStore, &Info, &LoadAttachmentsTimer, &DownloadStartMS, - &NeededBlockDownloads, - IgnoreMissingAttachments, - OptionalContext](ThinChunkBlockDescription&& ThinBlockDescription, std::vector<uint32_t>&& NeededChunkIndexes) { + &NeededBlockDownloads](ThinChunkBlockDescription&& ThinBlockDescription, + std::vector<uint32_t>&& NeededChunkIndexes) { if (RemoteResult.IsError()) { return; @@ -3506,12 +3533,7 @@ LoadOplog(CidStore& ChunkStore, AttachmentCount.fetch_add(1); if (ThinBlockDescription.BlockHash == IoHash::Zero) { - DownloadAndSaveBlockChunks(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSaveBlockChunks(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3528,11 +3550,7 @@ LoadOplog(CidStore& ChunkStore, } }; - auto OnNeedAttachment = [&RemoteStore, - &Oplog, - &ChunkStore, - &NetworkWorkerPool, - &WorkerPool, + auto OnNeedAttachment = [&Context, &AttachmentsDownloadLatch, &AttachmentsWriteLatch, &RemoteResult, @@ -3540,9 +3558,7 @@ LoadOplog(CidStore& ChunkStore, &AttachmentCount, &LoadAttachmentsTimer, &DownloadStartMS, - &Info, - IgnoreMissingAttachments, - OptionalContext](const IoHash& RawHash) { + &Info](const IoHash& RawHash) { if (!Attachments.insert(RawHash).second) { return; @@ -3552,12 +3568,7 @@ LoadOplog(CidStore& ChunkStore, return; } AttachmentCount.fetch_add(1); - DownloadAndSaveAttachment(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSaveAttachment(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3570,11 +3581,11 @@ LoadOplog(CidStore& ChunkStore, std::vector<ChunkedInfo> FilesToDechunk; auto OnChunkedAttachment = [&FilesToDechunk](const ChunkedInfo& Chunked) { FilesToDechunk.push_back(Chunked); }; - auto OnReferencedAttachments = [&Oplog](std::span<IoHash> RawHashes) { Oplog.CaptureAddedAttachments(RawHashes); }; + auto OnReferencedAttachments = [&Context](std::span<IoHash> RawHashes) { Context.Oplog.CaptureAddedAttachments(RawHashes); }; // Make sure we retain any attachments we download before writing the oplog - Oplog.EnableUpdateCapture(); - auto _ = MakeGuard([&Oplog]() { Oplog.DisableUpdateCapture(); }); + Context.Oplog.EnableUpdateCapture(); + auto _ = MakeGuard([&Context]() { Context.Oplog.DisableUpdateCapture(); }); CbObject OplogSection; RemoteProjectStore::Result Result = ParseOplogContainer(LoadContainerResult.ContainerObject, @@ -3584,12 +3595,12 @@ LoadOplog(CidStore& ChunkStore, OnNeedAttachment, OnChunkedAttachment, OplogSection, - OptionalContext); + Context.OptionalJobContext); if (Result.ErrorCode != 0) { RemoteResult.SetError(Result.ErrorCode, Result.Reason, Result.Text); } - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Parsed oplog in {}, found {} attachments, {} blocks and {} chunked files to download", NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), Attachments.size(), @@ -3613,11 +3624,12 @@ LoadOplog(CidStore& ChunkStore, std::vector<bool> DownloadedViaLegacyChunkFlag(AllNeededChunkHashes.size(), false); ChunkBlockAnalyser::BlockResult PartialBlocksResult; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Fetching descriptions for {} blocks", BlockHashes.size())); - RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = RemoteStore.GetBlockDescriptions(BlockHashes); + RemoteProjectStore::GetBlockDescriptionsResult BlockDescriptions = + Context.RemoteStore.GetBlockDescriptions(BlockHashes, Context.OptionalCache, Context.CacheBuildId); - remotestore_impl::ReportMessage(OptionalContext, + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("GetBlockDescriptions took {}. Found {} blocks", NiceTimeSpanMs(uint64_t(BlockDescriptions.ElapsedSeconds * 1000)), BlockDescriptions.Blocks.size())); @@ -3636,12 +3648,7 @@ LoadOplog(CidStore& ChunkStore, if (FindIt == BlockDescriptions.Blocks.end()) { // Fall back to full download as we can't get enough information about the block - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSaveBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3678,56 +3685,86 @@ LoadOplog(CidStore& ChunkStore, if (!AllNeededChunkHashes.empty()) { std::vector<ChunkBlockAnalyser::EPartialBlockDownloadMode> PartialBlockDownloadModes; - std::vector<bool> BlockExistsInCache; + std::vector<bool> BlockExistsInCache(BlocksWithDescription.size(), false); - if (PartialBlockRequestMode == EPartialBlockRequestMode::Off) + if (Context.PartialBlockRequestMode == EPartialBlockRequestMode::Off) { PartialBlockDownloadModes.resize(BlocksWithDescription.size(), ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); } else { - RemoteProjectStore::AttachmentExistsInCacheResult CacheExistsResult = - RemoteStore.AttachmentExistsInCache(BlocksWithDescription); - if (CacheExistsResult.ErrorCode != 0 || CacheExistsResult.HasBody.size() != BlocksWithDescription.size()) + if (Context.OptionalCache) { - BlockExistsInCache.resize(BlocksWithDescription.size(), false); + std::vector<BuildStorageCache::BlobExistsResult> CacheExistsResult = + Context.OptionalCache->BlobsExists(Context.CacheBuildId, BlocksWithDescription); + if (CacheExistsResult.size() == BlocksWithDescription.size()) + { + for (size_t BlobIndex = 0; BlobIndex < CacheExistsResult.size(); BlobIndex++) + { + BlockExistsInCache[BlobIndex] = CacheExistsResult[BlobIndex].HasBody; + } + } + uint64_t FoundBlocks = + std::accumulate(BlockExistsInCache.begin(), + BlockExistsInCache.end(), + uint64_t(0u), + [](uint64_t Current, bool Exists) -> uint64_t { return Current + (Exists ? 1 : 0); }); + if (FoundBlocks > 0) + { + remotestore_impl::ReportMessage( + Context.OptionalJobContext, + fmt::format("Found {} out of {} blocks in cache", FoundBlocks, BlockExistsInCache.size())); + } } - else + + ChunkBlockAnalyser::EPartialBlockDownloadMode CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + ChunkBlockAnalyser::EPartialBlockDownloadMode CachePartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + + switch (Context.PartialBlockRequestMode) { - BlockExistsInCache = std::move(CacheExistsResult.HasBody); + case EPartialBlockRequestMode::Off: + break; + case EPartialBlockRequestMode::ZenCacheOnly: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::Off; + break; + case EPartialBlockRequestMode::Mixed: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; + case EPartialBlockRequestMode::All: + CachePartialDownloadMode = Context.CacheMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed + : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange; + CloudPartialDownloadMode = Context.StoreMaxRangeCountPerRequest > 1 + ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange + : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange; + break; } PartialBlockDownloadModes.reserve(BlocksWithDescription.size()); - - for (bool ExistsInCache : BlockExistsInCache) + for (uint32_t BlockIndex = 0; BlockIndex < BlocksWithDescription.size(); BlockIndex++) { - if (PartialBlockRequestMode == EPartialBlockRequestMode::All) - { - PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact - : ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRange); - } - else if (PartialBlockRequestMode == EPartialBlockRequestMode::ZenCacheOnly) - { - PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::Exact - : ChunkBlockAnalyser::EPartialBlockDownloadMode::Off); - } - else if (PartialBlockRequestMode == EPartialBlockRequestMode::Mixed) - { - PartialBlockDownloadModes.push_back(ExistsInCache ? ChunkBlockAnalyser::EPartialBlockDownloadMode::MultiRangeHighSpeed - : ChunkBlockAnalyser::EPartialBlockDownloadMode::SingleRange); - } + const bool BlockExistInCache = BlockExistsInCache[BlockIndex]; + PartialBlockDownloadModes.push_back(BlockExistInCache ? CachePartialDownloadMode : CloudPartialDownloadMode); } } ZEN_ASSERT(PartialBlockDownloadModes.size() == BlocksWithDescription.size()); + ChunkBlockAnalyser PartialAnalyser( *LogOutput, BlockDescriptions.Blocks, - ChunkBlockAnalyser::Options{.IsQuiet = false, - .IsVerbose = false, - .HostLatencySec = HostLatencySec, - .HostHighSpeedLatencySec = CacheLatencySec, - .HostMaxRangeCountPerRequest = RemoteProjectStore::MaxRangeCountPerRequest}); + ChunkBlockAnalyser::Options{.IsQuiet = false, + .IsVerbose = false, + .HostLatencySec = Context.StoreLatencySec, + .HostHighSpeedLatencySec = Context.CacheLatencySec, + .HostMaxRangeCountPerRequest = Context.StoreMaxRangeCountPerRequest, + .HostHighSpeedMaxRangeCountPerRequest = Context.CacheMaxRangeCountPerRequest}); std::vector<ChunkBlockAnalyser::NeededBlock> NeededBlocks = PartialAnalyser.GetNeeded(AllNeededPartialChunkHashesLookup, @@ -3736,12 +3773,7 @@ LoadOplog(CidStore& ChunkStore, PartialBlocksResult = PartialAnalyser.CalculatePartialBlockDownloads(NeededBlocks, PartialBlockDownloadModes); for (uint32_t FullBlockIndex : PartialBlocksResult.FullBlockIndexes) { - DownloadAndSaveBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSaveBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3765,12 +3797,7 @@ LoadOplog(CidStore& ChunkStore, RangeCount++; } - DownloadAndSavePartialBlock(ChunkStore, - RemoteStore, - IgnoreMissingAttachments, - OptionalContext, - NetworkWorkerPool, - WorkerPool, + DownloadAndSavePartialBlock(Context, AttachmentsDownloadLatch, AttachmentsWriteLatch, RemoteResult, @@ -3791,38 +3818,44 @@ LoadOplog(CidStore& ChunkStore, } AttachmentsDownloadLatch.CountDown(); - while (!AttachmentsDownloadLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + ptrdiff_t AttachmentCountToUseForProgress = AttachmentsDownloadLatch.Remaining(); + while (!AttachmentsDownloadLatch.Wait(1000)) { - if (!RemoteResult.IsError()) + ptrdiff_t Remaining = AttachmentsDownloadLatch.Remaining(); + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } + } + uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; + if (DownloadStartMS != (uint64_t)-1) + { + PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); } - } - uint64_t PartialTransferWallTimeMS = TransferWallTimeMS; - if (DownloadStartMS != (uint64_t)-1) - { - PartialTransferWallTimeMS += LoadAttachmentsTimer.GetElapsedTimeMs() - DownloadStartMS.load(); - } - - uint64_t AttachmentsDownloaded = - Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); - uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + - Info.AttachmentBytesDownloaded.load(); - remotestore_impl::ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{} ({}) downloaded, {} ({}) stored, {} remaining. {}", - AttachmentsDownloaded, - NiceBytes(AttachmentBytesDownloaded), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Remaining, - remotestore_impl::GetStats(RemoteStore.GetStats(), PartialTransferWallTimeMS)), - AttachmentCount.load(), - Remaining); + uint64_t AttachmentsDownloaded = + Info.AttachmentBlocksDownloaded.load() + Info.AttachmentBlocksRangesDownloaded.load() + Info.AttachmentsDownloaded.load(); + uint64_t AttachmentBytesDownloaded = Info.AttachmentBlockBytesDownloaded.load() + + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); + + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress( + Context.OptionalJobContext, + "Loading attachments"sv, + fmt::format( + "{} ({}) downloaded, {} ({}) stored, {} remaining. {}", + AttachmentsDownloaded, + NiceBytes(AttachmentBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Remaining, + remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, PartialTransferWallTimeMS)), + AttachmentCountToUseForProgress, + Remaining); + } } if (DownloadStartMS != (uint64_t)-1) { @@ -3831,58 +3864,58 @@ LoadOplog(CidStore& ChunkStore, if (AttachmentCount.load() > 0) { - remotestore_impl::ReportProgress(OptionalContext, - "Loading attachments"sv, - fmt::format("{}", remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS)), - AttachmentCount.load(), - 0); + remotestore_impl::ReportProgress( + Context.OptionalJobContext, + "Loading attachments"sv, + fmt::format("{}", remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS)), + AttachmentCount.load(), + 0); } AttachmentsWriteLatch.CountDown(); - while (!AttachmentsWriteLatch.Wait(1000)) { - ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + ptrdiff_t AttachmentCountToUseForProgress = AttachmentsWriteLatch.Remaining(); + while (!AttachmentsWriteLatch.Wait(1000)) { - if (!RemoteResult.IsError()) + ptrdiff_t Remaining = AttachmentsWriteLatch.Remaining(); + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { - RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + if (!RemoteResult.IsError()) + { + RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); + } } + AttachmentCountToUseForProgress = Max(Remaining, AttachmentCountToUseForProgress); + remotestore_impl::ReportProgress(Context.OptionalJobContext, + "Writing attachments"sv, + fmt::format("{} ({}), {} remaining.", + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Remaining), + AttachmentCountToUseForProgress, + Remaining); } - remotestore_impl::ReportProgress( - OptionalContext, - "Writing attachments"sv, - fmt::format("{} ({}), {} remaining.", Info.AttachmentsStored.load(), NiceBytes(Info.AttachmentBytesStored.load()), Remaining), - AttachmentCount.load(), - Remaining); } if (AttachmentCount.load() > 0) { - remotestore_impl::ReportProgress(OptionalContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Writing attachments", ""sv, AttachmentCount.load(), 0); } if (Result.ErrorCode == 0) { if (!FilesToDechunk.empty()) { - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, fmt::format("Dechunking {} attachments", FilesToDechunk.size())); Latch DechunkLatch(1); - std::filesystem::path TempFilePath = Oplog.TempPath(); + std::filesystem::path TempFilePath = Context.Oplog.TempPath(); for (const ChunkedInfo& Chunked : FilesToDechunk) { std::filesystem::path TempFileName = TempFilePath / Chunked.RawHash.ToHexString(); DechunkLatch.AddCount(1); - WorkerPool.ScheduleWork( - [&ChunkStore, - &DechunkLatch, - TempFileName, - &Chunked, - &RemoteResult, - IgnoreMissingAttachments, - &Info, - OptionalContext]() { + Context.WorkerPool.ScheduleWork( + [&Context, &DechunkLatch, TempFileName, &Chunked, &RemoteResult, &Info]() { ZEN_TRACE_CPU("DechunkAttachment"); auto _ = MakeGuard([&DechunkLatch, &TempFileName] { @@ -3916,16 +3949,16 @@ LoadOplog(CidStore& ChunkStore, for (std::uint32_t SequenceIndex : Chunked.ChunkSequence) { const IoHash& ChunkHash = Chunked.ChunkHashes[SequenceIndex]; - IoBuffer Chunk = ChunkStore.FindChunkByCid(ChunkHash); + IoBuffer Chunk = Context.ChunkStore.FindChunkByCid(ChunkHash); if (!Chunk) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Missing chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -3943,7 +3976,7 @@ LoadOplog(CidStore& ChunkStore, if (RawHash != ChunkHash) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Mismatching raw hash {} for chunk {} for chunked attachment {}", RawHash, ChunkHash, @@ -3951,7 +3984,7 @@ LoadOplog(CidStore& ChunkStore, // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -3988,14 +4021,14 @@ LoadOplog(CidStore& ChunkStore, })) { remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Failed to decompress chunk {} for chunked attachment {}", ChunkHash, Chunked.RawHash)); // We only add 1 as the resulting missing count will be 1 for the dechunked file Info.MissingAttachmentCount.fetch_add(1); - if (!IgnoreMissingAttachments) + if (!Context.IgnoreMissingAttachments) { RemoteResult.SetError( gsl::narrow<int>(HttpResponseCode::NotFound), @@ -4019,18 +4052,17 @@ LoadOplog(CidStore& ChunkStore, } uint64_t TmpBufferSize = TmpBuffer.GetSize(); CidStore::InsertResult InsertResult = - ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); + Context.ChunkStore.AddChunk(TmpBuffer, Chunked.RawHash, CidStore::InsertMode::kMayBeMovedInPlace); if (InsertResult.New) { Info.AttachmentBytesStored.fetch_add(TmpBufferSize); Info.AttachmentsStored.fetch_add(1); } - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Dechunked attachment {} ({}) in {}", - Chunked.RawHash, - NiceBytes(Chunked.RawSize), - NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + ZEN_INFO("Dechunked attachment {} ({}) in {}", + Chunked.RawHash, + NiceBytes(Chunked.RawSize), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } catch (const std::exception& Ex) { @@ -4046,54 +4078,58 @@ LoadOplog(CidStore& ChunkStore, while (!DechunkLatch.Wait(1000)) { ptrdiff_t Remaining = DechunkLatch.Remaining(); - if (remotestore_impl::IsCancelled(OptionalContext)) + if (remotestore_impl::IsCancelled(Context.OptionalJobContext)) { if (!RemoteResult.IsError()) { RemoteResult.SetError(gsl::narrow<int>(HttpResponseCode::OK), "Operation cancelled", ""); remotestore_impl::ReportMessage( - OptionalContext, + Context.OptionalJobContext, fmt::format("Aborting ({}): {}", RemoteResult.GetError(), RemoteResult.GetErrorReason())); } } - remotestore_impl::ReportProgress(OptionalContext, + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, fmt::format("{} remaining...", Remaining), FilesToDechunk.size(), Remaining); } - remotestore_impl::ReportProgress(OptionalContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); + remotestore_impl::ReportProgress(Context.OptionalJobContext, "Dechunking attachments"sv, ""sv, FilesToDechunk.size(), 0); } Result = RemoteResult.ConvertResult(); } if (Result.ErrorCode == 0) { - if (CleanOplog) + if (Context.CleanOplog) { - RemoteStore.Flush(); - if (!Oplog.Reset()) + if (Context.OptionalCache) + { + Context.OptionalCache->Flush(100, [](intptr_t) { return /*DontWaitForPendingOperation*/ false; }); + } + if (!Context.Oplog.Reset()) { Result = RemoteProjectStore::Result{.ErrorCode = gsl::narrow<int>(HttpResponseCode::InternalServerError), .ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0, - .Reason = fmt::format("Failed to clean existing oplog '{}'", Oplog.OplogId())}; - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); + .Reason = fmt::format("Failed to clean existing oplog '{}'", Context.Oplog.OplogId())}; + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Aborting ({}): {}", Result.ErrorCode, Result.Reason)); } } if (Result.ErrorCode == 0) { - remotestore_impl::WriteOplogSection(Oplog, OplogSection, OptionalContext); + remotestore_impl::WriteOplogSection(Context.Oplog, OplogSection, Context.OptionalJobContext); } } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; - remotestore_impl::LogRemoteStoreStatsDetails(RemoteStore.GetStats()); + remotestore_impl::LogRemoteStoreStatsDetails(Context.RemoteStore.GetStats()); { std::string DownloadDetails; RemoteProjectStore::ExtendedStats ExtendedStats; - if (RemoteStore.GetExtendedStats(ExtendedStats)) + if (Context.RemoteStore.GetExtendedStats(ExtendedStats)) { if (!ExtendedStats.m_ReceivedBytesPerSource.empty()) { @@ -4112,7 +4148,8 @@ LoadOplog(CidStore& ChunkStore, Total += It.second; } - remotestore_impl::ReportMessage(OptionalContext, fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView())); + remotestore_impl::ReportMessage(Context.OptionalJobContext, + fmt::format("Downloaded {} ({})", NiceBytes(Total), SB.ToView())); } } } @@ -4122,25 +4159,26 @@ LoadOplog(CidStore& ChunkStore, uint64_t TotalBytesDownloaded = Info.OplogSizeBytes + Info.AttachmentBlockBytesDownloaded.load() + Info.AttachmentBlockRangeBytesDownloaded.load() + Info.AttachmentBytesDownloaded.load(); - remotestore_impl::ReportMessage(OptionalContext, - fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " - "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", - RemoteStoreInfo.ContainerName, - Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", - NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), - NiceBytes(Info.OplogSizeBytes), - Info.AttachmentBlocksDownloaded.load(), - NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), - Info.AttachmentBlocksRangesDownloaded.load(), - NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), - Info.AttachmentsDownloaded.load(), - NiceBytes(Info.AttachmentBytesDownloaded.load()), - TotalDownloads, - NiceBytes(TotalBytesDownloaded), - Info.AttachmentsStored.load(), - NiceBytes(Info.AttachmentBytesStored.load()), - Info.MissingAttachmentCount.load(), - remotestore_impl::GetStats(RemoteStore.GetStats(), TransferWallTimeMS))); + remotestore_impl::ReportMessage( + Context.OptionalJobContext, + fmt::format("Loaded oplog '{}' {} in {} ({}), Blocks: {} ({}), BlockRanges: {} ({}), Attachments: {} " + "({}), Total: {} ({}), Stored: {} ({}), Missing: {} {}", + RemoteStoreInfo.ContainerName, + Result.ErrorCode == 0 ? "SUCCESS" : "FAILURE", + NiceTimeSpanMs(static_cast<uint64_t>(Result.ElapsedSeconds * 1000.0)), + NiceBytes(Info.OplogSizeBytes), + Info.AttachmentBlocksDownloaded.load(), + NiceBytes(Info.AttachmentBlockBytesDownloaded.load()), + Info.AttachmentBlocksRangesDownloaded.load(), + NiceBytes(Info.AttachmentBlockRangeBytesDownloaded.load()), + Info.AttachmentsDownloaded.load(), + NiceBytes(Info.AttachmentBytesDownloaded.load()), + TotalDownloads, + NiceBytes(TotalBytesDownloaded), + Info.AttachmentsStored.load(), + NiceBytes(Info.AttachmentBytesStored.load()), + Info.MissingAttachmentCount.load(), + remotestore_impl::GetStats(Context.RemoteStore.GetStats(), Context.OptionalCacheStats, TransferWallTimeMS))); return Result; } @@ -4237,6 +4275,26 @@ namespace projectstore_testutils { return Result; } + class TestJobContext : public JobContext + { + public: + explicit TestJobContext(int& OpIndex) : m_OpIndex(OpIndex) {} + virtual bool IsCancelled() const { return false; } + virtual void ReportMessage(std::string_view Message) { ZEN_INFO("Job {}: {}", m_OpIndex, Message); } + virtual void ReportProgress(std::string_view CurrentOp, std::string_view Details, ptrdiff_t TotalCount, ptrdiff_t RemainingCount) + { + ZEN_INFO("Job {}: Op '{}'{} {}/{}", + m_OpIndex, + CurrentOp, + Details.empty() ? "" : fmt::format(" {}", Details), + TotalCount - RemainingCount, + TotalCount); + } + + private: + int& m_OpIndex; + }; + } // namespace projectstore_testutils TEST_SUITE_BEGIN("remotestore.projectstore"); @@ -4334,66 +4392,708 @@ TEST_CASE_TEMPLATE("project.store.export", false, nullptr); - CHECK(ExportResult.ErrorCode == 0); + REQUIRE(ExportResult.ErrorCode == 0); Ref<ProjectStore::Oplog> OplogImport = Project->NewOplog("oplog2", {}); CHECK(OplogImport); - RemoteProjectStore::Result ImportResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - EPartialBlockRequestMode::Mixed, - /*HostLatencySec*/ -1.0, - /*CacheLatencySec*/ -1.0, - nullptr); + int OpJobIndex = 0; + TestJobContext OpJobContext(OpJobIndex); + + RemoteProjectStore::Result ImportResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ false, - EPartialBlockRequestMode::Mixed, - /*HostLatencySec*/ -1.0, - /*CacheLatencySec*/ -1.0, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportForceResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportForceResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ false, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - EPartialBlockRequestMode::Mixed, - /*HostLatencySec*/ -1.0, - /*CacheLatencySec*/ -1.0, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportCleanResult = LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportCleanResult.ErrorCode == 0); - - RemoteProjectStore::Result ImportForceCleanResult = LoadOplog(CidStore, - *RemoteStore, - *OplogImport, - NetworkPool, - WorkerPool, - /*Force*/ true, - /*IgnoreMissingAttachments*/ false, - /*CleanOplog*/ true, - EPartialBlockRequestMode::Mixed, - /*HostLatencySec*/ -1.0, - /*CacheLatencySec*/ -1.0, - nullptr); + OpJobIndex++; + + RemoteProjectStore::Result ImportForceCleanResult = + LoadOplog(LoadOplogContext{.ChunkStore = CidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = Oid::Zero, + .Oplog = *OplogImport, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = true, + .IgnoreMissingAttachments = false, + .CleanOplog = true, + .PartialBlockRequestMode = EPartialBlockRequestMode::Mixed, + .OptionalJobContext = &OpJobContext}); CHECK(ImportForceCleanResult.ErrorCode == 0); + OpJobIndex++; +} + +// Common oplog setup used by the two tests below. +// Returns a FileRemoteStore backed by ExportDir that has been populated with a SaveOplog call. +// Keeps the test data identical to project.store.export so the two test suites exercise the same blocks/attachments. +static RemoteProjectStore::Result +SetupExportStore(CidStore& CidStore, + ProjectStore::Project& Project, + WorkerThreadPool& NetworkPool, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& ExportDir, + std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +{ + using namespace projectstore_testutils; + using namespace std::literals; + + Ref<ProjectStore::Oplog> Oplog = Project.NewOplog("oplog_export", {}); + if (!Oplog) + { + return RemoteProjectStore::Result{.ErrorCode = -1}; + } + + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), {})); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{77}))); + Oplog->AppendNewOplogEntry( + CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{7123, 583, 690, 99}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage(Oid::NewOid(), CreateAttachments(std::initializer_list<size_t>{55, 122}))); + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{256u * 1024u, 92u * 1024u}, OodleCompressionLevel::None))); + + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 64u * 1024, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = 32 * 1024u, + .ChunkFileSizeLimit = 64u * 1024u}, + /*.FolderPath =*/ExportDir, + /*.Name =*/std::string("oplog_export"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + + OutRemoteStore = CreateFileRemoteStore(Log(), Options); + return SaveOplog(CidStore, + *OutRemoteStore, + Project, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); +} + +// Creates an export store with a single oplog entry that packs six 512 KB chunks into one +// ~3 MB block (MaxBlockSize = 8 MB). The resulting block slack (~1.5 MB) far exceeds the +// 512 KB threshold that ChunkBlockAnalyser requires before it will consider partial-block +// downloads instead of full-block downloads. +// +// This function is self-contained: it creates its own GcManager, CidStore, ProjectStore and +// Project internally so that each call is independent of any outer test context. After +// SaveOplog returns, all persistent data lives on disk inside ExportDir and the caller can +// freely query OutRemoteStore without holding any references to the internal context. +static RemoteProjectStore::Result +SetupPartialBlockExportStore(WorkerThreadPool& NetworkPool, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& ExportDir, + std::shared_ptr<RemoteProjectStore>& OutRemoteStore) +{ + using namespace projectstore_testutils; + using namespace std::literals; + + // Self-contained CAS and project store. Subdirectories of ExportDir keep everything + // together without relying on the outer TEST_CASE's ExportCidStore / ExportProject. + GcManager LocalGc; + CidStore LocalCidStore(LocalGc); + CidStoreConfiguration LocalCidConfig = {.RootDirectory = ExportDir / "cas", .TinyValueThreshold = 1024, .HugeValueThreshold = 4096}; + LocalCidStore.Initialize(LocalCidConfig); + + std::filesystem::path LocalProjectBasePath = ExportDir / "proj"; + ProjectStore LocalProjectStore(LocalCidStore, LocalProjectBasePath, LocalGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> LocalProject(LocalProjectStore.NewProject(LocalProjectBasePath / "p"sv, + "p"sv, + (ExportDir / "root").string(), + (ExportDir / "engine").string(), + (ExportDir / "game").string(), + (ExportDir / "game" / "game.uproject").string())); + + Ref<ProjectStore::Oplog> Oplog = LocalProject->NewOplog("oplog_partial_block", {}); + if (!Oplog) + { + return RemoteProjectStore::Result{.ErrorCode = -1}; + } + + // Six 512 KB chunks with OodleCompressionLevel::None so the compressed size stays large + // and the block genuinely exceeds the 512 KB slack threshold. + Oplog->AppendNewOplogEntry(CreateBulkDataOplogPackage( + Oid::NewOid(), + CreateAttachments(std::initializer_list<size_t>{512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u, 512u * 1024u}, + OodleCompressionLevel::None))); + + // MaxChunkEmbedSize must be larger than the compressed size of each 512 KB chunk + // (OodleCompressionLevel::None → compressed ≈ raw ≈ 512 KB). With the legacy + // 32 KB limit all six chunks would become loose large attachments and no block would + // be created, so we use the production default of 1.5 MB instead. + FileRemoteStoreOptions Options = {RemoteStoreOptions{.MaxBlockSize = 8u * 1024u * 1024u, + .MaxChunksPerBlock = 1000, + .MaxChunkEmbedSize = RemoteStoreOptions::DefaultMaxChunkEmbedSize, + .ChunkFileSizeLimit = 64u * 1024u * 1024u}, + /*.FolderPath =*/ExportDir, + /*.Name =*/std::string("oplog_partial_block"), + /*.OptionalBaseName =*/std::string(), + /*.ForceDisableBlocks =*/false, + /*.ForceEnableTempBlocks =*/false}; + OutRemoteStore = CreateFileRemoteStore(Log(), Options); + return SaveOplog(LocalCidStore, + *OutRemoteStore, + *LocalProject, + *Oplog, + NetworkPool, + WorkerPool, + Options.MaxBlockSize, + Options.MaxChunksPerBlock, + Options.MaxChunkEmbedSize, + Options.ChunkFileSizeLimit, + /*EmbedLooseFiles*/ true, + /*ForceUpload*/ false, + /*IgnoreMissingAttachments*/ false, + /*OptionalContext*/ nullptr); +} + +// Returns the first block hash that has at least MinChunkCount chunks, or a zero IoHash +// if no qualifying block exists in Store. +static IoHash +FindBlockWithMultipleChunks(RemoteProjectStore& Store, size_t MinChunkCount) +{ + RemoteProjectStore::LoadContainerResult ContainerResult = Store.LoadContainer(); + if (ContainerResult.ErrorCode != 0) + { + return {}; + } + std::vector<IoHash> BlockHashes = GetBlockHashesFromOplog(ContainerResult.ContainerObject); + if (BlockHashes.empty()) + { + return {}; + } + RemoteProjectStore::GetBlockDescriptionsResult Descriptions = Store.GetBlockDescriptions(BlockHashes, nullptr, Oid{}); + if (Descriptions.ErrorCode != 0) + { + return {}; + } + for (const ChunkBlockDescription& Desc : Descriptions.Blocks) + { + if (Desc.ChunkRawHashes.size() >= MinChunkCount) + { + return Desc.BlockHash; + } + } + return {}; +} + +// Loads BlockHash from Source and inserts every even-indexed chunk (0, 2, 4, …) into +// TargetCidStore. Odd-indexed chunks are left absent so that when an import is run +// against the same block, HasAttachment returns false for three non-adjacent positions +// — the minimum needed to exercise the multi-range partial-block download paths. +static void +SeedCidStoreWithAlternateChunks(CidStore& TargetCidStore, RemoteProjectStore& Source, const IoHash& BlockHash) +{ + RemoteProjectStore::LoadAttachmentResult BlockResult = Source.LoadAttachment(BlockHash); + if (BlockResult.ErrorCode != 0 || !BlockResult.Bytes) + { + return; + } + + IoHash RawHash; + uint64_t RawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(BlockResult.Bytes), RawHash, RawSize); + if (!Compressed) + { + return; + } + CompositeBuffer BlockPayload = Compressed.DecompressToComposite(); + if (!BlockPayload) + { + return; + } + + uint32_t ChunkIndex = 0; + uint64_t HeaderSize = 0; + IterateChunkBlock( + BlockPayload.Flatten(), + [&TargetCidStore, &ChunkIndex](CompressedBuffer&& Chunk, const IoHash& AttachmentHash) { + if (ChunkIndex % 2 == 0) + { + IoBuffer ChunkData = Chunk.GetCompressed().Flatten().AsIoBuffer(); + TargetCidStore.AddChunk(ChunkData, AttachmentHash); + } + ++ChunkIndex; + }, + HeaderSize); +} + +TEST_CASE("project.store.import.context_settings") +{ + using namespace std::literals; + using namespace projectstore_testutils; + + ScopedTemporaryDirectory TempDir; + ScopedTemporaryDirectory ExportDir; + + std::filesystem::path RootDir = TempDir.Path() / "root"; + std::filesystem::path EngineRootDir = TempDir.Path() / "engine"; + std::filesystem::path ProjectRootDir = TempDir.Path() / "game"; + std::filesystem::path ProjectFilePath = TempDir.Path() / "game" / "game.uproject"; + + // Export-side CAS and project store: used only by SetupExportStore to build the remote store + // payload. Kept separate from the import side so the two CAS instances are disjoint. + GcManager ExportGc; + CidStore ExportCidStore(ExportGc); + CidStoreConfiguration ExportCidConfig = {.RootDirectory = TempDir.Path() / "export_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ExportCidStore.Initialize(ExportCidConfig); + + std::filesystem::path ExportBasePath = TempDir.Path() / "export_projectstore"; + ProjectStore ExportProjectStore(ExportCidStore, ExportBasePath, ExportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ExportProject(ExportProjectStore.NewProject(ExportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; + WorkerThreadPool WorkerPool(WorkerCount); + WorkerThreadPool NetworkPool(NetworkWorkerCount); + + std::shared_ptr<RemoteProjectStore> RemoteStore; + RemoteProjectStore::Result ExportResult = + SetupExportStore(ExportCidStore, *ExportProject, NetworkPool, WorkerPool, ExportDir.Path(), RemoteStore); + REQUIRE(ExportResult.ErrorCode == 0); + + // Import-side CAS and project store: starts empty, mirroring a fresh machine that has never + // downloaded the data. HasAttachment() therefore returns false for every chunk, so the import + // genuinely contacts the remote store without needing ForceDownload on the populate pass. + GcManager ImportGc; + CidStore ImportCidStore(ImportGc); + CidStoreConfiguration ImportCidConfig = {.RootDirectory = TempDir.Path() / "import_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + ImportCidStore.Initialize(ImportCidConfig); + + std::filesystem::path ImportBasePath = TempDir.Path() / "import_projectstore"; + ProjectStore ImportProjectStore(ImportCidStore, ImportBasePath, ImportGc, ProjectStore::Configuration{}); + Ref<ProjectStore::Project> ImportProject(ImportProjectStore.NewProject(ImportBasePath / "proj1"sv, + "proj1"sv, + RootDir.string(), + EngineRootDir.string(), + ProjectRootDir.string(), + ProjectFilePath.string())); + + const Oid CacheBuildId = Oid::NewOid(); + BuildStorageCache::Statistics CacheStats; + std::unique_ptr<BuildStorageCache> Cache = CreateInMemoryBuildStorageCache(256u, CacheStats); + auto ResetCacheStats = [&]() { + CacheStats.TotalBytesRead = 0; + CacheStats.TotalBytesWritten = 0; + CacheStats.TotalRequestCount = 0; + CacheStats.TotalRequestTimeUs = 0; + CacheStats.TotalExecutionTimeUs = 0; + CacheStats.PeakSentBytes = 0; + CacheStats.PeakReceivedBytes = 0; + CacheStats.PeakBytesPerSec = 0; + CacheStats.PutBlobCount = 0; + CacheStats.PutBlobByteCount = 0; + }; + + int OpJobIndex = 0; + + TestJobContext OpJobContext(OpJobIndex); + + // Helper: run a LoadOplog against the import-side CAS/project with the given context knobs. + // Each call creates a fresh oplog so repeated calls within one SUBCASE don't short-circuit on + // already-present data. + auto DoImport = [&](BuildStorageCache* OptCache, + EPartialBlockRequestMode Mode, + double StoreLatency, + uint64_t StoreRanges, + double CacheLatency, + uint64_t CacheRanges, + bool PopulateCache, + bool ForceDownload) -> RemoteProjectStore::Result { + Ref<ProjectStore::Oplog> ImportOplog = ImportProject->NewOplog(fmt::format("import_{}", OpJobIndex++), {}); + return LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *RemoteStore, + .OptionalCache = OptCache, + .CacheBuildId = CacheBuildId, + .Oplog = *ImportOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = ForceDownload, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = Mode, + .PopulateCache = PopulateCache, + .StoreLatencySec = StoreLatency, + .StoreMaxRangeCountPerRequest = StoreRanges, + .CacheLatencySec = CacheLatency, + .CacheMaxRangeCountPerRequest = CacheRanges, + .OptionalJobContext = &OpJobContext}); + }; + + // Shorthand: Mode=All, low latency, 128 ranges for both store and cache. + auto ImportAll = [&](BuildStorageCache* OptCache, bool Populate, bool Force) { + return DoImport(OptCache, EPartialBlockRequestMode::All, 0.001, 128u, 0.001, 128u, Populate, Force); + }; + + SUBCASE("mode_off_no_cache") + { + // Baseline: no partial block requests, no cache. + RemoteProjectStore::Result R = + DoImport(nullptr, EPartialBlockRequestMode::Off, -1.0, (uint64_t)-1, -1.0, (uint64_t)-1, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_all_multirange_cloud_no_cache") + { + // StoreMaxRangeCountPerRequest > 1 → MultiRange cloud path. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 128u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_all_singlerange_cloud_no_cache") + { + // StoreMaxRangeCountPerRequest == 1 → SingleRange cloud path. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::All, 0.001, 1u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("mode_mixed_high_latency_no_cache") + { + // High store latency encourages range merging; Mixed uses SingleRange for cloud, Off for cache. + RemoteProjectStore::Result R = DoImport(nullptr, EPartialBlockRequestMode::Mixed, 0.1, 128u, -1.0, 0u, false, false); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("cache_populate_and_hit") + { + // First import: ImportCidStore is empty so all blocks are downloaded from the remote store + // and written to the cache. + RemoteProjectStore::Result PopulateResult = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(PopulateResult.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + + // Re-import with ForceDownload=true: all chunks are now in ImportCidStore but Force overrides + // HasAttachment() so the download logic re-runs and serves blocks from the cache instead of + // the remote store. + ResetCacheStats(); + RemoteProjectStore::Result HitResult = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); + CHECK(HitResult.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount == 0); + // TotalRequestCount covers both full-blob cache hits and partial-range cache hits. + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("cache_no_populate_flag") + { + // Cache is provided but PopulateCache=false: blocks are downloaded to ImportCidStore but + // nothing should be written to the cache. + RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/false); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount == 0); + } + + SUBCASE("mode_zencacheonly_cache_multirange") + { + // Pre-populate the cache via a plain import, then re-import with ZenCacheOnly + + // CacheMaxRangeCountPerRequest=128. With 100% of chunks needed, all blocks go to + // FullBlockIndexes and GetBuildBlob (full blob) is called from the cache. + // CacheMaxRangeCountPerRequest > 1 would route partial downloads through GetBuildBlobRanges + // if the analyser ever emits BlockRanges entries. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 128u, false, true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("mode_zencacheonly_cache_singlerange") + { + // Pre-populate the cache, then re-import with ZenCacheOnly + CacheMaxRangeCountPerRequest=1. + // With 100% of chunks needed the analyser sends all blocks to FullBlockIndexes (full-block + // download path), which calls GetBuildBlob with no range offset — a full-blob cache hit. + // The single-range vs multi-range distinction only matters for the partial-block (BlockRanges) + // path, which is not reached when all chunks are needed. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = DoImport(Cache.get(), EPartialBlockRequestMode::ZenCacheOnly, 0.1, 128u, 0.001, 1u, false, true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("mode_all_cache_and_cloud_multirange") + { + // Pre-populate cache; All mode uses multi-range for both the cache and cloud paths. + RemoteProjectStore::Result Populate = ImportAll(Cache.get(), /*PopulateCache=*/true, /*Force=*/false); + CHECK(Populate.ErrorCode == 0); + ResetCacheStats(); + + RemoteProjectStore::Result R = ImportAll(Cache.get(), /*PopulateCache=*/false, /*Force=*/true); + CHECK(R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("partial_block_cloud_multirange") + { + // Export store with 6 × 512 KB chunks packed into one ~3 MB block. + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + // Seeding even-indexed chunks (0, 2, 4) leaves odd ones (1, 3, 5) absent in + // ImportCidStore. Three non-adjacent needed positions → three BlockRangeDescriptors. + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); + + // StoreMaxRangeCountPerRequest=128 → all three ranges sent in one LoadAttachmentRanges call. + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_multi_{}", OpJobIndex++), {}); + RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("partial_block_cloud_singlerange") + { + // Same block layout as partial_block_cloud_multirange but StoreMaxRangeCountPerRequest=1. + // DownloadPartialBlock issues one LoadAttachmentRanges call per range. + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + SeedCidStoreWithAlternateChunks(ImportCidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> PartialOplog = ImportProject->NewOplog(fmt::format("partial_cloud_single_{}", OpJobIndex++), {}); + RemoteProjectStore::Result R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = nullptr, + .CacheBuildId = CacheBuildId, + .Oplog = *PartialOplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 1u, + .CacheLatencySec = -1.0, + .CacheMaxRangeCountPerRequest = 0u, + .OptionalJobContext = &OpJobContext}); + CHECK(R.ErrorCode == 0); + } + + SUBCASE("partial_block_cache_multirange") + { + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + + // Phase 1: ImportCidStore starts empty → full block download from remote → PutBuildBlob + // populates the cache. + { + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p1_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase1R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + } + ResetCacheStats(); + + // Phase 2: fresh CidStore with only even-indexed chunks seeded. + // HasAttachment returns false for odd chunks (1, 3, 5) → three BlockRangeDescriptors. + // Block is in cache from Phase 1 → cache partial path. + // CacheMaxRangeCountPerRequest=128 → SubRangeCount=3 > 1 → GetBuildBlobRanges. + GcManager Phase2Gc; + CidStore Phase2CidStore(Phase2Gc); + CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + Phase2CidStore.Initialize(Phase2CidConfig); + SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_multi_p2_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase2R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } + + SUBCASE("partial_block_cache_singlerange") + { + ScopedTemporaryDirectory PartialExportDir; + std::shared_ptr<RemoteProjectStore> PartialRemoteStore; + RemoteProjectStore::Result ExportR = + SetupPartialBlockExportStore(NetworkPool, WorkerPool, PartialExportDir.Path(), PartialRemoteStore); + REQUIRE(ExportR.ErrorCode == 0); + + IoHash BlockHash = FindBlockWithMultipleChunks(*PartialRemoteStore, 4u); + CHECK(BlockHash != IoHash::Zero); + + // Phase 1: full block download from remote into cache. + { + Ref<ProjectStore::Oplog> Phase1Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p1_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase1R = LoadOplog(LoadOplogContext{.ChunkStore = ImportCidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase1Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::All, + .PopulateCache = true, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 128u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase1R.ErrorCode == 0); + CHECK(CacheStats.PutBlobCount > 0); + } + ResetCacheStats(); + + // Phase 2: fresh CidStore with only even-indexed chunks seeded. + // CacheMaxRangeCountPerRequest=1 → SubRangeCount=Min(3,1)=1 → GetBuildBlob with range + // offset (single-range legacy cache path), called once per needed chunk range. + GcManager Phase2Gc; + CidStore Phase2CidStore(Phase2Gc); + CidStoreConfiguration Phase2CidConfig = {.RootDirectory = TempDir.Path() / "partial_cas_single", + .TinyValueThreshold = 1024, + .HugeValueThreshold = 4096}; + Phase2CidStore.Initialize(Phase2CidConfig); + SeedCidStoreWithAlternateChunks(Phase2CidStore, *PartialRemoteStore, BlockHash); + + Ref<ProjectStore::Oplog> Phase2Oplog = ImportProject->NewOplog(fmt::format("partial_cache_single_p2_{}", OpJobIndex++), {}); + RemoteProjectStore::Result Phase2R = LoadOplog(LoadOplogContext{.ChunkStore = Phase2CidStore, + .RemoteStore = *PartialRemoteStore, + .OptionalCache = Cache.get(), + .CacheBuildId = CacheBuildId, + .Oplog = *Phase2Oplog, + .NetworkWorkerPool = NetworkPool, + .WorkerPool = WorkerPool, + .ForceDownload = false, + .IgnoreMissingAttachments = false, + .CleanOplog = false, + .PartialBlockRequestMode = EPartialBlockRequestMode::ZenCacheOnly, + .PopulateCache = false, + .StoreLatencySec = 0.001, + .StoreMaxRangeCountPerRequest = 128u, + .CacheLatencySec = 0.001, + .CacheMaxRangeCountPerRequest = 1u, + .OptionalJobContext = &OpJobContext}); + CHECK(Phase2R.ErrorCode == 0); + CHECK(CacheStats.TotalRequestCount > 0); + } } TEST_SUITE_END(); diff --git a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp index ef82c45e0..115d6438d 100644 --- a/src/zenremotestore/projectstore/zenremoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/zenremoteprojectstore.cpp @@ -157,59 +157,56 @@ public: return Result; } - virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes, ESourceMode SourceMode) override + virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override { LoadAttachmentsResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); + std::string LoadRequest = fmt::format("/{}/oplog/{}/rpc"sv, m_Project, m_Oplog); - CbObject Request; + CbObject Request; + { + CbObjectWriter RequestWriter; + RequestWriter.AddString("method"sv, "getchunks"sv); + RequestWriter.BeginObject("Request"sv); { - CbObjectWriter RequestWriter; - RequestWriter.AddString("method"sv, "getchunks"sv); - RequestWriter.BeginObject("Request"sv); + RequestWriter.BeginArray("Chunks"sv); { - RequestWriter.BeginArray("Chunks"sv); + for (const IoHash& RawHash : RawHashes) { - for (const IoHash& RawHash : RawHashes) + RequestWriter.BeginObject(); { - RequestWriter.BeginObject(); - { - RequestWriter.AddHash("RawHash", RawHash); - } - RequestWriter.EndObject(); + RequestWriter.AddHash("RawHash", RawHash); } + RequestWriter.EndObject(); } - RequestWriter.EndArray(); // "chunks" } - RequestWriter.EndObject(); - Request = RequestWriter.Save(); + RequestWriter.EndArray(); // "chunks" } + RequestWriter.EndObject(); + Request = RequestWriter.Save(); + } - HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); - AddStats(Response); + HttpClient::Response Response = m_Client.Post(LoadRequest, Request, HttpClient::Accept(ZenContentType::kCbPackage)); + AddStats(Response); - Result = LoadAttachmentsResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", - RawHashes.size(), - m_ProjectStoreUrl, - m_Project, - m_Oplog, - Result.Reason); - } - else + Result = LoadAttachmentsResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching {} oplog attachments from {}/{}/{}. Reason: '{}'", + RawHashes.size(), + m_ProjectStoreUrl, + m_Project, + m_Oplog, + Result.Reason); + } + else + { + CbPackage Package = Response.AsPackage(); + std::span<const CbAttachment> Attachments = Package.GetAttachments(); + Result.Chunks.reserve(Attachments.size()); + for (const CbAttachment& Attachment : Attachments) { - CbPackage Package = Response.AsPackage(); - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - Result.Chunks.reserve(Attachments.size()); - for (const CbAttachment& Attachment : Attachments) - { - Result.Chunks.emplace_back( - std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); - } + Result.Chunks.emplace_back( + std::pair<IoHash, CompressedBuffer>{Attachment.GetHash(), Attachment.AsCompressedBinary().MakeOwned()}); } } return Result; @@ -253,75 +250,64 @@ public: return GetKnownBlocksResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}}; } - virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes) override + virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span<const IoHash> BlockHashes, + BuildStorageCache* OptionalCache, + const Oid& CacheBuildId) override { - ZEN_UNUSED(BlockHashes); + ZEN_UNUSED(BlockHashes, OptionalCache, CacheBuildId); return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; } - virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span<const IoHash> RawHashes) override - { - return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector<bool>(RawHashes.size(), false)}; - } - - virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, ESourceMode SourceMode) override + virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { LoadAttachmentResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); - HttpClient::Response Response = - m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); - AddStats(Response); + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + AddStats(Response); - Result = LoadAttachmentResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - RawHash, - Result.Reason); - } - Result.Bytes = Response.ResponsePayload; - Result.Bytes.MakeOwned(); + Result = LoadAttachmentResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); } + Result.Bytes = Response.ResponsePayload; + Result.Bytes.MakeOwned(); return Result; } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, - std::span<const std::pair<uint64_t, uint64_t>> Ranges, - ESourceMode SourceMode) override + std::span<const std::pair<uint64_t, uint64_t>> Ranges) override { + ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; - if (SourceMode != ESourceMode::kCacheOnly) - { - std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); - HttpClient::Response Response = - m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); - AddStats(Response); + std::string LoadRequest = fmt::format("/{}/oplog/{}/{}"sv, m_Project, m_Oplog, RawHash); + HttpClient::Response Response = + m_Client.Download(LoadRequest, m_TempFilePath, HttpClient::Accept(ZenContentType::kCompressedBinary)); + AddStats(Response); - Result = LoadAttachmentRangesResult{ConvertResult(Response)}; - if (Result.ErrorCode) - { - Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", - m_ProjectStoreUrl, - m_Project, - m_Oplog, - RawHash, - Result.Reason); - } - else - { - Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); - } + Result = LoadAttachmentRangesResult{ConvertResult(Response)}; + if (Result.ErrorCode) + { + Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}/{}. Reason: '{}'", + m_ProjectStoreUrl, + m_Project, + m_Oplog, + RawHash, + Result.Reason); + } + else + { + Result.Ranges = std::vector<std::pair<uint64_t, uint64_t>>(Ranges.begin(), Ranges.end()); } return Result; } - virtual void Flush() override {} - private: void AddStats(const HttpClient::Response& Result) { diff --git a/src/zenserver/storage/buildstore/httpbuildstore.cpp b/src/zenserver/storage/buildstore/httpbuildstore.cpp index 459e044eb..38d97765b 100644 --- a/src/zenserver/storage/buildstore/httpbuildstore.cpp +++ b/src/zenserver/storage/buildstore/httpbuildstore.cpp @@ -177,6 +177,14 @@ HttpBuildStoreService::GetBlobRequest(HttpRouterRequest& Req) uint64_t RangeLength = RangeView["length"sv].AsUInt64(); OffsetAndLengthPairs.push_back(std::make_pair(RangeOffset, RangeLength)); } + if (OffsetAndLengthPairs.size() > MaxRangeCountPerRequestSupported) + { + return ServerRequest.WriteResponse(HttpResponseCode::BadRequest, + HttpContentType::kText, + fmt::format("Number of ranges ({}) for blob request exceeds maximum range count {}", + OffsetAndLengthPairs.size(), + MaxRangeCountPerRequestSupported)); + } } if (OffsetAndLengthPairs.empty()) { @@ -661,6 +669,11 @@ HttpBuildStoreService::HandleStatusRequest(HttpServerRequest& Request) ZEN_TRACE_CPU("HttpBuildStoreService::Status"); CbObjectWriter Cbo; Cbo << "ok" << true; + Cbo.BeginObject("capabilities"); + { + Cbo << "maxrangecountperrequest" << MaxRangeCountPerRequestSupported; + } + Cbo.EndObject(); // capabilities Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } diff --git a/src/zenserver/storage/buildstore/httpbuildstore.h b/src/zenserver/storage/buildstore/httpbuildstore.h index e10986411..5fa7cd642 100644 --- a/src/zenserver/storage/buildstore/httpbuildstore.h +++ b/src/zenserver/storage/buildstore/httpbuildstore.h @@ -45,6 +45,8 @@ private: inline LoggerRef Log() { return m_Log; } + static constexpr uint32_t MaxRangeCountPerRequestSupported = 256u; + LoggerRef m_Log; void PutBlobRequest(HttpRouterRequest& Req); diff --git a/src/zenserver/storage/projectstore/httpprojectstore.cpp b/src/zenserver/storage/projectstore/httpprojectstore.cpp index 2b5474d00..0ec6faea3 100644 --- a/src/zenserver/storage/projectstore/httpprojectstore.cpp +++ b/src/zenserver/storage/projectstore/httpprojectstore.cpp @@ -13,7 +13,12 @@ #include <zencore/scopeguard.h> #include <zencore/stream.h> #include <zencore/trace.h> +#include <zenhttp/httpclientauth.h> #include <zenhttp/packageformat.h> +#include <zenremotestore/builds/buildstoragecache.h> +#include <zenremotestore/builds/buildstorageutil.h> +#include <zenremotestore/jupiter/jupiterhost.h> +#include <zenremotestore/operationlogoutput.h> #include <zenremotestore/projectstore/buildsremoteprojectstore.h> #include <zenremotestore/projectstore/fileremoteprojectstore.h> #include <zenremotestore/projectstore/jupiterremoteprojectstore.h> @@ -244,8 +249,22 @@ namespace { { std::shared_ptr<RemoteProjectStore> Store; std::string Description; - double HostLatencySec = -1.0; - double CacheLatencySec = -1.0; + double LatencySec = -1.0; + uint64_t MaxRangeCountPerRequest = 1; + + struct Cache + { + std::unique_ptr<HttpClient> Http; + std::unique_ptr<BuildStorageCache> Cache; + Oid BuildsId = Oid::Zero; + std::string Description; + double LatencySec = -1.0; + uint64_t MaxRangeCountPerRequest = 1; + BuildStorageCache::Statistics Stats; + bool Populate = false; + }; + + std::unique_ptr<Cache> OptionalCache; }; CreateRemoteStoreResult CreateRemoteStore(LoggerRef InLog, @@ -262,9 +281,7 @@ namespace { using namespace std::literals; - std::shared_ptr<RemoteProjectStore> RemoteStore; - double HostLatencySec = -1.0; - double CacheLatencySec = -1.0; + CreateRemoteStoreResult Result; if (CbObjectView File = Params["file"sv].AsObjectView(); File) { @@ -282,6 +299,9 @@ namespace { bool ForceDisableBlocks = File["disableblocks"sv].AsBool(false); bool ForceEnableTempBlocks = File["enabletempblocks"sv].AsBool(false); + Result.LatencySec = 0; + Result.MaxRangeCountPerRequest = 1; + FileRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, FolderPath, @@ -289,7 +309,7 @@ namespace { std::string(OptionalBaseName), ForceDisableBlocks, ForceEnableTempBlocks}; - RemoteStore = CreateFileRemoteStore(Log(), Options); + Result.Store = CreateFileRemoteStore(Log(), Options); } if (CbObjectView Cloud = Params["cloud"sv].AsObjectView(); Cloud) @@ -367,21 +387,32 @@ namespace { bool ForceDisableTempBlocks = Cloud["disabletempblocks"sv].AsBool(false); bool AssumeHttp2 = Cloud["assumehttp2"sv].AsBool(false); - JupiterRemoteStoreOptions Options = { - RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Url, - std::string(Namespace), - std::string(Bucket), - Key, - BaseKey, - std::string(OpenIdProvider), - AccessToken, - AuthManager, - OidcExePath, - ForceDisableBlocks, - ForceDisableTempBlocks, - AssumeHttp2}; - RemoteStore = CreateJupiterRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); + if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(Url, AssumeHttp2, /*Verbose*/ false); TestResult.Success) + { + Result.LatencySec = TestResult.LatencySeconds; + Result.MaxRangeCountPerRequest = TestResult.MaxRangeCountPerRequest; + + JupiterRemoteStoreOptions Options = { + RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, + Url, + std::string(Namespace), + std::string(Bucket), + Key, + BaseKey, + std::string(OpenIdProvider), + AccessToken, + AuthManager, + OidcExePath, + ForceDisableBlocks, + ForceDisableTempBlocks, + AssumeHttp2}; + Result.Store = + CreateJupiterRemoteStore(Log(), Options, TempFilePath, /*Quiet*/ false, /*Unattended*/ false, /*Hidden*/ true); + } + else + { + return {nullptr, fmt::format("Unable to connect to jupiter host '{}'", Url)}; + } } if (CbObjectView Zen = Params["zen"sv].AsObjectView(); Zen) @@ -397,12 +428,13 @@ namespace { { return {nullptr, "Missing oplog"}; } + ZenRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, std::string(Url), std::string(Project), std::string(Oplog)}; - RemoteStore = CreateZenRemoteStore(Log(), Options, TempFilePath); + Result.Store = CreateZenRemoteStore(Log(), Options, TempFilePath); } if (CbObjectView Builds = Params["builds"sv].AsObjectView(); Builds) @@ -475,11 +507,76 @@ namespace { MemoryView MetaDataSection = Builds["metadata"sv].AsBinaryView(); IoBuffer MetaData(IoBuffer::Wrap, MetaDataSection.GetData(), MetaDataSection.GetSize()); + auto EnsureHttps = [](const std::string& Host, std::string_view PreferredProtocol) { + if (!Host.empty() && Host.find("://"sv) == std::string::npos) + { + // Assume https URL + return fmt::format("{}://{}"sv, PreferredProtocol, Host); + } + return Host; + }; + + Host = EnsureHttps(Host, "https"); + OverrideHost = EnsureHttps(OverrideHost, "https"); + ZenHost = EnsureHttps(ZenHost, "http"); + + std::function<HttpClientAccessToken()> TokenProvider; + if (!OpenIdProvider.empty()) + { + TokenProvider = httpclientauth::CreateFromOpenIdProvider(AuthManager, OpenIdProvider); + } + else if (!AccessToken.empty()) + { + TokenProvider = httpclientauth::CreateFromStaticToken(AccessToken); + } + else if (!OidcExePath.empty()) + { + if (auto TokenProviderMaybe = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath, + Host.empty() ? OverrideHost : Host, + /*Quiet*/ false, + /*Unattended*/ false, + /*Hidden*/ true); + TokenProviderMaybe) + { + TokenProvider = TokenProviderMaybe.value(); + } + } + + if (!TokenProvider) + { + TokenProvider = httpclientauth::CreateFromDefaultOpenIdProvider(AuthManager); + } + + BuildStorageResolveResult ResolveResult; + { + HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", + .AccessTokenProvider = TokenProvider, + .AssumeHttp2 = AssumeHttp2, + .AllowResume = true, + .RetryCount = 2}; + + std::unique_ptr<OperationLogOutput> Output(CreateStandardLogOutput(Log())); + + try + { + ResolveResult = ResolveBuildStorage(*Output, + ClientSettings, + Host, + OverrideHost, + ZenHost, + ZenCacheResolveMode::Discovery, + /*Verbose*/ false); + } + catch (const std::exception& Ex) + { + return {nullptr, fmt::format("Failed resolving storage host and cache. Reason: '{}'", Ex.what())}; + } + } + Result.LatencySec = ResolveResult.Cloud.LatencySec; + Result.MaxRangeCountPerRequest = ResolveResult.Cloud.Caps.MaxRangeCountPerRequest; + BuildsRemoteStoreOptions Options = { RemoteStoreOptions{.MaxBlockSize = MaxBlockSize, .MaxChunksPerBlock = 1000, .MaxChunkEmbedSize = MaxChunkEmbedSize}, - Host, - OverrideHost, - ZenHost, std::string(Namespace), std::string(Bucket), BuildId, @@ -489,30 +586,43 @@ namespace { OidcExePath, ForceDisableBlocks, ForceDisableTempBlocks, - AssumeHttp2, - PopulateCache, MetaData, MaximumInMemoryDownloadSize}; - RemoteStore = CreateJupiterBuildsRemoteStore(Log(), - Options, - TempFilePath, - /*Quiet*/ false, - /*Unattended*/ false, - /*Hidden*/ true, - GetTinyWorkerPool(EWorkloadType::Background), - HostLatencySec, - CacheLatencySec); + Result.Store = CreateJupiterBuildsRemoteStore(Log(), ResolveResult, std::move(TokenProvider), Options, TempFilePath); + + if (!ResolveResult.Cache.Address.empty()) + { + Result.OptionalCache = std::make_unique<CreateRemoteStoreResult::Cache>(); + + HttpClientSettings CacheClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveResult.Cache.AssumeHttp2, + .AllowResume = true, + .RetryCount = 0, + .MaximumInMemoryDownloadSize = MaximumInMemoryDownloadSize}; + + Result.OptionalCache->Http = std::make_unique<HttpClient>(ResolveResult.Cache.Address, CacheClientSettings); + Result.OptionalCache->Cache = CreateZenBuildStorageCache(*Result.OptionalCache->Http, + Result.OptionalCache->Stats, + Namespace, + Bucket, + TempFilePath, + GetTinyWorkerPool(EWorkloadType::Background)); + Result.OptionalCache->BuildsId = BuildId; + Result.OptionalCache->LatencySec = ResolveResult.Cache.LatencySec; + Result.OptionalCache->MaxRangeCountPerRequest = ResolveResult.Cache.Caps.MaxRangeCountPerRequest; + Result.OptionalCache->Populate = PopulateCache; + Result.OptionalCache->Description = + fmt::format("[zenserver] {} namespace {} bucket {}", ResolveResult.Cache.Address, Namespace, Bucket); + } } - - if (!RemoteStore) + if (!Result.Store) { return {nullptr, "Unknown remote store type"}; } - return CreateRemoteStoreResult{.Store = std::move(RemoteStore), - .Description = "", - .HostLatencySec = HostLatencySec, - .CacheLatencySec = CacheLatencySec}; + return Result; } std::pair<HttpResponseCode, std::string> ConvertResult(const RemoteProjectStore::Result& Result) @@ -2679,38 +2789,36 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) EPartialBlockRequestMode PartialBlockRequestMode = PartialBlockRequestModeFromString(Params["partialblockrequestmode"sv].AsString("true")); - CreateRemoteStoreResult RemoteStoreResult = CreateRemoteStore(Log(), - Params, - m_AuthMgr, - MaxBlockSize, - MaxChunkEmbedSize, - GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), - Oplog->TempPath()); + std::shared_ptr<CreateRemoteStoreResult> RemoteStoreResult = + std::make_shared<CreateRemoteStoreResult>(CreateRemoteStore(Log(), + Params, + m_AuthMgr, + MaxBlockSize, + MaxChunkEmbedSize, + GetMaxMemoryBufferSize(MaxBlockSize, BoostWorkerMemory), + Oplog->TempPath())); - if (RemoteStoreResult.Store == nullptr) + if (RemoteStoreResult->Store == nullptr) { - return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult.Description); + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, RemoteStoreResult->Description); } - std::shared_ptr<RemoteProjectStore> RemoteStore = std::move(RemoteStoreResult.Store); - RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); JobId JobId = m_JobQueue.QueueJob( fmt::format("Import oplog '{}/{}'", Project->Identifier, Oplog->OplogId()), [this, - ChunkStore = &m_CidStore, - ActualRemoteStore = std::move(RemoteStore), + RemoteStoreResult = std::move(RemoteStoreResult), Oplog, Force, IgnoreMissingAttachments, CleanOplog, PartialBlockRequestMode, - HostLatencySec = RemoteStoreResult.HostLatencySec, - CacheLatencySec = RemoteStoreResult.CacheLatencySec, BoostWorkerCount](JobContext& Context) { - Context.ReportMessage(fmt::format("Loading oplog '{}/{}' from {}", - Oplog->GetOuterProjectIdentifier(), - Oplog->OplogId(), - ActualRemoteStore->GetInfo().Description)); + Context.ReportMessage( + fmt::format("Loading oplog '{}/{}'\n Host: {}\n Cache: {}", + Oplog->GetOuterProjectIdentifier(), + Oplog->OplogId(), + RemoteStoreResult->Store->GetInfo().Description, + RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Description : "<none>")); Ref<TransferThreadWorkers> Workers = GetThreadWorkers(BoostWorkerCount, /*SingleThreaded*/ false); @@ -2718,19 +2826,26 @@ HttpProjectService::HandleRpcRequest(HttpRouterRequest& Req) WorkerThreadPool& NetworkWorkerPool = Workers->GetNetworkPool(); Context.ReportMessage(fmt::format("{}", Workers->GetWorkersInfo())); - - RemoteProjectStore::Result Result = LoadOplog(m_CidStore, - *ActualRemoteStore, - *Oplog, - NetworkWorkerPool, - WorkerPool, - Force, - IgnoreMissingAttachments, - CleanOplog, - PartialBlockRequestMode, - HostLatencySec, - CacheLatencySec, - &Context); + RemoteProjectStore::Result Result = LoadOplog(LoadOplogContext{ + .ChunkStore = m_CidStore, + .RemoteStore = *RemoteStoreResult->Store, + .OptionalCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Cache.get() : nullptr, + .CacheBuildId = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->BuildsId : Oid::Zero, + .OptionalCacheStats = RemoteStoreResult->OptionalCache ? &RemoteStoreResult->OptionalCache->Stats : nullptr, + .Oplog = *Oplog, + .NetworkWorkerPool = NetworkWorkerPool, + .WorkerPool = WorkerPool, + .ForceDownload = Force, + .IgnoreMissingAttachments = IgnoreMissingAttachments, + .CleanOplog = CleanOplog, + .PartialBlockRequestMode = PartialBlockRequestMode, + .PopulateCache = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->Populate : false, + .StoreLatencySec = RemoteStoreResult->LatencySec, + .StoreMaxRangeCountPerRequest = RemoteStoreResult->MaxRangeCountPerRequest, + .CacheLatencySec = RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->LatencySec : -1.0, + .CacheMaxRangeCountPerRequest = + RemoteStoreResult->OptionalCache ? RemoteStoreResult->OptionalCache->MaxRangeCountPerRequest : 0, + .OptionalJobContext = &Context}); auto Response = ConvertResult(Result); ZEN_INFO("LoadOplog: Status: {} '{}'", ToString(Response.first), Response.second); if (!IsHttpSuccessCode(Response.first)) |