aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2021-11-18 14:33:44 +0100
committerStefan Boberg <[email protected]>2021-11-18 14:33:44 +0100
commite53df312f3c4dcef19add9cd26afc324557b1f5a (patch)
treea3d7b59f29e484d48edffb2a26bbb0dd2d95533d
parentgc: implemented timestamped snapshot persistence (diff)
parentChange error code for failed upsteam apply (diff)
downloadzen-e53df312f3c4dcef19add9cd26afc324557b1f5a.tar.xz
zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.zip
merge from main
-rw-r--r--thirdparty/BLAKE3/.github/workflows/build_b3sum.py (renamed from 3rdparty/BLAKE3/.github/workflows/build_b3sum.py)0
-rw-r--r--thirdparty/BLAKE3/.github/workflows/ci.yml (renamed from 3rdparty/BLAKE3/.github/workflows/ci.yml)0
-rw-r--r--thirdparty/BLAKE3/.github/workflows/tag.yml (renamed from 3rdparty/BLAKE3/.github/workflows/tag.yml)0
-rw-r--r--thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py (renamed from 3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py)0
-rw-r--r--thirdparty/BLAKE3/.gitignore (renamed from 3rdparty/BLAKE3/.gitignore)0
-rw-r--r--thirdparty/BLAKE3/CONTRIBUTING.md (renamed from 3rdparty/BLAKE3/CONTRIBUTING.md)0
-rw-r--r--thirdparty/BLAKE3/Cargo.toml (renamed from 3rdparty/BLAKE3/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/LICENSE (renamed from 3rdparty/BLAKE3/LICENSE)0
-rw-r--r--thirdparty/BLAKE3/README.md (renamed from 3rdparty/BLAKE3/README.md)0
-rw-r--r--thirdparty/BLAKE3/b3sum/Cargo.toml (renamed from 3rdparty/BLAKE3/b3sum/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/b3sum/README.md (renamed from 3rdparty/BLAKE3/b3sum/README.md)0
-rw-r--r--thirdparty/BLAKE3/b3sum/src/main.rs (renamed from 3rdparty/BLAKE3/b3sum/src/main.rs)0
-rw-r--r--thirdparty/BLAKE3/b3sum/src/unit_tests.rs (renamed from 3rdparty/BLAKE3/b3sum/src/unit_tests.rs)0
-rw-r--r--thirdparty/BLAKE3/b3sum/tests/cli_tests.rs (renamed from 3rdparty/BLAKE3/b3sum/tests/cli_tests.rs)0
-rw-r--r--thirdparty/BLAKE3/b3sum/what_does_check_do.md (renamed from 3rdparty/BLAKE3/b3sum/what_does_check_do.md)0
-rw-r--r--thirdparty/BLAKE3/benches/bench.rs (renamed from 3rdparty/BLAKE3/benches/bench.rs)0
-rw-r--r--thirdparty/BLAKE3/build.rs (renamed from 3rdparty/BLAKE3/build.rs)0
-rw-r--r--thirdparty/BLAKE3/c/.gitignore (renamed from 3rdparty/BLAKE3/c/.gitignore)0
-rw-r--r--thirdparty/BLAKE3/c/Makefile.testing (renamed from 3rdparty/BLAKE3/c/Makefile.testing)0
-rw-r--r--thirdparty/BLAKE3/c/README.md (renamed from 3rdparty/BLAKE3/c/README.md)0
-rw-r--r--thirdparty/BLAKE3/c/blake3.c (renamed from 3rdparty/BLAKE3/c/blake3.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3.h (renamed from 3rdparty/BLAKE3/c/blake3.h)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx2.c (renamed from 3rdparty/BLAKE3/c/blake3_avx2.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx512.c (renamed from 3rdparty/BLAKE3/c/blake3_avx512.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_dispatch.c (renamed from 3rdparty/BLAKE3/c/blake3_dispatch.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_impl.h (renamed from 3rdparty/BLAKE3/c/blake3_impl.h)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_neon.c (renamed from 3rdparty/BLAKE3/c/blake3_neon.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_portable.c (renamed from 3rdparty/BLAKE3/c/blake3_portable.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse2.c (renamed from 3rdparty/BLAKE3/c/blake3_sse2.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse41.c (renamed from 3rdparty/BLAKE3/c/blake3_sse41.c)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S)0
-rw-r--r--thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm)0
-rw-r--r--thirdparty/BLAKE3/c/example.c (renamed from 3rdparty/BLAKE3/c/example.c)0
-rw-r--r--thirdparty/BLAKE3/c/main.c (renamed from 3rdparty/BLAKE3/c/main.c)0
-rw-r--r--thirdparty/BLAKE3/c/test.py (renamed from 3rdparty/BLAKE3/c/test.py)0
-rw-r--r--thirdparty/BLAKE3/lib/Linux_x64/libblake3.a (renamed from 3rdparty/BLAKE3/lib/Linux_x64/libblake3.a)bin94190 -> 94190 bytes
-rw-r--r--thirdparty/BLAKE3/lib/Win64/BLAKE3.lib (renamed from 3rdparty/BLAKE3/lib/Win64/BLAKE3.lib)bin172346 -> 172346 bytes
-rw-r--r--thirdparty/BLAKE3/media/B3.svg (renamed from 3rdparty/BLAKE3/media/B3.svg)0
-rw-r--r--thirdparty/BLAKE3/media/BLAKE3.svg (renamed from 3rdparty/BLAKE3/media/BLAKE3.svg)0
-rw-r--r--thirdparty/BLAKE3/media/speed.svg (renamed from 3rdparty/BLAKE3/media/speed.svg)0
-rw-r--r--thirdparty/BLAKE3/reference_impl/Cargo.toml (renamed from 3rdparty/BLAKE3/reference_impl/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/reference_impl/README.md (renamed from 3rdparty/BLAKE3/reference_impl/README.md)0
-rw-r--r--thirdparty/BLAKE3/reference_impl/reference_impl.rs (renamed from 3rdparty/BLAKE3/reference_impl/reference_impl.rs)0
-rw-r--r--thirdparty/BLAKE3/src/ffi_avx2.rs (renamed from 3rdparty/BLAKE3/src/ffi_avx2.rs)0
-rw-r--r--thirdparty/BLAKE3/src/ffi_avx512.rs (renamed from 3rdparty/BLAKE3/src/ffi_avx512.rs)0
-rw-r--r--thirdparty/BLAKE3/src/ffi_neon.rs (renamed from 3rdparty/BLAKE3/src/ffi_neon.rs)0
-rw-r--r--thirdparty/BLAKE3/src/ffi_sse2.rs (renamed from 3rdparty/BLAKE3/src/ffi_sse2.rs)0
-rw-r--r--thirdparty/BLAKE3/src/ffi_sse41.rs (renamed from 3rdparty/BLAKE3/src/ffi_sse41.rs)0
-rw-r--r--thirdparty/BLAKE3/src/guts.rs (renamed from 3rdparty/BLAKE3/src/guts.rs)0
-rw-r--r--thirdparty/BLAKE3/src/join.rs (renamed from 3rdparty/BLAKE3/src/join.rs)0
-rw-r--r--thirdparty/BLAKE3/src/lib.rs (renamed from 3rdparty/BLAKE3/src/lib.rs)0
-rw-r--r--thirdparty/BLAKE3/src/platform.rs (renamed from 3rdparty/BLAKE3/src/platform.rs)0
-rw-r--r--thirdparty/BLAKE3/src/portable.rs (renamed from 3rdparty/BLAKE3/src/portable.rs)0
-rw-r--r--thirdparty/BLAKE3/src/rust_avx2.rs (renamed from 3rdparty/BLAKE3/src/rust_avx2.rs)0
-rw-r--r--thirdparty/BLAKE3/src/rust_sse2.rs (renamed from 3rdparty/BLAKE3/src/rust_sse2.rs)0
-rw-r--r--thirdparty/BLAKE3/src/rust_sse41.rs (renamed from 3rdparty/BLAKE3/src/rust_sse41.rs)0
-rw-r--r--thirdparty/BLAKE3/src/test.rs (renamed from 3rdparty/BLAKE3/src/test.rs)0
-rw-r--r--thirdparty/BLAKE3/src/traits.rs (renamed from 3rdparty/BLAKE3/src/traits.rs)0
-rw-r--r--thirdparty/BLAKE3/test_vectors/Cargo.toml (renamed from 3rdparty/BLAKE3/test_vectors/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/test_vectors/cross_test.sh (renamed from 3rdparty/BLAKE3/test_vectors/cross_test.sh)0
-rw-r--r--thirdparty/BLAKE3/test_vectors/src/lib.rs (renamed from 3rdparty/BLAKE3/test_vectors/src/lib.rs)0
-rw-r--r--thirdparty/BLAKE3/test_vectors/test_vectors.json (renamed from 3rdparty/BLAKE3/test_vectors/test_vectors.json)0
-rw-r--r--thirdparty/BLAKE3/tools/compiler_version/Cargo.toml (renamed from 3rdparty/BLAKE3/tools/compiler_version/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/tools/compiler_version/build.rs (renamed from 3rdparty/BLAKE3/tools/compiler_version/build.rs)0
-rw-r--r--thirdparty/BLAKE3/tools/compiler_version/src/main.rs (renamed from 3rdparty/BLAKE3/tools/compiler_version/src/main.rs)0
-rw-r--r--thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml (renamed from 3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml)0
-rw-r--r--thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs (renamed from 3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs)0
-rw-r--r--thirdparty/Oodle/include/oodle2.h (renamed from 3rdparty/Oodle/include/oodle2.h)0
-rw-r--r--thirdparty/Oodle/include/oodle2base.h (renamed from 3rdparty/Oodle/include/oodle2base.h)0
-rwxr-xr-xthirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a (renamed from 3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a)bin2593542 -> 2593542 bytes
-rwxr-xr-xthirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 (renamed from 3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8)bin1905079 -> 1905079 bytes
-rw-r--r--thirdparty/Oodle/lib/Win64/oo2core_win64.lib (renamed from 3rdparty/Oodle/lib/Win64/oo2core_win64.lib)bin8776756 -> 8776756 bytes
-rw-r--r--thirdparty/licenses/README.md3
-rw-r--r--thirdparty/utfcpp/.circleci/config.yml (renamed from 3rdparty/utfcpp/.circleci/config.yml)0
-rw-r--r--thirdparty/utfcpp/.gitignore (renamed from 3rdparty/utfcpp/.gitignore)0
-rw-r--r--thirdparty/utfcpp/.gitmodules (renamed from 3rdparty/utfcpp/.gitmodules)0
-rw-r--r--thirdparty/utfcpp/CMakeLists.txt (renamed from 3rdparty/utfcpp/CMakeLists.txt)0
-rw-r--r--thirdparty/utfcpp/LICENSE (renamed from 3rdparty/utfcpp/LICENSE)0
-rw-r--r--thirdparty/utfcpp/README.md (renamed from 3rdparty/utfcpp/README.md)0
-rw-r--r--thirdparty/utfcpp/samples/docsample.cpp (renamed from 3rdparty/utfcpp/samples/docsample.cpp)0
-rw-r--r--thirdparty/utfcpp/source/utf8.h (renamed from 3rdparty/utfcpp/source/utf8.h)0
-rw-r--r--thirdparty/utfcpp/source/utf8/checked.h (renamed from 3rdparty/utfcpp/source/utf8/checked.h)0
-rw-r--r--thirdparty/utfcpp/source/utf8/core.h (renamed from 3rdparty/utfcpp/source/utf8/core.h)0
-rw-r--r--thirdparty/utfcpp/source/utf8/cpp11.h (renamed from 3rdparty/utfcpp/source/utf8/cpp11.h)0
-rw-r--r--thirdparty/utfcpp/source/utf8/unchecked.h (renamed from 3rdparty/utfcpp/source/utf8/unchecked.h)0
-rw-r--r--thirdparty/utfcpp/tests/CMakeLists.txt (renamed from 3rdparty/utfcpp/tests/CMakeLists.txt)0
-rw-r--r--thirdparty/utfcpp/tests/docker/Dockerfile (renamed from 3rdparty/utfcpp/tests/docker/Dockerfile)0
-rw-r--r--thirdparty/utfcpp/tests/negative.cpp (renamed from 3rdparty/utfcpp/tests/negative.cpp)0
-rw-r--r--thirdparty/utfcpp/tests/test_checked_api.cpp (renamed from 3rdparty/utfcpp/tests/test_checked_api.cpp)0
-rw-r--r--thirdparty/utfcpp/tests/test_checked_iterator.cpp (renamed from 3rdparty/utfcpp/tests/test_checked_iterator.cpp)0
-rw-r--r--thirdparty/utfcpp/tests/test_cpp11.cpp (renamed from 3rdparty/utfcpp/tests/test_cpp11.cpp)0
-rw-r--r--thirdparty/utfcpp/tests/test_data/utf8_invalid.txt (renamed from 3rdparty/utfcpp/tests/test_data/utf8_invalid.txt)bin20010 -> 20010 bytes
-rw-r--r--thirdparty/utfcpp/tests/test_unchecked_api.cpp (renamed from 3rdparty/utfcpp/tests/test_unchecked_api.cpp)0
-rw-r--r--thirdparty/utfcpp/tests/test_unchecked_iterator.cpp (renamed from 3rdparty/utfcpp/tests/test_unchecked_iterator.cpp)0
-rw-r--r--zen.sln2
-rw-r--r--zencore/blake3.cpp2
-rw-r--r--zencore/compactbinary.cpp18
-rw-r--r--zencore/compress.cpp15
-rw-r--r--zencore/include/zencore/iobuffer.h1
-rw-r--r--zencore/memory.cpp4
-rw-r--r--zencore/xmake.lua6
-rw-r--r--zencore/zencore.vcxproj4
-rw-r--r--zenfs_common.props2
-rw-r--r--zenserver-test/zenserver-test.cpp402
-rw-r--r--zenserver/cache/structuredcache.cpp642
-rw-r--r--zenserver/cache/structuredcache.h5
-rw-r--r--zenserver/cache/structuredcachestore.cpp66
-rw-r--r--zenserver/cache/structuredcachestore.h85
-rw-r--r--zenserver/compute/apply.cpp107
-rw-r--r--zenserver/compute/apply.h20
-rw-r--r--zenserver/config.cpp16
-rw-r--r--zenserver/config.h8
-rw-r--r--zenserver/diag/formatters.h20
-rw-r--r--zenserver/upstream/jupiter.cpp264
-rw-r--r--zenserver/upstream/jupiter.h56
-rw-r--r--zenserver/upstream/upstreamapply.cpp1559
-rw-r--r--zenserver/upstream/upstreamapply.h172
-rw-r--r--zenserver/upstream/upstreamcache.cpp678
-rw-r--r--zenserver/upstream/upstreamcache.h74
-rw-r--r--zenserver/upstream/zen.cpp58
-rw-r--r--zenserver/upstream/zen.h19
-rw-r--r--zenserver/zenserver.cpp39
-rw-r--r--zenserver/zenserver.vcxproj2
-rw-r--r--zenserver/zenserver.vcxproj.filters6
-rw-r--r--zenutil/cache/cachekey.cpp9
-rw-r--r--zenutil/cache/cachepolicy.cpp167
-rw-r--r--zenutil/include/zenutil/cache/cache.h6
-rw-r--r--zenutil/include/zenutil/cache/cachekey.h83
-rw-r--r--zenutil/include/zenutil/cache/cachepolicy.h112
-rw-r--r--zenutil/zenutil.vcxproj5
-rw-r--r--zenutil/zenutil.vcxproj.filters20
148 files changed, 4193 insertions, 564 deletions
diff --git a/3rdparty/BLAKE3/.github/workflows/build_b3sum.py b/thirdparty/BLAKE3/.github/workflows/build_b3sum.py
index e487daf97..e487daf97 100644
--- a/3rdparty/BLAKE3/.github/workflows/build_b3sum.py
+++ b/thirdparty/BLAKE3/.github/workflows/build_b3sum.py
diff --git a/3rdparty/BLAKE3/.github/workflows/ci.yml b/thirdparty/BLAKE3/.github/workflows/ci.yml
index 464a411d5..464a411d5 100644
--- a/3rdparty/BLAKE3/.github/workflows/ci.yml
+++ b/thirdparty/BLAKE3/.github/workflows/ci.yml
diff --git a/3rdparty/BLAKE3/.github/workflows/tag.yml b/thirdparty/BLAKE3/.github/workflows/tag.yml
index 577d4f312..577d4f312 100644
--- a/3rdparty/BLAKE3/.github/workflows/tag.yml
+++ b/thirdparty/BLAKE3/.github/workflows/tag.yml
diff --git a/3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py b/thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py
index c1cbf518b..c1cbf518b 100644
--- a/3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py
+++ b/thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py
diff --git a/3rdparty/BLAKE3/.gitignore b/thirdparty/BLAKE3/.gitignore
index fa8d85ac5..fa8d85ac5 100644
--- a/3rdparty/BLAKE3/.gitignore
+++ b/thirdparty/BLAKE3/.gitignore
diff --git a/3rdparty/BLAKE3/CONTRIBUTING.md b/thirdparty/BLAKE3/CONTRIBUTING.md
index 3a605f255..3a605f255 100644
--- a/3rdparty/BLAKE3/CONTRIBUTING.md
+++ b/thirdparty/BLAKE3/CONTRIBUTING.md
diff --git a/3rdparty/BLAKE3/Cargo.toml b/thirdparty/BLAKE3/Cargo.toml
index 3df0fd279..3df0fd279 100644
--- a/3rdparty/BLAKE3/Cargo.toml
+++ b/thirdparty/BLAKE3/Cargo.toml
diff --git a/3rdparty/BLAKE3/LICENSE b/thirdparty/BLAKE3/LICENSE
index f5892efc3..f5892efc3 100644
--- a/3rdparty/BLAKE3/LICENSE
+++ b/thirdparty/BLAKE3/LICENSE
diff --git a/3rdparty/BLAKE3/README.md b/thirdparty/BLAKE3/README.md
index 360183668..360183668 100644
--- a/3rdparty/BLAKE3/README.md
+++ b/thirdparty/BLAKE3/README.md
diff --git a/3rdparty/BLAKE3/b3sum/Cargo.toml b/thirdparty/BLAKE3/b3sum/Cargo.toml
index 4678bee2d..4678bee2d 100644
--- a/3rdparty/BLAKE3/b3sum/Cargo.toml
+++ b/thirdparty/BLAKE3/b3sum/Cargo.toml
diff --git a/3rdparty/BLAKE3/b3sum/README.md b/thirdparty/BLAKE3/b3sum/README.md
index e97830b7c..e97830b7c 100644
--- a/3rdparty/BLAKE3/b3sum/README.md
+++ b/thirdparty/BLAKE3/b3sum/README.md
diff --git a/3rdparty/BLAKE3/b3sum/src/main.rs b/thirdparty/BLAKE3/b3sum/src/main.rs
index b01e5de58..b01e5de58 100644
--- a/3rdparty/BLAKE3/b3sum/src/main.rs
+++ b/thirdparty/BLAKE3/b3sum/src/main.rs
diff --git a/3rdparty/BLAKE3/b3sum/src/unit_tests.rs b/thirdparty/BLAKE3/b3sum/src/unit_tests.rs
index 1fa1a17dc..1fa1a17dc 100644
--- a/3rdparty/BLAKE3/b3sum/src/unit_tests.rs
+++ b/thirdparty/BLAKE3/b3sum/src/unit_tests.rs
diff --git a/3rdparty/BLAKE3/b3sum/tests/cli_tests.rs b/thirdparty/BLAKE3/b3sum/tests/cli_tests.rs
index 51fbbba98..51fbbba98 100644
--- a/3rdparty/BLAKE3/b3sum/tests/cli_tests.rs
+++ b/thirdparty/BLAKE3/b3sum/tests/cli_tests.rs
diff --git a/3rdparty/BLAKE3/b3sum/what_does_check_do.md b/thirdparty/BLAKE3/b3sum/what_does_check_do.md
index 3a44a0010..3a44a0010 100644
--- a/3rdparty/BLAKE3/b3sum/what_does_check_do.md
+++ b/thirdparty/BLAKE3/b3sum/what_does_check_do.md
diff --git a/3rdparty/BLAKE3/benches/bench.rs b/thirdparty/BLAKE3/benches/bench.rs
index ba5a4041f..ba5a4041f 100644
--- a/3rdparty/BLAKE3/benches/bench.rs
+++ b/thirdparty/BLAKE3/benches/bench.rs
diff --git a/3rdparty/BLAKE3/build.rs b/thirdparty/BLAKE3/build.rs
index ea657d8db..ea657d8db 100644
--- a/3rdparty/BLAKE3/build.rs
+++ b/thirdparty/BLAKE3/build.rs
diff --git a/3rdparty/BLAKE3/c/.gitignore b/thirdparty/BLAKE3/c/.gitignore
index 0bf608cee..0bf608cee 100644
--- a/3rdparty/BLAKE3/c/.gitignore
+++ b/thirdparty/BLAKE3/c/.gitignore
diff --git a/3rdparty/BLAKE3/c/Makefile.testing b/thirdparty/BLAKE3/c/Makefile.testing
index 41e6b8285..41e6b8285 100644
--- a/3rdparty/BLAKE3/c/Makefile.testing
+++ b/thirdparty/BLAKE3/c/Makefile.testing
diff --git a/3rdparty/BLAKE3/c/README.md b/thirdparty/BLAKE3/c/README.md
index 5e8b4e682..5e8b4e682 100644
--- a/3rdparty/BLAKE3/c/README.md
+++ b/thirdparty/BLAKE3/c/README.md
diff --git a/3rdparty/BLAKE3/c/blake3.c b/thirdparty/BLAKE3/c/blake3.c
index 7abf5324e..7abf5324e 100644
--- a/3rdparty/BLAKE3/c/blake3.c
+++ b/thirdparty/BLAKE3/c/blake3.c
diff --git a/3rdparty/BLAKE3/c/blake3.h b/thirdparty/BLAKE3/c/blake3.h
index 57ebd5adc..57ebd5adc 100644
--- a/3rdparty/BLAKE3/c/blake3.h
+++ b/thirdparty/BLAKE3/c/blake3.h
diff --git a/3rdparty/BLAKE3/c/blake3_avx2.c b/thirdparty/BLAKE3/c/blake3_avx2.c
index c5a2ce9e2..c5a2ce9e2 100644
--- a/3rdparty/BLAKE3/c/blake3_avx2.c
+++ b/thirdparty/BLAKE3/c/blake3_avx2.c
diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S
index 812bb8568..812bb8568 100644
--- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S
+++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S
diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S
index bb58d2ae6..bb58d2ae6 100644
--- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S
+++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S
diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm
index 352298edd..352298edd 100644
--- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm
+++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm
diff --git a/3rdparty/BLAKE3/c/blake3_avx512.c b/thirdparty/BLAKE3/c/blake3_avx512.c
index 77a5c385c..77a5c385c 100644
--- a/3rdparty/BLAKE3/c/blake3_avx512.c
+++ b/thirdparty/BLAKE3/c/blake3_avx512.c
diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S
index a06aede0f..a06aede0f 100644
--- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S
+++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S
diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S
index e10b9f36c..e10b9f36c 100644
--- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S
+++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S
diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm
index b19efbaae..b19efbaae 100644
--- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm
+++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml
index 2052c7458..2052c7458 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md
index c44726b90..c44726b90 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs
index 119bd2064..119bd2064 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs
index d5dc47a81..d5dc47a81 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh
index 94d50affb..94d50affb 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs
index f18fe123f..f18fe123f 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs
diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs
index b989ae9c4..b989ae9c4 100644
--- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs
+++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs
diff --git a/3rdparty/BLAKE3/c/blake3_dispatch.c b/thirdparty/BLAKE3/c/blake3_dispatch.c
index 6518478e5..6518478e5 100644
--- a/3rdparty/BLAKE3/c/blake3_dispatch.c
+++ b/thirdparty/BLAKE3/c/blake3_dispatch.c
diff --git a/3rdparty/BLAKE3/c/blake3_impl.h b/thirdparty/BLAKE3/c/blake3_impl.h
index 86ab6aa25..86ab6aa25 100644
--- a/3rdparty/BLAKE3/c/blake3_impl.h
+++ b/thirdparty/BLAKE3/c/blake3_impl.h
diff --git a/3rdparty/BLAKE3/c/blake3_neon.c b/thirdparty/BLAKE3/c/blake3_neon.c
index 46691f526..46691f526 100644
--- a/3rdparty/BLAKE3/c/blake3_neon.c
+++ b/thirdparty/BLAKE3/c/blake3_neon.c
diff --git a/3rdparty/BLAKE3/c/blake3_portable.c b/thirdparty/BLAKE3/c/blake3_portable.c
index 062dd1b47..062dd1b47 100644
--- a/3rdparty/BLAKE3/c/blake3_portable.c
+++ b/thirdparty/BLAKE3/c/blake3_portable.c
diff --git a/3rdparty/BLAKE3/c/blake3_sse2.c b/thirdparty/BLAKE3/c/blake3_sse2.c
index 159296688..159296688 100644
--- a/3rdparty/BLAKE3/c/blake3_sse2.c
+++ b/thirdparty/BLAKE3/c/blake3_sse2.c
diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S
index d144046ab..d144046ab 100644
--- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S
+++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S
diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S
index 494c0c6fd..494c0c6fd 100644
--- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S
+++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S
diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm
index 72deb7bbc..72deb7bbc 100644
--- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm
+++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm
diff --git a/3rdparty/BLAKE3/c/blake3_sse41.c b/thirdparty/BLAKE3/c/blake3_sse41.c
index b31122533..b31122533 100644
--- a/3rdparty/BLAKE3/c/blake3_sse41.c
+++ b/thirdparty/BLAKE3/c/blake3_sse41.c
diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S
index a3ff64269..a3ff64269 100644
--- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S
+++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S
diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S
index 60d0a4042..60d0a4042 100644
--- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S
+++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S
diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm
index 87001e4d3..87001e4d3 100644
--- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm
+++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm
diff --git a/3rdparty/BLAKE3/c/example.c b/thirdparty/BLAKE3/c/example.c
index 02fe3c32b..02fe3c32b 100644
--- a/3rdparty/BLAKE3/c/example.c
+++ b/thirdparty/BLAKE3/c/example.c
diff --git a/3rdparty/BLAKE3/c/main.c b/thirdparty/BLAKE3/c/main.c
index 9b8a436f3..9b8a436f3 100644
--- a/3rdparty/BLAKE3/c/main.c
+++ b/thirdparty/BLAKE3/c/main.c
diff --git a/3rdparty/BLAKE3/c/test.py b/thirdparty/BLAKE3/c/test.py
index b0b192950..b0b192950 100644
--- a/3rdparty/BLAKE3/c/test.py
+++ b/thirdparty/BLAKE3/c/test.py
diff --git a/3rdparty/BLAKE3/lib/Linux_x64/libblake3.a b/thirdparty/BLAKE3/lib/Linux_x64/libblake3.a
index b956e22cb..b956e22cb 100644
--- a/3rdparty/BLAKE3/lib/Linux_x64/libblake3.a
+++ b/thirdparty/BLAKE3/lib/Linux_x64/libblake3.a
Binary files differ
diff --git a/3rdparty/BLAKE3/lib/Win64/BLAKE3.lib b/thirdparty/BLAKE3/lib/Win64/BLAKE3.lib
index 1308d9928..1308d9928 100644
--- a/3rdparty/BLAKE3/lib/Win64/BLAKE3.lib
+++ b/thirdparty/BLAKE3/lib/Win64/BLAKE3.lib
Binary files differ
diff --git a/3rdparty/BLAKE3/media/B3.svg b/thirdparty/BLAKE3/media/B3.svg
index a50da0ce9..a50da0ce9 100644
--- a/3rdparty/BLAKE3/media/B3.svg
+++ b/thirdparty/BLAKE3/media/B3.svg
diff --git a/3rdparty/BLAKE3/media/BLAKE3.svg b/thirdparty/BLAKE3/media/BLAKE3.svg
index 2d50c2d3b..2d50c2d3b 100644
--- a/3rdparty/BLAKE3/media/BLAKE3.svg
+++ b/thirdparty/BLAKE3/media/BLAKE3.svg
diff --git a/3rdparty/BLAKE3/media/speed.svg b/thirdparty/BLAKE3/media/speed.svg
index 7bd65ca3c..7bd65ca3c 100644
--- a/3rdparty/BLAKE3/media/speed.svg
+++ b/thirdparty/BLAKE3/media/speed.svg
diff --git a/3rdparty/BLAKE3/reference_impl/Cargo.toml b/thirdparty/BLAKE3/reference_impl/Cargo.toml
index 8c81e5ad9..8c81e5ad9 100644
--- a/3rdparty/BLAKE3/reference_impl/Cargo.toml
+++ b/thirdparty/BLAKE3/reference_impl/Cargo.toml
diff --git a/3rdparty/BLAKE3/reference_impl/README.md b/thirdparty/BLAKE3/reference_impl/README.md
index 941fafd72..941fafd72 100644
--- a/3rdparty/BLAKE3/reference_impl/README.md
+++ b/thirdparty/BLAKE3/reference_impl/README.md
diff --git a/3rdparty/BLAKE3/reference_impl/reference_impl.rs b/thirdparty/BLAKE3/reference_impl/reference_impl.rs
index 248834319..248834319 100644
--- a/3rdparty/BLAKE3/reference_impl/reference_impl.rs
+++ b/thirdparty/BLAKE3/reference_impl/reference_impl.rs
diff --git a/3rdparty/BLAKE3/src/ffi_avx2.rs b/thirdparty/BLAKE3/src/ffi_avx2.rs
index d805e868e..d805e868e 100644
--- a/3rdparty/BLAKE3/src/ffi_avx2.rs
+++ b/thirdparty/BLAKE3/src/ffi_avx2.rs
diff --git a/3rdparty/BLAKE3/src/ffi_avx512.rs b/thirdparty/BLAKE3/src/ffi_avx512.rs
index c1b9f649b..c1b9f649b 100644
--- a/3rdparty/BLAKE3/src/ffi_avx512.rs
+++ b/thirdparty/BLAKE3/src/ffi_avx512.rs
diff --git a/3rdparty/BLAKE3/src/ffi_neon.rs b/thirdparty/BLAKE3/src/ffi_neon.rs
index 889974277..889974277 100644
--- a/3rdparty/BLAKE3/src/ffi_neon.rs
+++ b/thirdparty/BLAKE3/src/ffi_neon.rs
diff --git a/3rdparty/BLAKE3/src/ffi_sse2.rs b/thirdparty/BLAKE3/src/ffi_sse2.rs
index c49a229ad..c49a229ad 100644
--- a/3rdparty/BLAKE3/src/ffi_sse2.rs
+++ b/thirdparty/BLAKE3/src/ffi_sse2.rs
diff --git a/3rdparty/BLAKE3/src/ffi_sse41.rs b/thirdparty/BLAKE3/src/ffi_sse41.rs
index 0b64c90a0..0b64c90a0 100644
--- a/3rdparty/BLAKE3/src/ffi_sse41.rs
+++ b/thirdparty/BLAKE3/src/ffi_sse41.rs
diff --git a/3rdparty/BLAKE3/src/guts.rs b/thirdparty/BLAKE3/src/guts.rs
index 88dcc86cd..88dcc86cd 100644
--- a/3rdparty/BLAKE3/src/guts.rs
+++ b/thirdparty/BLAKE3/src/guts.rs
diff --git a/3rdparty/BLAKE3/src/join.rs b/thirdparty/BLAKE3/src/join.rs
index 60932db1c..60932db1c 100644
--- a/3rdparty/BLAKE3/src/join.rs
+++ b/thirdparty/BLAKE3/src/join.rs
diff --git a/3rdparty/BLAKE3/src/lib.rs b/thirdparty/BLAKE3/src/lib.rs
index bf66b6dae..bf66b6dae 100644
--- a/3rdparty/BLAKE3/src/lib.rs
+++ b/thirdparty/BLAKE3/src/lib.rs
diff --git a/3rdparty/BLAKE3/src/platform.rs b/thirdparty/BLAKE3/src/platform.rs
index 4bd67de7a..4bd67de7a 100644
--- a/3rdparty/BLAKE3/src/platform.rs
+++ b/thirdparty/BLAKE3/src/platform.rs
diff --git a/3rdparty/BLAKE3/src/portable.rs b/thirdparty/BLAKE3/src/portable.rs
index 0a569cec7..0a569cec7 100644
--- a/3rdparty/BLAKE3/src/portable.rs
+++ b/thirdparty/BLAKE3/src/portable.rs
diff --git a/3rdparty/BLAKE3/src/rust_avx2.rs b/thirdparty/BLAKE3/src/rust_avx2.rs
index 6ab773ad4..6ab773ad4 100644
--- a/3rdparty/BLAKE3/src/rust_avx2.rs
+++ b/thirdparty/BLAKE3/src/rust_avx2.rs
diff --git a/3rdparty/BLAKE3/src/rust_sse2.rs b/thirdparty/BLAKE3/src/rust_sse2.rs
index 15b52ee5d..15b52ee5d 100644
--- a/3rdparty/BLAKE3/src/rust_sse2.rs
+++ b/thirdparty/BLAKE3/src/rust_sse2.rs
diff --git a/3rdparty/BLAKE3/src/rust_sse41.rs b/thirdparty/BLAKE3/src/rust_sse41.rs
index d5cf0f4a9..d5cf0f4a9 100644
--- a/3rdparty/BLAKE3/src/rust_sse41.rs
+++ b/thirdparty/BLAKE3/src/rust_sse41.rs
diff --git a/3rdparty/BLAKE3/src/test.rs b/thirdparty/BLAKE3/src/test.rs
index eefb1a354..eefb1a354 100644
--- a/3rdparty/BLAKE3/src/test.rs
+++ b/thirdparty/BLAKE3/src/test.rs
diff --git a/3rdparty/BLAKE3/src/traits.rs b/thirdparty/BLAKE3/src/traits.rs
index 9704e0106..9704e0106 100644
--- a/3rdparty/BLAKE3/src/traits.rs
+++ b/thirdparty/BLAKE3/src/traits.rs
diff --git a/3rdparty/BLAKE3/test_vectors/Cargo.toml b/thirdparty/BLAKE3/test_vectors/Cargo.toml
index cd74a9df0..cd74a9df0 100644
--- a/3rdparty/BLAKE3/test_vectors/Cargo.toml
+++ b/thirdparty/BLAKE3/test_vectors/Cargo.toml
diff --git a/3rdparty/BLAKE3/test_vectors/cross_test.sh b/thirdparty/BLAKE3/test_vectors/cross_test.sh
index c4d280c9d..c4d280c9d 100644
--- a/3rdparty/BLAKE3/test_vectors/cross_test.sh
+++ b/thirdparty/BLAKE3/test_vectors/cross_test.sh
diff --git a/3rdparty/BLAKE3/test_vectors/src/lib.rs b/thirdparty/BLAKE3/test_vectors/src/lib.rs
index 04460f668..04460f668 100644
--- a/3rdparty/BLAKE3/test_vectors/src/lib.rs
+++ b/thirdparty/BLAKE3/test_vectors/src/lib.rs
diff --git a/3rdparty/BLAKE3/test_vectors/test_vectors.json b/thirdparty/BLAKE3/test_vectors/test_vectors.json
index f6da91792..f6da91792 100644
--- a/3rdparty/BLAKE3/test_vectors/test_vectors.json
+++ b/thirdparty/BLAKE3/test_vectors/test_vectors.json
diff --git a/3rdparty/BLAKE3/tools/compiler_version/Cargo.toml b/thirdparty/BLAKE3/tools/compiler_version/Cargo.toml
index 1046cf29d..1046cf29d 100644
--- a/3rdparty/BLAKE3/tools/compiler_version/Cargo.toml
+++ b/thirdparty/BLAKE3/tools/compiler_version/Cargo.toml
diff --git a/3rdparty/BLAKE3/tools/compiler_version/build.rs b/thirdparty/BLAKE3/tools/compiler_version/build.rs
index 3e14ebe67..3e14ebe67 100644
--- a/3rdparty/BLAKE3/tools/compiler_version/build.rs
+++ b/thirdparty/BLAKE3/tools/compiler_version/build.rs
diff --git a/3rdparty/BLAKE3/tools/compiler_version/src/main.rs b/thirdparty/BLAKE3/tools/compiler_version/src/main.rs
index 767cb31bd..767cb31bd 100644
--- a/3rdparty/BLAKE3/tools/compiler_version/src/main.rs
+++ b/thirdparty/BLAKE3/tools/compiler_version/src/main.rs
diff --git a/3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml b/thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml
index 9e30174a9..9e30174a9 100644
--- a/3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml
+++ b/thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml
diff --git a/3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs b/thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs
index 6b509b053..6b509b053 100644
--- a/3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs
+++ b/thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs
diff --git a/3rdparty/Oodle/include/oodle2.h b/thirdparty/Oodle/include/oodle2.h
index 31204e932..31204e932 100644
--- a/3rdparty/Oodle/include/oodle2.h
+++ b/thirdparty/Oodle/include/oodle2.h
diff --git a/3rdparty/Oodle/include/oodle2base.h b/thirdparty/Oodle/include/oodle2base.h
index 05f73f3ea..05f73f3ea 100644
--- a/3rdparty/Oodle/include/oodle2base.h
+++ b/thirdparty/Oodle/include/oodle2base.h
diff --git a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a
index dee0353e5..dee0353e5 100755
--- a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a
+++ b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a
Binary files differ
diff --git a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8
index 425ada44d..425ada44d 100755
--- a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8
+++ b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8
Binary files differ
diff --git a/3rdparty/Oodle/lib/Win64/oo2core_win64.lib b/thirdparty/Oodle/lib/Win64/oo2core_win64.lib
index ae42f727a..ae42f727a 100644
--- a/3rdparty/Oodle/lib/Win64/oo2core_win64.lib
+++ b/thirdparty/Oodle/lib/Win64/oo2core_win64.lib
Binary files differ
diff --git a/thirdparty/licenses/README.md b/thirdparty/licenses/README.md
new file mode 100644
index 000000000..a4d8e1c97
--- /dev/null
+++ b/thirdparty/licenses/README.md
@@ -0,0 +1,3 @@
+# Placeholder
+
+Third party library license information goes here at some point in the near future.
diff --git a/3rdparty/utfcpp/.circleci/config.yml b/thirdparty/utfcpp/.circleci/config.yml
index b2cbdaf99..b2cbdaf99 100644
--- a/3rdparty/utfcpp/.circleci/config.yml
+++ b/thirdparty/utfcpp/.circleci/config.yml
diff --git a/3rdparty/utfcpp/.gitignore b/thirdparty/utfcpp/.gitignore
index 488d51dd9..488d51dd9 100644
--- a/3rdparty/utfcpp/.gitignore
+++ b/thirdparty/utfcpp/.gitignore
diff --git a/3rdparty/utfcpp/.gitmodules b/thirdparty/utfcpp/.gitmodules
index d2ac8470d..d2ac8470d 100644
--- a/3rdparty/utfcpp/.gitmodules
+++ b/thirdparty/utfcpp/.gitmodules
diff --git a/3rdparty/utfcpp/CMakeLists.txt b/thirdparty/utfcpp/CMakeLists.txt
index 4e63087bc..4e63087bc 100644
--- a/3rdparty/utfcpp/CMakeLists.txt
+++ b/thirdparty/utfcpp/CMakeLists.txt
diff --git a/3rdparty/utfcpp/LICENSE b/thirdparty/utfcpp/LICENSE
index 36b7cd93c..36b7cd93c 100644
--- a/3rdparty/utfcpp/LICENSE
+++ b/thirdparty/utfcpp/LICENSE
diff --git a/3rdparty/utfcpp/README.md b/thirdparty/utfcpp/README.md
index 0c689cf12..0c689cf12 100644
--- a/3rdparty/utfcpp/README.md
+++ b/thirdparty/utfcpp/README.md
diff --git a/3rdparty/utfcpp/samples/docsample.cpp b/thirdparty/utfcpp/samples/docsample.cpp
index 653388725..653388725 100644
--- a/3rdparty/utfcpp/samples/docsample.cpp
+++ b/thirdparty/utfcpp/samples/docsample.cpp
diff --git a/3rdparty/utfcpp/source/utf8.h b/thirdparty/utfcpp/source/utf8.h
index 82b13f59f..82b13f59f 100644
--- a/3rdparty/utfcpp/source/utf8.h
+++ b/thirdparty/utfcpp/source/utf8.h
diff --git a/3rdparty/utfcpp/source/utf8/checked.h b/thirdparty/utfcpp/source/utf8/checked.h
index 71b9076f6..71b9076f6 100644
--- a/3rdparty/utfcpp/source/utf8/checked.h
+++ b/thirdparty/utfcpp/source/utf8/checked.h
diff --git a/3rdparty/utfcpp/source/utf8/core.h b/thirdparty/utfcpp/source/utf8/core.h
index de6199f2a..de6199f2a 100644
--- a/3rdparty/utfcpp/source/utf8/core.h
+++ b/thirdparty/utfcpp/source/utf8/core.h
diff --git a/3rdparty/utfcpp/source/utf8/cpp11.h b/thirdparty/utfcpp/source/utf8/cpp11.h
index d93961b04..d93961b04 100644
--- a/3rdparty/utfcpp/source/utf8/cpp11.h
+++ b/thirdparty/utfcpp/source/utf8/cpp11.h
diff --git a/3rdparty/utfcpp/source/utf8/unchecked.h b/thirdparty/utfcpp/source/utf8/unchecked.h
index 0e1b51cc7..0e1b51cc7 100644
--- a/3rdparty/utfcpp/source/utf8/unchecked.h
+++ b/thirdparty/utfcpp/source/utf8/unchecked.h
diff --git a/3rdparty/utfcpp/tests/CMakeLists.txt b/thirdparty/utfcpp/tests/CMakeLists.txt
index 06e0d7e9c..06e0d7e9c 100644
--- a/3rdparty/utfcpp/tests/CMakeLists.txt
+++ b/thirdparty/utfcpp/tests/CMakeLists.txt
diff --git a/3rdparty/utfcpp/tests/docker/Dockerfile b/thirdparty/utfcpp/tests/docker/Dockerfile
index 125a26936..125a26936 100644
--- a/3rdparty/utfcpp/tests/docker/Dockerfile
+++ b/thirdparty/utfcpp/tests/docker/Dockerfile
diff --git a/3rdparty/utfcpp/tests/negative.cpp b/thirdparty/utfcpp/tests/negative.cpp
index f1bcc993e..f1bcc993e 100644
--- a/3rdparty/utfcpp/tests/negative.cpp
+++ b/thirdparty/utfcpp/tests/negative.cpp
diff --git a/3rdparty/utfcpp/tests/test_checked_api.cpp b/thirdparty/utfcpp/tests/test_checked_api.cpp
index 6787da62e..6787da62e 100644
--- a/3rdparty/utfcpp/tests/test_checked_api.cpp
+++ b/thirdparty/utfcpp/tests/test_checked_api.cpp
diff --git a/3rdparty/utfcpp/tests/test_checked_iterator.cpp b/thirdparty/utfcpp/tests/test_checked_iterator.cpp
index 4c44834fd..4c44834fd 100644
--- a/3rdparty/utfcpp/tests/test_checked_iterator.cpp
+++ b/thirdparty/utfcpp/tests/test_checked_iterator.cpp
diff --git a/3rdparty/utfcpp/tests/test_cpp11.cpp b/thirdparty/utfcpp/tests/test_cpp11.cpp
index edcff9d31..edcff9d31 100644
--- a/3rdparty/utfcpp/tests/test_cpp11.cpp
+++ b/thirdparty/utfcpp/tests/test_cpp11.cpp
diff --git a/3rdparty/utfcpp/tests/test_data/utf8_invalid.txt b/thirdparty/utfcpp/tests/test_data/utf8_invalid.txt
index ae8315932..ae8315932 100644
--- a/3rdparty/utfcpp/tests/test_data/utf8_invalid.txt
+++ b/thirdparty/utfcpp/tests/test_data/utf8_invalid.txt
Binary files differ
diff --git a/3rdparty/utfcpp/tests/test_unchecked_api.cpp b/thirdparty/utfcpp/tests/test_unchecked_api.cpp
index e9f19ca6c..e9f19ca6c 100644
--- a/3rdparty/utfcpp/tests/test_unchecked_api.cpp
+++ b/thirdparty/utfcpp/tests/test_unchecked_api.cpp
diff --git a/3rdparty/utfcpp/tests/test_unchecked_iterator.cpp b/thirdparty/utfcpp/tests/test_unchecked_iterator.cpp
index 103e8e28a..103e8e28a 100644
--- a/3rdparty/utfcpp/tests/test_unchecked_iterator.cpp
+++ b/thirdparty/utfcpp/tests/test_unchecked_iterator.cpp
diff --git a/zen.sln b/zen.sln
index dec613b24..c1bced170 100644
--- a/zen.sln
+++ b/zen.sln
@@ -33,7 +33,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zen", "zen\zen.vcxproj", "{
{D75BF9AB-C61E-4FFF-AD59-1563430F05E2} = {D75BF9AB-C61E-4FFF-AD59-1563430F05E2}
EndProjectSection
EndProject
-Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3rdparty", "3rdparty", "{3CB3B9E8-B4CB-4D2E-821A-2AFE34093BEF}"
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "thirdparty", "thirdparty", "{3CB3B9E8-B4CB-4D2E-821A-2AFE34093BEF}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zenserver-test", "zenserver-test\zenserver-test.vcxproj", "{2563249E-E695-4CC4-8FFA-335D07680C9D}"
ProjectSection(ProjectDependencies) = postProject
diff --git a/zencore/blake3.cpp b/zencore/blake3.cpp
index 663f21b6d..8f8952271 100644
--- a/zencore/blake3.cpp
+++ b/zencore/blake3.cpp
@@ -7,7 +7,7 @@
#include <zencore/testing.h>
#include <zencore/zencore.h>
-#include "../3rdparty/BLAKE3/c/blake3.h"
+#include "../thirdparty/BLAKE3/c/blake3.h"
#pragma comment(lib, "blake3.lib")
#include <string.h>
diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp
index aafb365f3..c6bf38b04 100644
--- a/zencore/compactbinary.cpp
+++ b/zencore/compactbinary.cpp
@@ -1854,11 +1854,11 @@ TEST_CASE("uson.json")
<< "ValueTwo";
CbObject Obj = Writer.Save();
- StringBuilder<128> Sb;
- const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+ StringBuilder<128> Sb;
+ const char* JsonText = Obj.ToJson(Sb).Data();
std::string JsonError;
- json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+ json11::Json Json = json11::Json::parse(JsonText, JsonError);
const std::string ValueOne = Json["KeyOne"].string_value();
const std::string ValueTwo = Json["KeyTwo"].string_value();
@@ -1879,11 +1879,11 @@ TEST_CASE("uson.json")
CbObject Obj = Writer.Save();
- StringBuilder<128> Sb;
- const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+ StringBuilder<128> Sb;
+ const char* JsonText = Obj.ToJson(Sb).Data();
std::string JsonError;
- json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+ json11::Json Json = json11::Json::parse(JsonText, JsonError);
const float FloatValue = float(Json["Float"].number_value());
const double DoubleValue = Json["Double"].number_value();
@@ -1904,11 +1904,11 @@ TEST_CASE("uson.json")
CbObject Obj = Writer.Save();
- StringBuilder<128> Sb;
- const std::string_view JsonText = Obj.ToJson(Sb).ToView();
+ StringBuilder<128> Sb;
+ const char* JsonText = Obj.ToJson(Sb).Data();
std::string JsonError;
- json11::Json Json = json11::Json::parse(JsonText.data(), JsonError);
+ json11::Json Json = json11::Json::parse(JsonText, JsonError);
const double FloatValue = Json["FloatNan"].number_value();
const double DoubleValue = Json["DoubleNan"].number_value();
diff --git a/zencore/compress.cpp b/zencore/compress.cpp
index dd6484a3c..35a5acb3a 100644
--- a/zencore/compress.cpp
+++ b/zencore/compress.cpp
@@ -8,7 +8,7 @@
#include <zencore/endian.h>
#include <zencore/testing.h>
-#include "../3rdparty/Oodle/include/oodle2.h"
+#include "../thirdparty/Oodle/include/oodle2.h"
#if ZEN_PLATFORM_WINDOWS
# pragma comment(lib, "oo2core_win64.lib")
#endif
@@ -693,6 +693,11 @@ ValidBufferOrEmpty(BufferType&& CompressedData)
CompositeBuffer
CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize)
{
+ if (Header.TotalRawSize < RawOffset + RawSize)
+ {
+ return CompositeBuffer();
+ }
+
if (Header.Method == CompressionMethod::None)
{
UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(RawSize);
@@ -862,9 +867,11 @@ CompressedBuffer
CompressedBuffer::CopyRange(uint64_t RawOffset, uint64_t RawSize) const
{
using namespace detail;
- const BufferHeader Header = BufferHeader::Read(CompressedData);
- CompressedBuffer Range;
- Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, RawSize);
+ const BufferHeader Header = BufferHeader::Read(CompressedData);
+ const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset;
+
+ CompressedBuffer Range;
+ Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, TotalRawSize);
return Range;
}
diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h
index 88a72cbba..04b3b33dd 100644
--- a/zencore/include/zencore/iobuffer.h
+++ b/zencore/include/zencore/iobuffer.h
@@ -382,6 +382,7 @@ public:
ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull);
ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer);
inline static IoBuffer MakeCloneFromMemory(const void* Ptr, size_t Sz) { return IoBuffer(IoBuffer::Clone, Ptr, Sz); }
+ inline static IoBuffer MakeCloneFromMemory(MemoryView Memory) { return IoBuffer(IoBuffer::Clone, Memory.GetData(), Memory.GetSize()); }
};
IoHash HashBuffer(IoBuffer& Buffer);
diff --git a/zencore/memory.cpp b/zencore/memory.cpp
index 14ea7ca1d..c94829276 100644
--- a/zencore/memory.cpp
+++ b/zencore/memory.cpp
@@ -186,13 +186,13 @@ TEST_CASE("MemoryView")
{
{
uint8_t Array1[16] = {};
- MemoryView View1 = MakeMemoryView(Array1);
+ MemoryView View1 = MakeMemoryView(Array1);
CHECK(View1.GetSize() == 16);
}
{
uint32_t Array2[16] = {};
- MemoryView View2 = MakeMemoryView(Array2);
+ MemoryView View2 = MakeMemoryView(Array2);
CHECK(View2.GetSize() == 64);
}
diff --git a/zencore/xmake.lua b/zencore/xmake.lua
index 5de7f476d..d26a9f922 100644
--- a/zencore/xmake.lua
+++ b/zencore/xmake.lua
@@ -2,13 +2,15 @@ target('zencore')
set_kind("static")
add_files("**.cpp")
add_includedirs("include", {public=true})
- add_includedirs("..\\3rdparty\\utfcpp\\source")
- add_linkdirs("$(projectdir)/3rdparty/BLAKE3/lib/Win64", "$(projectdir)/3rdparty/Oodle/lib/Win64")
+ add_includedirs("..\\thirdparty\\utfcpp\\source")
+ add_linkdirs("$(projectdir)/thirdparty/BLAKE3/lib/Win64", "$(projectdir)/thirdparty/Oodle/lib/Win64")
add_packages(
"vcpkg::spdlog",
"vcpkg::fmt",
"vcpkg::doctest",
+ "vcpkg::json11",
"vcpkg::lz4",
+ "vcpkg::mimalloc",
"vcpkg::cpr",
"vcpkg::curl", -- required by cpr
"vcpkg::zlib", -- required by curl
diff --git a/zencore/zencore.vcxproj b/zencore/zencore.vcxproj
index 95b9eace5..49e959b96 100644
--- a/zencore/zencore.vcxproj
+++ b/zencore/zencore.vcxproj
@@ -74,7 +74,7 @@
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
- <AdditionalIncludeDirectories>.\include;..\3rdparty\utfcpp\source</AdditionalIncludeDirectories>
+ <AdditionalIncludeDirectories>.\include;..\thirdparty\utfcpp\source</AdditionalIncludeDirectories>
<DebugInformationFormat>ProgramDatabase</DebugInformationFormat>
<LanguageStandard>stdcpplatest</LanguageStandard>
<TreatWarningAsError>true</TreatWarningAsError>
@@ -95,7 +95,7 @@
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
- <AdditionalIncludeDirectories>.\include;..\3rdparty\utfcpp\source</AdditionalIncludeDirectories>
+ <AdditionalIncludeDirectories>.\include;..\thirdparty\utfcpp\source</AdditionalIncludeDirectories>
<WholeProgramOptimization>false</WholeProgramOptimization>
<LanguageStandard>stdcpplatest</LanguageStandard>
<TreatWarningAsError>true</TreatWarningAsError>
diff --git a/zenfs_common.props b/zenfs_common.props
index 67894b9e8..3d1b5e9c8 100644
--- a/zenfs_common.props
+++ b/zenfs_common.props
@@ -3,7 +3,7 @@
<ImportGroup Label="PropertySheets" />
<PropertyGroup Label="UserMacros" />
<PropertyGroup>
- <LibraryPath>$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64);$(SolutionDir)\3rdparty\BLAKE3\lib\Win64;$(SolutionDir)\3rdparty\Oodle\lib\Win64</LibraryPath>
+ <LibraryPath>$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64);$(SolutionDir)\thirdparty\BLAKE3\lib\Win64;$(SolutionDir)\thirdparty\Oodle\lib\Win64</LibraryPath>
</PropertyGroup>
<ItemDefinitionGroup>
<ClCompile>
diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp
index 48600bb36..b3a4348f0 100644
--- a/zenserver-test/zenserver-test.cpp
+++ b/zenserver-test/zenserver-test.cpp
@@ -20,6 +20,7 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpshared.h>
#include <zenhttp/zenhttp.h>
+#include <zenutil/cache/cache.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_USE_MIMALLOC
@@ -466,7 +467,9 @@ using namespace std::literals;
class full_test_formatter final : public spdlog::formatter
{
public:
- full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {}
+ full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId)
+ {
+ }
virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); }
@@ -991,7 +994,7 @@ TEST_CASE("project.basic")
}
}
- BaseUri << "/oplog/ps5";
+ BaseUri << "/oplog/foobar";
{
{
@@ -1007,7 +1010,7 @@ TEST_CASE("project.basic")
zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView();
- CHECK(ResponseObject["id"].AsString() == "ps5"sv);
+ CHECK(ResponseObject["id"].AsString() == "foobar"sv);
CHECK(ResponseObject["project"].AsString() == "test"sv);
}
}
@@ -1029,11 +1032,13 @@ TEST_CASE("project.basic")
"00010000"};
auto FileOid = zen::Oid::FromHexString(ChunkId);
+ std::filesystem::path ReliablePath = zen::GetRunningExecutablePath();
+
OpWriter.BeginArray("files");
OpWriter.BeginObject();
OpWriter << "id" << FileOid;
- OpWriter << "clientpath" << __FILE__;
- OpWriter << "serverpath" << __FILE__;
+ OpWriter << "clientpath" << ReliablePath.c_str();
+ OpWriter << "serverpath" << ReliablePath.c_str();
OpWriter.EndObject();
OpWriter.EndArray();
@@ -1108,6 +1113,45 @@ TEST_CASE("project.pipe")
}
# endif
+namespace utils {
+
+ struct ZenConfig
+ {
+ std::filesystem::path DataDir;
+ uint16_t Port;
+ std::string BaseUri;
+ std::string Args;
+
+ static ZenConfig New(uint16_t Port = 13337, std::string Args = "")
+ {
+ return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(),
+ .Port = Port,
+ .BaseUri = "http://localhost:{}/z$"_format(Port),
+ .Args = std::move(Args)};
+ }
+
+ static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
+ {
+ return New(13337, "--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort));
+ }
+
+ void Spawn(ZenServerInstance& Inst)
+ {
+ Inst.SetTestDir(DataDir);
+ Inst.SpawnServer(Port, Args);
+ Inst.WaitUntilReady();
+ }
+ };
+
+ void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg)
+ {
+ Server.SetTestDir(Cfg.DataDir);
+ Server.SpawnServer(Cfg.Port, Cfg.Args);
+ Server.WaitUntilReady();
+ }
+
+} // namespace utils
+
TEST_CASE("zcache.basic")
{
using namespace std::literals;
@@ -1414,34 +1458,7 @@ TEST_CASE("zcache.cbpackage")
TEST_CASE("zcache.policy")
{
using namespace std::literals;
-
- struct ZenConfig
- {
- std::filesystem::path DataDir;
- uint16_t Port;
- std::string BaseUri;
- std::string Args;
-
- static ZenConfig New(uint16_t Port = 13337, std::string Args = "")
- {
- return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(),
- .Port = Port,
- .BaseUri = "http://localhost:{}/z$"_format(Port),
- .Args = std::move(Args)};
- }
-
- static ZenConfig NewWithUpstream(uint16_t UpstreamPort)
- {
- return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort));
- }
-
- void Spawn(ZenServerInstance& Inst)
- {
- Inst.SetTestDir(DataDir);
- Inst.SpawnServer(Port, Args);
- Inst.WaitUntilReady();
- }
- };
+ using namespace utils;
auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer {
auto Buf = zen::UniqueBuffer::Alloc(Size);
@@ -1705,14 +1722,15 @@ TEST_CASE("zcache.policy")
LocalCfg.Spawn(LocalInst);
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(OriginalPackage);
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
// Store package locally
{
- CHECK(OriginalPackage.GetAttachments().size() != 0);
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ CHECK(Package.GetAttachments().size() != 0);
cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
@@ -1765,14 +1783,15 @@ TEST_CASE("zcache.policy")
UpstreamCfg.Spawn(UpstreamInst);
LocalCfg.Spawn(LocalInst);
- zen::IoHash Key;
- zen::IoHash PayloadId;
- zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId);
- auto Buf = ToBuffer(OriginalPackage);
+ zen::IoHash Key;
+ zen::IoHash PayloadId;
// Store package upstream
{
- CHECK(OriginalPackage.GetAttachments().size() != 0);
+ zen::CbPackage Package = GeneratePackage(Key, PayloadId);
+ auto Buf = ToBuffer(Package);
+
+ CHECK(Package.GetAttachments().size() != 0);
cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)},
cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()},
cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
@@ -1888,6 +1907,303 @@ TEST_CASE("zcache.policy")
}
}
+TEST_CASE("zcache.rpc")
+{
+ using namespace std::literals;
+
+ auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage {
+ std::vector<uint8_t> Data;
+ Data.resize(PayloadSize);
+ for (size_t Idx = 0; Idx < PayloadSize; ++Idx)
+ {
+ Data[Idx] = Idx % 255;
+ }
+
+ zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size())));
+
+ zen::CbObjectWriter CacheRecord;
+ CacheRecord.BeginObject("CacheKey"sv);
+ CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash;
+ CacheRecord.EndObject();
+ CacheRecord << "Data"sv << Attachment;
+
+ zen::CbPackage Package;
+ Package.SetObject(CacheRecord.Save());
+ Package.AddAttachment(Attachment);
+
+ return Package;
+ };
+
+ auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer {
+ zen::BinaryWriter MemStream;
+ Package.Save(MemStream);
+ return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size());
+ };
+
+ auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri,
+ std::string_view Query,
+ std::string_view Bucket,
+ size_t Num,
+ size_t PayloadSize = 1024) -> std::vector<CacheKey> {
+ std::vector<zen::CacheKey> OutKeys;
+
+ for (uint32_t Key = 1; Key <= Num; ++Key)
+ {
+ const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof uint32_t));
+ CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize);
+
+ OutKeys.push_back(CacheKey);
+
+ IoBuffer Payload = ToIoBuffer(CacheRecord);
+
+ cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}{}"_format(BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)},
+ cpr::Body{(const char*)Payload.Data(), Payload.Size()},
+ cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}});
+
+ CHECK(Result.status_code == 201);
+ }
+
+ return OutKeys;
+ };
+
+ struct GetCacheRecordResult
+ {
+ zen::CbPackage Response;
+ std::vector<zen::CbFieldView> Records;
+ bool Success;
+ };
+
+ auto GetCacheRecords =
+ [](std::string_view BaseUri, std::span<zen::CacheKey> Keys, const zen::CacheRecordPolicy& Policy) -> GetCacheRecordResult {
+ using namespace zen;
+
+ CbObjectWriter Request;
+ Request << "Method"sv
+ << "GetCacheRecords"sv;
+ Request.BeginObject("Params"sv);
+
+ Request.BeginArray("CacheKeys"sv);
+ for (const CacheKey& Key : Keys)
+ {
+ Request.BeginObject();
+ Request << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash;
+ Request.EndObject();
+ }
+ Request.EndArray();
+
+ Request.BeginObject("Policy");
+ CacheRecordPolicy::Save(Policy, Request);
+ Request.EndObject();
+
+ Request.EndObject();
+
+ BinaryWriter Body;
+ Request.Save(Body);
+
+ cpr::Response Result = cpr::Post(cpr::Url{"{}/$rpc"_format(BaseUri)},
+ cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}},
+ cpr::Body{(const char*)Body.GetData(), Body.GetSize()});
+
+ GetCacheRecordResult OutResult;
+
+ if (Result.status_code == 200)
+ {
+ CbPackage Response;
+ if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size())))
+ {
+ OutResult.Response = std::move(Response);
+ CbObjectView ResponseObject = OutResult.Response.GetObject();
+
+ for (CbFieldView RecordView : ResponseObject["Result"])
+ {
+ ExtendableStringBuilder<256> Tmp;
+ auto JSON = RecordView.AsObjectView().ToJson(Tmp).ToView();
+ OutResult.Records.push_back(RecordView);
+ }
+
+ OutResult.Success = true;
+ }
+ }
+
+ return OutResult;
+ };
+
+ auto LoadKey = [](zen::CbFieldView KeyView) -> zen::CacheKey {
+ if (zen::CbObjectView KeyObj = KeyView.AsObjectView())
+ {
+ return CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
+ }
+ return CacheKey::Empty;
+ };
+
+ SUBCASE("get cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ CacheRecordPolicy Policy;
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
+
+ CHECK(Result.Records.size() == Keys.size());
+
+ for (size_t Index = 0; CbFieldView RecordView : Result.Records)
+ {
+ const CacheKey& ExpectedKey = Keys[Index++];
+
+ CbObjectView RecordObj = RecordView.AsObjectView();
+ CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
+ const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
+ const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
+
+ CHECK(Key == ExpectedKey);
+ CHECK(Attachment != nullptr);
+ }
+ }
+
+ SUBCASE("get missing cache records")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ CacheRecordPolicy Policy;
+ std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128);
+ std::vector<zen::CacheKey> Keys;
+
+ for (const zen::CacheKey& Key : ExistingKeys)
+ {
+ Keys.push_back(Key);
+ Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero));
+ }
+
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
+
+ CHECK(Result.Records.size() == Keys.size());
+
+ size_t KeyIndex = 0;
+ for (size_t Index = 0; CbFieldView RecordView : Result.Records)
+ {
+ const bool Missing = Index++ % 2 != 0;
+
+ if (Missing)
+ {
+ CHECK(RecordView.IsNull());
+ }
+ else
+ {
+ const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++];
+ CbObjectView RecordObj = RecordView.AsObjectView();
+ CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
+ zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
+ const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash);
+
+ CHECK(Key == ExpectedKey);
+ CHECK(Attachment != nullptr);
+ }
+ }
+ }
+
+ SUBCASE("policy - 'SkipAttachments' does not return any record attachments")
+ {
+ std::filesystem::path TestDir = TestEnv.CreateNewTestDir();
+ const uint16_t PortNumber = 13337;
+ const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber);
+
+ ZenServerInstance Inst(TestEnv);
+ Inst.SetTestDir(TestDir);
+ Inst.SpawnServer(PortNumber);
+ Inst.WaitUntilReady();
+
+ CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::SkipAttachments);
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 4);
+ GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy);
+
+ CHECK(Result.Records.size() == Keys.size());
+
+ std::span<const zen::CbAttachment> Attachments = Result.Response.GetAttachments();
+ CHECK(Attachments.empty());
+
+ for (size_t Index = 0; CbFieldView RecordView : Result.Records)
+ {
+ const CacheKey& ExpectedKey = Keys[Index++];
+
+ CbObjectView RecordObj = RecordView.AsObjectView();
+ CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash());
+ const IoHash AttachmentHash = RecordObj["Data"sv].AsHash();
+
+ CHECK(Key == ExpectedKey);
+ }
+ }
+
+ SUBCASE("policy - 'QueryLocal' does not query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+
+ CacheRecordPolicy Policy(CachePolicy::QueryLocal);
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
+
+ CHECK(Result.Records.size() == Keys.size());
+
+ for (CbFieldView RecordView : Result.Records)
+ {
+ CHECK(RecordView.IsNull());
+ }
+ }
+
+ SUBCASE("policy - 'QueryRemote' does query upstream")
+ {
+ using namespace utils;
+
+ ZenConfig UpstreamCfg = ZenConfig::New(13338);
+ ZenServerInstance UpstreamServer(TestEnv);
+ ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338);
+ ZenServerInstance LocalServer(TestEnv);
+
+ SpawnServer(UpstreamServer, UpstreamCfg);
+ SpawnServer(LocalServer, LocalCfg);
+
+ std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4);
+
+ CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote);
+ GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy);
+
+ CHECK(Result.Records.size() == Keys.size());
+
+ for (size_t Index = 0; CbFieldView RecordView : Result.Response.GetObject()["Result"sv])
+ {
+ const zen::CacheKey& ExpectedKey = Keys[Index++];
+ CbObjectView RecordObj = RecordView.AsObjectView();
+ zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]);
+ CHECK(Key == ExpectedKey);
+ }
+ }
+}
+
# if ZEN_USE_EXEC
struct RemoteExecutionRequest
diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp
index 177cdbf55..53e1b1c61 100644
--- a/zenserver/cache/structuredcache.cpp
+++ b/zenserver/cache/structuredcache.cpp
@@ -14,7 +14,9 @@
#include <zencore/timer.h>
#include <zenhttp/httpserver.h>
#include <zenstore/CAS.h>
+#include <zenutil/cache/cache.h>
+//#include "cachekey.h"
#include "monitoring/httpstats.h"
#include "structuredcachestore.h"
#include "upstream/jupiter.h"
@@ -36,115 +38,24 @@ using namespace std::literals;
//////////////////////////////////////////////////////////////////////////
-namespace detail { namespace cacheopt {
- constexpr std::string_view Local = "local"sv;
- constexpr std::string_view Remote = "remote"sv;
- constexpr std::string_view Data = "data"sv;
- constexpr std::string_view Meta = "meta"sv;
- constexpr std::string_view Value = "value"sv;
- constexpr std::string_view Attachments = "attachments"sv;
-}} // namespace detail::cacheopt
-
-//////////////////////////////////////////////////////////////////////////
-
-enum class CachePolicy : uint8_t
-{
- None = 0,
- QueryLocal = 1 << 0,
- QueryRemote = 1 << 1,
- Query = QueryLocal | QueryRemote,
- StoreLocal = 1 << 2,
- StoreRemote = 1 << 3,
- Store = StoreLocal | StoreRemote,
- SkipMeta = 1 << 4,
- SkipValue = 1 << 5,
- SkipAttachments = 1 << 6,
- SkipData = SkipMeta | SkipValue | SkipAttachments,
- SkipLocalCopy = 1 << 7,
- Local = QueryLocal | StoreLocal,
- Remote = QueryRemote | StoreRemote,
- Default = Query | Store,
- Disable = None,
-};
-
-gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
-
CachePolicy
ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams)
{
- CachePolicy QueryPolicy = CachePolicy::Query;
-
- {
- std::string_view Opts = QueryParams.GetValue("query"sv);
- if (!Opts.empty())
- {
- QueryPolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- QueryPolicy |= CachePolicy::QueryLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- QueryPolicy |= CachePolicy::QueryRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy StorePolicy = CachePolicy::Store;
-
- {
- std::string_view Opts = QueryParams.GetValue("store"sv);
- if (!Opts.empty())
- {
- StorePolicy = CachePolicy::None;
- ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Local)
- {
- StorePolicy |= CachePolicy::StoreLocal;
- }
- if (Opt == detail::cacheopt::Remote)
- {
- StorePolicy |= CachePolicy::StoreRemote;
- }
- return true;
- });
- }
- }
-
- CachePolicy SkipPolicy = CachePolicy::None;
-
- {
- std::string_view Opts = QueryParams.GetValue("skip"sv);
- if (!Opts.empty())
- {
- ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) {
- if (Opt == detail::cacheopt::Meta)
- {
- SkipPolicy |= CachePolicy::SkipMeta;
- }
- if (Opt == detail::cacheopt::Value)
- {
- SkipPolicy |= CachePolicy::SkipValue;
- }
- if (Opt == detail::cacheopt::Attachments)
- {
- SkipPolicy |= CachePolicy::SkipAttachments;
- }
- if (Opt == detail::cacheopt::Data)
- {
- SkipPolicy |= CachePolicy::SkipData;
- }
- return true;
- });
- }
- }
+ const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv));
+ const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv));
+ const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv));
return QueryPolicy | StorePolicy | SkipPolicy;
}
+struct AttachmentCount
+{
+ uint32_t New = 0;
+ uint32_t Valid = 0;
+ uint32_t Invalid = 0;
+ uint32_t Total = 0;
+};
+
//////////////////////////////////////////////////////////////////////////
HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore,
@@ -207,6 +118,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
{
std::string_view Key = Request.RelativeUri();
+ if (Key == "$rpc")
+ {
+ return HandleRpcRequest(Request);
+ }
+
if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); }))
{
// Bucket reference
@@ -319,8 +235,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request
if (ValidCount != AttachmentCount)
{
- Success = false;
- ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments",
+ // Success = false;
+ ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments",
Ref.BucketSegment,
Ref.HashKey,
ToString(AcceptType),
@@ -543,52 +459,39 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
CbObjectView CacheRecord(Body.Data());
std::vector<IoHash> ValidAttachments;
- uint32_t AttachmentCount = 0;
+ int32_t TotalCount = 0;
- CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) {
+ CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) {
const IoHash Hash = AttachmentHash.AsHash();
if (m_CidStore.ContainsChunk(Hash))
{
ValidAttachments.emplace_back(Hash);
}
- AttachmentCount++;
+ TotalCount++;
});
- const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size());
- const bool ValidCacheRecord = ValidCount == AttachmentCount;
-
- if (ValidCacheRecord)
- {
- ZEN_DEBUG("PUT - '{}/{}' {} '{}', {} attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- NiceBytes(Body.Size()),
- ToString(ContentType),
- ValidCount);
+ ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)",
+ Ref.BucketSegment,
+ Ref.HashKey,
+ NiceBytes(Body.Size()),
+ ToString(ContentType),
+ TotalCount,
+ ValidAttachments.size());
- m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
+ Body.SetContentType(ZenContentType::kCbObject);
+ m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body});
- if (StoreUpstream)
- {
- ZEN_ASSERT(m_UpstreamCache);
- auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
- .CacheKey = {Ref.BucketSegment, Ref.HashKey},
- .PayloadIds = std::move(ValidAttachments)});
- }
+ const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size());
- Request.WriteResponse(HttpResponseCode::Created);
- }
- else
+ if (StoreUpstream && !IsPartialRecord)
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, found {}/{} attachments",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(ContentType),
- ValidCount,
- AttachmentCount);
-
- Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv);
+ ZEN_ASSERT(m_UpstreamCache);
+ auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject,
+ .CacheKey = {Ref.BucketSegment, Ref.HashKey},
+ .PayloadIds = std::move(ValidAttachments)});
}
+
+ Request.WriteResponse(HttpResponseCode::Created);
}
else if (ContentType == HttpContentType::kCbPackage)
{
@@ -600,16 +503,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv);
}
- CbObject CacheRecord = Package.GetObject();
-
- std::span<const CbAttachment> Attachments = Package.GetAttachments();
- std::vector<IoHash> ValidAttachments;
- int32_t NewAttachmentCount = 0;
+ CbObject CacheRecord = Package.GetObject();
+ AttachmentCount Count;
+ std::vector<IoHash> ValidAttachments;
- ValidAttachments.reserve(Attachments.size());
+ ValidAttachments.reserve(Package.GetAttachments().size());
- CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) {
- if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash()))
+ CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) {
+ const IoHash Hash = HashView.AsHash();
+ if (const CbAttachment* Attachment = Package.FindAttachment(Hash))
{
if (Attachment->IsCompressedBinary())
{
@@ -620,8 +522,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
if (InsertResult.New)
{
- NewAttachmentCount++;
+ Count.New++;
}
+ Count.Valid++;
}
else
{
@@ -629,40 +532,40 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request
Ref.BucketSegment,
Ref.HashKey,
ToString(HttpContentType::kCbPackage),
- AttachmentHash.AsHash());
+ Hash);
+ Count.Invalid++;
}
}
- else
+ else if (m_CidStore.ContainsChunk(Hash))
{
- ZEN_WARN("PUT - '{}/{}' '{}' FAILED, missing attachment '{}'",
- Ref.BucketSegment,
- Ref.HashKey,
- ToString(HttpContentType::kCbPackage),
- AttachmentHash.AsHash());
+ ValidAttachments.emplace_back(Hash);
+ Count.Valid++;
}
+ Count.Total++;
});
- const bool AttachmentsValid = ValidAttachments.size() == Attachments.size();
-
- if (!AttachmentsValid)
+ if (Count.Invalid > 0)
{
- return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv);
+ return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv);
}
- ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments",
+ ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)",
Ref.BucketSegment,
Ref.HashKey,
NiceBytes(Body.GetSize()),
ToString(ContentType),
- NewAttachmentCount,
- Attachments.size());
+ Count.New,
+ Count.Valid,
+ Count.Total);
IoBuffer CacheRecordValue = CacheRecord.GetBuffer().AsIoBuffer();
CacheRecordValue.SetContentType(ZenContentType::kCbObject);
m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()});
- if (StoreUpstream)
+ const bool IsPartialRecord = Count.Valid != Count.Total;
+
+ if (StoreUpstream && !IsPartialRecord)
{
ZEN_ASSERT(m_UpstreamCache);
auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage,
@@ -708,8 +611,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques
if (QueryUpstream)
{
- if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId});
- UpstreamResult.Success)
+ if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success)
{
if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value)))
{
@@ -864,6 +766,420 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef&
}
void
+HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request)
+{
+ switch (auto Verb = Request.RequestVerb())
+ {
+ using enum HttpVerb;
+
+ case kPost:
+ {
+ const HttpContentType ContentType = Request.RequestContentType();
+ const HttpContentType AcceptType = Request.AcceptContentType();
+
+ if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage)
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ Request.WriteResponseAsync(
+ [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) {
+ const std::string_view Method = RpcRequest["Method"sv].AsString();
+ if (Method == "GetCacheRecords"sv)
+ {
+ HandleRpcGetCacheRecords(AsyncRequest, RpcRequest);
+ }
+ else if (Method == "GetCachePayloads"sv)
+ {
+ HandleRpcGetCachePayloads(AsyncRequest, RpcRequest);
+ }
+ else
+ {
+ AsyncRequest.WriteResponse(HttpResponseCode::BadRequest);
+ }
+ });
+ }
+ break;
+ default:
+ Request.WriteResponse(HttpResponseCode::BadRequest);
+ break;
+ }
+}
+
+void
+HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ using namespace fmt::literals;
+
+ CbPackage RpcResponse;
+ CacheRecordPolicy Policy;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+ std::vector<CacheKey> CacheKeys;
+ std::vector<IoBuffer> CacheValues;
+ std::vector<size_t> UpstreamRequests;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv);
+
+ CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy);
+
+ const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError);
+ const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments);
+ const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache;
+
+ for (CbFieldView KeyView : Params["CacheKeys"sv])
+ {
+ CbObjectView KeyObject = KeyView.AsObjectView();
+ CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()));
+ }
+
+ if (CacheKeys.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ CacheValues.resize(CacheKeys.size());
+
+ for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys)
+ {
+ ZenCacheValue CacheValue;
+ uint32_t MissingCount = 0;
+
+ if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue))
+ {
+ CbObjectView CacheRecord(CacheValue.Value.Data());
+
+ if (!SkipAttachments)
+ {
+ CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash()))
+ {
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk))));
+ }
+ else
+ {
+ MissingCount++;
+ }
+ });
+ }
+ }
+
+ if (CacheValue.Value && (MissingCount == 0 || PartialOnError))
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)",
+ Key.Bucket,
+ Key.Hash,
+ NiceBytes(CacheValue.Value.Size()),
+ ToString(CacheValue.Value.GetContentType()));
+
+ CacheValues[KeyIndex] = std::move(CacheValue.Value);
+ m_CacheStats.HitCount++;
+ }
+ else if (QueryRemote)
+ {
+ UpstreamRequests.push_back(KeyIndex);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(partial)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
+
+ ++KeyIndex;
+ }
+
+ if (!UpstreamRequests.empty() && m_UpstreamCache)
+ {
+ const auto OnCacheRecordGetComplete =
+ [this, &CacheKeys, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) {
+ ZEN_ASSERT(Params.KeyIndex < CacheValues.size());
+
+ IoBuffer CacheValue;
+ AttachmentCount Count;
+
+ if (Params.Record)
+ {
+ Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) {
+ if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash()))
+ {
+ if (CompressedBuffer Compressed = Attachment->AsCompressedBinary())
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+ if (InsertResult.New)
+ {
+ Count.New++;
+ }
+ Count.Valid++;
+
+ if (!SkipAttachments)
+ {
+ RpcResponse.AddAttachment(CbAttachment(Compressed));
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'",
+ HashView.AsHash(),
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash);
+ Count.Invalid++;
+ }
+ }
+ else if (m_CidStore.ContainsChunk(HashView.AsHash()))
+ {
+ Count.Valid++;
+ }
+ Count.Total++;
+ });
+
+ if ((Count.Valid == Count.Total) || PartialOnError)
+ {
+ CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer();
+ }
+ }
+
+ if (CacheValue)
+ {
+ ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)",
+ Params.CacheKey.Bucket,
+ Params.CacheKey.Hash,
+ NiceBytes(CacheValue.GetSize()),
+ ToString(HttpContentType::kCbPackage),
+ Count.New,
+ Count.Valid,
+ Count.Total);
+
+ CacheValue.SetContentType(ZenContentType::kCbObject);
+
+ CacheValues[Params.KeyIndex] = CacheValue;
+ m_CacheStore.Put(Params.CacheKey.Bucket, Params.CacheKey.Hash, {.Value = CacheValue});
+
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ const bool IsPartial = Count.Valid != Count.Total;
+ ZEN_DEBUG("MISS - '{}/{}' {}", Params.CacheKey.Bucket, Params.CacheKey.Hash, IsPartial ? "(partial)"sv : ""sv);
+ m_CacheStats.MissCount++;
+ }
+ };
+
+ m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete));
+ }
+
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+ for (const IoBuffer& Value : CacheValues)
+ {
+ if (Value)
+ {
+ CbObjectView Record(Value.Data());
+ ResponseObject << Record;
+ }
+ else
+ {
+ ResponseObject.AddNull();
+ }
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
+HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest)
+{
+ using namespace fmt::literals;
+
+ ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv);
+
+ std::vector<CacheChunkRequest> ChunkRequests;
+ std::vector<size_t> UpstreamRequests;
+ std::vector<IoBuffer> Chunks;
+ CbObjectView Params = RpcRequest["Params"sv].AsObjectView();
+
+ for (CbFieldView RequestView : Params["ChunkRequests"sv])
+ {
+ CbObjectView RequestObject = RequestView.AsObjectView();
+ CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView();
+ const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash());
+ const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash();
+ const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId();
+ const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64();
+ const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64();
+ const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32();
+
+ ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy));
+ }
+
+ if (ChunkRequests.empty())
+ {
+ return Request.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ Chunks.resize(ChunkRequests.size());
+
+ // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId)
+ // is missing, load the cache record and try to find the raw hash from the payload ID.
+ {
+ const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash {
+ if (CbObjectView ValueObject = Record["Value"sv].AsObjectView())
+ {
+ const Oid Id = ValueObject["Id"sv].AsObjectId();
+ if (Id == PayloadId)
+ {
+ return ValueObject["RawHash"sv].AsHash();
+ }
+ }
+
+ for (CbFieldView AttachmentView : Record["Attachments"sv])
+ {
+ CbObjectView AttachmentObject = AttachmentView.AsObjectView();
+ const Oid Id = AttachmentObject["Id"sv].AsObjectId();
+
+ if (Id == PayloadId)
+ {
+ return AttachmentObject["RawHash"sv].AsHash();
+ }
+ }
+
+ return IoHash::Zero;
+ };
+
+ CacheKey CurrentKey = CacheKey::Empty;
+ IoBuffer CurrentRecordBuffer;
+
+ for (CacheChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ if (ChunkRequest.ChunkId != IoHash::Zero)
+ {
+ continue;
+ }
+
+ if (ChunkRequest.Key != CurrentKey)
+ {
+ CurrentKey = ChunkRequest.Key;
+
+ ZenCacheValue CacheValue;
+ if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue))
+ {
+ CurrentRecordBuffer = CacheValue.Value;
+ }
+ }
+
+ if (CurrentRecordBuffer)
+ {
+ ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId);
+ }
+ }
+ }
+
+ for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests)
+ {
+ const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal;
+ const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote;
+
+ if (QueryLocal)
+ {
+ if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId))
+ {
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})",
+ ChunkRequest.Key.Bucket,
+ ChunkRequest.Key.Hash,
+ ChunkRequest.ChunkId,
+ NiceBytes(Chunk.Size()),
+ ToString(Chunk.GetContentType()),
+ "LOCAL");
+
+ Chunks[RequestIndex] = Chunk;
+ m_CacheStats.HitCount++;
+ }
+ else if (QueryRemote)
+ {
+ UpstreamRequests.push_back(RequestIndex);
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ m_CacheStats.MissCount++;
+ }
+ }
+ else
+ {
+ ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId);
+ }
+
+ ++RequestIndex;
+ }
+
+ if (!UpstreamRequests.empty() && m_UpstreamCache)
+ {
+ const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) {
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload)))
+ {
+ auto InsertResult = m_CidStore.AddChunk(Compressed);
+
+ ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})",
+ Params.Request.Key.Bucket,
+ Params.Request.Key.Hash,
+ Params.Request.ChunkId,
+ NiceBytes(Params.Payload.GetSize()),
+ "UPSTREAM");
+
+ ZEN_ASSERT(Params.RequestIndex < Chunks.size());
+ Chunks[Params.RequestIndex] = std::move(Params.Payload);
+
+ m_CacheStats.HitCount++;
+ m_CacheStats.UpstreamHitCount++;
+ }
+ else
+ {
+ ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId);
+ m_CacheStats.MissCount++;
+ }
+ };
+
+ m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete));
+ }
+
+ CbPackage RpcResponse;
+ CbObjectWriter ResponseObject;
+
+ ResponseObject.BeginArray("Result"sv);
+
+ for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex)
+ {
+ if (Chunks[ChunkIndex])
+ {
+ ResponseObject << ChunkRequests[ChunkIndex].ChunkId;
+ RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex])))));
+ }
+ else
+ {
+ ResponseObject << IoHash::Zero;
+ }
+ }
+ ResponseObject.EndArray();
+
+ RpcResponse.SetObject(ResponseObject.Save());
+
+ BinaryWriter MemStream;
+ RpcResponse.Save(MemStream);
+
+ Request.WriteResponse(HttpResponseCode::OK,
+ HttpContentType::kCbPackage,
+ IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize()));
+}
+
+void
HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request)
{
CbObjectWriter Cbo;
diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h
index 59749a024..a0215aa6e 100644
--- a/zenserver/cache/structuredcache.h
+++ b/zenserver/cache/structuredcache.h
@@ -21,7 +21,7 @@ class CidStore;
class ScrubContext;
class UpstreamCache;
class ZenCacheStore;
-enum class CachePolicy : uint8_t;
+enum class CachePolicy : uint32_t;
/**
* Structured cache service. Imposes constraints on keys, supports blobs and
@@ -89,6 +89,9 @@ private:
void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy);
+ void HandleRpcRequest(zen::HttpServerRequest& Request);
+ void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
+ void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest);
void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket);
virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override;
virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override;
diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp
index ac9f628d3..5cce7f325 100644
--- a/zenserver/cache/structuredcachestore.cpp
+++ b/zenserver/cache/structuredcachestore.cpp
@@ -190,7 +190,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa
return false;
}
- CacheBucket* Bucket = Bucket = &it->second;
+ CacheBucket* Bucket = &it->second;
_.ReleaseNow();
@@ -347,44 +347,52 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue
//////////////////////////////////////////////////////////////////////////
-#pragma pack(push)
-#pragma pack(1)
+inline DiskLocation::DiskLocation() = default;
-struct DiskLocation
+inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
+: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
+, LowerSize(ValueSize & 0xFFFFffff)
+, IndexDataSize(IndexSize)
{
- inline DiskLocation() = default;
+}
- inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags)
- : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags))
- , LowerSize(ValueSize & 0xFFFFffff)
- , IndexDataSize(IndexSize)
- {
- }
+inline uint64_t
+DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags)
+{
+ return Offset | Flags;
+}
- static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
- static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
- static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
- static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
- static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
- static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+inline uint64_t
+DiskLocation::Offset() const
+{
+ return OffsetAndFlags & kOffsetMask;
+}
- static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; }
+inline uint64_t
+DiskLocation::Size() const
+{
+ return LowerSize;
+}
- inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; }
- inline uint64_t Size() const { return LowerSize; }
- inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; }
- inline ZenContentType GetContentType() const
- {
- ZenContentType ContentType = ZenContentType::kBinary;
+inline uint64_t
+DiskLocation::IsFlagSet(uint64_t Flag) const
+{
+ return OffsetAndFlags & Flag;
+}
- if (IsFlagSet(DiskLocation::kStructured))
- {
- ContentType = ZenContentType::kCbObject;
- }
+inline ZenContentType
+DiskLocation::GetContentType() const
+{
+ ZenContentType ContentType = ZenContentType::kBinary;
- return ContentType;
+ if (IsFlagSet(DiskLocation::kStructured))
+ {
+ ContentType = ZenContentType::kCbObject;
}
+ return ContentType;
+}
+
private:
uint64_t OffsetAndFlags = 0;
uint32_t LowerSize = 0;
diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h
index 5a3191cc5..3040640f8 100644
--- a/zenserver/cache/structuredcachestore.h
+++ b/zenserver/cache/structuredcachestore.h
@@ -7,6 +7,7 @@
#include <zencore/iohash.h>
#include <zencore/thread.h>
#include <zencore/uid.h>
+#include <zenstore/basicfile.h>
#include <zenstore/cas.h>
#include <zenstore/gc.h>
@@ -103,6 +104,42 @@ private:
Configuration m_Configuration;
};
+#pragma pack(push)
+#pragma pack(1)
+
+struct DiskLocation
+{
+ static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull;
+ static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull;
+ static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull;
+ static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull;
+ static const uint64_t kStructured = 0x4000'0000'0000'0000ull;
+ static const uint64_t kTombStone = 0x2000'0000'0000'0000ull;
+
+ DiskLocation();
+ DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags);
+ static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags);
+ uint64_t Offset() const;
+ uint64_t Size() const;
+ uint64_t IsFlagSet(uint64_t Flag) const;
+ ZenContentType GetContentType() const;
+
+private:
+ uint64_t OffsetAndFlags = 0;
+ uint32_t LowerSize = 0;
+ uint32_t IndexDataSize = 0;
+};
+
+struct DiskIndexEntry
+{
+ IoHash Key;
+ DiskLocation Location;
+};
+
+#pragma pack(pop)
+
+static_assert(sizeof(DiskIndexEntry) == 36);
+
class ZenCacheDiskLayer
{
public:
@@ -122,7 +159,53 @@ private:
/** A cache bucket manages a single directory containing
metadata and data for that bucket
*/
- struct CacheBucket;
+ struct CacheBucket
+ {
+ CacheBucket();
+ ~CacheBucket();
+
+ void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true);
+ static bool Delete(std::filesystem::path BucketDir);
+
+ bool Get(const IoHash& HashKey, ZenCacheValue& OutValue);
+ void Put(const IoHash& HashKey, const ZenCacheValue& Value);
+ void Drop();
+ void Flush();
+ void Scrub(ScrubContext& Ctx);
+ void GarbageCollect(GcContext& GcCtx);
+
+ inline bool IsOk() const { return m_IsOk; }
+
+ private:
+ std::filesystem::path m_BucketDir;
+ Oid m_BucketId;
+ bool m_IsOk = false;
+ uint64_t m_LargeObjectThreshold = 64 * 1024;
+
+ // These files are used to manage storage of small objects for this bucket
+
+ BasicFile m_SobsFile;
+ TCasLogFile<DiskIndexEntry> m_SlogFile;
+
+ RwLock m_IndexLock;
+ tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index;
+ uint64_t m_WriteCursor = 0;
+
+ void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey);
+ void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value);
+ bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue);
+ bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue);
+
+ // These locks are here to avoid contention on file creation, therefore it's sufficient
+ // that we take the same lock for the same hash
+ //
+ // These locks are small and should really be spaced out so they don't share cache lines,
+ // but we don't currently access them at particularly high frequency so it should not be
+ // an issue in practice
+
+ RwLock m_ShardedLocks[256];
+ inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; }
+ };
std::filesystem::path m_RootDir;
RwLock m_Lock;
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 053c262c2..8ad14a1ed 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -2,6 +2,8 @@
#include "apply.h"
+#include <upstream/jupiter.h>
+#include <upstream/upstreamapply.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -331,8 +333,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
+ m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore);
+
+ CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "default"sv,
+ .BlobStoreNamespace = "default"sv,
+ .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv};
+
+ auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+
m_Router.AddPattern("job", "([[:digit:]]+)");
m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("action", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
"workers/{worker}",
@@ -488,6 +502,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
+ "jobs/{worker}/{action}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"jobs/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -541,7 +579,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
{
// We already have everything
- CbPackage Output = ExecAction(Worker, RequestObject);
+ CbObject Output = ExecActionUpstream(Worker, RequestObject);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -600,7 +638,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
zen::NiceBytes(TotalNewBytes),
NewAttachmentCount);
- CbPackage Output = ExecAction(Worker, ActionObj);
+ CbObject Output = ExecActionUpstream(Worker, ActionObj);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -843,4 +881,69 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
return OutputPackage;
}
+CbObject
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+ const IoHash ActionId = Action.GetHash();
+
+ Action.MakeOwned();
+
+ ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
+
+ if (!EnqueueResult.Success)
+ {
+ throw std::runtime_error("Error enqueuing upstream task");
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+ Writer.AddHash("action", ActionId);
+
+ return std::move(Writer.Save());
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
+{
+ using namespace fmt::literals;
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
+ if (!Status.Success)
+ {
+ // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+ if (!Completed.Success || Completed.Error.ErrorCode != 0)
+ {
+ ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason);
+ // throw std::runtime_error(
+ // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str());
+ return HttpResponseCode::BadRequest;
+ }
+
+ ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.OutputPackage.GetAttachments().size(),
+ NiceBytes(Completed.TotalAttachmentBytes),
+ NiceBytes(Completed.TotalRawAttachmentBytes));
+
+ Package = std::move(Completed.OutputPackage);
+ return HttpResponseCode::OK;
+}
+
} // namespace zen
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 86b262213..15cda4750 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -14,6 +14,7 @@ namespace zen {
class CasStore;
class CidStore;
+class UpstreamApply;
/**
* Lambda style compute function service
@@ -28,14 +29,15 @@ public:
virtual void HandleRequest(HttpServerRequest& Request) override;
private:
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- HttpRequestRouter m_Router;
- CasStore& m_CasStore;
- CidStore& m_CidStore;
- std::filesystem::path m_SandboxPath;
- std::filesystem::path m_FunctionPath;
- std::atomic<int> m_SandboxCount{0};
+ spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ std::filesystem::path m_SandboxPath;
+ std::filesystem::path m_FunctionPath;
+ std::atomic<int> m_SandboxCount{0};
+ std::unique_ptr<UpstreamApply> m_UpstreamApply;
struct WorkerDesc
{
@@ -44,6 +46,8 @@ private:
[[nodiscard]] std::filesystem::path CreateNewSandbox();
[[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action);
+ [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action);
+ [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
RwLock m_WorkerLock;
std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index f512f8015..3e85daa9e 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -243,7 +243,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
"",
"upstream-thread-count",
"Number of threads used for upstream procsssing",
- cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"),
"");
options.add_option("cache",
@@ -253,6 +253,20 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z
cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"),
"");
+ options.add_option("cache",
+ "",
+ "upstream-connect-timeout-ms",
+ "Connect timeout in millisecond(s). Default 5000 ms.",
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"),
+ "");
+
+ options.add_option("cache",
+ "",
+ "upstream-timeout-ms",
+ "Timeout in millisecond(s). Default 0 ms",
+ cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"),
+ "");
+
try
{
auto result = options.parse(argc, argv);
diff --git a/zenserver/config.h b/zenserver/config.h
index 7fa8163b3..f6858b940 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -65,9 +65,11 @@ struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
ZenUpstreamZenConfig ZenConfig;
- int UpstreamThreadCount = 4;
- UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
- bool StatsEnabled = false;
+ int32_t UpstreamThreadCount = 4;
+ int32_t ConnectTimeoutMilliseconds = 5000;
+ int32_t TimeoutMilliseconds = 0;
+ UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite;
+ bool StatsEnabled = false;
};
struct ZenServiceConfig
diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h
index 42f928efe..759df58d3 100644
--- a/zenserver/diag/formatters.h
+++ b/zenserver/diag/formatters.h
@@ -2,6 +2,11 @@
#pragma once
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/iobuffer.h>
+#include <zencore/string.h>
+
ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
#include <fmt/format.h>
@@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response>
{
using namespace std::literals;
- if (Response.status_code == 200)
+ if (Response.status_code == 200 || Response.status_code == 201)
{
return fmt::format_to(Ctx.out(),
"Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s",
@@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response>
const auto It = Response.header.find("Content-Type");
const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv;
- const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv ||
- ContentType == "application/octet-stream";
-
- if (IsBinary)
+ if (ContentType == "application/x-ue-cb"sv)
{
+ zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ zen::CbObjectView Obj(Body.Data());
+ zen::ExtendableStringBuilder<256> Sb;
+ std::string_view Json = Obj.ToJson(Sb).ToView();
+
return fmt::format_to(Ctx.out(),
- "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'",
+ "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'",
Response.url.str(),
Response.status_code,
Response.uploaded_bytes,
Response.downloaded_bytes,
Response.elapsed,
+ Json,
Response.reason);
}
else
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 556a2124d..9223ea0f4 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -46,13 +46,20 @@ namespace detail {
void InvalidateAccessToken() { AccessToken = {}; }
- void Reset()
+ void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
{
Session.SetBody({});
Session.SetHeader({});
+ Session.SetConnectTimeout(ConnectTimeout);
+ Session.SetTimeout(Timeout);
AccessToken = GetAccessToken();
}
+ cpr::Session& GetSession() { return Session; }
+
+ private:
+ friend class CloudCacheClient;
+
CloudCacheClient& OwnerClient;
CloudCacheAccessToken AccessToken;
cpr::Session Session;
@@ -87,12 +94,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw";
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -133,10 +140,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
<< Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -168,10 +176,11 @@ CloudCacheSession::GetBlob(const IoHash& Key)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -180,9 +189,14 @@ CloudCacheSession::GetBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
- const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+ const IoBuffer Buffer =
+ Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}
@@ -199,10 +213,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -211,6 +226,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -230,10 +249,11 @@ CloudCacheSession::GetObject(const IoHash& Key)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Get();
ZEN_DEBUG("GET {}", Response);
@@ -242,6 +262,10 @@ CloudCacheSession::GetObject(const IoHash& Key)
{
return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
}
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
const bool Success = Response.status_code == 200;
const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
@@ -263,7 +287,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key;
- auto& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
@@ -314,7 +338,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
<< Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(
@@ -377,12 +401,13 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
<< Key.ToHexString() << "/finalize/" << RefHash.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value},
{"X-Jupiter-IoHash", RefHash.ToHexString()},
{"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -436,7 +461,7 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}});
@@ -471,7 +496,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}});
@@ -506,7 +531,7 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object)
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
@@ -542,10 +567,11 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/"
<< Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+ Session.SetOption(cpr::Body{});
cpr::Response Response = Session.Head();
ZEN_DEBUG("HEAD {}", Response);
@@ -565,67 +591,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key)
CloudCacheResult
CloudCacheSession::BlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
-
- Session.SetOption(cpr::Url{Uri.c_str()});
-
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
-
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+ return CacheTypeExists("blobs"sv, Key);
}
CloudCacheResult
CloudCacheSession::CompressedBlobExists(const IoHash& Key)
{
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
- if (!AccessToken.IsValid())
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
-
- cpr::Session& Session = m_SessionState->Session;
+ return CacheTypeExists("compressed-blobs"sv, Key);
+}
- Session.SetOption(cpr::Url{Uri.c_str()});
+CloudCacheResult
+CloudCacheSession::ObjectExists(const IoHash& Key)
+{
+ return CacheTypeExists("objects"sv, Key);
+}
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+CloudCacheExistsResult
+CloudCacheSession::BlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("blobs"sv, Keys);
+}
- if (Response.error)
- {
- return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
- }
- else if (!VerifyAccessToken(Response.status_code))
- {
- return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
- }
+CloudCacheExistsResult
+CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("compressed-blobs"sv, Keys);
+}
- return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+CloudCacheExistsResult
+CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
+{
+ return CacheTypeExists("objects"sv, Keys);
}
CloudCacheResult
-CloudCacheSession::ObjectExists(const IoHash& Key)
+CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -634,14 +634,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
- cpr::Session& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
- cpr::Response Response = Session.Head();
- ZEN_DEBUG("HEAD {}", Response);
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
if (Response.error)
{
@@ -656,7 +658,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key)
}
CloudCacheResult
-CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -665,13 +667,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
- auto& Session = m_SessionState->Session;
+ auto& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
cpr::Response Response = Session.Post();
ZEN_DEBUG("POST {}", Response);
@@ -692,7 +693,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
}
CloudCacheResult
-CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
+CloudCacheSession::GetObjectTree(const IoHash& Key)
{
const CloudCacheAccessToken& AccessToken = GetAccessToken();
if (!AccessToken.IsValid())
@@ -701,15 +702,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t
}
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree";
- auto& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}});
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
+ cpr::Response Response = Session.Get();
+ ZEN_DEBUG("GET {}", Response);
if (Response.error)
{
@@ -755,6 +756,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
return true;
}
+CloudCacheResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}});
+
+ cpr::Response Response = Session.Head();
+ ZEN_DEBUG("HEAD {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {.ErrorCode = 401, .Reason = std::string("Invalid access token")};
+ }
+
+ return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
+}
+
+CloudCacheExistsResult
+CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
+{
+ const CloudCacheAccessToken& AccessToken = GetAccessToken();
+ if (!AccessToken.IsValid())
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ ExtendableStringBuilder<256> Query;
+ for (const auto& Key : Keys)
+ {
+ Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString();
+ }
+
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query;
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}};
+ }
+ else if (!VerifyAccessToken(Response.status_code))
+ {
+ return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}};
+ }
+
+ CloudCacheExistsResult Result{
+ CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}};
+
+ if (Result.Success)
+ {
+ IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
+ const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
+ for (auto& Item : ExistsResponse["id"sv])
+ {
+ if (Item.IsHash())
+ {
+ Result.Have.insert(Item.AsHash());
+ }
+ }
+ }
+
+ return Result;
+}
+
//////////////////////////////////////////////////////////////////////////
//
// ServiceUrl: https://jupiter.devtools.epicgames.com
@@ -772,7 +859,16 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options)
, m_BlobStoreNamespace(Options.BlobStoreNamespace)
, m_OAuthClientId(Options.OAuthClientId)
, m_OAuthSecret(Options.OAuthSecret)
+, m_AccessToken(Options.AccessToken)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
{
+ if (!Options.AccessToken.empty())
+ {
+ // If an access token was provided, OAuth settings are not used.
+ return;
+ }
+
if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv))
{
ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider);
@@ -822,6 +918,12 @@ CloudCacheClient::AcquireAccessToken()
{
using namespace std::chrono;
+ // If an access token was provided, return it instead of querying OAuth
+ if (!m_AccessToken.empty())
+ {
+ return {m_AccessToken, steady_clock::time_point::max()};
+ }
+
ExtendableStringBuilder<128> OAuthFormData;
OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret;
@@ -862,7 +964,7 @@ CloudCacheClient::AllocSessionState()
State = new detail::CloudCacheSessionState(*this);
}
- State->Reset();
+ State->Reset(m_ConnectTimeout, m_Timeout);
return State;
}
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index 9471ef64f..68c7361e0 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -12,6 +12,7 @@
#include <chrono>
#include <list>
#include <memory>
+#include <set>
#include <vector>
struct ZenCacheValue;
@@ -64,6 +65,11 @@ struct FinalizeRefResult : CloudCacheResult
std::vector<IoHash> Needs;
};
+struct CloudCacheExistsResult : CloudCacheResult
+{
+ std::set<IoHash> Have;
+};
+
/**
* Context for performing Jupiter operations
*
@@ -95,12 +101,18 @@ public:
FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah);
CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key);
+
CloudCacheResult BlobExists(const IoHash& Key);
CloudCacheResult CompressedBlobExists(const IoHash& Key);
CloudCacheResult ObjectExists(const IoHash& Key);
+ CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys);
+ CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys);
+ CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys);
+
CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData);
CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
+ CloudCacheResult GetObjectTree(const IoHash& Key);
std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
@@ -109,6 +121,10 @@ private:
const CloudCacheAccessToken& GetAccessToken();
bool VerifyAccessToken(long StatusCode);
+ CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key);
+
+ CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys);
+
spdlog::logger& m_Log;
RefPtr<CloudCacheClient> m_CacheClient;
detail::CloudCacheSessionState* m_SessionState;
@@ -116,13 +132,16 @@ private:
struct CloudCacheClientOptions
{
- std::string_view ServiceUrl;
- std::string_view DdcNamespace;
- std::string_view BlobStoreNamespace;
- std::string_view OAuthProvider;
- std::string_view OAuthClientId;
- std::string_view OAuthSecret;
- bool UseLegacyDdc = false;
+ std::string_view ServiceUrl;
+ std::string_view DdcNamespace;
+ std::string_view BlobStoreNamespace;
+ std::string_view OAuthProvider;
+ std::string_view OAuthClientId;
+ std::string_view OAuthSecret;
+ std::string_view AccessToken;
+ std::chrono::milliseconds ConnectTimeout{5000};
+ std::chrono::milliseconds Timeout{};
+ bool UseLegacyDdc = false;
};
/**
@@ -143,16 +162,19 @@ public:
spdlog::logger& Logger() { return m_Log; }
private:
- spdlog::logger& m_Log;
- std::string m_ServiceUrl;
- std::string m_OAuthDomain;
- std::string m_OAuthUriPath;
- std::string m_OAuthFullUri;
- std::string m_DdcNamespace;
- std::string m_BlobStoreNamespace;
- std::string m_OAuthClientId;
- std::string m_OAuthSecret;
- bool m_IsValid = false;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::string m_OAuthDomain;
+ std::string m_OAuthUriPath;
+ std::string m_OAuthFullUri;
+ std::string m_DdcNamespace;
+ std::string m_BlobStoreNamespace;
+ std::string m_OAuthClientId;
+ std::string m_OAuthSecret;
+ std::string m_AccessToken;
+ std::chrono::milliseconds m_ConnectTimeout{};
+ std::chrono::milliseconds m_Timeout{};
+ bool m_IsValid = false;
RwLock m_SessionStateLock;
std::list<detail::CloudCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
new file mode 100644
index 000000000..651c2dcc8
--- /dev/null
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -0,0 +1,1559 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "upstreamapply.h"
+#include "jupiter.h"
+#include "zen.h"
+
+#include <zencore/blockingqueue.h>
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinarypackage.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/compress.h>
+#include <zencore/fmtutils.h>
+#include <zencore/session.h>
+#include <zencore/stats.h>
+#include <zencore/stream.h>
+#include <zencore/timer.h>
+
+#include <zenstore/cas.h>
+#include <zenstore/cidstore.h>
+
+#include "cache/structuredcachestore.h"
+#include "diag/logging.h"
+
+#include <fmt/format.h>
+
+#include <algorithm>
+#include <atomic>
+#include <map>
+#include <set>
+#include <stack>
+#include <thread>
+#include <unordered_map>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail {
+
+ class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
+ {
+ public:
+ HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_CasStore(CasStore)
+ , m_CidStore(CidStore)
+ {
+ using namespace fmt::literals;
+ m_DisplayName = "Horde - '{}'"_format(Options.ServiceUrl);
+ m_Client = new CloudCacheClient(Options);
+ m_ChannelId = "zen-{}"_format(zen::GetSessionIdString());
+ }
+
+ virtual ~HordeUpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
+
+ virtual bool IsHealthy() const override { return m_HealthOk.load(); }
+
+ virtual UpstreamEndpointHealth CheckHealth() override
+ {
+ try
+ {
+ CloudCacheSession Session(m_Client);
+ CloudCacheResult Result = Session.Authenticate();
+
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Reason = Err.what(), .Ok = false};
+ }
+ }
+
+ virtual std::string_view DisplayName() const override { return m_DisplayName; }
+
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override
+ {
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ try
+ {
+ UpstreamData UpstreamData;
+ if (!ProcessApplyKey(ApplyRecord, UpstreamData))
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}};
+ }
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.contains(UpstreamData.TaskId))
+ {
+ // Pending task is already queued, return success
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ m_PendingTasks[UpstreamData.TaskId] = ApplyRecord;
+ }
+
+ CloudCacheSession Session(m_Client);
+
+ {
+ CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ UpstreamData.Blobs.clear();
+ }
+
+ {
+ CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ UpstreamData.Objects.clear();
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
+ Writer.BeginArray("t"sv);
+ Writer.AddObjectAttachment(UpstreamData.TaskId);
+ Writer.EndArray();
+ IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer();
+
+ CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ m_PendingTasks.erase(UpstreamData.TaskId);
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
+ }
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs)
+ {
+ if (Blobs.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing blobs
+ std::set<IoHash> Keys;
+ for (const auto& It : Blobs)
+ {
+ Keys.insert(It.first);
+ }
+
+ CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (ExistsResult.ErrorCode != 0)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode,
+ .Reason = std::move(ExistsResult.Reason)};
+ }
+
+ // TODO: Batch upload missing blobs
+
+ for (const auto& It : Blobs)
+ {
+ if (ExistsResult.Have.contains(It.first))
+ {
+ continue;
+ }
+
+ CloudCacheResult Result = Session.PutBlob(It.first, It.second);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode,
+ .Reason = std::move(Result.Reason)};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
+ {
+ if (Objects.size() == 0)
+ {
+ return {.Success = true};
+ }
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Batch check for missing objects
+ std::set<IoHash> Keys;
+ for (const auto& It : Objects)
+ {
+ Keys.insert(It.first);
+ }
+
+ CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys);
+ ElapsedSeconds += ExistsResult.ElapsedSeconds;
+ if (ExistsResult.ErrorCode != 0)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = ExistsResult.ErrorCode,
+ .Reason = std::move(ExistsResult.Reason)};
+ }
+
+ // TODO: Batch upload missing objects
+
+ for (const auto& It : Objects)
+ {
+ if (ExistsResult.Have.contains(It.first))
+ {
+ continue;
+ }
+
+ CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer());
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .ErrorCode = Result.ErrorCode,
+ .Reason = std::move(Result.Reason)};
+ }
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
+ }
+
+ enum class ComputeTaskState : int32_t
+ {
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+ };
+
+ enum class ComputeTaskOutcome : int32_t
+ {
+ Success = 0,
+ Failed = 1,
+ Cancelled = 2,
+ NoResult = 3,
+ Exipred = 4,
+ BlobNotFound = 5,
+ Exception = 6,
+ };
+
+ std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
+ {
+ switch (Outcome)
+ {
+ case ComputeTaskOutcome::Success:
+ return "Success"sv;
+ case ComputeTaskOutcome::Failed:
+ return "Failed"sv;
+ case ComputeTaskOutcome::Cancelled:
+ return "Cancelled"sv;
+ case ComputeTaskOutcome::NoResult:
+ return "NoResult"sv;
+ case ComputeTaskOutcome::Exipred:
+ return "Exipred"sv;
+ case ComputeTaskOutcome::BlobNotFound:
+ return "BlobNotFound"sv;
+ case ComputeTaskOutcome::Exception:
+ return "Exception"sv;
+ };
+ return "Unknown"sv;
+ }
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() override
+ {
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ UpstreamApplyCompleted CompletedTasks;
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ if (m_PendingTasks.empty())
+ {
+ // Nothing to do.
+ return {.Success = true};
+ }
+ }
+
+ try
+ {
+ CloudCacheSession Session(m_Client);
+
+ CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId);
+ Bytes += UpdatesResult.Bytes;
+ ElapsedSeconds += UpdatesResult.ElapsedSeconds;
+ if (UpdatesResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!UpdatesResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response);
+
+ // zen::StringBuilder<4096> ObjStr;
+ // zen::CompactBinaryToJson(TaskStatus, ObjStr);
+
+ for (auto& It : TaskStatus["u"sv])
+ {
+ CbObjectView Status = It.AsObjectView();
+ const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
+
+ const std::string_view AgentId = TaskStatus["a"sv].AsString();
+ const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+
+ // Only care about completed tasks
+ if (State != ComputeTaskState::Complete)
+ {
+ continue;
+ }
+
+ const IoHash TaskId = Status["h"sv].AsObjectAttachment();
+
+ IoHash WorkerId;
+ IoHash ActionId;
+
+ {
+ std::scoped_lock Lock(m_TaskMutex);
+ auto TaskIt = m_PendingTasks.find(TaskId);
+ if (TaskIt == m_PendingTasks.end())
+ {
+ continue;
+ }
+ WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
+ ActionId = TaskIt->second.Action.GetHash();
+ m_PendingTasks.erase(TaskIt);
+ }
+
+ GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session);
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+
+ CompletedTasks[WorkerId][ActionId] = std::move(Result);
+ }
+
+ return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ m_HealthOk = false;
+ return {
+ .Error{.ErrorCode = -1, .Reason = Err.what()},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .Completed = std::move(CompletedTasks),
+ };
+ }
+ }
+
+ virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; }
+
+ private:
+ spdlog::logger& Log() { return m_Log; }
+
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ spdlog::logger& m_Log;
+ std::string m_DisplayName;
+ RefPtr<CloudCacheClient> m_Client;
+ UpstreamApplyEndpointStats m_Stats;
+ std::atomic_bool m_HealthOk{false};
+ std::string m_ChannelId;
+
+ std::mutex m_TaskMutex;
+ std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
+
+ struct UpstreamData
+ {
+ std::map<IoHash, IoBuffer> Blobs;
+ std::map<IoHash, CbObject> Objects;
+ IoHash TaskId;
+ IoHash RequirementsId;
+ };
+
+ struct UpstreamDirectory
+ {
+ std::filesystem::path Path;
+ std::map<std::string, UpstreamDirectory> Directories;
+ std::set<std::string> Files;
+ };
+
+ [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session)
+ {
+ try
+ {
+ const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32();
+
+ if (Outcome != ComputeTaskOutcome::Success)
+ {
+ using namespace fmt::literals;
+ const std::string_view Detail = TaskStatus["d"sv].AsString();
+ if (!Detail.empty())
+ {
+ return {.Error{.ErrorCode = -1,
+ .Reason = "Task {}: {}"_format(ComputeTaskOutcomeToString(Outcome), std::string(Detail))}};
+ }
+ return {.Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}};
+ }
+
+ const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment();
+ const DateTime Time = TaskStatus["t"sv].AsDateTime();
+ const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment();
+ const std::string_view AgentId = TaskStatus["a"sv].AsString();
+ const std::string_view LeaseId = TaskStatus["l"sv].AsString();
+
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+
+ // Get Result object and all Object Attachments + Binary Attachment IDs
+ CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+
+ if (ObjectTreeResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!ObjectTreeResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ std::map<IoHash, IoBuffer> TreeObjectData;
+ std::map<IoHash, IoBuffer> TreeBinaryData;
+
+ MemoryView ResponseView = ObjectTreeResult.Response;
+ while (ResponseView.GetSize() > 0)
+ {
+ CbFieldView Field = CbFieldView(ResponseView.GetData());
+ ResponseView += Field.GetSize();
+ if (Field.IsObjectAttachment())
+ {
+ const IoHash Hash = Field.AsObjectAttachment();
+ Field = CbFieldView(ResponseView.GetData());
+ ResponseView += Field.GetSize();
+ if (!Field.IsObject()) // No data
+ {
+ TreeObjectData[Hash] = {};
+ continue;
+ }
+ MemoryView FieldView = Field.AsObjectView().GetView();
+
+ TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize());
+ }
+ else if (Field.IsBinaryAttachment())
+ {
+ const IoHash Hash = Field.AsBinaryAttachment();
+ TreeBinaryData[Hash] = {};
+ }
+ else // Unknown type
+ {
+ }
+ }
+
+ for (auto& It : TreeObjectData)
+ {
+ if (It.second.GetSize() == 0)
+ {
+ CloudCacheResult ObjectResult = Session.GetObject(It.first);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+ if (ObjectTreeResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!ObjectResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ It.second = std::move(ObjectResult.Response);
+ }
+ }
+
+ for (auto& It : TreeBinaryData)
+ {
+ if (It.second.GetSize() == 0)
+ {
+ CloudCacheResult BlobResult = Session.GetBlob(It.first);
+ Bytes += ObjectTreeResult.Bytes;
+ ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+ if (BlobResult.ErrorCode != 0)
+ {
+ return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ if (!BlobResult.Success)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ It.second = std::move(BlobResult.Response);
+ }
+ }
+
+ CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]);
+ int32_t ExitCode = ResultObject["e"sv].AsInt32();
+ IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
+ IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
+ IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment();
+
+ std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize());
+ std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize());
+
+ if (ExitCode != 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr)};
+ }
+
+ CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]);
+
+ // Get build.output
+ IoHash BuildOutputId;
+ IoBuffer BuildOutput;
+ for (auto& It : OutputObject["f"sv])
+ {
+ const CbObjectView FileObject = It.AsObjectView();
+ if (FileObject["n"sv].AsString() == "Build.output"sv)
+ {
+ BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
+ BuildOutput = TreeBinaryData[BuildOutputId];
+ break;
+ }
+ }
+
+ if (BuildOutput.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ // Get Output directory node
+ IoBuffer OutputDirectoryTree;
+ for (auto& It : OutputObject["d"sv])
+ {
+ const CbObjectView DirectoryObject = It.AsObjectView();
+ if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
+ {
+ OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
+ break;
+ }
+ }
+
+ if (OutputDirectoryTree.GetSize() == 0)
+ {
+ return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ // load build.output as CbObject
+
+ // Move Outputs from Horde to CbPackage
+
+ std::unordered_map<IoHash, IoHash> CidToCompressedId;
+ CbPackage OutputPackage;
+ CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
+ int64_t TotalAttachmentBytes = 0;
+ int64_t TotalRawAttachmentBytes = 0;
+
+ for (auto& It : OutputDirectoryTreeObject["f"sv])
+ {
+ CbObjectView FileObject = It.AsObjectView();
+ // Name is the uncompressed hash
+ IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
+ // Hash is the compressed data hash, and how it is stored in Horde
+ IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
+
+ if (!TreeBinaryData.contains(CompressedId))
+ {
+ Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString());
+ return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ CidToCompressedId[DecompressedId] = CompressedId;
+ }
+
+ // Iterate attachments, verify all chunks exist, and add to CbPackage
+ bool AnyErrors = false;
+ CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
+ BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash DecompressedId = Field.AsHash();
+ if (!CidToCompressedId.contains(DecompressedId))
+ {
+ Log().warn("Attachment not found {}", DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+ const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
+
+ if (!TreeBinaryData.contains(CompressedId))
+ {
+ Log().warn("Missing output {} compressed {} uncompressed",
+ CompressedId.ToHexString(),
+ DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+
+ CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId]));
+
+ if (!AttachmentBuffer)
+ {
+ Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
+ CompressedId.ToHexString(),
+ DecompressedId.ToHexString());
+ AnyErrors = true;
+ return;
+ }
+
+ TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
+ TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize();
+
+ CbAttachment Attachment(AttachmentBuffer);
+ OutputPackage.AddAttachment(Attachment);
+ });
+
+ if (AnyErrors)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+
+ OutputPackage.SetObject(BuildOutputObject);
+
+ return {.OutputPackage = std::move(OutputPackage),
+ .TotalAttachmentBytes = TotalAttachmentBytes,
+ .TotalRawAttachmentBytes = TotalRawAttachmentBytes,
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds,
+ .StdOut = std::move(StdOut),
+ .StdErr = std::move(StdErr),
+ .Success = true};
+ }
+ catch (std::exception& Err)
+ {
+ return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
+ }
+ }
+
+ [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
+ {
+ std::string ExecutablePath;
+ std::map<std::string, std::string> Environment;
+ std::set<std::filesystem::path> InputFiles;
+ std::map<std::filesystem::path, IoHash> InputFileHashes;
+
+ ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
+ if (ExecutablePath.empty())
+ {
+ Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor",
+ ApplyRecord.WorkerDescriptor.GetHash());
+ return false;
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
+ {
+ CbObjectView FileEntry = It.AsObjectView();
+ if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs))
+ {
+ return false;
+ }
+ }
+
+ for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
+ {
+ std::string_view Env = It.AsString();
+ auto Index = Env.find('=');
+ if (Index < 0)
+ {
+ Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
+ return false;
+ }
+
+ Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
+ }
+
+ {
+ static const std::filesystem::path BuildActionPath = "Build.action"sv;
+ static const std::filesystem::path InputPath = "Inputs"sv;
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+
+ InputFiles.insert(BuildActionPath);
+ InputFileHashes[BuildActionPath] = ActionId;
+ Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
+ ApplyRecord.Action.GetBuffer().GetSize());
+
+ bool AnyErrors = false;
+ ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Cid = Field.AsHash();
+ const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
+ IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
+ AnyErrors = true;
+ return;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ return;
+ }
+
+ const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize());
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = CompressedId;
+ Data.Blobs[CompressedId] = std::move(DataBuffer);
+ });
+
+ if (AnyErrors)
+ {
+ return false;
+ }
+ }
+
+ const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
+
+ CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects);
+ const IoHash SandboxHash = Sandbox.GetHash();
+ Data.Objects[SandboxHash] = std::move(Sandbox);
+
+ {
+ using namespace fmt::literals;
+ std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
+ if (HostPlatform.empty())
+ {
+ Log().warn("process apply upstream FAILED, 'host' platform not provided");
+ return false;
+ }
+
+ int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32();
+ int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
+ bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
+
+ // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
+ std::string Condition;
+ if (HostPlatform == "Win64" || HostPlatform == "Windows")
+ {
+ Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'";
+ }
+ else if (HostPlatform == "Mac")
+ {
+ Condition = "OSFamily == 'MacOS'";
+ }
+ else
+ {
+ Condition = "OSFamily == '{}'"_format(HostPlatform);
+ }
+
+ std::map<std::string_view, int64_t> Resources;
+ if (LogicalCores > 0)
+ {
+ Resources["LogicalCores"sv] = LogicalCores;
+ }
+ if (Memory > 0)
+ {
+ Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL);
+ }
+
+ CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
+ const IoHash RequirementsId = Requirements.GetHash();
+ Data.Objects[RequirementsId] = std::move(Requirements);
+ Data.RequirementsId = RequirementsId;
+ }
+
+ CbObject Task = BuildTask(ExecutablePath,
+ {"-Build=build.action"},
+ Environment,
+ {},
+ SandboxHash,
+ Data.RequirementsId,
+ {"Build.output", "Outputs"});
+
+ const IoHash TaskId = Task.GetHash();
+ Data.Objects[TaskId] = std::move(Task);
+ Data.TaskId = TaskId;
+
+ return true;
+ }
+
+ [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
+ std::set<std::filesystem::path>& InputFiles,
+ std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ std::map<IoHash, IoBuffer>& Blobs)
+ {
+ const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
+ const IoHash ChunkId = FileEntry["hash"sv].AsHash();
+ const uint64_t Size = FileEntry["size"sv].AsUInt64();
+ IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId);
+
+ if (!DataBuffer)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
+ return false;
+ }
+
+ if (DataBuffer.Size() != Size)
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}",
+ ChunkId,
+ DataBuffer.Size(),
+ Size);
+ return false;
+ }
+
+ if (InputFiles.contains(FilePath))
+ {
+ Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
+ return false;
+ }
+
+ InputFiles.insert(FilePath);
+ InputFileHashes[FilePath] = ChunkId;
+ Blobs[ChunkId] = std::move(DataBuffer);
+ return true;
+ }
+
+ [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles)
+ {
+ static const std::filesystem::path RootPath;
+ std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories;
+ UpstreamDirectory RootDirectory = {.Path = RootPath};
+
+ AllDirectories[RootPath] = &RootDirectory;
+
+ // Build tree from flat list
+ for (const auto& Path : InputFiles)
+ {
+ if (Path.has_parent_path())
+ {
+ if (!AllDirectories.contains(Path.parent_path()))
+ {
+ std::stack<std::string> PathSplit;
+ {
+ std::filesystem::path ParentPath = Path.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ while (ParentPath.has_parent_path())
+ {
+ ParentPath = ParentPath.parent_path();
+ PathSplit.push(ParentPath.filename().string());
+ }
+ }
+ UpstreamDirectory* ParentPtr = &RootDirectory;
+ while (!PathSplit.empty())
+ {
+ if (!ParentPtr->Directories.contains(PathSplit.top()))
+ {
+ std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()};
+ ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath};
+ AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()];
+ }
+ ParentPtr = &ParentPtr->Directories[PathSplit.top()];
+ PathSplit.pop();
+ }
+ }
+
+ AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string());
+ }
+ else
+ {
+ RootDirectory.Files.insert(Path.filename().string());
+ }
+ }
+
+ return RootDirectory;
+ }
+
+ [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
+ const std::map<std::filesystem::path, IoHash>& InputFileHashes,
+ const std::map<IoHash, IoBuffer>& Blobs,
+ std::map<IoHash, CbObject>& Objects)
+ {
+ CbObjectWriter DirectoryTreeWriter;
+
+ if (!RootDirectory.Files.empty())
+ {
+ DirectoryTreeWriter.BeginArray("f"sv);
+ for (const auto& File : RootDirectory.Files)
+ {
+ const std::filesystem::path FilePath = {RootDirectory.Path / File};
+ const IoHash& FileHash = InputFileHashes.at(FilePath);
+ const uint64_t FileSize = Blobs.at(FileHash).Size();
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, File);
+ DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
+ DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size
+ // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ if (!RootDirectory.Directories.empty())
+ {
+ DirectoryTreeWriter.BeginArray("d"sv);
+ for (const auto& Item : RootDirectory.Directories)
+ {
+ CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects);
+ const IoHash DirectoryHash = Directory.GetHash();
+ Objects[DirectoryHash] = std::move(Directory);
+
+ DirectoryTreeWriter.BeginObject();
+ DirectoryTreeWriter.AddString("n"sv, Item.first);
+ DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash);
+ DirectoryTreeWriter.EndObject();
+ }
+ DirectoryTreeWriter.EndArray();
+ }
+
+ return std::move(DirectoryTreeWriter.Save());
+ }
+
+ [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
+ const std::map<std::string_view, int64_t>& Resources,
+ const bool Exclusive)
+ {
+ CbObjectWriter Writer;
+ Writer.AddString("c", Condition);
+ if (!Resources.empty())
+ {
+ Writer.BeginArray("r");
+ for (const auto& Resource : Resources)
+ {
+ Writer.BeginArray();
+ Writer.AddString(Resource.first);
+ Writer.AddInteger(Resource.second);
+ Writer.EndArray();
+ }
+ Writer.EndArray();
+ }
+ Writer.AddBool("e", Exclusive);
+ return std::move(Writer.Save());
+ }
+
+ [[nodiscard]] CbObject BuildTask(const std::string_view Executable,
+ const std::vector<std::string>& Arguments,
+ const std::map<std::string, std::string>& Environment,
+ const std::string_view WorkingDirectory,
+ const IoHash& SandboxHash,
+ const IoHash& RequirementsId,
+ const std::set<std::string>& Outputs)
+ {
+ CbObjectWriter TaskWriter;
+ TaskWriter.AddString("e"sv, Executable);
+
+ if (!Arguments.empty())
+ {
+ TaskWriter.BeginArray("a"sv);
+ for (const auto& Argument : Arguments)
+ {
+ TaskWriter.AddString(Argument);
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!Environment.empty())
+ {
+ TaskWriter.BeginArray("v"sv);
+ for (const auto& Env : Environment)
+ {
+ TaskWriter.BeginArray();
+ TaskWriter.AddString(Env.first);
+ TaskWriter.AddString(Env.second);
+ TaskWriter.EndArray();
+ }
+ TaskWriter.EndArray();
+ }
+
+ if (!WorkingDirectory.empty())
+ {
+ TaskWriter.AddString("s"sv, WorkingDirectory);
+ }
+
+ TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
+ TaskWriter.AddObjectAttachment("r"sv, RequirementsId);
+
+ // Outputs
+ if (!Outputs.empty())
+ {
+ TaskWriter.BeginArray("o"sv);
+ for (const auto& Output : Outputs)
+ {
+ TaskWriter.AddString(Output);
+ }
+ TaskWriter.EndArray();
+ }
+
+ return std::move(TaskWriter.Save());
+ }
+ };
+} // namespace detail
+
+//////////////////////////////////////////////////////////////////////////
+
+struct UpstreamApplyStats
+{
+ static constexpr uint64_t MaxSampleCount = 1000ull;
+
+ UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
+
+ void Add(spdlog::logger& Logger,
+ UpstreamApplyEndpoint& Endpoint,
+ const PostUpstreamApplyResult& Result,
+ const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
+ {
+ Stats.PostCount++;
+ Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsUp.fetch_add(Result.ElapsedSeconds);
+ }
+
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
+ {
+ Dump(Logger, Endpoints);
+ }
+ }
+
+ void Add(spdlog::logger& Logger,
+ UpstreamApplyEndpoint& Endpoint,
+ const GetUpstreamApplyUpdatesResult& Result,
+ const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
+
+ if (Result.Error)
+ {
+ Stats.ErrorCount++;
+ }
+ else if (Result.Success)
+ {
+ Stats.UpdateCount++;
+ Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0);
+ Stats.SecondsDown.fetch_add(Result.ElapsedSeconds);
+ if (!Result.Completed.empty())
+ {
+ uint64_t Completed = 0;
+ for (auto& It : Result.Completed)
+ {
+ Completed += It.second.size();
+ }
+ Stats.CompleteCount.fetch_add(Completed);
+ }
+ }
+
+ if (m_Enabled && m_SampleCount++ % MaxSampleCount)
+ {
+ Dump(Logger, Endpoints);
+ }
+ }
+
+ void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints)
+ {
+ for (auto& Ep : Endpoints)
+ {
+ // These stats will not be totally correct as the numbers are not captured atomically
+
+ UpstreamApplyEndpointStats& Stats = Ep->Stats();
+ const uint64_t PostCount = Stats.PostCount;
+ const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t UpdateCount = Stats.UpdateCount;
+ const double DownBytes = Stats.DownBytes;
+ const double SecondsDown = Stats.SecondsDown;
+ const double UpBytes = Stats.UpBytes;
+ const double SecondsUp = Stats.SecondsUp;
+
+ const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0;
+ const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+
+ Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
+ Ep->DisplayName(),
+ CompleteRate,
+ DownBytes,
+ DownSpeed,
+ UpBytes,
+ UpSpeed);
+ }
+ }
+
+ bool m_Enabled;
+ std::atomic_uint64_t m_SampleCount = {};
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+class DefaultUpstreamApply final : public UpstreamApply
+{
+public:
+ DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ : m_Log(logging::Get("upstream-apply"))
+ , m_Options(Options)
+ , m_CasStore(CasStore)
+ , m_CidStore(CidStore)
+ , m_Stats(Options.StatsEnabled)
+ {
+ }
+
+ virtual ~DefaultUpstreamApply() { Shutdown(); }
+
+ virtual bool Initialize() override
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ if (Health.Ok)
+ {
+ Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ }
+ else
+ {
+ Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+
+ m_RunState.IsRunning = !m_Endpoints.empty();
+
+ if (m_RunState.IsRunning)
+ {
+ for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
+ {
+ m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this);
+ }
+
+ m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this);
+
+ m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this);
+ }
+
+ return m_RunState.IsRunning;
+ }
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
+ {
+ m_Endpoints.emplace_back(std::move(Endpoint));
+ }
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300);
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ // Already in progress
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ std::chrono::steady_clock::time_point ExpireTime =
+ TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds)
+ : std::chrono::steady_clock::time_point::max();
+
+ m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
+ }
+
+ if (!m_UpstreamThreads.empty())
+ {
+ m_UpstreamQueue.Enqueue(std::move(ApplyRecord));
+ }
+ else
+ {
+ ProcessApplyRecord(std::move(ApplyRecord));
+ }
+
+ return {.ApplyId = ActionId, .Success = true};
+ }
+
+ return {};
+ }
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override
+ {
+ if (m_RunState.IsRunning)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ return {.Status = *Status, .Success = true};
+ }
+ }
+
+ return {};
+ }
+
+ virtual void GetStatus(CbObjectWriter& Status) override
+ {
+ Status << "worker_threads" << m_Options.ThreadCount;
+ Status << "queue_count" << m_UpstreamQueue.Size();
+
+ Status.BeginArray("endpoints");
+ for (const auto& Ep : m_Endpoints)
+ {
+ Status.BeginObject();
+ Status << "name" << Ep->DisplayName();
+ Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
+
+ UpstreamApplyEndpointStats& Stats = Ep->Stats();
+ const uint64_t PostCount = Stats.PostCount;
+ const uint64_t CompleteCount = Stats.CompleteCount;
+ const uint64_t UpdateCount = Stats.UpdateCount;
+ const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
+
+ Status << "post_count" << PostCount;
+ Status << "complete_count" << PostCount;
+ Status << "update_count" << Stats.UpdateCount;
+
+ Status << "complete_ratio" << CompleteRate;
+ Status << "downloaded_mb" << Stats.DownBytes;
+ Status << "uploaded_mb" << Stats.UpBytes;
+ Status << "error_count" << Stats.ErrorCount;
+
+ Status.EndObject();
+ }
+ Status.EndArray();
+ }
+
+private:
+ // The caller is responsible for locking if required
+ UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId)
+ {
+ if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end())
+ {
+ if (auto It2 = It->second.find(ActionId); It2 != It->second.end())
+ {
+ return &It2->second;
+ }
+ }
+ return nullptr;
+ }
+
+ void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord)
+ {
+ const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
+ const IoHash ActionId = ApplyRecord.Action.GetHash();
+ try
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ if (Result.Success)
+ {
+ Status->State = UpstreamApplyState::Executing;
+ }
+ else
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error = std::move(Result.Error),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds};
+ }
+ }
+ }
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ return;
+ }
+ }
+
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
+ }
+ Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
+ }
+ }
+ catch (std::exception& e)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}};
+ }
+ Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what());
+ }
+ }
+
+ void ProcessUpstreamQueue()
+ {
+ for (;;)
+ {
+ UpstreamApplyRecord ApplyRecord;
+ if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord))
+ {
+ ProcessApplyRecord(std::move(ApplyRecord));
+ }
+
+ if (!m_RunState.IsRunning)
+ {
+ break;
+ }
+ }
+ }
+
+ void ProcessApplyUpdates()
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy())
+ {
+ GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates();
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason);
+ }
+
+ if (!Result.Completed.empty())
+ {
+ for (auto& It : Result.Completed)
+ {
+ for (auto& It2 : It.second)
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ if (auto Status = FindStatus(It.first, It2.first); Status != nullptr)
+ {
+ Status->State = UpstreamApplyState::Complete;
+ Status->Result = std::move(It2.second);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ void ProcessUpstreamUpdates()
+ {
+ const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval);
+ for (;;)
+ {
+ std::this_thread::sleep_for(UpdateSleep);
+
+ if (!m_RunState.IsRunning)
+ {
+ break;
+ }
+
+ ProcessApplyUpdates();
+
+ // Remove any expired tasks, regardless of state
+ {
+ std::scoped_lock Lock(m_ApplyTasksMutex);
+ for (auto& WorkerIt : m_ApplyTasks)
+ {
+ const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) {
+ return Item.second.ExpireTime < std::chrono::steady_clock::now();
+ });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' expired tasks", Count);
+ }
+ }
+ const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); });
+ if (Count > 0)
+ {
+ Log().debug("Removed '{}' empty task lists", Count);
+ }
+ }
+ }
+ }
+
+ void MonitorEndpoints()
+ {
+ for (;;)
+ {
+ {
+ std::unique_lock Lock(m_RunState.Mutex);
+ if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
+ {
+ break;
+ }
+ }
+
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (!Endpoint->IsHealthy())
+ {
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
+ }
+ else
+ {
+ Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ }
+ }
+ }
+ }
+ }
+
+ void Shutdown()
+ {
+ if (m_RunState.Stop())
+ {
+ m_UpstreamQueue.CompleteAdding();
+ for (std::thread& Thread : m_UpstreamThreads)
+ {
+ Thread.join();
+ }
+
+ m_EndpointMonitorThread.join();
+ m_UpstreamUpdatesThread.join();
+ m_UpstreamThreads.clear();
+ m_Endpoints.clear();
+ }
+ }
+
+ spdlog::logger& Log() { return m_Log; }
+
+ using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>;
+
+ struct RunState
+ {
+ std::mutex Mutex;
+ std::condition_variable ExitSignal;
+ std::atomic_bool IsRunning{false};
+
+ bool Stop()
+ {
+ bool Stopped = false;
+ {
+ std::scoped_lock Lock(Mutex);
+ Stopped = IsRunning.exchange(false);
+ }
+ if (Stopped)
+ {
+ ExitSignal.notify_all();
+ }
+ return Stopped;
+ }
+ };
+
+ spdlog::logger& m_Log;
+ UpstreamApplyOptions m_Options;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ UpstreamApplyQueue m_UpstreamQueue;
+ UpstreamApplyStats m_Stats;
+ UpstreamApplyTasks m_ApplyTasks;
+ std::mutex m_ApplyTasksMutex;
+ std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
+ std::vector<std::thread> m_UpstreamThreads;
+ std::thread m_UpstreamUpdatesThread;
+ std::thread m_EndpointMonitorThread;
+ RunState m_RunState;
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+std::unique_ptr<UpstreamApply>
+MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+{
+ return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore);
+}
+
+std::unique_ptr<UpstreamApplyEndpoint>
+MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore)
+{
+ return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, CasStore, CidStore);
+}
+
+} // namespace zen
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
new file mode 100644
index 000000000..98f193c02
--- /dev/null
+++ b/zenserver/upstream/upstreamapply.h
@@ -0,0 +1,172 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/compactbinarypackage.h>
+#include <zencore/iobuffer.h>
+#include <zencore/iohash.h>
+#include <zencore/zencore.h>
+
+#include <atomic>
+#include <chrono>
+#include <memory>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace zen {
+
+class CbObjectWriter;
+class CasStore;
+class CidStore;
+class ZenCacheStore;
+struct CloudCacheClientOptions;
+
+enum class UpstreamApplyState : int32_t
+{
+ Queued = 0,
+ Executing = 1,
+ Complete = 2,
+};
+
+struct UpstreamApplyRecord
+{
+ CbObject WorkerDescriptor;
+ CbObject Action;
+};
+
+struct UpstreamApplyOptions
+{
+ std::chrono::seconds HealthCheckInterval{5};
+ std::chrono::seconds UpdatesInterval{5};
+ uint32_t ThreadCount = 4;
+ bool StatsEnabled = false;
+};
+
+struct UpstreamApplyError
+{
+ int32_t ErrorCode{};
+ std::string Reason{};
+
+ explicit operator bool() const { return ErrorCode != 0; }
+};
+
+struct PostUpstreamApplyResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ bool Success = false;
+};
+
+struct GetUpstreamApplyResult
+{
+ CbPackage OutputPackage{};
+ int64_t TotalAttachmentBytes{};
+ int64_t TotalRawAttachmentBytes{};
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ std::string StdOut{};
+ std::string StdErr{};
+ bool Success = false;
+};
+
+using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>;
+
+struct GetUpstreamApplyUpdatesResult
+{
+ UpstreamApplyError Error{};
+ int64_t Bytes{};
+ double ElapsedSeconds{};
+ UpstreamApplyCompleted Completed{};
+ bool Success = false;
+};
+
+struct UpstreamApplyStatus
+{
+ UpstreamApplyState State{};
+ GetUpstreamApplyResult Result{};
+ std::chrono::steady_clock::time_point ExpireTime{};
+};
+
+using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>;
+
+struct UpstreamEndpointHealth
+{
+ std::string Reason;
+ bool Ok = false;
+};
+
+struct UpstreamApplyEndpointStats
+{
+ std::atomic_uint64_t PostCount{};
+ std::atomic_uint64_t CompleteCount{};
+ std::atomic_uint64_t UpdateCount{};
+ std::atomic_uint64_t ErrorCount{};
+ std::atomic<double> UpBytes{};
+ std::atomic<double> DownBytes{};
+ std::atomic<double> SecondsUp{};
+ std::atomic<double> SecondsDown{};
+};
+
+/**
+ * The upstream apply endpont is responsible for handling remote execution.
+ */
+class UpstreamApplyEndpoint
+{
+public:
+ virtual ~UpstreamApplyEndpoint() = default;
+
+ virtual UpstreamEndpointHealth Initialize() = 0;
+
+ virtual bool IsHealthy() const = 0;
+
+ virtual UpstreamEndpointHealth CheckHealth() = 0;
+
+ virtual std::string_view DisplayName() const = 0;
+
+ virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
+
+ virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
+
+ virtual UpstreamApplyEndpointStats& Stats() = 0;
+};
+
+/**
+ * Manages one or more upstream cache endpoints.
+ */
+class UpstreamApply
+{
+public:
+ virtual ~UpstreamApply() = default;
+
+ virtual bool Initialize() = 0;
+
+ virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
+
+ struct EnqueueResult
+ {
+ IoHash ApplyId{};
+ bool Success = false;
+ };
+
+ struct StatusResult
+ {
+ UpstreamApplyStatus Status{};
+ bool Success = false;
+ };
+
+ virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
+
+ virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
+
+ virtual void GetStatus(CbObjectWriter& CbO) = 0;
+};
+
+std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+
+std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options,
+ CasStore& CasStore,
+ CidStore& CidStore);
+
+} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp
index 168449d05..e2dc09872 100644
--- a/zenserver/upstream/upstreamcache.cpp
+++ b/zenserver/upstream/upstreamcache.cpp
@@ -40,13 +40,15 @@ namespace detail {
: m_Log(zen::logging::Get("upstream"))
, m_UseLegacyDdc(Options.UseLegacyDdc)
{
- using namespace fmt::literals;
- m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl);
- m_Client = new CloudCacheClient(Options);
+ m_Info.Name = "Horde"sv;
+ m_Info.Url = Options.ServiceUrl;
+ m_Client = new CloudCacheClient(Options);
}
virtual ~JupiterUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
virtual bool IsHealthy() const override { return m_HealthOk.load(); }
@@ -58,7 +60,7 @@ namespace detail {
CloudCacheSession Session(m_Client);
const CloudCacheResult Result = Session.Authenticate();
- m_HealthOk = Result.ErrorCode == 0;
+ m_HealthOk = Result.Success && Result.ErrorCode == 0;
return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
}
@@ -68,9 +70,7 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -144,12 +144,69 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ ZEN_UNUSED(Policy);
+
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& CacheKey = CacheKeys[Index];
+ CbPackage Package;
+ CbObject Record;
+
+ if (!Result.Error)
+ {
+ CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject);
+ AppendResult(RefResult, Result);
+
+ if (RefResult.ErrorCode == 0)
+ {
+ const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All);
+ if (ValidationResult == CbValidateError::None)
+ {
+ Record = LoadCompactBinaryObject(RefResult.Response);
+ Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) {
+ CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash());
+ AppendResult(BlobResult, Result);
+
+ if (BlobResult.ErrorCode == 0)
+ {
+ if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response)))
+ {
+ Package.AddAttachment(CbAttachment(Chunk));
+ }
+ }
+ else
+ {
+ m_HealthOk = false;
+ }
+ });
+ }
+ }
+ else
+ {
+ m_HealthOk = false;
+ }
+ }
+
+ OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package});
+ }
+
+ return Result;
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override
{
try
{
CloudCacheSession Session(m_Client);
- const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId);
+ const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId);
if (Result.ErrorCode == 0)
{
@@ -171,12 +228,41 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ CloudCacheSession Session(m_Client);
+ GetUpstreamCacheResult Result;
+
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ IoBuffer Payload;
+
+ if (!Result.Error)
+ {
+ const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId);
+ Payload = BlobResult.Response;
+
+ AppendResult(BlobResult, Result);
+ m_HealthOk = BlobResult.ErrorCode == 0;
+ }
+
+ OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload});
+ }
+
+ return Result;
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
+ using namespace fmt::literals;
+
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -200,125 +286,132 @@ namespace detail {
}
}
- return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success};
+ m_HealthOk = Result.ErrorCode == 0;
+
+ return {.Reason = std::move(Result.Reason),
+ .Bytes = Result.Bytes,
+ .ElapsedSeconds = Result.ElapsedSeconds,
+ .Success = Result.Success};
}
else
{
- bool Success = false;
int64_t TotalBytes = 0ull;
double TotalElapsedSeconds = 0.0;
- for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++)
- {
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool {
+ for (const IoHash& PayloadId : PayloadIds)
{
- if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- Result.Success)
+ const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId);
+
+ if (It == std::end(CacheRecord.PayloadIds))
{
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
- break;
+ OutReason = "payload '{}' MISSING from local cache"_format(PayloadId);
+ return false;
}
- }
- if (!Success)
- {
- return {.Reason = "Failed to upload payload",
- .Bytes = TotalBytes,
- .ElapsedSeconds = TotalElapsedSeconds,
- .Success = false};
+ const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It);
+
+ CloudCacheResult BlobResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++)
+ {
+ BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
+ }
+
+ m_HealthOk = BlobResult.ErrorCode == 0;
+
+ if (!BlobResult.Success)
+ {
+ OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason);
+ return false;
+ }
+
+ TotalBytes += BlobResult.Bytes;
+ TotalElapsedSeconds += BlobResult.ElapsedSeconds;
}
+
+ return true;
+ };
+
+ PutRefResult RefResult;
+ for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++)
+ {
+ RefResult =
+ Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject);
}
- Success = false;
- for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++)
+ m_HealthOk = RefResult.ErrorCode == 0;
+
+ if (!RefResult.Success)
{
- if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- RecordValue,
- ZenContentType::kCbObject);
- Result.Success)
- {
- TotalBytes += Result.Bytes;
- TotalElapsedSeconds += Result.ElapsedSeconds;
- Success = true;
+ return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ RefResult.Reason),
+ .Success = false};
+ }
- if (!Result.Needs.empty())
- {
- for (const IoHash& NeededHash : Result.Needs)
- {
- Success = false;
+ TotalBytes += RefResult.Bytes;
+ TotalElapsedSeconds += RefResult.ElapsedSeconds;
- if (auto It =
- std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash);
- It != std::end(CacheRecord.PayloadIds))
- {
- const size_t Idx = It - std::begin(CacheRecord.PayloadIds);
-
- if (CloudCacheResult BlobResult =
- Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]);
- BlobResult.Success)
- {
- TotalBytes += BlobResult.Bytes;
- TotalElapsedSeconds += BlobResult.ElapsedSeconds;
- Success = true;
- }
- else
- {
- ZEN_WARN("upload missing payload '{}/{}/{}' FAILED",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
- else
- {
- ZEN_WARN("needed payload '{}/{}/{}' MISSING",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- NeededHash);
- }
- }
+ std::string Reason;
+ if (!PutBlobs(RefResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ const IoHash RefHash = IoHash::HashBuffer(RecordValue);
+ FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
- if (FinalizeRefResult FinalizeResult =
- Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
- FinalizeResult.Success)
- {
- TotalBytes += FinalizeResult.Bytes;
- TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
- Success = true;
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
- for (const IoHash& MissingHash : FinalizeResult.Needs)
- {
- ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'",
- CacheRecord.CacheKey.Bucket,
- CacheRecord.CacheKey.Hash,
- MissingHash);
- }
- }
- else
- {
- ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash);
- Success = false;
- }
- }
+ if (!FinalizeResult.Needs.empty())
+ {
+ if (!PutBlobs(FinalizeResult.Needs, Reason))
+ {
+ return {.Reason = std::move(Reason), .Success = false};
+ }
- if (Success)
+ FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash);
+ m_HealthOk = FinalizeResult.ErrorCode == 0;
+
+ if (!FinalizeResult.Success)
+ {
+ return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ FinalizeResult.Reason),
+ .Success = false};
+ }
+
+ if (!FinalizeResult.Needs.empty())
+ {
+ ExtendableStringBuilder<256> Sb;
+ for (const IoHash& MissingHash : FinalizeResult.Needs)
{
- break;
+ Sb << MissingHash.ToHexString() << ",";
}
+
+ return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Sb.ToString()),
+ .Success = false};
}
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success};
+ TotalBytes += FinalizeResult.Bytes;
+ TotalElapsedSeconds += FinalizeResult.ElapsedSeconds;
+
+ return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true};
}
}
catch (std::exception& Err)
{
+ m_HealthOk = false;
return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -326,9 +419,22 @@ namespace detail {
virtual UpstreamEndpointStats& Stats() override { return m_Stats; }
private:
+ static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out)
+ {
+ Out.Success &= Result.Success;
+ Out.Bytes += Result.Bytes;
+ Out.ElapsedSeconds += Result.ElapsedSeconds;
+
+ if (Result.ErrorCode)
+ {
+ Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)};
+ }
+ };
+
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
+ UpstreamEndpointInfo m_Info;
bool m_UseLegacyDdc;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
@@ -349,9 +455,13 @@ namespace detail {
};
public:
- ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN")
+ ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
+ : m_Log(zen::logging::Get("upstream"))
+ , m_Info({.Name = std::string("Zen")})
+ , m_ConnectTimeout(Options.ConnectTimeout)
+ , m_Timeout(Options.Timeout)
{
- for (const auto& Url : Urls)
+ for (const auto& Url : Options.Urls)
{
m_Endpoints.push_back({.Url = Url});
}
@@ -359,6 +469,8 @@ namespace detail {
~ZenUpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; }
+
virtual UpstreamEndpointHealth Initialize() override
{
using namespace fmt::literals;
@@ -366,9 +478,8 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Info.Url = Ep.Url;
+ m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -391,9 +502,9 @@ namespace detail {
const ZenEndpoint& Ep = GetEndpoint();
if (Ep.Ok)
{
- m_ServiceUrl = Ep.Url;
- m_DisplayName = "ZEN - {}"_format(m_ServiceUrl);
- m_Client = new ZenStructuredCacheClient(m_ServiceUrl);
+ m_Info.Url = Ep.Url;
+ m_Client =
+ new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout});
m_HealthOk = true;
return {.Ok = true};
@@ -420,9 +531,7 @@ namespace detail {
}
}
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
try
{
@@ -449,13 +558,80 @@ namespace detail {
}
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(KeyIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCacheRecords";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("CacheKeys"sv);
+ for (size_t Index : KeyIndex)
+ {
+ const CacheKey& Key = CacheKeys[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ BatchRequest << "Bucket"sv << Key.Bucket;
+ BatchRequest << "Hash"sv << Key.Hash;
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndArray();
+
+ BatchRequest.BeginObject("Policy"sv);
+ CacheRecordPolicy::Save(Policy, BatchRequest);
+ BatchRequest.EndObject();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ OnComplete(
+ {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
+
+ for (size_t Index : KeyIndex)
+ {
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
try
{
ZenStructuredCacheSession Session(*m_Client);
- const ZenCacheResult Result =
- Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId);
+ const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId);
if (Result.ErrorCode == 0)
{
@@ -477,12 +653,96 @@ namespace detail {
}
}
+ virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> IndexMap;
+ IndexMap.reserve(RequestIndex.size());
+
+ CbObjectWriter BatchRequest;
+ BatchRequest << "Method"sv
+ << "GetCachePayloads";
+
+ BatchRequest.BeginObject("Params"sv);
+ {
+ BatchRequest.BeginArray("ChunkRequests"sv);
+ {
+ for (size_t Index : RequestIndex)
+ {
+ const CacheChunkRequest& Request = CacheChunkRequests[Index];
+ IndexMap.push_back(Index);
+
+ BatchRequest.BeginObject();
+ {
+ BatchRequest.BeginObject("Key"sv);
+ BatchRequest << "Bucket"sv << Request.Key.Bucket;
+ BatchRequest << "Hash"sv << Request.Key.Hash;
+ BatchRequest.EndObject();
+
+ BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId);
+ BatchRequest << "ChunkId"sv << Request.ChunkId;
+ BatchRequest << "RawOffset"sv << Request.RawOffset;
+ BatchRequest << "RawSize"sv << Request.RawSize;
+ BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy);
+ }
+ BatchRequest.EndObject();
+ }
+ }
+ BatchRequest.EndArray();
+ }
+ BatchRequest.EndObject();
+
+ CbPackage BatchResponse;
+ ZenCacheResult Result;
+
+ {
+ ZenStructuredCacheSession Session(*m_Client);
+ Result = Session.InvokeRpc(BatchRequest.Save());
+ }
+
+ if (Result.Success)
+ {
+ if (BatchResponse.TryLoad(Result.Response))
+ {
+ for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv])
+ {
+ const size_t Index = IndexMap[LocalIndex++];
+ IoBuffer Payload;
+
+ if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash()))
+ {
+ if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary())
+ {
+ Payload = Compressed.GetCompressed().Flatten().AsIoBuffer();
+ }
+ }
+
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)});
+ }
+
+ return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true};
+ }
+ }
+ else if (Result.ErrorCode)
+ {
+ m_HealthOk = false;
+ }
+
+ for (size_t Index : RequestIndex)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
+
+ return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}};
+ }
+
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
std::span<IoBuffer const> Payloads) override
{
ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size());
- const uint32_t MaxAttempts = 3;
+ const int32_t MaxAttempts = 3;
try
{
@@ -565,12 +825,15 @@ namespace detail {
TotalElapsedSeconds += Result.ElapsedSeconds;
}
- return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success};
+ return {.Reason = std::move(Result.Reason),
+ .Bytes = TotalBytes,
+ .ElapsedSeconds = TotalElapsedSeconds,
+ .Success = Result.Success};
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
m_HealthOk = false;
- return {.Reason = std::string(e.what()), .Success = false};
+ return {.Reason = std::string(Err.what()), .Success = false};
}
}
@@ -581,7 +844,7 @@ namespace detail {
{
for (ZenEndpoint& Ep : m_Endpoints)
{
- ZenStructuredCacheClient Client(Ep.Url);
+ ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)});
ZenStructuredCacheSession Session(Client);
const int32_t SampleCount = 2;
@@ -602,7 +865,7 @@ namespace detail {
for (const auto& Ep : m_Endpoints)
{
- ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
+ ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason);
}
return m_Endpoints.front();
@@ -611,9 +874,10 @@ namespace detail {
spdlog::logger& Log() { return m_Log; }
spdlog::logger& m_Log;
- std::string m_ServiceUrl;
+ UpstreamEndpointInfo m_Info;
std::vector<ZenEndpoint> m_Endpoints;
- std::string m_DisplayName;
+ std::chrono::milliseconds m_ConnectTimeout;
+ std::chrono::milliseconds m_Timeout;
RefPtr<ZenStructuredCacheClient> m_Client;
UpstreamEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
@@ -700,7 +964,7 @@ struct UpstreamStats
const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0;
Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'",
- Ep->DisplayName(),
+ Ep->GetEndpointInfo().Name,
HitRate,
DownBytes,
DownSpeed,
@@ -734,13 +998,15 @@ public:
for (auto& Endpoint : m_Endpoints)
{
const UpstreamEndpointHealth Health = Endpoint->Initialize();
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+
if (Health.Ok)
{
- ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName());
+ ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url);
}
else
{
- ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
}
}
@@ -761,7 +1027,7 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); }
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override
{
if (m_Options.ReadUpstream)
{
@@ -776,6 +1042,14 @@ public:
{
return Result;
}
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
}
}
}
@@ -783,7 +1057,99 @@ public:
return {};
}
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override
+ virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy() && !MissingKeys.empty())
+ {
+ std::vector<size_t> Missing;
+
+ auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) {
+ if (Params.Record)
+ {
+ OnComplete(std::forward<CacheRecordGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.KeyIndex);
+ }
+ });
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ MissingKeys = std::move(Missing);
+ }
+ }
+ }
+
+ for (size_t Index : MissingKeys)
+ {
+ OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()});
+ }
+ }
+
+ virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) override final
+ {
+ std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end());
+
+ if (m_Options.ReadUpstream)
+ {
+ for (auto& Endpoint : m_Endpoints)
+ {
+ if (Endpoint->IsHealthy() && !MissingPayloads.empty())
+ {
+ std::vector<size_t> Missing;
+
+ auto Result =
+ Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) {
+ if (Params.Payload)
+ {
+ OnComplete(std::forward<CachePayloadGetCompleteParams>(Params));
+ }
+ else
+ {
+ Missing.push_back(Params.RequestIndex);
+ }
+ });
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
+
+ m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+ MissingPayloads = std::move(Missing);
+ }
+ }
+ }
+
+ for (size_t Index : MissingPayloads)
+ {
+ OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()});
+ }
+ }
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override
{
if (m_Options.ReadUpstream)
{
@@ -791,13 +1157,21 @@ public:
{
if (Endpoint->IsHealthy())
{
- const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey);
+ const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId);
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
if (Result.Success)
{
return Result;
}
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'",
+ Endpoint->GetEndpointInfo().Url,
+ Result.Error.Reason,
+ Result.Error.ErrorCode);
+ }
}
}
}
@@ -834,8 +1208,10 @@ public:
Status.BeginArray("endpoints");
for (const auto& Ep : m_Endpoints)
{
+ const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo();
Status.BeginObject();
- Status << "name" << Ep->DisplayName();
+ Status << "name" << Info.Name;
+ Status << "url" << Info.Url;
Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
UpstreamEndpointStats& Stats = Ep->Stats();
@@ -890,6 +1266,15 @@ private:
{
const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads));
m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints);
+
+ if (!Result.Success)
+ {
+ ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Endpoint->GetEndpointInfo().Url,
+ Result.Reason);
+ }
}
}
}
@@ -905,9 +1290,12 @@ private:
{
ProcessCacheRecord(std::move(CacheRecord));
}
- catch (std::exception& e)
+ catch (std::exception& Err)
{
- ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what());
+ ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'",
+ CacheRecord.CacheKey.Bucket,
+ CacheRecord.CacheKey.Hash,
+ Err.what());
}
}
@@ -930,20 +1318,28 @@ private:
}
}
- for (auto& Endpoint : m_Endpoints)
+ try
{
- if (!Endpoint->IsHealthy())
+ for (auto& Endpoint : m_Endpoints)
{
- if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
- {
- ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
- }
- else
+ if (!Endpoint->IsHealthy())
{
- ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
+ const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo();
+ if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
+ {
+ ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason);
+ }
+ else
+ {
+ ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason);
+ }
}
}
}
+ catch (std::exception& Err)
+ {
+ ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what());
+ }
}
}
@@ -1015,9 +1411,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options)
}
std::unique_ptr<UpstreamEndpoint>
-MakeZenUpstreamEndpoint(std::span<std::string const> Urls)
+MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options)
{
- return std::make_unique<detail::ZenUpstreamEndpoint>(Urls);
+ return std::make_unique<detail::ZenUpstreamEndpoint>(Options);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h
index edc995da6..12287198d 100644
--- a/zenserver/upstream/upstreamcache.h
+++ b/zenserver/upstream/upstreamcache.h
@@ -5,34 +5,27 @@
#include <zencore/iobuffer.h>
#include <zencore/iohash.h>
#include <zencore/zencore.h>
+#include <zenutil/cache/cache.h>
#include <atomic>
#include <chrono>
+#include <functional>
#include <memory>
namespace zen {
+class CbObjectView;
+class CbPackage;
class CbObjectWriter;
class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
-
-struct UpstreamCacheKey
-{
- std::string Bucket;
- IoHash Hash;
-};
-
-struct UpstreamPayloadKey
-{
- UpstreamCacheKey CacheKey;
- IoHash PayloadId;
-};
+struct ZenStructuredCacheClientOptions;
struct UpstreamCacheRecord
{
ZenContentType Type = ZenContentType::kBinary;
- UpstreamCacheKey CacheKey;
+ CacheKey CacheKey;
std::vector<IoHash> PayloadIds;
};
@@ -88,6 +81,31 @@ struct UpstreamEndpointStats
std::atomic<double> SecondsDown{};
};
+struct CacheRecordGetCompleteParams
+{
+ const CacheKey& CacheKey;
+ size_t KeyIndex = ~size_t(0);
+ const CbObjectView& Record;
+ const CbPackage& Package;
+};
+
+using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>;
+
+struct CachePayloadGetCompleteParams
+{
+ const CacheChunkRequest& Request;
+ size_t RequestIndex{~size_t(0)};
+ IoBuffer Payload;
+};
+
+using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>;
+
+struct UpstreamEndpointInfo
+{
+ std::string Name;
+ std::string Url;
+};
+
/**
* The upstream endpont is responsible for handling upload/downloading of cache records.
*/
@@ -96,17 +114,26 @@ class UpstreamEndpoint
public:
virtual ~UpstreamEndpoint() = default;
+ virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0;
+
virtual UpstreamEndpointHealth Initialize() = 0;
virtual bool IsHealthy() const = 0;
virtual UpstreamEndpointHealth CheckHealth() = 0;
- virtual std::string_view DisplayName() const = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
+
+ virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& Policy,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) = 0;
virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord,
IoBuffer RecordValue,
@@ -127,9 +154,18 @@ public:
virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0;
- virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0;
+ virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0;
+
+ virtual void GetCacheRecords(std::span<CacheKey> CacheKeys,
+ std::span<size_t> KeyIndex,
+ const CacheRecordPolicy& RecordPolicy,
+ OnCacheRecordGetComplete&& OnComplete) = 0;
+
+ virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0;
- virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0;
+ virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests,
+ std::span<size_t> RequestIndex,
+ OnCachePayloadGetComplete&& OnComplete) = 0;
struct EnqueueResult
{
@@ -145,6 +181,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt
std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options);
-std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls);
+std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options);
} // namespace zen
diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp
index 14333f45a..cd7f48334 100644
--- a/zenserver/upstream/zen.cpp
+++ b/zenserver/upstream/zen.cpp
@@ -311,8 +311,17 @@ namespace detail {
ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
~ZenCacheSessionState() {}
- void Reset() {}
+ void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
+ {
+ Session.SetBody({});
+ Session.SetHeader({});
+ Session.SetConnectTimeout(ConnectTimeout);
+ Session.SetTimeout(Timeout);
+ }
+
+ cpr::Session& GetSession() { return Session; }
+ private:
ZenStructuredCacheClient& OwnerClient;
cpr::Session Session;
};
@@ -321,9 +330,11 @@ namespace detail {
//////////////////////////////////////////////////////////////////////////
-ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl)
+ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
: m_Log(logging::Get(std::string_view("zenclient")))
-, m_ServiceUrl(ServiceUrl)
+, m_ServiceUrl(Options.Url)
+, m_ConnectTimeout(Options.ConnectTimeout)
+, m_Timeout(Options.Timeout)
{
}
@@ -347,7 +358,7 @@ ZenStructuredCacheClient::AllocSessionState()
State = new detail::ZenCacheSessionState(*this);
}
- State->Reset();
+ State->Reset(m_ConnectTimeout, m_Timeout);
return State;
}
@@ -381,7 +392,7 @@ ZenStructuredCacheSession::CheckHealth()
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/health/check";
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
cpr::Response Response = Session.Get();
@@ -399,7 +410,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept",
@@ -427,7 +438,7 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});
@@ -452,7 +463,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type",
@@ -480,7 +491,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
ExtendableStringBuilder<256> Uri;
Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString();
- cpr::Session& Session = m_SessionState->Session;
+ cpr::Session& Session = m_SessionState->GetSession();
Session.SetOption(cpr::Url{Uri.c_str()});
Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
@@ -499,4 +510,33 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa
.Success = (Response.status_code == 200 || Response.status_code == 201)};
}
+ZenCacheResult
+ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
+{
+ ExtendableStringBuilder<256> Uri;
+ Uri << m_Client.ServiceUrl() << "/z$/$rpc";
+
+ BinaryWriter Body;
+ Request.CopyTo(Body);
+
+ cpr::Session& Session = m_SessionState->GetSession();
+
+ Session.SetOption(cpr::Url{Uri.c_str()});
+ Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
+ Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});
+
+ cpr::Response Response = Session.Post();
+ ZEN_DEBUG("POST {}", Response);
+
+ if (Response.error)
+ {
+ return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
+ }
+
+ const bool Success = Response.status_code == 200;
+ const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();
+
+ return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
+}
+
} // namespace zen
diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h
index 7f55294ce..df975df1f 100644
--- a/zenserver/upstream/zen.h
+++ b/zenserver/upstream/zen.h
@@ -26,6 +26,8 @@ class logger;
namespace zen {
class CbObjectWriter;
+class CbObjectView;
+class CbPackage;
class ZenStructuredCacheClient;
/** Zen mesh tracker
@@ -97,6 +99,14 @@ struct ZenCacheResult
bool Success = false;
};
+struct ZenStructuredCacheClientOptions
+{
+ std::string_view Url;
+ std::span<std::string const> Urls;
+ std::chrono::milliseconds ConnectTimeout{};
+ std::chrono::milliseconds Timeout{};
+};
+
/** Zen Structured Cache session
*
* This provides a context in which cache queries can be performed
@@ -114,6 +124,7 @@ public:
ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId);
ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type);
ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload);
+ ZenCacheResult InvokeRpc(const CbObjectView& Request);
private:
inline spdlog::logger& Log() { return m_Log; }
@@ -131,7 +142,7 @@ private:
class ZenStructuredCacheClient : public RefCounted
{
public:
- ZenStructuredCacheClient(std::string_view ServiceUrl);
+ ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options);
~ZenStructuredCacheClient();
std::string_view ServiceUrl() const { return m_ServiceUrl; }
@@ -139,8 +150,10 @@ public:
inline spdlog::logger& Log() { return m_Log; }
private:
- spdlog::logger& m_Log;
- std::string m_ServiceUrl;
+ spdlog::logger& m_Log;
+ std::string m_ServiceUrl;
+ std::chrono::milliseconds m_ConnectTimeout;
+ std::chrono::milliseconds m_Timeout;
RwLock m_SessionStateLock;
std::list<detail::ZenCacheSessionState*> m_SessionStateCache;
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index f26ab723c..bb57b4d0a 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -691,7 +691,10 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
if (!ZenUrls.empty())
{
- std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls);
+ std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint =
+ zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)});
UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint));
}
}
@@ -701,23 +704,29 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig)
zen::CloudCacheClientOptions Options;
if (UpstreamConfig.JupiterConfig.UseProductionSettings)
{
- Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
- .DdcNamespace = "ue.ddc"sv,
- .BlobStoreNamespace = "ue.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
+ Options =
+ zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv,
+ .DdcNamespace = "ue.ddc"sv,
+ .BlobStoreNamespace = "ue.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
+ .UseLegacyDdc = false};
}
else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings)
{
- Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "ue4.ddc"sv,
- .BlobStoreNamespace = "test.ddc"sv,
- .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
- .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
- .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
- .UseLegacyDdc = false};
+ Options =
+ zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "ue4.ddc"sv,
+ .BlobStoreNamespace = "test.ddc"sv,
+ .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv,
+ .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv,
+ .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
+ .UseLegacyDdc = false};
}
Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl);
diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj
index 13589ee3b..38b51e2f0 100644
--- a/zenserver/zenserver.vcxproj
+++ b/zenserver/zenserver.vcxproj
@@ -126,6 +126,7 @@
<ClInclude Include="diag\diagsvcs.h" />
<ClInclude Include="experimental\usnjournal.h" />
<ClInclude Include="targetver.h" />
+ <ClInclude Include="upstream\upstreamapply.h" />
<ClInclude Include="upstream\upstreamcache.h" />
<ClInclude Include="upstream\zen.h" />
<ClInclude Include="windows\service.h" />
@@ -149,6 +150,7 @@
<ClCompile Include="testing\launch.cpp" />
<ClCompile Include="casstore.cpp" />
<ClCompile Include="experimental\usnjournal.cpp" />
+ <ClCompile Include="upstream\upstreamapply.cpp" />
<ClCompile Include="upstream\upstreamcache.cpp" />
<ClCompile Include="upstream\zen.cpp" />
<ClCompile Include="windows\service.cpp" />
diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters
index 87591ef56..97a43c901 100644
--- a/zenserver/zenserver.vcxproj.filters
+++ b/zenserver/zenserver.vcxproj.filters
@@ -41,6 +41,9 @@
<ClInclude Include="experimental\vfs.h" />
<ClInclude Include="monitoring\httpstats.h" />
<ClInclude Include="monitoring\httpstatus.h" />
+ <ClInclude Include="upstream\upstreamapply.h">
+ <Filter>upstream</Filter>
+ </ClInclude>
<ClInclude Include="cache\cachetracking.h" />
</ItemGroup>
<ItemGroup>
@@ -77,6 +80,9 @@
<ClCompile Include="experimental\vfs.cpp" />
<ClCompile Include="monitoring\httpstats.cpp" />
<ClCompile Include="monitoring\httpstatus.cpp" />
+ <ClCompile Include="upstream\upstreamapply.cpp">
+ <Filter>upstream</Filter>
+ </ClCompile>
<ClCompile Include="cache\cachetracking.cpp" />
</ItemGroup>
<ItemGroup>
diff --git a/zenutil/cache/cachekey.cpp b/zenutil/cache/cachekey.cpp
new file mode 100644
index 000000000..545b47f11
--- /dev/null
+++ b/zenutil/cache/cachekey.cpp
@@ -0,0 +1,9 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/cache/cachekey.h>
+
+namespace zen {
+
+const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()};
+
+} // namespace zen
diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp
new file mode 100644
index 000000000..f718bf841
--- /dev/null
+++ b/zenutil/cache/cachepolicy.cpp
@@ -0,0 +1,167 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/cache/cachepolicy.h>
+
+#include <zencore/compactbinary.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/string.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+namespace detail { namespace cacheopt {
+ constexpr std::string_view Local = "local"sv;
+ constexpr std::string_view Remote = "remote"sv;
+ constexpr std::string_view Data = "data"sv;
+ constexpr std::string_view Meta = "meta"sv;
+ constexpr std::string_view Value = "value"sv;
+ constexpr std::string_view Attachments = "attachments"sv;
+}} // namespace detail::cacheopt
+
+CachePolicy
+ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default)
+{
+ if (QueryPolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Local)
+ {
+ Result |= CachePolicy::QueryLocal;
+ }
+ if (Token == detail::cacheopt::Remote)
+ {
+ Result |= CachePolicy::QueryRemote;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CachePolicy
+ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default)
+{
+ if (StorePolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Local)
+ {
+ Result |= CachePolicy::StoreLocal;
+ }
+ if (Token == detail::cacheopt::Remote)
+ {
+ Result |= CachePolicy::StoreRemote;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CachePolicy
+ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default)
+{
+ if (SkipPolicy.empty())
+ {
+ return Default;
+ }
+
+ CachePolicy Result = CachePolicy::None;
+
+ ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) {
+ if (Token == detail::cacheopt::Meta)
+ {
+ Result |= CachePolicy::SkipMeta;
+ }
+ if (Token == detail::cacheopt::Value)
+ {
+ Result |= CachePolicy::SkipValue;
+ }
+ if (Token == detail::cacheopt::Attachments)
+ {
+ Result |= CachePolicy::SkipAttachments;
+ }
+ if (Token == detail::cacheopt::Data)
+ {
+ Result |= CachePolicy::SkipData;
+ }
+ return true;
+ });
+
+ return Result;
+}
+
+CacheRecordPolicy::CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy PayloadPolicy)
+: m_RecordPolicy(RecordPolicy)
+, m_DefaultPayloadPolicy(PayloadPolicy)
+{
+}
+
+CachePolicy
+CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const
+{
+ if (const auto It = m_PayloadPolicies.find(PayloadId); It != m_PayloadPolicies.end())
+ {
+ return It->second;
+ }
+
+ return m_DefaultPayloadPolicy;
+}
+
+bool
+CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy)
+{
+ using namespace std::literals;
+
+ const uint32_t RecordPolicy = RecordPolicyObject["RecordPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default));
+ const uint32_t DefaultPayloadPolicy =
+ RecordPolicyObject["DefaultPayloadPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default));
+
+ OutRecordPolicy.m_RecordPolicy = static_cast<CachePolicy>(RecordPolicy);
+ OutRecordPolicy.m_DefaultPayloadPolicy = static_cast<CachePolicy>(DefaultPayloadPolicy);
+
+ for (CbFieldView PayloadPolicyView : RecordPolicyObject["PayloadPolicies"sv])
+ {
+ CbObjectView PayloadPolicyObject = PayloadPolicyView.AsObjectView();
+ const Oid PayloadId = PayloadPolicyObject["Id"sv].AsObjectId();
+ const uint32_t PayloadPolicy = PayloadPolicyObject["Policy"sv].AsUInt32();
+
+ if (PayloadId != Oid::Zero && PayloadPolicy != 0)
+ {
+ OutRecordPolicy.m_PayloadPolicies.emplace(PayloadId, static_cast<CachePolicy>(PayloadPolicy));
+ }
+ }
+
+ return true;
+}
+
+void
+CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer)
+{
+ Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy());
+ Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy());
+
+ if (!Policy.m_PayloadPolicies.empty())
+ {
+ Writer.BeginArray("PayloadPolicies"sv);
+ for (const auto& Kv : Policy.m_PayloadPolicies)
+ {
+ Writer.AddObjectId("Id"sv, Kv.first);
+ Writer << "Policy"sv << static_cast<uint32_t>(Kv.second);
+ }
+ Writer.EndArray();
+ }
+}
+
+} // namespace zen
diff --git a/zenutil/include/zenutil/cache/cache.h b/zenutil/include/zenutil/cache/cache.h
new file mode 100644
index 000000000..1a1dd9386
--- /dev/null
+++ b/zenutil/include/zenutil/cache/cache.h
@@ -0,0 +1,6 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenutil/cache/cachekey.h>
+#include <zenutil/cache/cachepolicy.h>
diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h
new file mode 100644
index 000000000..fb36c7759
--- /dev/null
+++ b/zenutil/include/zenutil/cache/cachekey.h
@@ -0,0 +1,83 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/string.h>
+#include <zencore/uid.h>
+
+#include <zenutil/cache/cachepolicy.h>
+
+namespace zen {
+
+struct CacheKey
+{
+ std::string Bucket;
+ IoHash Hash;
+
+ static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; }
+
+ static const CacheKey Empty;
+};
+
+inline bool
+operator==(const CacheKey& A, const CacheKey& B)
+{
+ return A.Bucket == B.Bucket && A.Hash == B.Hash;
+}
+
+inline bool
+operator!=(const CacheKey& A, const CacheKey& B)
+{
+ return A.Bucket != B.Bucket || A.Hash != B.Hash;
+}
+
+inline bool
+operator<(const CacheKey& A, const CacheKey& B)
+{
+ const std::string& BucketA = A.Bucket;
+ const std::string& BucketB = B.Bucket;
+ return BucketA == BucketB ? A.Hash < B.Hash : BucketA < BucketB;
+}
+
+struct CacheChunkRequest
+{
+ CacheKey Key;
+ IoHash ChunkId;
+ Oid PayloadId;
+ uint64_t RawOffset = 0ull;
+ uint64_t RawSize = ~uint64_t(0);
+ CachePolicy Policy = CachePolicy::Default;
+};
+
+inline bool
+operator<(const CacheChunkRequest& A, const CacheChunkRequest& B)
+{
+ if (A.Key < B.Key)
+ {
+ return true;
+ }
+ if (B.Key < A.Key)
+ {
+ return false;
+ }
+ if (A.ChunkId < B.ChunkId)
+ {
+ return true;
+ }
+ if (B.ChunkId < A.ChunkId)
+ {
+ return false;
+ }
+ if (A.PayloadId < B.PayloadId)
+ {
+ return true;
+ }
+ if (B.PayloadId < A.PayloadId)
+ {
+ return false;
+ }
+ return A.RawOffset < B.RawOffset;
+}
+
+} // namespace zen
diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h
new file mode 100644
index 000000000..5675ccf4d
--- /dev/null
+++ b/zenutil/include/zenutil/cache/cachepolicy.h
@@ -0,0 +1,112 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/string.h>
+#include <zencore/uid.h>
+
+#include <gsl/gsl-lite.hpp>
+#include <unordered_map>
+
+namespace zen {
+
+class CbObjectView;
+class CbWriter;
+
+enum class CachePolicy : uint32_t
+{
+ /** A value without any flags set. */
+ None = 0,
+
+ /** Allow a cache request to query local caches. */
+ QueryLocal = 1 << 0,
+ /** Allow a cache request to query remote caches. */
+ QueryRemote = 1 << 1,
+ /** Allow a cache request to query any caches. */
+ Query = QueryLocal | QueryRemote,
+
+ /** Allow cache records and values to be stored in local caches. */
+ StoreLocal = 1 << 2,
+ /** Allow cache records and values to be stored in remote caches. */
+ StoreRemote = 1 << 3,
+ /** Allow cache records and values to be stored in any caches. */
+ Store = StoreLocal | StoreRemote,
+
+ /** Skip fetching the metadata for record requests. */
+ SkipMeta = 1 << 4,
+ /** Skip fetching the value for record, chunk, or value requests. */
+ SkipValue = 1 << 5,
+ /** Skip fetching the attachments for record requests. */
+ SkipAttachments = 1 << 6,
+ /**
+ * Skip fetching the data for any requests.
+ *
+ * Put requests with skip flags may assume that record existence implies payload existence.
+ */
+ SkipData = SkipMeta | SkipValue | SkipAttachments,
+
+ /**
+ * Keep records in the cache for at least the duration of the session.
+ *
+ * This is a hint that the record may be accessed again in this session. This is mainly meant
+ * to be used when subsequent accesses will not tolerate a cache miss.
+ */
+ KeepAlive = 1 << 7,
+
+ /**
+ * Partial output will be provided with the error status when a required payload is missing.
+ *
+ * This is meant for cases when the missing payloads can be individually recovered or rebuilt
+ * without rebuilding the whole record. The cache automatically adds this flag when there are
+ * other cache stores that it may be able to recover missing payloads from.
+ *
+ * Requests for records would return records where the missing payloads have a hash and size,
+ * but no data. Requests for chunks or values would return the hash and size, but no data.
+ */
+ PartialOnError = 1 << 8,
+
+ /** Allow cache requests to query and store records and values in local caches. */
+ Local = QueryLocal | StoreLocal,
+ /** Allow cache requests to query and store records and values in remote caches. */
+ Remote = QueryRemote | StoreRemote,
+
+ /** Allow cache requests to query and store records and values in any caches. */
+ Default = Query | Store,
+
+ /** Do not allow cache requests to query or store records and values in any caches. */
+ Disable = None,
+};
+
+gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy);
+
+CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query);
+
+CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store);
+
+CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None);
+
+class CacheRecordPolicy
+{
+public:
+ CacheRecordPolicy() = default;
+ CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy DefaultPayloadPolicy = CachePolicy::Default);
+
+ CachePolicy GetRecordPolicy() const { return m_RecordPolicy; }
+ CachePolicy GetPayloadPolicy(const Oid& PayloadId) const;
+ CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; }
+
+ bool HasRecordPolicy(const CachePolicy Policy) const { return (m_RecordPolicy & Policy) == Policy; }
+ bool HasPayloadPolicy(const Oid& PayloadId, const CachePolicy Policy) const { return (GetPayloadPolicy(PayloadId) & Policy) == Policy; }
+
+ static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy);
+ static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer);
+
+private:
+ using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>;
+
+ CachePolicy m_RecordPolicy = CachePolicy::Default;
+ CachePolicy m_DefaultPayloadPolicy = CachePolicy::Default;
+ PayloadPolicyMap m_PayloadPolicies;
+};
+
+} // namespace zen
diff --git a/zenutil/zenutil.vcxproj b/zenutil/zenutil.vcxproj
index 20f803e2a..e0c034c6f 100644
--- a/zenutil/zenutil.vcxproj
+++ b/zenutil/zenutil.vcxproj
@@ -95,9 +95,14 @@
</Link>
</ItemDefinitionGroup>
<ItemGroup>
+ <ClCompile Include="cache\cachekey.cpp" />
+ <ClCompile Include="cache\cachepolicy.cpp" />
<ClCompile Include="zenserverprocess.cpp" />
</ItemGroup>
<ItemGroup>
+ <ClInclude Include="include\zenutil\cache\cache.h" />
+ <ClInclude Include="include\zenutil\cache\cachekey.h" />
+ <ClInclude Include="include\zenutil\cache\cachepolicy.h" />
<ClInclude Include="include\zenutil\zenserverprocess.h" />
</ItemGroup>
<ItemGroup>
diff --git a/zenutil/zenutil.vcxproj.filters b/zenutil/zenutil.vcxproj.filters
index 9952e7159..368a827c2 100644
--- a/zenutil/zenutil.vcxproj.filters
+++ b/zenutil/zenutil.vcxproj.filters
@@ -2,11 +2,31 @@
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup>
<ClCompile Include="zenserverprocess.cpp" />
+ <ClCompile Include="cache\cachekey.cpp">
+ <Filter>cache</Filter>
+ </ClCompile>
+ <ClCompile Include="cache\cachepolicy.cpp">
+ <Filter>cache</Filter>
+ </ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="include\zenutil\zenserverprocess.h" />
+ <ClInclude Include="include\zenutil\cache\cache.h">
+ <Filter>cache</Filter>
+ </ClInclude>
+ <ClInclude Include="include\zenutil\cache\cachekey.h">
+ <Filter>cache</Filter>
+ </ClInclude>
+ <ClInclude Include="include\zenutil\cache\cachepolicy.h">
+ <Filter>cache</Filter>
+ </ClInclude>
</ItemGroup>
<ItemGroup>
<None Include="xmake.lua" />
</ItemGroup>
+ <ItemGroup>
+ <Filter Include="cache">
+ <UniqueIdentifier>{a441c536-6a01-4ac4-85a0-2667c95027d0}</UniqueIdentifier>
+ </Filter>
+ </ItemGroup>
</Project> \ No newline at end of file