diff options
| author | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2021-11-18 14:33:44 +0100 |
| commit | e53df312f3c4dcef19add9cd26afc324557b1f5a (patch) | |
| tree | a3d7b59f29e484d48edffb2a26bbb0dd2d95533d | |
| parent | gc: implemented timestamped snapshot persistence (diff) | |
| parent | Change error code for failed upsteam apply (diff) | |
| download | zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.tar.xz zen-e53df312f3c4dcef19add9cd26afc324557b1f5a.zip | |
merge from main
| -rw-r--r-- | thirdparty/BLAKE3/.github/workflows/build_b3sum.py (renamed from 3rdparty/BLAKE3/.github/workflows/build_b3sum.py) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/.github/workflows/ci.yml (renamed from 3rdparty/BLAKE3/.github/workflows/ci.yml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/.github/workflows/tag.yml (renamed from 3rdparty/BLAKE3/.github/workflows/tag.yml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py (renamed from 3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/.gitignore (renamed from 3rdparty/BLAKE3/.gitignore) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/CONTRIBUTING.md (renamed from 3rdparty/BLAKE3/CONTRIBUTING.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/Cargo.toml (renamed from 3rdparty/BLAKE3/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/LICENSE (renamed from 3rdparty/BLAKE3/LICENSE) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/README.md (renamed from 3rdparty/BLAKE3/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/Cargo.toml (renamed from 3rdparty/BLAKE3/b3sum/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/README.md (renamed from 3rdparty/BLAKE3/b3sum/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/src/main.rs (renamed from 3rdparty/BLAKE3/b3sum/src/main.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/src/unit_tests.rs (renamed from 3rdparty/BLAKE3/b3sum/src/unit_tests.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/tests/cli_tests.rs (renamed from 3rdparty/BLAKE3/b3sum/tests/cli_tests.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/b3sum/what_does_check_do.md (renamed from 3rdparty/BLAKE3/b3sum/what_does_check_do.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/benches/bench.rs (renamed from 3rdparty/BLAKE3/benches/bench.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/build.rs (renamed from 3rdparty/BLAKE3/build.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/.gitignore (renamed from 3rdparty/BLAKE3/c/.gitignore) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/Makefile.testing (renamed from 3rdparty/BLAKE3/c/Makefile.testing) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/README.md (renamed from 3rdparty/BLAKE3/c/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3.c (renamed from 3rdparty/BLAKE3/c/blake3.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3.h (renamed from 3rdparty/BLAKE3/c/blake3.h) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx2.c (renamed from 3rdparty/BLAKE3/c/blake3_avx2.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx512.c (renamed from 3rdparty/BLAKE3/c/blake3_avx512.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs (renamed from 3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_dispatch.c (renamed from 3rdparty/BLAKE3/c/blake3_dispatch.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_impl.h (renamed from 3rdparty/BLAKE3/c/blake3_impl.h) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_neon.c (renamed from 3rdparty/BLAKE3/c/blake3_neon.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_portable.c (renamed from 3rdparty/BLAKE3/c/blake3_portable.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse2.c (renamed from 3rdparty/BLAKE3/c/blake3_sse2.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse41.c (renamed from 3rdparty/BLAKE3/c/blake3_sse41.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm (renamed from 3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/example.c (renamed from 3rdparty/BLAKE3/c/example.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/main.c (renamed from 3rdparty/BLAKE3/c/main.c) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/c/test.py (renamed from 3rdparty/BLAKE3/c/test.py) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/lib/Linux_x64/libblake3.a (renamed from 3rdparty/BLAKE3/lib/Linux_x64/libblake3.a) | bin | 94190 -> 94190 bytes | |||
| -rw-r--r-- | thirdparty/BLAKE3/lib/Win64/BLAKE3.lib (renamed from 3rdparty/BLAKE3/lib/Win64/BLAKE3.lib) | bin | 172346 -> 172346 bytes | |||
| -rw-r--r-- | thirdparty/BLAKE3/media/B3.svg (renamed from 3rdparty/BLAKE3/media/B3.svg) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/media/BLAKE3.svg (renamed from 3rdparty/BLAKE3/media/BLAKE3.svg) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/media/speed.svg (renamed from 3rdparty/BLAKE3/media/speed.svg) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/reference_impl/Cargo.toml (renamed from 3rdparty/BLAKE3/reference_impl/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/reference_impl/README.md (renamed from 3rdparty/BLAKE3/reference_impl/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/reference_impl/reference_impl.rs (renamed from 3rdparty/BLAKE3/reference_impl/reference_impl.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/ffi_avx2.rs (renamed from 3rdparty/BLAKE3/src/ffi_avx2.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/ffi_avx512.rs (renamed from 3rdparty/BLAKE3/src/ffi_avx512.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/ffi_neon.rs (renamed from 3rdparty/BLAKE3/src/ffi_neon.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/ffi_sse2.rs (renamed from 3rdparty/BLAKE3/src/ffi_sse2.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/ffi_sse41.rs (renamed from 3rdparty/BLAKE3/src/ffi_sse41.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/guts.rs (renamed from 3rdparty/BLAKE3/src/guts.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/join.rs (renamed from 3rdparty/BLAKE3/src/join.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/lib.rs (renamed from 3rdparty/BLAKE3/src/lib.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/platform.rs (renamed from 3rdparty/BLAKE3/src/platform.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/portable.rs (renamed from 3rdparty/BLAKE3/src/portable.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/rust_avx2.rs (renamed from 3rdparty/BLAKE3/src/rust_avx2.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/rust_sse2.rs (renamed from 3rdparty/BLAKE3/src/rust_sse2.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/rust_sse41.rs (renamed from 3rdparty/BLAKE3/src/rust_sse41.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/test.rs (renamed from 3rdparty/BLAKE3/src/test.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/src/traits.rs (renamed from 3rdparty/BLAKE3/src/traits.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/test_vectors/Cargo.toml (renamed from 3rdparty/BLAKE3/test_vectors/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/test_vectors/cross_test.sh (renamed from 3rdparty/BLAKE3/test_vectors/cross_test.sh) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/test_vectors/src/lib.rs (renamed from 3rdparty/BLAKE3/test_vectors/src/lib.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/test_vectors/test_vectors.json (renamed from 3rdparty/BLAKE3/test_vectors/test_vectors.json) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/tools/compiler_version/Cargo.toml (renamed from 3rdparty/BLAKE3/tools/compiler_version/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/tools/compiler_version/build.rs (renamed from 3rdparty/BLAKE3/tools/compiler_version/build.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/tools/compiler_version/src/main.rs (renamed from 3rdparty/BLAKE3/tools/compiler_version/src/main.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml (renamed from 3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml) | 0 | ||||
| -rw-r--r-- | thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs (renamed from 3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs) | 0 | ||||
| -rw-r--r-- | thirdparty/Oodle/include/oodle2.h (renamed from 3rdparty/Oodle/include/oodle2.h) | 0 | ||||
| -rw-r--r-- | thirdparty/Oodle/include/oodle2base.h (renamed from 3rdparty/Oodle/include/oodle2base.h) | 0 | ||||
| -rwxr-xr-x | thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a (renamed from 3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a) | bin | 2593542 -> 2593542 bytes | |||
| -rwxr-xr-x | thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 (renamed from 3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8) | bin | 1905079 -> 1905079 bytes | |||
| -rw-r--r-- | thirdparty/Oodle/lib/Win64/oo2core_win64.lib (renamed from 3rdparty/Oodle/lib/Win64/oo2core_win64.lib) | bin | 8776756 -> 8776756 bytes | |||
| -rw-r--r-- | thirdparty/licenses/README.md | 3 | ||||
| -rw-r--r-- | thirdparty/utfcpp/.circleci/config.yml (renamed from 3rdparty/utfcpp/.circleci/config.yml) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/.gitignore (renamed from 3rdparty/utfcpp/.gitignore) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/.gitmodules (renamed from 3rdparty/utfcpp/.gitmodules) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/CMakeLists.txt (renamed from 3rdparty/utfcpp/CMakeLists.txt) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/LICENSE (renamed from 3rdparty/utfcpp/LICENSE) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/README.md (renamed from 3rdparty/utfcpp/README.md) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/samples/docsample.cpp (renamed from 3rdparty/utfcpp/samples/docsample.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/source/utf8.h (renamed from 3rdparty/utfcpp/source/utf8.h) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/source/utf8/checked.h (renamed from 3rdparty/utfcpp/source/utf8/checked.h) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/source/utf8/core.h (renamed from 3rdparty/utfcpp/source/utf8/core.h) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/source/utf8/cpp11.h (renamed from 3rdparty/utfcpp/source/utf8/cpp11.h) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/source/utf8/unchecked.h (renamed from 3rdparty/utfcpp/source/utf8/unchecked.h) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/CMakeLists.txt (renamed from 3rdparty/utfcpp/tests/CMakeLists.txt) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/docker/Dockerfile (renamed from 3rdparty/utfcpp/tests/docker/Dockerfile) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/negative.cpp (renamed from 3rdparty/utfcpp/tests/negative.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_checked_api.cpp (renamed from 3rdparty/utfcpp/tests/test_checked_api.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_checked_iterator.cpp (renamed from 3rdparty/utfcpp/tests/test_checked_iterator.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_cpp11.cpp (renamed from 3rdparty/utfcpp/tests/test_cpp11.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_data/utf8_invalid.txt (renamed from 3rdparty/utfcpp/tests/test_data/utf8_invalid.txt) | bin | 20010 -> 20010 bytes | |||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_unchecked_api.cpp (renamed from 3rdparty/utfcpp/tests/test_unchecked_api.cpp) | 0 | ||||
| -rw-r--r-- | thirdparty/utfcpp/tests/test_unchecked_iterator.cpp (renamed from 3rdparty/utfcpp/tests/test_unchecked_iterator.cpp) | 0 | ||||
| -rw-r--r-- | zen.sln | 2 | ||||
| -rw-r--r-- | zencore/blake3.cpp | 2 | ||||
| -rw-r--r-- | zencore/compactbinary.cpp | 18 | ||||
| -rw-r--r-- | zencore/compress.cpp | 15 | ||||
| -rw-r--r-- | zencore/include/zencore/iobuffer.h | 1 | ||||
| -rw-r--r-- | zencore/memory.cpp | 4 | ||||
| -rw-r--r-- | zencore/xmake.lua | 6 | ||||
| -rw-r--r-- | zencore/zencore.vcxproj | 4 | ||||
| -rw-r--r-- | zenfs_common.props | 2 | ||||
| -rw-r--r-- | zenserver-test/zenserver-test.cpp | 402 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.cpp | 642 | ||||
| -rw-r--r-- | zenserver/cache/structuredcache.h | 5 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.cpp | 66 | ||||
| -rw-r--r-- | zenserver/cache/structuredcachestore.h | 85 | ||||
| -rw-r--r-- | zenserver/compute/apply.cpp | 107 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 20 | ||||
| -rw-r--r-- | zenserver/config.cpp | 16 | ||||
| -rw-r--r-- | zenserver/config.h | 8 | ||||
| -rw-r--r-- | zenserver/diag/formatters.h | 20 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 264 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 56 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 1559 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 172 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.cpp | 678 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamcache.h | 74 | ||||
| -rw-r--r-- | zenserver/upstream/zen.cpp | 58 | ||||
| -rw-r--r-- | zenserver/upstream/zen.h | 19 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 39 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj | 2 | ||||
| -rw-r--r-- | zenserver/zenserver.vcxproj.filters | 6 | ||||
| -rw-r--r-- | zenutil/cache/cachekey.cpp | 9 | ||||
| -rw-r--r-- | zenutil/cache/cachepolicy.cpp | 167 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cache.h | 6 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachekey.h | 83 | ||||
| -rw-r--r-- | zenutil/include/zenutil/cache/cachepolicy.h | 112 | ||||
| -rw-r--r-- | zenutil/zenutil.vcxproj | 5 | ||||
| -rw-r--r-- | zenutil/zenutil.vcxproj.filters | 20 |
148 files changed, 4193 insertions, 564 deletions
diff --git a/3rdparty/BLAKE3/.github/workflows/build_b3sum.py b/thirdparty/BLAKE3/.github/workflows/build_b3sum.py index e487daf97..e487daf97 100644 --- a/3rdparty/BLAKE3/.github/workflows/build_b3sum.py +++ b/thirdparty/BLAKE3/.github/workflows/build_b3sum.py diff --git a/3rdparty/BLAKE3/.github/workflows/ci.yml b/thirdparty/BLAKE3/.github/workflows/ci.yml index 464a411d5..464a411d5 100644 --- a/3rdparty/BLAKE3/.github/workflows/ci.yml +++ b/thirdparty/BLAKE3/.github/workflows/ci.yml diff --git a/3rdparty/BLAKE3/.github/workflows/tag.yml b/thirdparty/BLAKE3/.github/workflows/tag.yml index 577d4f312..577d4f312 100644 --- a/3rdparty/BLAKE3/.github/workflows/tag.yml +++ b/thirdparty/BLAKE3/.github/workflows/tag.yml diff --git a/3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py b/thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py index c1cbf518b..c1cbf518b 100644 --- a/3rdparty/BLAKE3/.github/workflows/upload_github_release_asset.py +++ b/thirdparty/BLAKE3/.github/workflows/upload_github_release_asset.py diff --git a/3rdparty/BLAKE3/.gitignore b/thirdparty/BLAKE3/.gitignore index fa8d85ac5..fa8d85ac5 100644 --- a/3rdparty/BLAKE3/.gitignore +++ b/thirdparty/BLAKE3/.gitignore diff --git a/3rdparty/BLAKE3/CONTRIBUTING.md b/thirdparty/BLAKE3/CONTRIBUTING.md index 3a605f255..3a605f255 100644 --- a/3rdparty/BLAKE3/CONTRIBUTING.md +++ b/thirdparty/BLAKE3/CONTRIBUTING.md diff --git a/3rdparty/BLAKE3/Cargo.toml b/thirdparty/BLAKE3/Cargo.toml index 3df0fd279..3df0fd279 100644 --- a/3rdparty/BLAKE3/Cargo.toml +++ b/thirdparty/BLAKE3/Cargo.toml diff --git a/3rdparty/BLAKE3/LICENSE b/thirdparty/BLAKE3/LICENSE index f5892efc3..f5892efc3 100644 --- a/3rdparty/BLAKE3/LICENSE +++ b/thirdparty/BLAKE3/LICENSE diff --git a/3rdparty/BLAKE3/README.md b/thirdparty/BLAKE3/README.md index 360183668..360183668 100644 --- a/3rdparty/BLAKE3/README.md +++ b/thirdparty/BLAKE3/README.md diff --git a/3rdparty/BLAKE3/b3sum/Cargo.toml b/thirdparty/BLAKE3/b3sum/Cargo.toml index 4678bee2d..4678bee2d 100644 --- a/3rdparty/BLAKE3/b3sum/Cargo.toml +++ b/thirdparty/BLAKE3/b3sum/Cargo.toml diff --git a/3rdparty/BLAKE3/b3sum/README.md b/thirdparty/BLAKE3/b3sum/README.md index e97830b7c..e97830b7c 100644 --- a/3rdparty/BLAKE3/b3sum/README.md +++ b/thirdparty/BLAKE3/b3sum/README.md diff --git a/3rdparty/BLAKE3/b3sum/src/main.rs b/thirdparty/BLAKE3/b3sum/src/main.rs index b01e5de58..b01e5de58 100644 --- a/3rdparty/BLAKE3/b3sum/src/main.rs +++ b/thirdparty/BLAKE3/b3sum/src/main.rs diff --git a/3rdparty/BLAKE3/b3sum/src/unit_tests.rs b/thirdparty/BLAKE3/b3sum/src/unit_tests.rs index 1fa1a17dc..1fa1a17dc 100644 --- a/3rdparty/BLAKE3/b3sum/src/unit_tests.rs +++ b/thirdparty/BLAKE3/b3sum/src/unit_tests.rs diff --git a/3rdparty/BLAKE3/b3sum/tests/cli_tests.rs b/thirdparty/BLAKE3/b3sum/tests/cli_tests.rs index 51fbbba98..51fbbba98 100644 --- a/3rdparty/BLAKE3/b3sum/tests/cli_tests.rs +++ b/thirdparty/BLAKE3/b3sum/tests/cli_tests.rs diff --git a/3rdparty/BLAKE3/b3sum/what_does_check_do.md b/thirdparty/BLAKE3/b3sum/what_does_check_do.md index 3a44a0010..3a44a0010 100644 --- a/3rdparty/BLAKE3/b3sum/what_does_check_do.md +++ b/thirdparty/BLAKE3/b3sum/what_does_check_do.md diff --git a/3rdparty/BLAKE3/benches/bench.rs b/thirdparty/BLAKE3/benches/bench.rs index ba5a4041f..ba5a4041f 100644 --- a/3rdparty/BLAKE3/benches/bench.rs +++ b/thirdparty/BLAKE3/benches/bench.rs diff --git a/3rdparty/BLAKE3/build.rs b/thirdparty/BLAKE3/build.rs index ea657d8db..ea657d8db 100644 --- a/3rdparty/BLAKE3/build.rs +++ b/thirdparty/BLAKE3/build.rs diff --git a/3rdparty/BLAKE3/c/.gitignore b/thirdparty/BLAKE3/c/.gitignore index 0bf608cee..0bf608cee 100644 --- a/3rdparty/BLAKE3/c/.gitignore +++ b/thirdparty/BLAKE3/c/.gitignore diff --git a/3rdparty/BLAKE3/c/Makefile.testing b/thirdparty/BLAKE3/c/Makefile.testing index 41e6b8285..41e6b8285 100644 --- a/3rdparty/BLAKE3/c/Makefile.testing +++ b/thirdparty/BLAKE3/c/Makefile.testing diff --git a/3rdparty/BLAKE3/c/README.md b/thirdparty/BLAKE3/c/README.md index 5e8b4e682..5e8b4e682 100644 --- a/3rdparty/BLAKE3/c/README.md +++ b/thirdparty/BLAKE3/c/README.md diff --git a/3rdparty/BLAKE3/c/blake3.c b/thirdparty/BLAKE3/c/blake3.c index 7abf5324e..7abf5324e 100644 --- a/3rdparty/BLAKE3/c/blake3.c +++ b/thirdparty/BLAKE3/c/blake3.c diff --git a/3rdparty/BLAKE3/c/blake3.h b/thirdparty/BLAKE3/c/blake3.h index 57ebd5adc..57ebd5adc 100644 --- a/3rdparty/BLAKE3/c/blake3.h +++ b/thirdparty/BLAKE3/c/blake3.h diff --git a/3rdparty/BLAKE3/c/blake3_avx2.c b/thirdparty/BLAKE3/c/blake3_avx2.c index c5a2ce9e2..c5a2ce9e2 100644 --- a/3rdparty/BLAKE3/c/blake3_avx2.c +++ b/thirdparty/BLAKE3/c/blake3_avx2.c diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S index 812bb8568..812bb8568 100644 --- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S +++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_unix.S diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S index bb58d2ae6..bb58d2ae6 100644 --- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S +++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_gnu.S diff --git a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm index 352298edd..352298edd 100644 --- a/3rdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm +++ b/thirdparty/BLAKE3/c/blake3_avx2_x86-64_windows_msvc.asm diff --git a/3rdparty/BLAKE3/c/blake3_avx512.c b/thirdparty/BLAKE3/c/blake3_avx512.c index 77a5c385c..77a5c385c 100644 --- a/3rdparty/BLAKE3/c/blake3_avx512.c +++ b/thirdparty/BLAKE3/c/blake3_avx512.c diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S index a06aede0f..a06aede0f 100644 --- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S +++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_unix.S diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S index e10b9f36c..e10b9f36c 100644 --- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S +++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_gnu.S diff --git a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm index b19efbaae..b19efbaae 100644 --- a/3rdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm +++ b/thirdparty/BLAKE3/c/blake3_avx512_x86-64_windows_msvc.asm diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml index 2052c7458..2052c7458 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/Cargo.toml diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md index c44726b90..c44726b90 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/README.md +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/README.md diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs index 119bd2064..119bd2064 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/benches/bench.rs diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs index d5dc47a81..d5dc47a81 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/build.rs diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh index 94d50affb..94d50affb 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/cross_test.sh diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs index f18fe123f..f18fe123f 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/lib.rs diff --git a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs index b989ae9c4..b989ae9c4 100644 --- a/3rdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs +++ b/thirdparty/BLAKE3/c/blake3_c_rust_bindings/src/test.rs diff --git a/3rdparty/BLAKE3/c/blake3_dispatch.c b/thirdparty/BLAKE3/c/blake3_dispatch.c index 6518478e5..6518478e5 100644 --- a/3rdparty/BLAKE3/c/blake3_dispatch.c +++ b/thirdparty/BLAKE3/c/blake3_dispatch.c diff --git a/3rdparty/BLAKE3/c/blake3_impl.h b/thirdparty/BLAKE3/c/blake3_impl.h index 86ab6aa25..86ab6aa25 100644 --- a/3rdparty/BLAKE3/c/blake3_impl.h +++ b/thirdparty/BLAKE3/c/blake3_impl.h diff --git a/3rdparty/BLAKE3/c/blake3_neon.c b/thirdparty/BLAKE3/c/blake3_neon.c index 46691f526..46691f526 100644 --- a/3rdparty/BLAKE3/c/blake3_neon.c +++ b/thirdparty/BLAKE3/c/blake3_neon.c diff --git a/3rdparty/BLAKE3/c/blake3_portable.c b/thirdparty/BLAKE3/c/blake3_portable.c index 062dd1b47..062dd1b47 100644 --- a/3rdparty/BLAKE3/c/blake3_portable.c +++ b/thirdparty/BLAKE3/c/blake3_portable.c diff --git a/3rdparty/BLAKE3/c/blake3_sse2.c b/thirdparty/BLAKE3/c/blake3_sse2.c index 159296688..159296688 100644 --- a/3rdparty/BLAKE3/c/blake3_sse2.c +++ b/thirdparty/BLAKE3/c/blake3_sse2.c diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S index d144046ab..d144046ab 100644 --- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S +++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_unix.S diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S index 494c0c6fd..494c0c6fd 100644 --- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S +++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_gnu.S diff --git a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm index 72deb7bbc..72deb7bbc 100644 --- a/3rdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm +++ b/thirdparty/BLAKE3/c/blake3_sse2_x86-64_windows_msvc.asm diff --git a/3rdparty/BLAKE3/c/blake3_sse41.c b/thirdparty/BLAKE3/c/blake3_sse41.c index b31122533..b31122533 100644 --- a/3rdparty/BLAKE3/c/blake3_sse41.c +++ b/thirdparty/BLAKE3/c/blake3_sse41.c diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S index a3ff64269..a3ff64269 100644 --- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S +++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_unix.S diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S index 60d0a4042..60d0a4042 100644 --- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S +++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_gnu.S diff --git a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm index 87001e4d3..87001e4d3 100644 --- a/3rdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm +++ b/thirdparty/BLAKE3/c/blake3_sse41_x86-64_windows_msvc.asm diff --git a/3rdparty/BLAKE3/c/example.c b/thirdparty/BLAKE3/c/example.c index 02fe3c32b..02fe3c32b 100644 --- a/3rdparty/BLAKE3/c/example.c +++ b/thirdparty/BLAKE3/c/example.c diff --git a/3rdparty/BLAKE3/c/main.c b/thirdparty/BLAKE3/c/main.c index 9b8a436f3..9b8a436f3 100644 --- a/3rdparty/BLAKE3/c/main.c +++ b/thirdparty/BLAKE3/c/main.c diff --git a/3rdparty/BLAKE3/c/test.py b/thirdparty/BLAKE3/c/test.py index b0b192950..b0b192950 100644 --- a/3rdparty/BLAKE3/c/test.py +++ b/thirdparty/BLAKE3/c/test.py diff --git a/3rdparty/BLAKE3/lib/Linux_x64/libblake3.a b/thirdparty/BLAKE3/lib/Linux_x64/libblake3.a Binary files differindex b956e22cb..b956e22cb 100644 --- a/3rdparty/BLAKE3/lib/Linux_x64/libblake3.a +++ b/thirdparty/BLAKE3/lib/Linux_x64/libblake3.a diff --git a/3rdparty/BLAKE3/lib/Win64/BLAKE3.lib b/thirdparty/BLAKE3/lib/Win64/BLAKE3.lib Binary files differindex 1308d9928..1308d9928 100644 --- a/3rdparty/BLAKE3/lib/Win64/BLAKE3.lib +++ b/thirdparty/BLAKE3/lib/Win64/BLAKE3.lib diff --git a/3rdparty/BLAKE3/media/B3.svg b/thirdparty/BLAKE3/media/B3.svg index a50da0ce9..a50da0ce9 100644 --- a/3rdparty/BLAKE3/media/B3.svg +++ b/thirdparty/BLAKE3/media/B3.svg diff --git a/3rdparty/BLAKE3/media/BLAKE3.svg b/thirdparty/BLAKE3/media/BLAKE3.svg index 2d50c2d3b..2d50c2d3b 100644 --- a/3rdparty/BLAKE3/media/BLAKE3.svg +++ b/thirdparty/BLAKE3/media/BLAKE3.svg diff --git a/3rdparty/BLAKE3/media/speed.svg b/thirdparty/BLAKE3/media/speed.svg index 7bd65ca3c..7bd65ca3c 100644 --- a/3rdparty/BLAKE3/media/speed.svg +++ b/thirdparty/BLAKE3/media/speed.svg diff --git a/3rdparty/BLAKE3/reference_impl/Cargo.toml b/thirdparty/BLAKE3/reference_impl/Cargo.toml index 8c81e5ad9..8c81e5ad9 100644 --- a/3rdparty/BLAKE3/reference_impl/Cargo.toml +++ b/thirdparty/BLAKE3/reference_impl/Cargo.toml diff --git a/3rdparty/BLAKE3/reference_impl/README.md b/thirdparty/BLAKE3/reference_impl/README.md index 941fafd72..941fafd72 100644 --- a/3rdparty/BLAKE3/reference_impl/README.md +++ b/thirdparty/BLAKE3/reference_impl/README.md diff --git a/3rdparty/BLAKE3/reference_impl/reference_impl.rs b/thirdparty/BLAKE3/reference_impl/reference_impl.rs index 248834319..248834319 100644 --- a/3rdparty/BLAKE3/reference_impl/reference_impl.rs +++ b/thirdparty/BLAKE3/reference_impl/reference_impl.rs diff --git a/3rdparty/BLAKE3/src/ffi_avx2.rs b/thirdparty/BLAKE3/src/ffi_avx2.rs index d805e868e..d805e868e 100644 --- a/3rdparty/BLAKE3/src/ffi_avx2.rs +++ b/thirdparty/BLAKE3/src/ffi_avx2.rs diff --git a/3rdparty/BLAKE3/src/ffi_avx512.rs b/thirdparty/BLAKE3/src/ffi_avx512.rs index c1b9f649b..c1b9f649b 100644 --- a/3rdparty/BLAKE3/src/ffi_avx512.rs +++ b/thirdparty/BLAKE3/src/ffi_avx512.rs diff --git a/3rdparty/BLAKE3/src/ffi_neon.rs b/thirdparty/BLAKE3/src/ffi_neon.rs index 889974277..889974277 100644 --- a/3rdparty/BLAKE3/src/ffi_neon.rs +++ b/thirdparty/BLAKE3/src/ffi_neon.rs diff --git a/3rdparty/BLAKE3/src/ffi_sse2.rs b/thirdparty/BLAKE3/src/ffi_sse2.rs index c49a229ad..c49a229ad 100644 --- a/3rdparty/BLAKE3/src/ffi_sse2.rs +++ b/thirdparty/BLAKE3/src/ffi_sse2.rs diff --git a/3rdparty/BLAKE3/src/ffi_sse41.rs b/thirdparty/BLAKE3/src/ffi_sse41.rs index 0b64c90a0..0b64c90a0 100644 --- a/3rdparty/BLAKE3/src/ffi_sse41.rs +++ b/thirdparty/BLAKE3/src/ffi_sse41.rs diff --git a/3rdparty/BLAKE3/src/guts.rs b/thirdparty/BLAKE3/src/guts.rs index 88dcc86cd..88dcc86cd 100644 --- a/3rdparty/BLAKE3/src/guts.rs +++ b/thirdparty/BLAKE3/src/guts.rs diff --git a/3rdparty/BLAKE3/src/join.rs b/thirdparty/BLAKE3/src/join.rs index 60932db1c..60932db1c 100644 --- a/3rdparty/BLAKE3/src/join.rs +++ b/thirdparty/BLAKE3/src/join.rs diff --git a/3rdparty/BLAKE3/src/lib.rs b/thirdparty/BLAKE3/src/lib.rs index bf66b6dae..bf66b6dae 100644 --- a/3rdparty/BLAKE3/src/lib.rs +++ b/thirdparty/BLAKE3/src/lib.rs diff --git a/3rdparty/BLAKE3/src/platform.rs b/thirdparty/BLAKE3/src/platform.rs index 4bd67de7a..4bd67de7a 100644 --- a/3rdparty/BLAKE3/src/platform.rs +++ b/thirdparty/BLAKE3/src/platform.rs diff --git a/3rdparty/BLAKE3/src/portable.rs b/thirdparty/BLAKE3/src/portable.rs index 0a569cec7..0a569cec7 100644 --- a/3rdparty/BLAKE3/src/portable.rs +++ b/thirdparty/BLAKE3/src/portable.rs diff --git a/3rdparty/BLAKE3/src/rust_avx2.rs b/thirdparty/BLAKE3/src/rust_avx2.rs index 6ab773ad4..6ab773ad4 100644 --- a/3rdparty/BLAKE3/src/rust_avx2.rs +++ b/thirdparty/BLAKE3/src/rust_avx2.rs diff --git a/3rdparty/BLAKE3/src/rust_sse2.rs b/thirdparty/BLAKE3/src/rust_sse2.rs index 15b52ee5d..15b52ee5d 100644 --- a/3rdparty/BLAKE3/src/rust_sse2.rs +++ b/thirdparty/BLAKE3/src/rust_sse2.rs diff --git a/3rdparty/BLAKE3/src/rust_sse41.rs b/thirdparty/BLAKE3/src/rust_sse41.rs index d5cf0f4a9..d5cf0f4a9 100644 --- a/3rdparty/BLAKE3/src/rust_sse41.rs +++ b/thirdparty/BLAKE3/src/rust_sse41.rs diff --git a/3rdparty/BLAKE3/src/test.rs b/thirdparty/BLAKE3/src/test.rs index eefb1a354..eefb1a354 100644 --- a/3rdparty/BLAKE3/src/test.rs +++ b/thirdparty/BLAKE3/src/test.rs diff --git a/3rdparty/BLAKE3/src/traits.rs b/thirdparty/BLAKE3/src/traits.rs index 9704e0106..9704e0106 100644 --- a/3rdparty/BLAKE3/src/traits.rs +++ b/thirdparty/BLAKE3/src/traits.rs diff --git a/3rdparty/BLAKE3/test_vectors/Cargo.toml b/thirdparty/BLAKE3/test_vectors/Cargo.toml index cd74a9df0..cd74a9df0 100644 --- a/3rdparty/BLAKE3/test_vectors/Cargo.toml +++ b/thirdparty/BLAKE3/test_vectors/Cargo.toml diff --git a/3rdparty/BLAKE3/test_vectors/cross_test.sh b/thirdparty/BLAKE3/test_vectors/cross_test.sh index c4d280c9d..c4d280c9d 100644 --- a/3rdparty/BLAKE3/test_vectors/cross_test.sh +++ b/thirdparty/BLAKE3/test_vectors/cross_test.sh diff --git a/3rdparty/BLAKE3/test_vectors/src/lib.rs b/thirdparty/BLAKE3/test_vectors/src/lib.rs index 04460f668..04460f668 100644 --- a/3rdparty/BLAKE3/test_vectors/src/lib.rs +++ b/thirdparty/BLAKE3/test_vectors/src/lib.rs diff --git a/3rdparty/BLAKE3/test_vectors/test_vectors.json b/thirdparty/BLAKE3/test_vectors/test_vectors.json index f6da91792..f6da91792 100644 --- a/3rdparty/BLAKE3/test_vectors/test_vectors.json +++ b/thirdparty/BLAKE3/test_vectors/test_vectors.json diff --git a/3rdparty/BLAKE3/tools/compiler_version/Cargo.toml b/thirdparty/BLAKE3/tools/compiler_version/Cargo.toml index 1046cf29d..1046cf29d 100644 --- a/3rdparty/BLAKE3/tools/compiler_version/Cargo.toml +++ b/thirdparty/BLAKE3/tools/compiler_version/Cargo.toml diff --git a/3rdparty/BLAKE3/tools/compiler_version/build.rs b/thirdparty/BLAKE3/tools/compiler_version/build.rs index 3e14ebe67..3e14ebe67 100644 --- a/3rdparty/BLAKE3/tools/compiler_version/build.rs +++ b/thirdparty/BLAKE3/tools/compiler_version/build.rs diff --git a/3rdparty/BLAKE3/tools/compiler_version/src/main.rs b/thirdparty/BLAKE3/tools/compiler_version/src/main.rs index 767cb31bd..767cb31bd 100644 --- a/3rdparty/BLAKE3/tools/compiler_version/src/main.rs +++ b/thirdparty/BLAKE3/tools/compiler_version/src/main.rs diff --git a/3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml b/thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml index 9e30174a9..9e30174a9 100644 --- a/3rdparty/BLAKE3/tools/instruction_set_support/Cargo.toml +++ b/thirdparty/BLAKE3/tools/instruction_set_support/Cargo.toml diff --git a/3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs b/thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs index 6b509b053..6b509b053 100644 --- a/3rdparty/BLAKE3/tools/instruction_set_support/src/main.rs +++ b/thirdparty/BLAKE3/tools/instruction_set_support/src/main.rs diff --git a/3rdparty/Oodle/include/oodle2.h b/thirdparty/Oodle/include/oodle2.h index 31204e932..31204e932 100644 --- a/3rdparty/Oodle/include/oodle2.h +++ b/thirdparty/Oodle/include/oodle2.h diff --git a/3rdparty/Oodle/include/oodle2base.h b/thirdparty/Oodle/include/oodle2base.h index 05f73f3ea..05f73f3ea 100644 --- a/3rdparty/Oodle/include/oodle2base.h +++ b/thirdparty/Oodle/include/oodle2base.h diff --git a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a Binary files differindex dee0353e5..dee0353e5 100755 --- a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a +++ b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.a diff --git a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 Binary files differindex 425ada44d..425ada44d 100755 --- a/3rdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 +++ b/thirdparty/Oodle/lib/Linux_x64/liboo2corelinux64.so.8 diff --git a/3rdparty/Oodle/lib/Win64/oo2core_win64.lib b/thirdparty/Oodle/lib/Win64/oo2core_win64.lib Binary files differindex ae42f727a..ae42f727a 100644 --- a/3rdparty/Oodle/lib/Win64/oo2core_win64.lib +++ b/thirdparty/Oodle/lib/Win64/oo2core_win64.lib diff --git a/thirdparty/licenses/README.md b/thirdparty/licenses/README.md new file mode 100644 index 000000000..a4d8e1c97 --- /dev/null +++ b/thirdparty/licenses/README.md @@ -0,0 +1,3 @@ +# Placeholder + +Third party library license information goes here at some point in the near future. diff --git a/3rdparty/utfcpp/.circleci/config.yml b/thirdparty/utfcpp/.circleci/config.yml index b2cbdaf99..b2cbdaf99 100644 --- a/3rdparty/utfcpp/.circleci/config.yml +++ b/thirdparty/utfcpp/.circleci/config.yml diff --git a/3rdparty/utfcpp/.gitignore b/thirdparty/utfcpp/.gitignore index 488d51dd9..488d51dd9 100644 --- a/3rdparty/utfcpp/.gitignore +++ b/thirdparty/utfcpp/.gitignore diff --git a/3rdparty/utfcpp/.gitmodules b/thirdparty/utfcpp/.gitmodules index d2ac8470d..d2ac8470d 100644 --- a/3rdparty/utfcpp/.gitmodules +++ b/thirdparty/utfcpp/.gitmodules diff --git a/3rdparty/utfcpp/CMakeLists.txt b/thirdparty/utfcpp/CMakeLists.txt index 4e63087bc..4e63087bc 100644 --- a/3rdparty/utfcpp/CMakeLists.txt +++ b/thirdparty/utfcpp/CMakeLists.txt diff --git a/3rdparty/utfcpp/LICENSE b/thirdparty/utfcpp/LICENSE index 36b7cd93c..36b7cd93c 100644 --- a/3rdparty/utfcpp/LICENSE +++ b/thirdparty/utfcpp/LICENSE diff --git a/3rdparty/utfcpp/README.md b/thirdparty/utfcpp/README.md index 0c689cf12..0c689cf12 100644 --- a/3rdparty/utfcpp/README.md +++ b/thirdparty/utfcpp/README.md diff --git a/3rdparty/utfcpp/samples/docsample.cpp b/thirdparty/utfcpp/samples/docsample.cpp index 653388725..653388725 100644 --- a/3rdparty/utfcpp/samples/docsample.cpp +++ b/thirdparty/utfcpp/samples/docsample.cpp diff --git a/3rdparty/utfcpp/source/utf8.h b/thirdparty/utfcpp/source/utf8.h index 82b13f59f..82b13f59f 100644 --- a/3rdparty/utfcpp/source/utf8.h +++ b/thirdparty/utfcpp/source/utf8.h diff --git a/3rdparty/utfcpp/source/utf8/checked.h b/thirdparty/utfcpp/source/utf8/checked.h index 71b9076f6..71b9076f6 100644 --- a/3rdparty/utfcpp/source/utf8/checked.h +++ b/thirdparty/utfcpp/source/utf8/checked.h diff --git a/3rdparty/utfcpp/source/utf8/core.h b/thirdparty/utfcpp/source/utf8/core.h index de6199f2a..de6199f2a 100644 --- a/3rdparty/utfcpp/source/utf8/core.h +++ b/thirdparty/utfcpp/source/utf8/core.h diff --git a/3rdparty/utfcpp/source/utf8/cpp11.h b/thirdparty/utfcpp/source/utf8/cpp11.h index d93961b04..d93961b04 100644 --- a/3rdparty/utfcpp/source/utf8/cpp11.h +++ b/thirdparty/utfcpp/source/utf8/cpp11.h diff --git a/3rdparty/utfcpp/source/utf8/unchecked.h b/thirdparty/utfcpp/source/utf8/unchecked.h index 0e1b51cc7..0e1b51cc7 100644 --- a/3rdparty/utfcpp/source/utf8/unchecked.h +++ b/thirdparty/utfcpp/source/utf8/unchecked.h diff --git a/3rdparty/utfcpp/tests/CMakeLists.txt b/thirdparty/utfcpp/tests/CMakeLists.txt index 06e0d7e9c..06e0d7e9c 100644 --- a/3rdparty/utfcpp/tests/CMakeLists.txt +++ b/thirdparty/utfcpp/tests/CMakeLists.txt diff --git a/3rdparty/utfcpp/tests/docker/Dockerfile b/thirdparty/utfcpp/tests/docker/Dockerfile index 125a26936..125a26936 100644 --- a/3rdparty/utfcpp/tests/docker/Dockerfile +++ b/thirdparty/utfcpp/tests/docker/Dockerfile diff --git a/3rdparty/utfcpp/tests/negative.cpp b/thirdparty/utfcpp/tests/negative.cpp index f1bcc993e..f1bcc993e 100644 --- a/3rdparty/utfcpp/tests/negative.cpp +++ b/thirdparty/utfcpp/tests/negative.cpp diff --git a/3rdparty/utfcpp/tests/test_checked_api.cpp b/thirdparty/utfcpp/tests/test_checked_api.cpp index 6787da62e..6787da62e 100644 --- a/3rdparty/utfcpp/tests/test_checked_api.cpp +++ b/thirdparty/utfcpp/tests/test_checked_api.cpp diff --git a/3rdparty/utfcpp/tests/test_checked_iterator.cpp b/thirdparty/utfcpp/tests/test_checked_iterator.cpp index 4c44834fd..4c44834fd 100644 --- a/3rdparty/utfcpp/tests/test_checked_iterator.cpp +++ b/thirdparty/utfcpp/tests/test_checked_iterator.cpp diff --git a/3rdparty/utfcpp/tests/test_cpp11.cpp b/thirdparty/utfcpp/tests/test_cpp11.cpp index edcff9d31..edcff9d31 100644 --- a/3rdparty/utfcpp/tests/test_cpp11.cpp +++ b/thirdparty/utfcpp/tests/test_cpp11.cpp diff --git a/3rdparty/utfcpp/tests/test_data/utf8_invalid.txt b/thirdparty/utfcpp/tests/test_data/utf8_invalid.txt Binary files differindex ae8315932..ae8315932 100644 --- a/3rdparty/utfcpp/tests/test_data/utf8_invalid.txt +++ b/thirdparty/utfcpp/tests/test_data/utf8_invalid.txt diff --git a/3rdparty/utfcpp/tests/test_unchecked_api.cpp b/thirdparty/utfcpp/tests/test_unchecked_api.cpp index e9f19ca6c..e9f19ca6c 100644 --- a/3rdparty/utfcpp/tests/test_unchecked_api.cpp +++ b/thirdparty/utfcpp/tests/test_unchecked_api.cpp diff --git a/3rdparty/utfcpp/tests/test_unchecked_iterator.cpp b/thirdparty/utfcpp/tests/test_unchecked_iterator.cpp index 103e8e28a..103e8e28a 100644 --- a/3rdparty/utfcpp/tests/test_unchecked_iterator.cpp +++ b/thirdparty/utfcpp/tests/test_unchecked_iterator.cpp @@ -33,7 +33,7 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zen", "zen\zen.vcxproj", "{ {D75BF9AB-C61E-4FFF-AD59-1563430F05E2} = {D75BF9AB-C61E-4FFF-AD59-1563430F05E2} EndProjectSection EndProject -Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "3rdparty", "3rdparty", "{3CB3B9E8-B4CB-4D2E-821A-2AFE34093BEF}" +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "thirdparty", "thirdparty", "{3CB3B9E8-B4CB-4D2E-821A-2AFE34093BEF}" EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zenserver-test", "zenserver-test\zenserver-test.vcxproj", "{2563249E-E695-4CC4-8FFA-335D07680C9D}" ProjectSection(ProjectDependencies) = postProject diff --git a/zencore/blake3.cpp b/zencore/blake3.cpp index 663f21b6d..8f8952271 100644 --- a/zencore/blake3.cpp +++ b/zencore/blake3.cpp @@ -7,7 +7,7 @@ #include <zencore/testing.h> #include <zencore/zencore.h> -#include "../3rdparty/BLAKE3/c/blake3.h" +#include "../thirdparty/BLAKE3/c/blake3.h" #pragma comment(lib, "blake3.lib") #include <string.h> diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index aafb365f3..c6bf38b04 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -1854,11 +1854,11 @@ TEST_CASE("uson.json") << "ValueTwo"; CbObject Obj = Writer.Save(); - StringBuilder<128> Sb; - const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + StringBuilder<128> Sb; + const char* JsonText = Obj.ToJson(Sb).Data(); std::string JsonError; - json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + json11::Json Json = json11::Json::parse(JsonText, JsonError); const std::string ValueOne = Json["KeyOne"].string_value(); const std::string ValueTwo = Json["KeyTwo"].string_value(); @@ -1879,11 +1879,11 @@ TEST_CASE("uson.json") CbObject Obj = Writer.Save(); - StringBuilder<128> Sb; - const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + StringBuilder<128> Sb; + const char* JsonText = Obj.ToJson(Sb).Data(); std::string JsonError; - json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + json11::Json Json = json11::Json::parse(JsonText, JsonError); const float FloatValue = float(Json["Float"].number_value()); const double DoubleValue = Json["Double"].number_value(); @@ -1904,11 +1904,11 @@ TEST_CASE("uson.json") CbObject Obj = Writer.Save(); - StringBuilder<128> Sb; - const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + StringBuilder<128> Sb; + const char* JsonText = Obj.ToJson(Sb).Data(); std::string JsonError; - json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + json11::Json Json = json11::Json::parse(JsonText, JsonError); const double FloatValue = Json["FloatNan"].number_value(); const double DoubleValue = Json["DoubleNan"].number_value(); diff --git a/zencore/compress.cpp b/zencore/compress.cpp index dd6484a3c..35a5acb3a 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -8,7 +8,7 @@ #include <zencore/endian.h> #include <zencore/testing.h> -#include "../3rdparty/Oodle/include/oodle2.h" +#include "../thirdparty/Oodle/include/oodle2.h" #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "oo2core_win64.lib") #endif @@ -693,6 +693,11 @@ ValidBufferOrEmpty(BufferType&& CompressedData) CompositeBuffer CopyCompressedRange(const BufferHeader& Header, const CompositeBuffer& CompressedData, uint64_t RawOffset, uint64_t RawSize) { + if (Header.TotalRawSize < RawOffset + RawSize) + { + return CompositeBuffer(); + } + if (Header.Method == CompressionMethod::None) { UniqueBuffer NewCompressedData = UniqueBuffer::Alloc(RawSize); @@ -862,9 +867,11 @@ CompressedBuffer CompressedBuffer::CopyRange(uint64_t RawOffset, uint64_t RawSize) const { using namespace detail; - const BufferHeader Header = BufferHeader::Read(CompressedData); - CompressedBuffer Range; - Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, RawSize); + const BufferHeader Header = BufferHeader::Read(CompressedData); + const uint64_t TotalRawSize = RawSize < ~uint64_t(0) ? RawSize : Header.TotalRawSize - RawOffset; + + CompressedBuffer Range; + Range.CompressedData = CopyCompressedRange(Header, CompressedData, RawOffset, TotalRawSize); return Range; } diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 88a72cbba..04b3b33dd 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -382,6 +382,7 @@ public: ZENCORE_API static IoBuffer MakeFromFileHandle(void* FileHandle, uint64_t Offset = 0, uint64_t Size = ~0ull); ZENCORE_API static IoBuffer ReadFromFileMaybe(IoBuffer& InBuffer); inline static IoBuffer MakeCloneFromMemory(const void* Ptr, size_t Sz) { return IoBuffer(IoBuffer::Clone, Ptr, Sz); } + inline static IoBuffer MakeCloneFromMemory(MemoryView Memory) { return IoBuffer(IoBuffer::Clone, Memory.GetData(), Memory.GetSize()); } }; IoHash HashBuffer(IoBuffer& Buffer); diff --git a/zencore/memory.cpp b/zencore/memory.cpp index 14ea7ca1d..c94829276 100644 --- a/zencore/memory.cpp +++ b/zencore/memory.cpp @@ -186,13 +186,13 @@ TEST_CASE("MemoryView") { { uint8_t Array1[16] = {}; - MemoryView View1 = MakeMemoryView(Array1); + MemoryView View1 = MakeMemoryView(Array1); CHECK(View1.GetSize() == 16); } { uint32_t Array2[16] = {}; - MemoryView View2 = MakeMemoryView(Array2); + MemoryView View2 = MakeMemoryView(Array2); CHECK(View2.GetSize() == 64); } diff --git a/zencore/xmake.lua b/zencore/xmake.lua index 5de7f476d..d26a9f922 100644 --- a/zencore/xmake.lua +++ b/zencore/xmake.lua @@ -2,13 +2,15 @@ target('zencore') set_kind("static") add_files("**.cpp") add_includedirs("include", {public=true}) - add_includedirs("..\\3rdparty\\utfcpp\\source") - add_linkdirs("$(projectdir)/3rdparty/BLAKE3/lib/Win64", "$(projectdir)/3rdparty/Oodle/lib/Win64") + add_includedirs("..\\thirdparty\\utfcpp\\source") + add_linkdirs("$(projectdir)/thirdparty/BLAKE3/lib/Win64", "$(projectdir)/thirdparty/Oodle/lib/Win64") add_packages( "vcpkg::spdlog", "vcpkg::fmt", "vcpkg::doctest", + "vcpkg::json11", "vcpkg::lz4", + "vcpkg::mimalloc", "vcpkg::cpr", "vcpkg::curl", -- required by cpr "vcpkg::zlib", -- required by curl diff --git a/zencore/zencore.vcxproj b/zencore/zencore.vcxproj index 95b9eace5..49e959b96 100644 --- a/zencore/zencore.vcxproj +++ b/zencore/zencore.vcxproj @@ -74,7 +74,7 @@ <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>_DEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> - <AdditionalIncludeDirectories>.\include;..\3rdparty\utfcpp\source</AdditionalIncludeDirectories> + <AdditionalIncludeDirectories>.\include;..\thirdparty\utfcpp\source</AdditionalIncludeDirectories> <DebugInformationFormat>ProgramDatabase</DebugInformationFormat> <LanguageStandard>stdcpplatest</LanguageStandard> <TreatWarningAsError>true</TreatWarningAsError> @@ -95,7 +95,7 @@ <SDLCheck>true</SDLCheck> <PreprocessorDefinitions>NDEBUG;_LIB;%(PreprocessorDefinitions)</PreprocessorDefinitions> <ConformanceMode>true</ConformanceMode> - <AdditionalIncludeDirectories>.\include;..\3rdparty\utfcpp\source</AdditionalIncludeDirectories> + <AdditionalIncludeDirectories>.\include;..\thirdparty\utfcpp\source</AdditionalIncludeDirectories> <WholeProgramOptimization>false</WholeProgramOptimization> <LanguageStandard>stdcpplatest</LanguageStandard> <TreatWarningAsError>true</TreatWarningAsError> diff --git a/zenfs_common.props b/zenfs_common.props index 67894b9e8..3d1b5e9c8 100644 --- a/zenfs_common.props +++ b/zenfs_common.props @@ -3,7 +3,7 @@ <ImportGroup Label="PropertySheets" /> <PropertyGroup Label="UserMacros" /> <PropertyGroup> - <LibraryPath>$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64);$(SolutionDir)\3rdparty\BLAKE3\lib\Win64;$(SolutionDir)\3rdparty\Oodle\lib\Win64</LibraryPath> + <LibraryPath>$(VC_LibraryPath_x64);$(WindowsSDK_LibraryPath_x64);$(SolutionDir)\thirdparty\BLAKE3\lib\Win64;$(SolutionDir)\thirdparty\Oodle\lib\Win64</LibraryPath> </PropertyGroup> <ItemDefinitionGroup> <ClCompile> diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 48600bb36..b3a4348f0 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -20,6 +20,7 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpshared.h> #include <zenhttp/zenhttp.h> +#include <zenutil/cache/cache.h> #include <zenutil/zenserverprocess.h> #if ZEN_USE_MIMALLOC @@ -466,7 +467,9 @@ using namespace std::literals; class full_test_formatter final : public spdlog::formatter { public: - full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) {} + full_test_formatter(std::string_view LogId, std::chrono::time_point<std::chrono::system_clock> Epoch) : m_Epoch(Epoch), m_LogId(LogId) + { + } virtual std::unique_ptr<formatter> clone() const override { return std::make_unique<full_test_formatter>(m_LogId, m_Epoch); } @@ -991,7 +994,7 @@ TEST_CASE("project.basic") } } - BaseUri << "/oplog/ps5"; + BaseUri << "/oplog/foobar"; { { @@ -1007,7 +1010,7 @@ TEST_CASE("project.basic") zen::CbObjectView ResponseObject = zen::CbFieldView(Response.text.data()).AsObjectView(); - CHECK(ResponseObject["id"].AsString() == "ps5"sv); + CHECK(ResponseObject["id"].AsString() == "foobar"sv); CHECK(ResponseObject["project"].AsString() == "test"sv); } } @@ -1029,11 +1032,13 @@ TEST_CASE("project.basic") "00010000"}; auto FileOid = zen::Oid::FromHexString(ChunkId); + std::filesystem::path ReliablePath = zen::GetRunningExecutablePath(); + OpWriter.BeginArray("files"); OpWriter.BeginObject(); OpWriter << "id" << FileOid; - OpWriter << "clientpath" << __FILE__; - OpWriter << "serverpath" << __FILE__; + OpWriter << "clientpath" << ReliablePath.c_str(); + OpWriter << "serverpath" << ReliablePath.c_str(); OpWriter.EndObject(); OpWriter.EndArray(); @@ -1108,6 +1113,45 @@ TEST_CASE("project.pipe") } # endif +namespace utils { + + struct ZenConfig + { + std::filesystem::path DataDir; + uint16_t Port; + std::string BaseUri; + std::string Args; + + static ZenConfig New(uint16_t Port = 13337, std::string Args = "") + { + return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), + .Port = Port, + .BaseUri = "http://localhost:{}/z$"_format(Port), + .Args = std::move(Args)}; + } + + static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + { + return New(13337, "--debug --upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort)); + } + + void Spawn(ZenServerInstance& Inst) + { + Inst.SetTestDir(DataDir); + Inst.SpawnServer(Port, Args); + Inst.WaitUntilReady(); + } + }; + + void SpawnServer(ZenServerInstance& Server, ZenConfig& Cfg) + { + Server.SetTestDir(Cfg.DataDir); + Server.SpawnServer(Cfg.Port, Cfg.Args); + Server.WaitUntilReady(); + } + +} // namespace utils + TEST_CASE("zcache.basic") { using namespace std::literals; @@ -1414,34 +1458,7 @@ TEST_CASE("zcache.cbpackage") TEST_CASE("zcache.policy") { using namespace std::literals; - - struct ZenConfig - { - std::filesystem::path DataDir; - uint16_t Port; - std::string BaseUri; - std::string Args; - - static ZenConfig New(uint16_t Port = 13337, std::string Args = "") - { - return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), - .Port = Port, - .BaseUri = "http://localhost:{}/z$"_format(Port), - .Args = std::move(Args)}; - } - - static ZenConfig NewWithUpstream(uint16_t UpstreamPort) - { - return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort)); - } - - void Spawn(ZenServerInstance& Inst) - { - Inst.SetTestDir(DataDir); - Inst.SpawnServer(Port, Args); - Inst.WaitUntilReady(); - } - }; + using namespace utils; auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { auto Buf = zen::UniqueBuffer::Alloc(Size); @@ -1705,14 +1722,15 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(OriginalPackage); + zen::IoHash Key; + zen::IoHash PayloadId; // Store package locally { - CHECK(OriginalPackage.GetAttachments().size() != 0); + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + CHECK(Package.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); @@ -1765,14 +1783,15 @@ TEST_CASE("zcache.policy") UpstreamCfg.Spawn(UpstreamInst); LocalCfg.Spawn(LocalInst); - zen::IoHash Key; - zen::IoHash PayloadId; - zen::CbPackage OriginalPackage = GeneratePackage(Key, PayloadId); - auto Buf = ToBuffer(OriginalPackage); + zen::IoHash Key; + zen::IoHash PayloadId; // Store package upstream { - CHECK(OriginalPackage.GetAttachments().size() != 0); + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + CHECK(Package.GetAttachments().size() != 0); cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); @@ -1888,6 +1907,303 @@ TEST_CASE("zcache.policy") } } +TEST_CASE("zcache.rpc") +{ + using namespace std::literals; + + auto CreateCacheRecord = [](const zen::CacheKey& CacheKey, size_t PayloadSize) -> zen::CbPackage { + std::vector<uint8_t> Data; + Data.resize(PayloadSize); + for (size_t Idx = 0; Idx < PayloadSize; ++Idx) + { + Data[Idx] = Idx % 255; + } + + zen::CbAttachment Attachment(zen::CompressedBuffer::Compress(SharedBuffer::MakeView(Data.data(), Data.size()))); + + zen::CbObjectWriter CacheRecord; + CacheRecord.BeginObject("CacheKey"sv); + CacheRecord << "Bucket"sv << CacheKey.Bucket << "Hash"sv << CacheKey.Hash; + CacheRecord.EndObject(); + CacheRecord << "Data"sv << Attachment; + + zen::CbPackage Package; + Package.SetObject(CacheRecord.Save()); + Package.AddAttachment(Attachment); + + return Package; + }; + + auto ToIoBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::BinaryWriter MemStream; + Package.Save(MemStream); + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + auto PutCacheRecords = [&CreateCacheRecord, &ToIoBuffer](std::string_view BaseUri, + std::string_view Query, + std::string_view Bucket, + size_t Num, + size_t PayloadSize = 1024) -> std::vector<CacheKey> { + std::vector<zen::CacheKey> OutKeys; + + for (uint32_t Key = 1; Key <= Num; ++Key) + { + const zen::CacheKey CacheKey = zen::CacheKey::Create(Bucket, zen::IoHash::HashBuffer(&Key, sizeof uint32_t)); + CbPackage CacheRecord = CreateCacheRecord(CacheKey, PayloadSize); + + OutKeys.push_back(CacheKey); + + IoBuffer Payload = ToIoBuffer(CacheRecord); + + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}{}"_format(BaseUri, CacheKey.Bucket, CacheKey.Hash, Query)}, + cpr::Body{(const char*)Payload.Data(), Payload.Size()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + + CHECK(Result.status_code == 201); + } + + return OutKeys; + }; + + struct GetCacheRecordResult + { + zen::CbPackage Response; + std::vector<zen::CbFieldView> Records; + bool Success; + }; + + auto GetCacheRecords = + [](std::string_view BaseUri, std::span<zen::CacheKey> Keys, const zen::CacheRecordPolicy& Policy) -> GetCacheRecordResult { + using namespace zen; + + CbObjectWriter Request; + Request << "Method"sv + << "GetCacheRecords"sv; + Request.BeginObject("Params"sv); + + Request.BeginArray("CacheKeys"sv); + for (const CacheKey& Key : Keys) + { + Request.BeginObject(); + Request << "Bucket"sv << Key.Bucket << "Hash"sv << Key.Hash; + Request.EndObject(); + } + Request.EndArray(); + + Request.BeginObject("Policy"); + CacheRecordPolicy::Save(Policy, Request); + Request.EndObject(); + + Request.EndObject(); + + BinaryWriter Body; + Request.Save(Body); + + cpr::Response Result = cpr::Post(cpr::Url{"{}/$rpc"_format(BaseUri)}, + cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}, + cpr::Body{(const char*)Body.GetData(), Body.GetSize()}); + + GetCacheRecordResult OutResult; + + if (Result.status_code == 200) + { + CbPackage Response; + if (Response.TryLoad(zen::IoBuffer(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()))) + { + OutResult.Response = std::move(Response); + CbObjectView ResponseObject = OutResult.Response.GetObject(); + + for (CbFieldView RecordView : ResponseObject["Result"]) + { + ExtendableStringBuilder<256> Tmp; + auto JSON = RecordView.AsObjectView().ToJson(Tmp).ToView(); + OutResult.Records.push_back(RecordView); + } + + OutResult.Success = true; + } + } + + return OutResult; + }; + + auto LoadKey = [](zen::CbFieldView KeyView) -> zen::CacheKey { + if (zen::CbObjectView KeyObj = KeyView.AsObjectView()) + { + return CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); + } + return CacheKey::Empty; + }; + + SUBCASE("get cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + CacheRecordPolicy Policy; + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); + + CHECK(Result.Records.size() == Keys.size()); + + for (size_t Index = 0; CbFieldView RecordView : Result.Records) + { + const CacheKey& ExpectedKey = Keys[Index++]; + + CbObjectView RecordObj = RecordView.AsObjectView(); + CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); + const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); + const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); + + CHECK(Key == ExpectedKey); + CHECK(Attachment != nullptr); + } + } + + SUBCASE("get missing cache records") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + CacheRecordPolicy Policy; + std::vector<zen::CacheKey> ExistingKeys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 128); + std::vector<zen::CacheKey> Keys; + + for (const zen::CacheKey& Key : ExistingKeys) + { + Keys.push_back(Key); + Keys.push_back(CacheKey::Create("missing"sv, IoHash::Zero)); + } + + GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); + + CHECK(Result.Records.size() == Keys.size()); + + size_t KeyIndex = 0; + for (size_t Index = 0; CbFieldView RecordView : Result.Records) + { + const bool Missing = Index++ % 2 != 0; + + if (Missing) + { + CHECK(RecordView.IsNull()); + } + else + { + const CacheKey& ExpectedKey = ExistingKeys[KeyIndex++]; + CbObjectView RecordObj = RecordView.AsObjectView(); + CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); + zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); + const CbAttachment* Attachment = Result.Response.FindAttachment(AttachmentHash); + + CHECK(Key == ExpectedKey); + CHECK(Attachment != nullptr); + } + } + } + + SUBCASE("policy - 'SkipAttachments' does not return any record attachments") + { + std::filesystem::path TestDir = TestEnv.CreateNewTestDir(); + const uint16_t PortNumber = 13337; + const auto BaseUri = "http://localhost:{}/z$"_format(PortNumber); + + ZenServerInstance Inst(TestEnv); + Inst.SetTestDir(TestDir); + Inst.SpawnServer(PortNumber); + Inst.WaitUntilReady(); + + CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::SkipAttachments); + std::vector<zen::CacheKey> Keys = PutCacheRecords(BaseUri, ""sv, "mastodon"sv, 4); + GetCacheRecordResult Result = GetCacheRecords(BaseUri, Keys, Policy); + + CHECK(Result.Records.size() == Keys.size()); + + std::span<const zen::CbAttachment> Attachments = Result.Response.GetAttachments(); + CHECK(Attachments.empty()); + + for (size_t Index = 0; CbFieldView RecordView : Result.Records) + { + const CacheKey& ExpectedKey = Keys[Index++]; + + CbObjectView RecordObj = RecordView.AsObjectView(); + CbObjectView KeyObj = RecordObj["CacheKey"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObj["Bucket"sv].AsString(), KeyObj["Hash"].AsHash()); + const IoHash AttachmentHash = RecordObj["Data"sv].AsHash(); + + CHECK(Key == ExpectedKey); + } + } + + SUBCASE("policy - 'QueryLocal' does not query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + + CacheRecordPolicy Policy(CachePolicy::QueryLocal); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); + + CHECK(Result.Records.size() == Keys.size()); + + for (CbFieldView RecordView : Result.Records) + { + CHECK(RecordView.IsNull()); + } + } + + SUBCASE("policy - 'QueryRemote' does query upstream") + { + using namespace utils; + + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamServer(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalServer(TestEnv); + + SpawnServer(UpstreamServer, UpstreamCfg); + SpawnServer(LocalServer, LocalCfg); + + std::vector<zen::CacheKey> Keys = PutCacheRecords(UpstreamCfg.BaseUri, ""sv, "mastodon"sv, 4); + + CacheRecordPolicy Policy(CachePolicy::QueryLocal | CachePolicy::QueryRemote); + GetCacheRecordResult Result = GetCacheRecords(LocalCfg.BaseUri, Keys, Policy); + + CHECK(Result.Records.size() == Keys.size()); + + for (size_t Index = 0; CbFieldView RecordView : Result.Response.GetObject()["Result"sv]) + { + const zen::CacheKey& ExpectedKey = Keys[Index++]; + CbObjectView RecordObj = RecordView.AsObjectView(); + zen::CacheKey Key = LoadKey(RecordObj["CacheKey"sv]); + CHECK(Key == ExpectedKey); + } + } +} + # if ZEN_USE_EXEC struct RemoteExecutionRequest diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index 177cdbf55..53e1b1c61 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -14,7 +14,9 @@ #include <zencore/timer.h> #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include <zenutil/cache/cache.h> +//#include "cachekey.h" #include "monitoring/httpstats.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -36,115 +38,24 @@ using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -namespace detail { namespace cacheopt { - constexpr std::string_view Local = "local"sv; - constexpr std::string_view Remote = "remote"sv; - constexpr std::string_view Data = "data"sv; - constexpr std::string_view Meta = "meta"sv; - constexpr std::string_view Value = "value"sv; - constexpr std::string_view Attachments = "attachments"sv; -}} // namespace detail::cacheopt - -////////////////////////////////////////////////////////////////////////// - -enum class CachePolicy : uint8_t -{ - None = 0, - QueryLocal = 1 << 0, - QueryRemote = 1 << 1, - Query = QueryLocal | QueryRemote, - StoreLocal = 1 << 2, - StoreRemote = 1 << 3, - Store = StoreLocal | StoreRemote, - SkipMeta = 1 << 4, - SkipValue = 1 << 5, - SkipAttachments = 1 << 6, - SkipData = SkipMeta | SkipValue | SkipAttachments, - SkipLocalCopy = 1 << 7, - Local = QueryLocal | StoreLocal, - Remote = QueryRemote | StoreRemote, - Default = Query | Store, - Disable = None, -}; - -gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); - CachePolicy ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) { - CachePolicy QueryPolicy = CachePolicy::Query; - - { - std::string_view Opts = QueryParams.GetValue("query"sv); - if (!Opts.empty()) - { - QueryPolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - QueryPolicy |= CachePolicy::QueryLocal; - } - if (Opt == detail::cacheopt::Remote) - { - QueryPolicy |= CachePolicy::QueryRemote; - } - return true; - }); - } - } - - CachePolicy StorePolicy = CachePolicy::Store; - - { - std::string_view Opts = QueryParams.GetValue("store"sv); - if (!Opts.empty()) - { - StorePolicy = CachePolicy::None; - ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Local) - { - StorePolicy |= CachePolicy::StoreLocal; - } - if (Opt == detail::cacheopt::Remote) - { - StorePolicy |= CachePolicy::StoreRemote; - } - return true; - }); - } - } - - CachePolicy SkipPolicy = CachePolicy::None; - - { - std::string_view Opts = QueryParams.GetValue("skip"sv); - if (!Opts.empty()) - { - ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { - if (Opt == detail::cacheopt::Meta) - { - SkipPolicy |= CachePolicy::SkipMeta; - } - if (Opt == detail::cacheopt::Value) - { - SkipPolicy |= CachePolicy::SkipValue; - } - if (Opt == detail::cacheopt::Attachments) - { - SkipPolicy |= CachePolicy::SkipAttachments; - } - if (Opt == detail::cacheopt::Data) - { - SkipPolicy |= CachePolicy::SkipData; - } - return true; - }); - } - } + const CachePolicy QueryPolicy = zen::ParseQueryCachePolicy(QueryParams.GetValue("query"sv)); + const CachePolicy StorePolicy = zen::ParseStoreCachePolicy(QueryParams.GetValue("store"sv)); + const CachePolicy SkipPolicy = zen::ParseSkipCachePolicy(QueryParams.GetValue("skip"sv)); return QueryPolicy | StorePolicy | SkipPolicy; } +struct AttachmentCount +{ + uint32_t New = 0; + uint32_t Valid = 0; + uint32_t Invalid = 0; + uint32_t Total = 0; +}; + ////////////////////////////////////////////////////////////////////////// HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, @@ -207,6 +118,11 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); + if (Key == "$rpc") + { + return HandleRpcRequest(Request); + } + if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -319,8 +235,8 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (ValidCount != AttachmentCount) { - Success = false; - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + // Success = false; + ZEN_WARN("GET - '{}/{}' '{}' is partial, found '{}' of '{}' attachments", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType), @@ -543,52 +459,39 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request CbObjectView CacheRecord(Body.Data()); std::vector<IoHash> ValidAttachments; - uint32_t AttachmentCount = 0; + int32_t TotalCount = 0; - CacheRecord.IterateAttachments([this, &AttachmentCount, &ValidAttachments](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &TotalCount, &ValidAttachments](CbFieldView AttachmentHash) { const IoHash Hash = AttachmentHash.AsHash(); if (m_CidStore.ContainsChunk(Hash)) { ValidAttachments.emplace_back(Hash); } - AttachmentCount++; + TotalCount++; }); - const uint32_t ValidCount = static_cast<uint32_t>(ValidAttachments.size()); - const bool ValidCacheRecord = ValidCount == AttachmentCount; - - if (ValidCacheRecord) - { - ZEN_DEBUG("PUT - '{}/{}' {} '{}', {} attachments", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Body.Size()), - ToString(ContentType), - ValidCount); + ZEN_DEBUG("PUT - '{}/{}' {} '{}' attachments '{}/{}' (valid/total)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(Body.Size()), + ToString(ContentType), + TotalCount, + ValidAttachments.size()); - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); + Body.SetContentType(ZenContentType::kCbObject); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); - if (StoreUpstream) - { - ZEN_ASSERT(m_UpstreamCache); - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(ValidAttachments)}); - } + const bool IsPartialRecord = TotalCount != static_cast<int32_t>(ValidAttachments.size()); - Request.WriteResponse(HttpResponseCode::Created); - } - else + if (StoreUpstream && !IsPartialRecord) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, found {}/{} attachments", - Ref.BucketSegment, - Ref.HashKey, - ToString(ContentType), - ValidCount, - AttachmentCount); - - Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing attachments"sv); + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(ValidAttachments)}); } + + Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbPackage) { @@ -600,16 +503,15 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"sv); } - CbObject CacheRecord = Package.GetObject(); - - std::span<const CbAttachment> Attachments = Package.GetAttachments(); - std::vector<IoHash> ValidAttachments; - int32_t NewAttachmentCount = 0; + CbObject CacheRecord = Package.GetObject(); + AttachmentCount Count; + std::vector<IoHash> ValidAttachments; - ValidAttachments.reserve(Attachments.size()); + ValidAttachments.reserve(Package.GetAttachments().size()); - CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &NewAttachmentCount](CbFieldView AttachmentHash) { - if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) + CacheRecord.IterateAttachments([this, &Ref, &Package, &ValidAttachments, &Count](CbFieldView HashView) { + const IoHash Hash = HashView.AsHash(); + if (const CbAttachment* Attachment = Package.FindAttachment(Hash)) { if (Attachment->IsCompressedBinary()) { @@ -620,8 +522,9 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request if (InsertResult.New) { - NewAttachmentCount++; + Count.New++; } + Count.Valid++; } else { @@ -629,40 +532,40 @@ HttpStructuredCacheService::HandlePutCacheRecord(zen::HttpServerRequest& Request Ref.BucketSegment, Ref.HashKey, ToString(HttpContentType::kCbPackage), - AttachmentHash.AsHash()); + Hash); + Count.Invalid++; } } - else + else if (m_CidStore.ContainsChunk(Hash)) { - ZEN_WARN("PUT - '{}/{}' '{}' FAILED, missing attachment '{}'", - Ref.BucketSegment, - Ref.HashKey, - ToString(HttpContentType::kCbPackage), - AttachmentHash.AsHash()); + ValidAttachments.emplace_back(Hash); + Count.Valid++; } + Count.Total++; }); - const bool AttachmentsValid = ValidAttachments.size() == Attachments.size(); - - if (!AttachmentsValid) + if (Count.Invalid > 0) { - return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"sv); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachment(s)"sv); } - ZEN_DEBUG("PUT - '{}/{}' {} '{}', {}/{} new attachments", + ZEN_DEBUG("PUT - '{}/{}' {} '{}', attachments '{}/{}/{}' (new/valid/total)", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.GetSize()), ToString(ContentType), - NewAttachmentCount, - Attachments.size()); + Count.New, + Count.Valid, + Count.Total); IoBuffer CacheRecordValue = CacheRecord.GetBuffer().AsIoBuffer(); CacheRecordValue.SetContentType(ZenContentType::kCbObject); m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (StoreUpstream) + const bool IsPartialRecord = Count.Valid != Count.Total; + + if (StoreUpstream && !IsPartialRecord) { ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, @@ -708,8 +611,7 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (QueryUpstream) { - if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); - UpstreamResult.Success) + if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId); UpstreamResult.Success) { if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { @@ -864,6 +766,420 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } void +HttpStructuredCacheService::HandleRpcRequest(zen::HttpServerRequest& Request) +{ + switch (auto Verb = Request.RequestVerb()) + { + using enum HttpVerb; + + case kPost: + { + const HttpContentType ContentType = Request.RequestContentType(); + const HttpContentType AcceptType = Request.AcceptContentType(); + + if (ContentType != HttpContentType::kCbObject || AcceptType != HttpContentType::kCbPackage) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Request.WriteResponseAsync( + [this, RpcRequest = zen::LoadCompactBinaryObject(Request.ReadPayload())](HttpServerRequest& AsyncRequest) { + const std::string_view Method = RpcRequest["Method"sv].AsString(); + if (Method == "GetCacheRecords"sv) + { + HandleRpcGetCacheRecords(AsyncRequest, RpcRequest); + } + else if (Method == "GetCachePayloads"sv) + { + HandleRpcGetCachePayloads(AsyncRequest, RpcRequest); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::BadRequest); + } + }); + } + break; + default: + Request.WriteResponse(HttpResponseCode::BadRequest); + break; + } +} + +void +HttpStructuredCacheService::HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + using namespace fmt::literals; + + CbPackage RpcResponse; + CacheRecordPolicy Policy; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + std::vector<CacheKey> CacheKeys; + std::vector<IoBuffer> CacheValues; + std::vector<size_t> UpstreamRequests; + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCacheRecords"sv); + + CacheRecordPolicy::Load(Params["Policy"sv].AsObjectView(), Policy); + + const bool PartialOnError = Policy.HasRecordPolicy(CachePolicy::PartialOnError); + const bool SkipAttachments = Policy.HasRecordPolicy(CachePolicy::SkipAttachments); + const bool QueryRemote = Policy.HasRecordPolicy(CachePolicy::QueryRemote) && m_UpstreamCache; + + for (CbFieldView KeyView : Params["CacheKeys"sv]) + { + CbObjectView KeyObject = KeyView.AsObjectView(); + CacheKeys.push_back(CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash())); + } + + if (CacheKeys.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + CacheValues.resize(CacheKeys.size()); + + for (size_t KeyIndex = 0; const CacheKey& Key : CacheKeys) + { + ZenCacheValue CacheValue; + uint32_t MissingCount = 0; + + if (m_CacheStore.Get(Key.Bucket, Key.Hash, CacheValue)) + { + CbObjectView CacheRecord(CacheValue.Value.Data()); + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &MissingCount, &RpcResponse](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + } + else + { + MissingCount++; + } + }); + } + } + + if (CacheValue.Value && (MissingCount == 0 || PartialOnError)) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Key.Bucket, + Key.Hash, + NiceBytes(CacheValue.Value.Size()), + ToString(CacheValue.Value.GetContentType())); + + CacheValues[KeyIndex] = std::move(CacheValue.Value); + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(KeyIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}' {}", Key.Bucket, Key.Hash, MissingCount ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + + ++KeyIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCacheRecordGetComplete = + [this, &CacheKeys, &CacheValues, &RpcResponse, PartialOnError, SkipAttachments](CacheRecordGetCompleteParams&& Params) { + ZEN_ASSERT(Params.KeyIndex < CacheValues.size()); + + IoBuffer CacheValue; + AttachmentCount Count; + + if (Params.Record) + { + Params.Record.IterateAttachments([this, &RpcResponse, SkipAttachments, &Params, &Count](CbFieldView HashView) { + if (const CbAttachment* Attachment = Params.Package.FindAttachment(HashView.AsHash())) + { + if (CompressedBuffer Compressed = Attachment->AsCompressedBinary()) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + if (InsertResult.New) + { + Count.New++; + } + Count.Valid++; + + if (!SkipAttachments) + { + RpcResponse.AddAttachment(CbAttachment(Compressed)); + } + } + else + { + ZEN_DEBUG("Uncompressed payload '{}' from upstream cache record '{}/{}'", + HashView.AsHash(), + Params.CacheKey.Bucket, + Params.CacheKey.Hash); + Count.Invalid++; + } + } + else if (m_CidStore.ContainsChunk(HashView.AsHash())) + { + Count.Valid++; + } + Count.Total++; + }); + + if ((Count.Valid == Count.Total) || PartialOnError) + { + CacheValue = CbObject::Clone(Params.Record).GetBuffer().AsIoBuffer(); + } + } + + if (CacheValue) + { + ZEN_DEBUG("HIT - '{}/{}' {} '{}' attachments '{}/{}/{}' (new/valid/total) (UPSTREAM)", + Params.CacheKey.Bucket, + Params.CacheKey.Hash, + NiceBytes(CacheValue.GetSize()), + ToString(HttpContentType::kCbPackage), + Count.New, + Count.Valid, + Count.Total); + + CacheValue.SetContentType(ZenContentType::kCbObject); + + CacheValues[Params.KeyIndex] = CacheValue; + m_CacheStore.Put(Params.CacheKey.Bucket, Params.CacheKey.Hash, {.Value = CacheValue}); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + const bool IsPartial = Count.Valid != Count.Total; + ZEN_DEBUG("MISS - '{}/{}' {}", Params.CacheKey.Bucket, Params.CacheKey.Hash, IsPartial ? "(partial)"sv : ""sv); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCacheRecords(CacheKeys, UpstreamRequests, Policy, std::move(OnCacheRecordGetComplete)); + } + + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + for (const IoBuffer& Value : CacheValues) + { + if (Value) + { + CbObjectView Record(Value.Data()); + ResponseObject << Record; + } + else + { + ResponseObject.AddNull(); + } + } + ResponseObject.EndArray(); + + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void +HttpStructuredCacheService::HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView RpcRequest) +{ + using namespace fmt::literals; + + ZEN_ASSERT(RpcRequest["Method"sv].AsString() == "GetCachePayloads"sv); + + std::vector<CacheChunkRequest> ChunkRequests; + std::vector<size_t> UpstreamRequests; + std::vector<IoBuffer> Chunks; + CbObjectView Params = RpcRequest["Params"sv].AsObjectView(); + + for (CbFieldView RequestView : Params["ChunkRequests"sv]) + { + CbObjectView RequestObject = RequestView.AsObjectView(); + CbObjectView KeyObject = RequestObject["Key"sv].AsObjectView(); + const CacheKey Key = CacheKey::Create(KeyObject["Bucket"sv].AsString(), KeyObject["Hash"sv].AsHash()); + const IoHash ChunkId = RequestObject["ChunkId"sv].AsHash(); + const Oid PayloadId = RequestObject["PayloadId"sv].AsObjectId(); + const uint64_t RawOffset = RequestObject["RawOffset"sv].AsUInt64(); + const uint64_t RawSize = RequestObject["RawSize"sv].AsUInt64(); + const uint32_t ChunkPolicy = RequestObject["Policy"sv].AsUInt32(); + + ChunkRequests.emplace_back(Key, ChunkId, PayloadId, RawOffset, RawSize, static_cast<CachePolicy>(ChunkPolicy)); + } + + if (ChunkRequests.empty()) + { + return Request.WriteResponse(HttpResponseCode::BadRequest); + } + + Chunks.resize(ChunkRequests.size()); + + // Unreal uses a 12 byte ID to address cache record payloads. When the uncompressed hash (ChunkId) + // is missing, load the cache record and try to find the raw hash from the payload ID. + { + const auto GetChunkIdFromPayloadId = [](CbObjectView Record, const Oid& PayloadId) -> IoHash { + if (CbObjectView ValueObject = Record["Value"sv].AsObjectView()) + { + const Oid Id = ValueObject["Id"sv].AsObjectId(); + if (Id == PayloadId) + { + return ValueObject["RawHash"sv].AsHash(); + } + } + + for (CbFieldView AttachmentView : Record["Attachments"sv]) + { + CbObjectView AttachmentObject = AttachmentView.AsObjectView(); + const Oid Id = AttachmentObject["Id"sv].AsObjectId(); + + if (Id == PayloadId) + { + return AttachmentObject["RawHash"sv].AsHash(); + } + } + + return IoHash::Zero; + }; + + CacheKey CurrentKey = CacheKey::Empty; + IoBuffer CurrentRecordBuffer; + + for (CacheChunkRequest& ChunkRequest : ChunkRequests) + { + if (ChunkRequest.ChunkId != IoHash::Zero) + { + continue; + } + + if (ChunkRequest.Key != CurrentKey) + { + CurrentKey = ChunkRequest.Key; + + ZenCacheValue CacheValue; + if (m_CacheStore.Get(CurrentKey.Bucket, CurrentKey.Hash, CacheValue)) + { + CurrentRecordBuffer = CacheValue.Value; + } + } + + if (CurrentRecordBuffer) + { + ChunkRequest.ChunkId = GetChunkIdFromPayloadId(CbObjectView(CurrentRecordBuffer.GetData()), ChunkRequest.PayloadId); + } + } + } + + for (size_t RequestIndex = 0; const CacheChunkRequest& ChunkRequest : ChunkRequests) + { + const bool QueryLocal = (ChunkRequest.Policy & CachePolicy::QueryLocal) == CachePolicy::QueryLocal; + const bool QueryRemote = (ChunkRequest.Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote; + + if (QueryLocal) + { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(ChunkRequest.ChunkId)) + { + ZEN_DEBUG("HIT - '{}/{}/{}' {} '{}' ({})", + ChunkRequest.Key.Bucket, + ChunkRequest.Key.Hash, + ChunkRequest.ChunkId, + NiceBytes(Chunk.Size()), + ToString(Chunk.GetContentType()), + "LOCAL"); + + Chunks[RequestIndex] = Chunk; + m_CacheStats.HitCount++; + } + else if (QueryRemote) + { + UpstreamRequests.push_back(RequestIndex); + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + m_CacheStats.MissCount++; + } + } + else + { + ZEN_DEBUG("SKIP - '{}/{}/{}'", ChunkRequest.Key.Bucket, ChunkRequest.Key.Hash, ChunkRequest.ChunkId); + } + + ++RequestIndex; + } + + if (!UpstreamRequests.empty() && m_UpstreamCache) + { + const auto OnCachePayloadGetComplete = [this, &ChunkRequests, &Chunks](CachePayloadGetCompleteParams&& Params) { + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Params.Payload))) + { + auto InsertResult = m_CidStore.AddChunk(Compressed); + + ZEN_DEBUG("HIT - '{}/{}/{}' {} ({})", + Params.Request.Key.Bucket, + Params.Request.Key.Hash, + Params.Request.ChunkId, + NiceBytes(Params.Payload.GetSize()), + "UPSTREAM"); + + ZEN_ASSERT(Params.RequestIndex < Chunks.size()); + Chunks[Params.RequestIndex] = std::move(Params.Payload); + + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; + } + else + { + ZEN_DEBUG("MISS - '{}/{}/{}'", Params.Request.Key.Bucket, Params.Request.Key.Hash, Params.Request.ChunkId); + m_CacheStats.MissCount++; + } + }; + + m_UpstreamCache->GetCachePayloads(ChunkRequests, UpstreamRequests, std::move(OnCachePayloadGetComplete)); + } + + CbPackage RpcResponse; + CbObjectWriter ResponseObject; + + ResponseObject.BeginArray("Result"sv); + + for (size_t ChunkIndex = 0; ChunkIndex < Chunks.size(); ++ChunkIndex) + { + if (Chunks[ChunkIndex]) + { + ResponseObject << ChunkRequests[ChunkIndex].ChunkId; + RpcResponse.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(std::move(Chunks[ChunkIndex]))))); + } + else + { + ResponseObject << IoHash::Zero; + } + } + ResponseObject.EndArray(); + + RpcResponse.SetObject(ResponseObject.Save()); + + BinaryWriter MemStream; + RpcResponse.Save(MemStream); + + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kCbPackage, + IoBuffer(IoBuffer::Wrap, MemStream.GetData(), MemStream.GetSize())); +} + +void HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 59749a024..a0215aa6e 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -21,7 +21,7 @@ class CidStore; class ScrubContext; class UpstreamCache; class ZenCacheStore; -enum class CachePolicy : uint8_t; +enum class CachePolicy : uint32_t; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -89,6 +89,9 @@ private: void HandleCachePayloadRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); + void HandleRpcRequest(zen::HttpServerRequest& Request); + void HandleRpcGetCacheRecords(zen::HttpServerRequest& Request, CbObjectView BatchRequest); + void HandleRpcGetCachePayloads(zen::HttpServerRequest& Request, CbObjectView BatchRequest); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index ac9f628d3..5cce7f325 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -190,7 +190,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa return false; } - CacheBucket* Bucket = Bucket = &it->second; + CacheBucket* Bucket = &it->second; _.ReleaseNow(); @@ -347,44 +347,52 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue ////////////////////////////////////////////////////////////////////////// -#pragma pack(push) -#pragma pack(1) +inline DiskLocation::DiskLocation() = default; -struct DiskLocation +inline DiskLocation::DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) +: OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) +, LowerSize(ValueSize & 0xFFFFffff) +, IndexDataSize(IndexSize) { - inline DiskLocation() = default; +} - inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) - : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) - , LowerSize(ValueSize & 0xFFFFffff) - , IndexDataSize(IndexSize) - { - } +inline uint64_t +DiskLocation::CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) +{ + return Offset | Flags; +} - static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; - static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; - static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; - static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; - static const uint64_t kStructured = 0x4000'0000'0000'0000ull; - static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; +inline uint64_t +DiskLocation::Offset() const +{ + return OffsetAndFlags & kOffsetMask; +} - static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } +inline uint64_t +DiskLocation::Size() const +{ + return LowerSize; +} - inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } - inline uint64_t Size() const { return LowerSize; } - inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } - inline ZenContentType GetContentType() const - { - ZenContentType ContentType = ZenContentType::kBinary; +inline uint64_t +DiskLocation::IsFlagSet(uint64_t Flag) const +{ + return OffsetAndFlags & Flag; +} - if (IsFlagSet(DiskLocation::kStructured)) - { - ContentType = ZenContentType::kCbObject; - } +inline ZenContentType +DiskLocation::GetContentType() const +{ + ZenContentType ContentType = ZenContentType::kBinary; - return ContentType; + if (IsFlagSet(DiskLocation::kStructured)) + { + ContentType = ZenContentType::kCbObject; } + return ContentType; +} + private: uint64_t OffsetAndFlags = 0; uint32_t LowerSize = 0; diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 5a3191cc5..3040640f8 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -7,6 +7,7 @@ #include <zencore/iohash.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/basicfile.h> #include <zenstore/cas.h> #include <zenstore/gc.h> @@ -103,6 +104,42 @@ private: Configuration m_Configuration; }; +#pragma pack(push) +#pragma pack(1) + +struct DiskLocation +{ + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; + static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; + static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; + static const uint64_t kStructured = 0x4000'0000'0000'0000ull; + static const uint64_t kTombStone = 0x2000'0000'0000'0000ull; + + DiskLocation(); + DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags); + static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags); + uint64_t Offset() const; + uint64_t Size() const; + uint64_t IsFlagSet(uint64_t Flag) const; + ZenContentType GetContentType() const; + +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; +}; + +struct DiskIndexEntry +{ + IoHash Key; + DiskLocation Location; +}; + +#pragma pack(pop) + +static_assert(sizeof(DiskIndexEntry) == 36); + class ZenCacheDiskLayer { public: @@ -122,7 +159,53 @@ private: /** A cache bucket manages a single directory containing metadata and data for that bucket */ - struct CacheBucket; + struct CacheBucket + { + CacheBucket(); + ~CacheBucket(); + + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); + static bool Delete(std::filesystem::path BucketDir); + + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); + void Drop(); + void Flush(); + void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + inline bool IsOk() const { return m_IsOk; } + + private: + std::filesystem::path m_BucketDir; + Oid m_BucketId; + bool m_IsOk = false; + uint64_t m_LargeObjectThreshold = 64 * 1024; + + // These files are used to manage storage of small objects for this bucket + + BasicFile m_SobsFile; + TCasLogFile<DiskIndexEntry> m_SlogFile; + + RwLock m_IndexLock; + tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; + + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); + bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + + RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } + }; std::filesystem::path m_RootDir; RwLock m_Lock; diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 053c262c2..8ad14a1ed 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -2,6 +2,8 @@ #include "apply.h" +#include <upstream/jupiter.h> +#include <upstream/upstreamapply.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -331,8 +333,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { + m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore); + + CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv, + .DdcNamespace = "default"sv, + .BlobStoreNamespace = "default"sv, + .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv}; + + auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "workers/{worker}", @@ -488,6 +502,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -541,7 +579,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, { // We already have everything - CbPackage Output = ExecAction(Worker, RequestObject); + CbObject Output = ExecActionUpstream(Worker, RequestObject); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -600,7 +638,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); - CbPackage Output = ExecAction(Worker, ActionObj); + CbObject Output = ExecActionUpstream(Worker, ActionObj); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -843,4 +881,69 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) return OutputPackage; } +CbObject +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + throw std::runtime_error("Error enqueuing upstream task"); + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + return std::move(Writer.Save()); +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + using namespace fmt::literals; + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason); + // throw std::runtime_error( + // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str()); + return HttpResponseCode::BadRequest; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + } // namespace zen diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 86b262213..15cda4750 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -14,6 +14,7 @@ namespace zen { class CasStore; class CidStore; +class UpstreamApply; /** * Lambda style compute function service @@ -28,14 +29,15 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - HttpRequestRouter m_Router; - CasStore& m_CasStore; - CidStore& m_CidStore; - std::filesystem::path m_SandboxPath; - std::filesystem::path m_FunctionPath; - std::atomic<int> m_SandboxCount{0}; + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + CasStore& m_CasStore; + CidStore& m_CidStore; + std::filesystem::path m_SandboxPath; + std::filesystem::path m_FunctionPath; + std::atomic<int> m_SandboxCount{0}; + std::unique_ptr<UpstreamApply> m_UpstreamApply; struct WorkerDesc { @@ -44,6 +46,8 @@ private: [[nodiscard]] std::filesystem::path CreateNewSandbox(); [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); RwLock m_WorkerLock; std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; diff --git a/zenserver/config.cpp b/zenserver/config.cpp index f512f8015..3e85daa9e 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -243,7 +243,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z "", "upstream-thread-count", "Number of threads used for upstream procsssing", - cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), ""); options.add_option("cache", @@ -253,6 +253,20 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), ""); + options.add_option("cache", + "", + "upstream-connect-timeout-ms", + "Connect timeout in millisecond(s). Default 5000 ms.", + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.ConnectTimeoutMilliseconds)->default_value("5000"), + ""); + + options.add_option("cache", + "", + "upstream-timeout-ms", + "Timeout in millisecond(s). Default 0 ms", + cxxopts::value<int32_t>(ServiceConfig.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"), + ""); + try { auto result = options.parse(argc, argv); diff --git a/zenserver/config.h b/zenserver/config.h index 7fa8163b3..f6858b940 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -65,9 +65,11 @@ struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; ZenUpstreamZenConfig ZenConfig; - int UpstreamThreadCount = 4; - UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; - bool StatsEnabled = false; + int32_t UpstreamThreadCount = 4; + int32_t ConnectTimeoutMilliseconds = 5000; + int32_t TimeoutMilliseconds = 0; + UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; + bool StatsEnabled = false; }; struct ZenServiceConfig diff --git a/zenserver/diag/formatters.h b/zenserver/diag/formatters.h index 42f928efe..759df58d3 100644 --- a/zenserver/diag/formatters.h +++ b/zenserver/diag/formatters.h @@ -2,6 +2,11 @@ #pragma once +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/iobuffer.h> +#include <zencore/string.h> + ZEN_THIRD_PARTY_INCLUDES_START #include <cpr/cpr.h> #include <fmt/format.h> @@ -17,7 +22,7 @@ struct fmt::formatter<cpr::Response> { using namespace std::literals; - if (Response.status_code == 200) + if (Response.status_code == 200 || Response.status_code == 201) { return fmt::format_to(Ctx.out(), "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s", @@ -32,18 +37,21 @@ struct fmt::formatter<cpr::Response> const auto It = Response.header.find("Content-Type"); const std::string_view ContentType = It != Response.header.end() ? It->second : "<None>"sv; - const bool IsBinary = ContentType == "application/x-ue-cb"sv || ContentType == "application/x-ue-comp"sv || - ContentType == "application/octet-stream"; - - if (IsBinary) + if (ContentType == "application/x-ue-cb"sv) { + zen::IoBuffer Body(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::CbObjectView Obj(Body.Data()); + zen::ExtendableStringBuilder<256> Sb; + std::string_view Json = Obj.ToJson(Sb).ToView(); + return fmt::format_to(Ctx.out(), - "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Reason: '{}'", + "Url: {}, Status: {}, Bytes: {}/{} (Up/Down), Elapsed: {}s, Response: '{}', Reason: '{}'", Response.url.str(), Response.status_code, Response.uploaded_bytes, Response.downloaded_bytes, Response.elapsed, + Json, Response.reason); } else diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 556a2124d..9223ea0f4 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -46,13 +46,20 @@ namespace detail { void InvalidateAccessToken() { AccessToken = {}; } - void Reset() + void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) { Session.SetBody({}); Session.SetHeader({}); + Session.SetConnectTimeout(ConnectTimeout); + Session.SetTimeout(Timeout); AccessToken = GetAccessToken(); } + cpr::Session& GetSession() { return Session; } + + private: + friend class CloudCacheClient; + CloudCacheClient& OwnerClient; CloudCacheAccessToken AccessToken; cpr::Session Session; @@ -87,12 +94,12 @@ CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Ke } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key << ".raw"; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -133,10 +140,11 @@ CloudCacheSession::GetRef(std::string_view BucketId, const IoHash& Key, ZenConte Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", ContentType}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -168,10 +176,11 @@ CloudCacheSession::GetBlob(const IoHash& Key) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -180,9 +189,14 @@ CloudCacheSession::GetBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; - const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + const IoBuffer Buffer = + Success && Response.text.size() > 0 ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; } @@ -199,10 +213,11 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-comp"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -211,6 +226,10 @@ CloudCacheSession::GetCompressedBlob(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -230,10 +249,11 @@ CloudCacheSession::GetObject(const IoHash& Key) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Get(); ZEN_DEBUG("GET {}", Response); @@ -242,6 +262,10 @@ CloudCacheSession::GetObject(const IoHash& Key) { return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } const bool Success = Response.status_code == 200; const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); @@ -263,7 +287,7 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/c/ddc/" << m_CacheClient->DdcNamespace() << "/" << BucketId << "/" << Key; - auto& Session = m_SessionState->Session; + auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, @@ -314,7 +338,7 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption( @@ -377,12 +401,13 @@ CloudCacheSession::FinalizeRef(std::string_view BucketId, const IoHash& Key, con Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/" << RefHash.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"X-Jupiter-IoHash", RefHash.ToHexString()}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -436,7 +461,7 @@ CloudCacheSession::PutBlob(const IoHash& Key, IoBuffer Blob) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/octet-stream"}}); @@ -471,7 +496,7 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-comp"}}); @@ -506,7 +531,7 @@ CloudCacheSession::PutObject(const IoHash& Key, IoBuffer Object) ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); @@ -542,10 +567,11 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << m_CacheClient->BlobStoreNamespace() << "/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + Session.SetOption(cpr::Body{}); cpr::Response Response = Session.Head(); ZEN_DEBUG("HEAD {}", Response); @@ -565,67 +591,41 @@ CloudCacheSession::RefExists(std::string_view BucketId, const IoHash& Key) CloudCacheResult CloudCacheSession::BlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; - - Session.SetOption(cpr::Url{Uri.c_str()}); - - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); - - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; + return CacheTypeExists("blobs"sv, Key); } CloudCacheResult CloudCacheSession::CompressedBlobExists(const IoHash& Key) { - const CloudCacheAccessToken& AccessToken = GetAccessToken(); - if (!AccessToken.IsValid()) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } - - ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); - - cpr::Session& Session = m_SessionState->Session; + return CacheTypeExists("compressed-blobs"sv, Key); +} - Session.SetOption(cpr::Url{Uri.c_str()}); +CloudCacheResult +CloudCacheSession::ObjectExists(const IoHash& Key) +{ + return CacheTypeExists("objects"sv, Key); +} - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); +CloudCacheExistsResult +CloudCacheSession::BlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("blobs"sv, Keys); +} - if (Response.error) - { - return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; - } - else if (!VerifyAccessToken(Response.status_code)) - { - return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; - } +CloudCacheExistsResult +CloudCacheSession::CompressedBlobExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("compressed-blobs"sv, Keys); +} - return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +CloudCacheExistsResult +CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) +{ + return CacheTypeExists("objects"sv, Keys); } CloudCacheResult -CloudCacheSession::ObjectExists(const IoHash& Key) +CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -634,14 +634,16 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; - cpr::Session& Session = m_SessionState->Session; + auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); - cpr::Response Response = Session.Head(); - ZEN_DEBUG("HEAD {}", Response); + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); if (Response.error) { @@ -656,7 +658,7 @@ CloudCacheSession::ObjectExists(const IoHash& Key) } CloudCacheResult -CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -665,13 +667,12 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; - auto& Session = m_SessionState->Session; + auto& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); - Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); cpr::Response Response = Session.Post(); ZEN_DEBUG("POST {}", Response); @@ -692,7 +693,7 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa } CloudCacheResult -CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) +CloudCacheSession::GetObjectTree(const IoHash& Key) { const CloudCacheAccessToken& AccessToken = GetAccessToken(); if (!AccessToken.IsValid()) @@ -701,15 +702,15 @@ CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t } ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString() << "/tree"; - auto& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); - Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/octet-stream"}}); - cpr::Response Response = Session.Post(); - ZEN_DEBUG("POST {}", Response); + cpr::Response Response = Session.Get(); + ZEN_DEBUG("GET {}", Response); if (Response.error) { @@ -755,6 +756,92 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) return true; } +CloudCacheResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}}); + + cpr::Response Response = Session.Head(); + ZEN_DEBUG("HEAD {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {.ErrorCode = 401, .Reason = std::string("Invalid access token")}; + } + + return {.ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +CloudCacheExistsResult +CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) +{ + const CloudCacheAccessToken& AccessToken = GetAccessToken(); + if (!AccessToken.IsValid()) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + ExtendableStringBuilder<256> Query; + for (const auto& Key : Keys) + { + Query << (Query.Size() != 0 ? "&id=" : "id=") << Key.ToHexString(); + } + + ExtendableStringBuilder<256> Uri; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/exists?" << Query; + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {CloudCacheResult{.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = Response.error.message}}; + } + else if (!VerifyAccessToken(Response.status_code)) + { + return {CloudCacheResult{.ErrorCode = 401, .Reason = std::string("Invalid access token")}}; + } + + CloudCacheExistsResult Result{ + CloudCacheResult{.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}}; + + if (Result.Success) + { + IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); + for (auto& Item : ExistsResponse["id"sv]) + { + if (Item.IsHash()) + { + Result.Have.insert(Item.AsHash()); + } + } + } + + return Result; +} + ////////////////////////////////////////////////////////////////////////// // // ServiceUrl: https://jupiter.devtools.epicgames.com @@ -772,7 +859,16 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options) , m_BlobStoreNamespace(Options.BlobStoreNamespace) , m_OAuthClientId(Options.OAuthClientId) , m_OAuthSecret(Options.OAuthSecret) +, m_AccessToken(Options.AccessToken) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) { + if (!Options.AccessToken.empty()) + { + // If an access token was provided, OAuth settings are not used. + return; + } + if (!Options.OAuthProvider.starts_with("http://"sv) && !Options.OAuthProvider.starts_with("https://"sv)) { ZEN_WARN("bad provider specification: '{}' - must be fully qualified", Options.OAuthProvider); @@ -822,6 +918,12 @@ CloudCacheClient::AcquireAccessToken() { using namespace std::chrono; + // If an access token was provided, return it instead of querying OAuth + if (!m_AccessToken.empty()) + { + return {m_AccessToken, steady_clock::time_point::max()}; + } + ExtendableStringBuilder<128> OAuthFormData; OAuthFormData << "client_id=" << m_OAuthClientId << "&scope=cache_access&grant_type=client_credentials&client_secret=" << m_OAuthSecret; @@ -862,7 +964,7 @@ CloudCacheClient::AllocSessionState() State = new detail::CloudCacheSessionState(*this); } - State->Reset(); + State->Reset(m_ConnectTimeout, m_Timeout); return State; } diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 9471ef64f..68c7361e0 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -12,6 +12,7 @@ #include <chrono> #include <list> #include <memory> +#include <set> #include <vector> struct ZenCacheValue; @@ -64,6 +65,11 @@ struct FinalizeRefResult : CloudCacheResult std::vector<IoHash> Needs; }; +struct CloudCacheExistsResult : CloudCacheResult +{ + std::set<IoHash> Have; +}; + /** * Context for performing Jupiter operations * @@ -95,12 +101,18 @@ public: FinalizeRefResult FinalizeRef(std::string_view BucketId, const IoHash& Key, const IoHash& RefHah); CloudCacheResult RefExists(std::string_view BucketId, const IoHash& Key); + CloudCacheResult BlobExists(const IoHash& Key); CloudCacheResult CompressedBlobExists(const IoHash& Key); CloudCacheResult ObjectExists(const IoHash& Key); + CloudCacheExistsResult BlobExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); + CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); + CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); + CloudCacheResult GetObjectTree(const IoHash& Key); std::vector<IoHash> Filter(std::string_view BucketId, const std::vector<IoHash>& ChunkHashes); @@ -109,6 +121,10 @@ private: const CloudCacheAccessToken& GetAccessToken(); bool VerifyAccessToken(long StatusCode); + CloudCacheResult CacheTypeExists(std::string_view TypeId, const IoHash& Key); + + CloudCacheExistsResult CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys); + spdlog::logger& m_Log; RefPtr<CloudCacheClient> m_CacheClient; detail::CloudCacheSessionState* m_SessionState; @@ -116,13 +132,16 @@ private: struct CloudCacheClientOptions { - std::string_view ServiceUrl; - std::string_view DdcNamespace; - std::string_view BlobStoreNamespace; - std::string_view OAuthProvider; - std::string_view OAuthClientId; - std::string_view OAuthSecret; - bool UseLegacyDdc = false; + std::string_view ServiceUrl; + std::string_view DdcNamespace; + std::string_view BlobStoreNamespace; + std::string_view OAuthProvider; + std::string_view OAuthClientId; + std::string_view OAuthSecret; + std::string_view AccessToken; + std::chrono::milliseconds ConnectTimeout{5000}; + std::chrono::milliseconds Timeout{}; + bool UseLegacyDdc = false; }; /** @@ -143,16 +162,19 @@ public: spdlog::logger& Logger() { return m_Log; } private: - spdlog::logger& m_Log; - std::string m_ServiceUrl; - std::string m_OAuthDomain; - std::string m_OAuthUriPath; - std::string m_OAuthFullUri; - std::string m_DdcNamespace; - std::string m_BlobStoreNamespace; - std::string m_OAuthClientId; - std::string m_OAuthSecret; - bool m_IsValid = false; + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::string m_OAuthDomain; + std::string m_OAuthUriPath; + std::string m_OAuthFullUri; + std::string m_DdcNamespace; + std::string m_BlobStoreNamespace; + std::string m_OAuthClientId; + std::string m_OAuthSecret; + std::string m_AccessToken; + std::chrono::milliseconds m_ConnectTimeout{}; + std::chrono::milliseconds m_Timeout{}; + bool m_IsValid = false; RwLock m_SessionStateLock; std::list<detail::CloudCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp new file mode 100644 index 000000000..651c2dcc8 --- /dev/null +++ b/zenserver/upstream/upstreamapply.cpp @@ -0,0 +1,1559 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "upstreamapply.h" +#include "jupiter.h" +#include "zen.h" + +#include <zencore/blockingqueue.h> +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinarypackage.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/compress.h> +#include <zencore/fmtutils.h> +#include <zencore/session.h> +#include <zencore/stats.h> +#include <zencore/stream.h> +#include <zencore/timer.h> + +#include <zenstore/cas.h> +#include <zenstore/cidstore.h> + +#include "cache/structuredcachestore.h" +#include "diag/logging.h" + +#include <fmt/format.h> + +#include <algorithm> +#include <atomic> +#include <map> +#include <set> +#include <stack> +#include <thread> +#include <unordered_map> + +namespace zen { + +using namespace std::literals; + +namespace detail { + + class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint + { + public: + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + { + using namespace fmt::literals; + m_DisplayName = "Horde - '{}'"_format(Options.ServiceUrl); + m_Client = new CloudCacheClient(Options); + m_ChannelId = "zen-{}"_format(zen::GetSessionIdString()); + } + + virtual ~HordeUpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + + virtual bool IsHealthy() const override { return m_HealthOk.load(); } + + virtual UpstreamEndpointHealth CheckHealth() override + { + try + { + CloudCacheSession Session(m_Client); + CloudCacheResult Result = Session.Authenticate(); + + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; + } + catch (std::exception& Err) + { + return {.Reason = Err.what(), .Ok = false}; + } + } + + virtual std::string_view DisplayName() const override { return m_DisplayName; } + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + + try + { + UpstreamData UpstreamData; + if (!ProcessApplyKey(ApplyRecord, UpstreamData)) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}}; + } + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.contains(UpstreamData.TaskId)) + { + // Pending task is already queued, return success + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; + } + + CloudCacheSession Session(m_Client); + + { + CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Blobs.clear(); + } + + { + CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + UpstreamData.Objects.clear(); + } + + CbObjectWriter Writer; + Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); + Writer.BeginArray("t"sv); + Writer.AddObjectAttachment(UpstreamData.TaskId); + Writer.EndArray(); + IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); + + CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + { + std::scoped_lock Lock(m_TaskMutex); + m_PendingTasks.erase(UpstreamData.TaskId); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + } + + [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session, const std::map<IoHash, IoBuffer>& Blobs) + { + if (Blobs.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing blobs + std::set<IoHash> Keys; + for (const auto& It : Blobs) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.BlobExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing blobs + + for (const auto& It : Blobs) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutBlob(It.first, It.second); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects) + { + if (Objects.size() == 0) + { + return {.Success = true}; + } + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Batch check for missing objects + std::set<IoHash> Keys; + for (const auto& It : Objects) + { + Keys.insert(It.first); + } + + CloudCacheExistsResult ExistsResult = Session.ObjectExists(Keys); + ElapsedSeconds += ExistsResult.ElapsedSeconds; + if (ExistsResult.ErrorCode != 0) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = ExistsResult.ErrorCode, + .Reason = std::move(ExistsResult.Reason)}; + } + + // TODO: Batch upload missing objects + + for (const auto& It : Objects) + { + if (ExistsResult.Have.contains(It.first)) + { + continue; + } + + CloudCacheResult Result = Session.PutObject(It.first, It.second.GetBuffer().AsIoBuffer()); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .ErrorCode = Result.ErrorCode, + .Reason = std::move(Result.Reason)}; + } + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true}; + } + + enum class ComputeTaskState : int32_t + { + Queued = 0, + Executing = 1, + Complete = 2, + }; + + enum class ComputeTaskOutcome : int32_t + { + Success = 0, + Failed = 1, + Cancelled = 2, + NoResult = 3, + Exipred = 4, + BlobNotFound = 5, + Exception = 6, + }; + + std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) + { + switch (Outcome) + { + case ComputeTaskOutcome::Success: + return "Success"sv; + case ComputeTaskOutcome::Failed: + return "Failed"sv; + case ComputeTaskOutcome::Cancelled: + return "Cancelled"sv; + case ComputeTaskOutcome::NoResult: + return "NoResult"sv; + case ComputeTaskOutcome::Exipred: + return "Exipred"sv; + case ComputeTaskOutcome::BlobNotFound: + return "BlobNotFound"sv; + case ComputeTaskOutcome::Exception: + return "Exception"sv; + }; + return "Unknown"sv; + } + + virtual GetUpstreamApplyUpdatesResult GetUpdates() override + { + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted CompletedTasks; + + { + std::scoped_lock Lock(m_TaskMutex); + if (m_PendingTasks.empty()) + { + // Nothing to do. + return {.Success = true}; + } + } + + try + { + CloudCacheSession Session(m_Client); + + CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + Bytes += UpdatesResult.Bytes; + ElapsedSeconds += UpdatesResult.ElapsedSeconds; + if (UpdatesResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!UpdatesResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; + } + + CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); + + // zen::StringBuilder<4096> ObjStr; + // zen::CompactBinaryToJson(TaskStatus, ObjStr); + + for (auto& It : TaskStatus["u"sv]) + { + CbObjectView Status = It.AsObjectView(); + const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32(); + + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + // Only care about completed tasks + if (State != ComputeTaskState::Complete) + { + continue; + } + + const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + + IoHash WorkerId; + IoHash ActionId; + + { + std::scoped_lock Lock(m_TaskMutex); + auto TaskIt = m_PendingTasks.find(TaskId); + if (TaskIt == m_PendingTasks.end()) + { + continue; + } + WorkerId = TaskIt->second.WorkerDescriptor.GetHash(); + ActionId = TaskIt->second.Action.GetHash(); + m_PendingTasks.erase(TaskIt); + } + + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + + CompletedTasks[WorkerId][ActionId] = std::move(Result); + } + + return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true}; + } + catch (std::exception& Err) + { + m_HealthOk = false; + return { + .Error{.ErrorCode = -1, .Reason = Err.what()}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .Completed = std::move(CompletedTasks), + }; + } + } + + virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; } + + private: + spdlog::logger& Log() { return m_Log; } + + CasStore& m_CasStore; + CidStore& m_CidStore; + spdlog::logger& m_Log; + std::string m_DisplayName; + RefPtr<CloudCacheClient> m_Client; + UpstreamApplyEndpointStats m_Stats; + std::atomic_bool m_HealthOk{false}; + std::string m_ChannelId; + + std::mutex m_TaskMutex; + std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks; + + struct UpstreamData + { + std::map<IoHash, IoBuffer> Blobs; + std::map<IoHash, CbObject> Objects; + IoHash TaskId; + IoHash RequirementsId; + }; + + struct UpstreamDirectory + { + std::filesystem::path Path; + std::map<std::string, UpstreamDirectory> Directories; + std::set<std::string> Files; + }; + + [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const CbObjectView& TaskStatus, CloudCacheSession& Session) + { + try + { + const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)TaskStatus["o"sv].AsInt32(); + + if (Outcome != ComputeTaskOutcome::Success) + { + using namespace fmt::literals; + const std::string_view Detail = TaskStatus["d"sv].AsString(); + if (!Detail.empty()) + { + return {.Error{.ErrorCode = -1, + .Reason = "Task {}: {}"_format(ComputeTaskOutcomeToString(Outcome), std::string(Detail))}}; + } + return {.Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}}; + } + + const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const DateTime Time = TaskStatus["t"sv].AsDateTime(); + const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const std::string_view AgentId = TaskStatus["a"sv].AsString(); + const std::string_view LeaseId = TaskStatus["l"sv].AsString(); + + int64_t Bytes{}; + double ElapsedSeconds{}; + + // Get Result object and all Object Attachments + Binary Attachment IDs + CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectTreeResult.ErrorCode, .Reason = std::move(ObjectTreeResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectTreeResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + std::map<IoHash, IoBuffer> TreeObjectData; + std::map<IoHash, IoBuffer> TreeBinaryData; + + MemoryView ResponseView = ObjectTreeResult.Response; + while (ResponseView.GetSize() > 0) + { + CbFieldView Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (Field.IsObjectAttachment()) + { + const IoHash Hash = Field.AsObjectAttachment(); + Field = CbFieldView(ResponseView.GetData()); + ResponseView += Field.GetSize(); + if (!Field.IsObject()) // No data + { + TreeObjectData[Hash] = {}; + continue; + } + MemoryView FieldView = Field.AsObjectView().GetView(); + + TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); + } + else if (Field.IsBinaryAttachment()) + { + const IoHash Hash = Field.AsBinaryAttachment(); + TreeBinaryData[Hash] = {}; + } + else // Unknown type + { + } + } + + for (auto& It : TreeObjectData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult ObjectResult = Session.GetObject(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (ObjectTreeResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!ObjectResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(ObjectResult.Response); + } + } + + for (auto& It : TreeBinaryData) + { + if (It.second.GetSize() == 0) + { + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectTreeResult.Bytes; + ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + if (BlobResult.ErrorCode != 0) + { + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + if (!BlobResult.Success) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + It.second = std::move(BlobResult.Response); + } + } + + CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + int32_t ExitCode = ResultObject["e"sv].AsInt32(); + IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); + IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); + IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); + + std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + + if (ExitCode != 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Task completed with errors"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr)}; + } + + CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + + // Get build.output + IoHash BuildOutputId; + IoBuffer BuildOutput; + for (auto& It : OutputObject["f"sv]) + { + const CbObjectView FileObject = It.AsObjectView(); + if (FileObject["n"sv].AsString() == "Build.output"sv) + { + BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); + BuildOutput = TreeBinaryData[BuildOutputId]; + break; + } + } + + if (BuildOutput.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Build.output file not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // Get Output directory node + IoBuffer OutputDirectoryTree; + for (auto& It : OutputObject["d"sv]) + { + const CbObjectView DirectoryObject = It.AsObjectView(); + if (DirectoryObject["n"sv].AsString() == "Outputs"sv) + { + OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + break; + } + } + + if (OutputDirectoryTree.GetSize() == 0) + { + return {.Error{.ErrorCode = ExitCode, .Reason = "Outputs directory not found in task results"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + // load build.output as CbObject + + // Move Outputs from Horde to CbPackage + + std::unordered_map<IoHash, IoHash> CidToCompressedId; + CbPackage OutputPackage; + CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree); + int64_t TotalAttachmentBytes = 0; + int64_t TotalRawAttachmentBytes = 0; + + for (auto& It : OutputDirectoryTreeObject["f"sv]) + { + CbObjectView FileObject = It.AsObjectView(); + // Name is the uncompressed hash + IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString()); + // Hash is the compressed data hash, and how it is stored in Horde + IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); + return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + CidToCompressedId[DecompressedId] = CompressedId; + } + + // Iterate attachments, verify all chunks exist, and add to CbPackage + bool AnyErrors = false; + CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput); + BuildOutputObject.IterateAttachments([&](CbFieldView Field) { + const IoHash DecompressedId = Field.AsHash(); + if (!CidToCompressedId.contains(DecompressedId)) + { + Log().warn("Attachment not found {}", DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); + + if (!TreeBinaryData.contains(CompressedId)) + { + Log().warn("Missing output {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + + if (!AttachmentBuffer) + { + Log().warn("Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed", + CompressedId.ToHexString(), + DecompressedId.ToHexString()); + AnyErrors = true; + return; + } + + TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize(); + TotalRawAttachmentBytes += AttachmentBuffer.GetRawSize(); + + CbAttachment Attachment(AttachmentBuffer); + OutputPackage.AddAttachment(Attachment); + }); + + if (AnyErrors) + { + return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + + OutputPackage.SetObject(BuildOutputObject); + + return {.OutputPackage = std::move(OutputPackage), + .TotalAttachmentBytes = TotalAttachmentBytes, + .TotalRawAttachmentBytes = TotalRawAttachmentBytes, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds, + .StdOut = std::move(StdOut), + .StdErr = std::move(StdErr), + .Success = true}; + } + catch (std::exception& Err) + { + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; + } + } + + [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data) + { + std::string ExecutablePath; + std::map<std::string, std::string> Environment; + std::set<std::filesystem::path> InputFiles; + std::map<std::filesystem::path, IoHash> InputFileHashes; + + ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString(); + if (ExecutablePath.empty()) + { + Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor", + ApplyRecord.WorkerDescriptor.GetHash()); + return false; + } + + for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["files"sv]) + { + CbObjectView FileEntry = It.AsObjectView(); + if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.Blobs)) + { + return false; + } + } + + for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv]) + { + std::string_view Env = It.AsString(); + auto Index = Env.find('='); + if (Index < 0) + { + Log().warn("process apply upstream FAILED, environment '{}' malformed", Env); + return false; + } + + Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1); + } + + { + static const std::filesystem::path BuildActionPath = "Build.action"sv; + static const std::filesystem::path InputPath = "Inputs"sv; + const IoHash ActionId = ApplyRecord.Action.GetHash(); + + InputFiles.insert(BuildActionPath); + InputFileHashes[BuildActionPath] = ActionId; + Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(), + ApplyRecord.Action.GetBuffer().GetSize()); + + bool AnyErrors = false; + ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) { + const IoHash Cid = Field.AsHash(); + const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()}; + IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Cid); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid); + AnyErrors = true; + return; + } + + if (InputFiles.contains(FilePath)) + { + return; + } + + const IoHash CompressedId = IoHash::HashBuffer(DataBuffer.GetData(), DataBuffer.GetSize()); + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = CompressedId; + Data.Blobs[CompressedId] = std::move(DataBuffer); + }); + + if (AnyErrors) + { + return false; + } + } + + const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles); + + CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Blobs, Data.Objects); + const IoHash SandboxHash = Sandbox.GetHash(); + Data.Objects[SandboxHash] = std::move(Sandbox); + + { + using namespace fmt::literals; + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + if (HostPlatform.empty()) + { + Log().warn("process apply upstream FAILED, 'host' platform not provided"); + return false; + } + + int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32(); + int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); + bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); + + // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) + std::string Condition; + if (HostPlatform == "Win64" || HostPlatform == "Windows") + { + Condition = "OSFamily == 'Windows' && Pool == 'Win-RemoteExec'"; + } + else if (HostPlatform == "Mac") + { + Condition = "OSFamily == 'MacOS'"; + } + else + { + Condition = "OSFamily == '{}'"_format(HostPlatform); + } + + std::map<std::string_view, int64_t> Resources; + if (LogicalCores > 0) + { + Resources["LogicalCores"sv] = LogicalCores; + } + if (Memory > 0) + { + Resources["RAM"sv] = std::max(Memory / 1024 / 1024 / 1024, 1LL); + } + + CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + } + + CbObject Task = BuildTask(ExecutablePath, + {"-Build=build.action"}, + Environment, + {}, + SandboxHash, + Data.RequirementsId, + {"Build.output", "Outputs"}); + + const IoHash TaskId = Task.GetHash(); + Data.Objects[TaskId] = std::move(Task); + Data.TaskId = TaskId; + + return true; + } + + [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry, + std::set<std::filesystem::path>& InputFiles, + std::map<std::filesystem::path, IoHash>& InputFileHashes, + std::map<IoHash, IoBuffer>& Blobs) + { + const std::filesystem::path FilePath = FileEntry["name"sv].AsString(); + const IoHash ChunkId = FileEntry["hash"sv].AsHash(); + const uint64_t Size = FileEntry["size"sv].AsUInt64(); + IoBuffer DataBuffer = m_CasStore.FindChunk(ChunkId); + + if (!DataBuffer) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId); + return false; + } + + if (DataBuffer.Size() != Size) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {}, action spec expected {}", + ChunkId, + DataBuffer.Size(), + Size); + return false; + } + + if (InputFiles.contains(FilePath)) + { + Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath); + return false; + } + + InputFiles.insert(FilePath); + InputFileHashes[FilePath] = ChunkId; + Blobs[ChunkId] = std::move(DataBuffer); + return true; + } + + [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles) + { + static const std::filesystem::path RootPath; + std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories; + UpstreamDirectory RootDirectory = {.Path = RootPath}; + + AllDirectories[RootPath] = &RootDirectory; + + // Build tree from flat list + for (const auto& Path : InputFiles) + { + if (Path.has_parent_path()) + { + if (!AllDirectories.contains(Path.parent_path())) + { + std::stack<std::string> PathSplit; + { + std::filesystem::path ParentPath = Path.parent_path(); + PathSplit.push(ParentPath.filename().string()); + while (ParentPath.has_parent_path()) + { + ParentPath = ParentPath.parent_path(); + PathSplit.push(ParentPath.filename().string()); + } + } + UpstreamDirectory* ParentPtr = &RootDirectory; + while (!PathSplit.empty()) + { + if (!ParentPtr->Directories.contains(PathSplit.top())) + { + std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()}; + ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath}; + AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()]; + } + ParentPtr = &ParentPtr->Directories[PathSplit.top()]; + PathSplit.pop(); + } + } + + AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string()); + } + else + { + RootDirectory.Files.insert(Path.filename().string()); + } + } + + return RootDirectory; + } + + [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory, + const std::map<std::filesystem::path, IoHash>& InputFileHashes, + const std::map<IoHash, IoBuffer>& Blobs, + std::map<IoHash, CbObject>& Objects) + { + CbObjectWriter DirectoryTreeWriter; + + if (!RootDirectory.Files.empty()) + { + DirectoryTreeWriter.BeginArray("f"sv); + for (const auto& File : RootDirectory.Files) + { + const std::filesystem::path FilePath = {RootDirectory.Path / File}; + const IoHash& FileHash = InputFileHashes.at(FilePath); + const uint64_t FileSize = Blobs.at(FileHash).Size(); + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, File); + DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash); + DirectoryTreeWriter.AddInteger("s"sv, FileSize); // Size + // DirectoryTreeWriter.AddInteger("a"sv, 0); // Attributes Currently unneeded + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + if (!RootDirectory.Directories.empty()) + { + DirectoryTreeWriter.BeginArray("d"sv); + for (const auto& Item : RootDirectory.Directories) + { + CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Blobs, Objects); + const IoHash DirectoryHash = Directory.GetHash(); + Objects[DirectoryHash] = std::move(Directory); + + DirectoryTreeWriter.BeginObject(); + DirectoryTreeWriter.AddString("n"sv, Item.first); + DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash); + DirectoryTreeWriter.EndObject(); + } + DirectoryTreeWriter.EndArray(); + } + + return std::move(DirectoryTreeWriter.Save()); + } + + [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition, + const std::map<std::string_view, int64_t>& Resources, + const bool Exclusive) + { + CbObjectWriter Writer; + Writer.AddString("c", Condition); + if (!Resources.empty()) + { + Writer.BeginArray("r"); + for (const auto& Resource : Resources) + { + Writer.BeginArray(); + Writer.AddString(Resource.first); + Writer.AddInteger(Resource.second); + Writer.EndArray(); + } + Writer.EndArray(); + } + Writer.AddBool("e", Exclusive); + return std::move(Writer.Save()); + } + + [[nodiscard]] CbObject BuildTask(const std::string_view Executable, + const std::vector<std::string>& Arguments, + const std::map<std::string, std::string>& Environment, + const std::string_view WorkingDirectory, + const IoHash& SandboxHash, + const IoHash& RequirementsId, + const std::set<std::string>& Outputs) + { + CbObjectWriter TaskWriter; + TaskWriter.AddString("e"sv, Executable); + + if (!Arguments.empty()) + { + TaskWriter.BeginArray("a"sv); + for (const auto& Argument : Arguments) + { + TaskWriter.AddString(Argument); + } + TaskWriter.EndArray(); + } + + if (!Environment.empty()) + { + TaskWriter.BeginArray("v"sv); + for (const auto& Env : Environment) + { + TaskWriter.BeginArray(); + TaskWriter.AddString(Env.first); + TaskWriter.AddString(Env.second); + TaskWriter.EndArray(); + } + TaskWriter.EndArray(); + } + + if (!WorkingDirectory.empty()) + { + TaskWriter.AddString("s"sv, WorkingDirectory); + } + + TaskWriter.AddObjectAttachment("s"sv, SandboxHash); + TaskWriter.AddObjectAttachment("r"sv, RequirementsId); + + // Outputs + if (!Outputs.empty()) + { + TaskWriter.BeginArray("o"sv); + for (const auto& Output : Outputs) + { + TaskWriter.AddString(Output); + } + TaskWriter.EndArray(); + } + + return std::move(TaskWriter.Save()); + } + }; +} // namespace detail + +////////////////////////////////////////////////////////////////////////// + +struct UpstreamApplyStats +{ + static constexpr uint64_t MaxSampleCount = 1000ull; + + UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {} + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const PostUpstreamApplyResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.PostCount++; + Stats.UpBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsUp.fetch_add(Result.ElapsedSeconds); + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Add(spdlog::logger& Logger, + UpstreamApplyEndpoint& Endpoint, + const GetUpstreamApplyUpdatesResult& Result, + const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + UpstreamApplyEndpointStats& Stats = Endpoint.Stats(); + + if (Result.Error) + { + Stats.ErrorCount++; + } + else if (Result.Success) + { + Stats.UpdateCount++; + Stats.DownBytes.fetch_add(double(Result.Bytes) / 1024.0 / 1024.0); + Stats.SecondsDown.fetch_add(Result.ElapsedSeconds); + if (!Result.Completed.empty()) + { + uint64_t Completed = 0; + for (auto& It : Result.Completed) + { + Completed += It.second.size(); + } + Stats.CompleteCount.fetch_add(Completed); + } + } + + if (m_Enabled && m_SampleCount++ % MaxSampleCount) + { + Dump(Logger, Endpoints); + } + } + + void Dump(spdlog::logger& Logger, const std::vector<std::unique_ptr<UpstreamApplyEndpoint>>& Endpoints) + { + for (auto& Ep : Endpoints) + { + // These stats will not be totally correct as the numbers are not captured atomically + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double DownBytes = Stats.DownBytes; + const double SecondsDown = Stats.SecondsDown; + const double UpBytes = Stats.UpBytes; + const double SecondsUp = Stats.SecondsUp; + + const double UpSpeed = UpBytes > 0 ? UpBytes / SecondsUp : 0.0; + const double DownSpeed = DownBytes > 0 ? DownBytes / SecondsDown : 0.0; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Logger.debug("STATS - '{}', Complete rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", + Ep->DisplayName(), + CompleteRate, + DownBytes, + DownSpeed, + UpBytes, + UpSpeed); + } + } + + bool m_Enabled; + std::atomic_uint64_t m_SampleCount = {}; +}; + +////////////////////////////////////////////////////////////////////////// + +class DefaultUpstreamApply final : public UpstreamApply +{ +public: + DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + : m_Log(logging::Get("upstream-apply")) + , m_Options(Options) + , m_CasStore(CasStore) + , m_CidStore(CidStore) + , m_Stats(Options.StatsEnabled) + { + } + + virtual ~DefaultUpstreamApply() { Shutdown(); } + + virtual bool Initialize() override + { + for (auto& Endpoint : m_Endpoints) + { + const UpstreamEndpointHealth Health = Endpoint->Initialize(); + if (Health.Ok) + { + Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName()); + } + else + { + Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + + m_RunState.IsRunning = !m_Endpoints.empty(); + + if (m_RunState.IsRunning) + { + for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) + { + m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + } + + m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + + m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + } + + return m_RunState.IsRunning; + } + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override + { + m_Endpoints.emplace_back(std::move(Endpoint)); + } + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override + { + if (m_RunState.IsRunning) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300); + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + // Already in progress + return {.ApplyId = ActionId, .Success = true}; + } + + std::chrono::steady_clock::time_point ExpireTime = + TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds) + : std::chrono::steady_clock::time_point::max(); + + m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)}; + } + + if (!m_UpstreamThreads.empty()) + { + m_UpstreamQueue.Enqueue(std::move(ApplyRecord)); + } + else + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + return {.ApplyId = ActionId, .Success = true}; + } + + return {}; + } + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override + { + if (m_RunState.IsRunning) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + return {.Status = *Status, .Success = true}; + } + } + + return {}; + } + + virtual void GetStatus(CbObjectWriter& Status) override + { + Status << "worker_threads" << m_Options.ThreadCount; + Status << "queue_count" << m_UpstreamQueue.Size(); + + Status.BeginArray("endpoints"); + for (const auto& Ep : m_Endpoints) + { + Status.BeginObject(); + Status << "name" << Ep->DisplayName(); + Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); + + UpstreamApplyEndpointStats& Stats = Ep->Stats(); + const uint64_t PostCount = Stats.PostCount; + const uint64_t CompleteCount = Stats.CompleteCount; + const uint64_t UpdateCount = Stats.UpdateCount; + const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0; + + Status << "post_count" << PostCount; + Status << "complete_count" << PostCount; + Status << "update_count" << Stats.UpdateCount; + + Status << "complete_ratio" << CompleteRate; + Status << "downloaded_mb" << Stats.DownBytes; + Status << "uploaded_mb" << Stats.UpBytes; + Status << "error_count" << Stats.ErrorCount; + + Status.EndObject(); + } + Status.EndArray(); + } + +private: + // The caller is responsible for locking if required + UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId) + { + if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end()) + { + if (auto It2 = It->second.find(ActionId); It2 != It->second.end()) + { + return &It2->second; + } + } + return nullptr; + } + + void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord) + { + const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash(); + const IoHash ActionId = ApplyRecord.Action.GetHash(); + try + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord)); + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + if (Result.Success) + { + Status->State = UpstreamApplyState::Executing; + } + else + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error = std::move(Result.Error), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds}; + } + } + } + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + return; + } + } + + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}}; + } + Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId); + } + } + catch (std::exception& e) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}}; + } + Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what()); + } + } + + void ProcessUpstreamQueue() + { + for (;;) + { + UpstreamApplyRecord ApplyRecord; + if (m_UpstreamQueue.WaitAndDequeue(ApplyRecord)) + { + ProcessApplyRecord(std::move(ApplyRecord)); + } + + if (!m_RunState.IsRunning) + { + break; + } + } + } + + void ProcessApplyUpdates() + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy()) + { + GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(); + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason); + } + + if (!Result.Completed.empty()) + { + for (auto& It : Result.Completed) + { + for (auto& It2 : It.second) + { + std::scoped_lock Lock(m_ApplyTasksMutex); + if (auto Status = FindStatus(It.first, It2.first); Status != nullptr) + { + Status->State = UpstreamApplyState::Complete; + Status->Result = std::move(It2.second); + } + } + } + } + } + } + } + + void ProcessUpstreamUpdates() + { + const auto& UpdateSleep = std::chrono::seconds(m_Options.UpdatesInterval); + for (;;) + { + std::this_thread::sleep_for(UpdateSleep); + + if (!m_RunState.IsRunning) + { + break; + } + + ProcessApplyUpdates(); + + // Remove any expired tasks, regardless of state + { + std::scoped_lock Lock(m_ApplyTasksMutex); + for (auto& WorkerIt : m_ApplyTasks) + { + const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) { + return Item.second.ExpireTime < std::chrono::steady_clock::now(); + }); + if (Count > 0) + { + Log().debug("Removed '{}' expired tasks", Count); + } + } + const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); }); + if (Count > 0) + { + Log().debug("Removed '{}' empty task lists", Count); + } + } + } + } + + void MonitorEndpoints() + { + for (;;) + { + { + std::unique_lock Lock(m_RunState.Mutex); + if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); })) + { + break; + } + } + + for (auto& Endpoint : m_Endpoints) + { + if (!Endpoint->IsHealthy()) + { + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); + } + else + { + Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + } + } + } + } + } + + void Shutdown() + { + if (m_RunState.Stop()) + { + m_UpstreamQueue.CompleteAdding(); + for (std::thread& Thread : m_UpstreamThreads) + { + Thread.join(); + } + + m_EndpointMonitorThread.join(); + m_UpstreamUpdatesThread.join(); + m_UpstreamThreads.clear(); + m_Endpoints.clear(); + } + } + + spdlog::logger& Log() { return m_Log; } + + using UpstreamApplyQueue = BlockingQueue<UpstreamApplyRecord>; + + struct RunState + { + std::mutex Mutex; + std::condition_variable ExitSignal; + std::atomic_bool IsRunning{false}; + + bool Stop() + { + bool Stopped = false; + { + std::scoped_lock Lock(Mutex); + Stopped = IsRunning.exchange(false); + } + if (Stopped) + { + ExitSignal.notify_all(); + } + return Stopped; + } + }; + + spdlog::logger& m_Log; + UpstreamApplyOptions m_Options; + CasStore& m_CasStore; + CidStore& m_CidStore; + UpstreamApplyQueue m_UpstreamQueue; + UpstreamApplyStats m_Stats; + UpstreamApplyTasks m_ApplyTasks; + std::mutex m_ApplyTasksMutex; + std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::thread m_UpstreamUpdatesThread; + std::thread m_EndpointMonitorThread; + RunState m_RunState; +}; + +////////////////////////////////////////////////////////////////////////// + +std::unique_ptr<UpstreamApply> +MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); +} + +std::unique_ptr<UpstreamApplyEndpoint> +MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, CasStore& CasStore, CidStore& CidStore) +{ + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, CasStore, CidStore); +} + +} // namespace zen diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h new file mode 100644 index 000000000..98f193c02 --- /dev/null +++ b/zenserver/upstream/upstreamapply.h @@ -0,0 +1,172 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/compactbinarypackage.h> +#include <zencore/iobuffer.h> +#include <zencore/iohash.h> +#include <zencore/zencore.h> + +#include <atomic> +#include <chrono> +#include <memory> +#include <unordered_map> +#include <unordered_set> + +namespace zen { + +class CbObjectWriter; +class CasStore; +class CidStore; +class ZenCacheStore; +struct CloudCacheClientOptions; + +enum class UpstreamApplyState : int32_t +{ + Queued = 0, + Executing = 1, + Complete = 2, +}; + +struct UpstreamApplyRecord +{ + CbObject WorkerDescriptor; + CbObject Action; +}; + +struct UpstreamApplyOptions +{ + std::chrono::seconds HealthCheckInterval{5}; + std::chrono::seconds UpdatesInterval{5}; + uint32_t ThreadCount = 4; + bool StatsEnabled = false; +}; + +struct UpstreamApplyError +{ + int32_t ErrorCode{}; + std::string Reason{}; + + explicit operator bool() const { return ErrorCode != 0; } +}; + +struct PostUpstreamApplyResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; +}; + +struct GetUpstreamApplyResult +{ + CbPackage OutputPackage{}; + int64_t TotalAttachmentBytes{}; + int64_t TotalRawAttachmentBytes{}; + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + std::string StdOut{}; + std::string StdErr{}; + bool Success = false; +}; + +using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>; + +struct GetUpstreamApplyUpdatesResult +{ + UpstreamApplyError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + UpstreamApplyCompleted Completed{}; + bool Success = false; +}; + +struct UpstreamApplyStatus +{ + UpstreamApplyState State{}; + GetUpstreamApplyResult Result{}; + std::chrono::steady_clock::time_point ExpireTime{}; +}; + +using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>; + +struct UpstreamEndpointHealth +{ + std::string Reason; + bool Ok = false; +}; + +struct UpstreamApplyEndpointStats +{ + std::atomic_uint64_t PostCount{}; + std::atomic_uint64_t CompleteCount{}; + std::atomic_uint64_t UpdateCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; +}; + +/** + * The upstream apply endpont is responsible for handling remote execution. + */ +class UpstreamApplyEndpoint +{ +public: + virtual ~UpstreamApplyEndpoint() = default; + + virtual UpstreamEndpointHealth Initialize() = 0; + + virtual bool IsHealthy() const = 0; + + virtual UpstreamEndpointHealth CheckHealth() = 0; + + virtual std::string_view DisplayName() const = 0; + + virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; + + virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; + + virtual UpstreamApplyEndpointStats& Stats() = 0; +}; + +/** + * Manages one or more upstream cache endpoints. + */ +class UpstreamApply +{ +public: + virtual ~UpstreamApply() = default; + + virtual bool Initialize() = 0; + + virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0; + + struct EnqueueResult + { + IoHash ApplyId{}; + bool Success = false; + }; + + struct StatusResult + { + UpstreamApplyStatus Status{}; + bool Success = false; + }; + + virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; + + virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; + + virtual void GetStatus(CbObjectWriter& CbO) = 0; +}; + +std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); + +std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, + CasStore& CasStore, + CidStore& CidStore); + +} // namespace zen diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 168449d05..e2dc09872 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -40,13 +40,15 @@ namespace detail { : m_Log(zen::logging::Get("upstream")) , m_UseLegacyDdc(Options.UseLegacyDdc) { - using namespace fmt::literals; - m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); - m_Client = new CloudCacheClient(Options); + m_Info.Name = "Horde"sv; + m_Info.Url = Options.ServiceUrl; + m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } virtual bool IsHealthy() const override { return m_HealthOk.load(); } @@ -58,7 +60,7 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.Authenticate(); - m_HealthOk = Result.ErrorCode == 0; + m_HealthOk = Result.Success && Result.ErrorCode == 0; return {.Reason = std::move(Result.Reason), .Ok = Result.Success}; } @@ -68,9 +70,7 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -144,12 +144,69 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + ZEN_UNUSED(Policy); + + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : KeyIndex) + { + const CacheKey& CacheKey = CacheKeys[Index]; + CbPackage Package; + CbObject Record; + + if (!Result.Error) + { + CloudCacheResult RefResult = Session.GetRef(CacheKey.Bucket, CacheKey.Hash, ZenContentType::kCbObject); + AppendResult(RefResult, Result); + + if (RefResult.ErrorCode == 0) + { + const CbValidateError ValidationResult = ValidateCompactBinary(RefResult.Response, CbValidateMode::All); + if (ValidationResult == CbValidateError::None) + { + Record = LoadCompactBinaryObject(RefResult.Response); + Record.IterateAttachments([this, &Session, &Result, &Package](CbFieldView AttachmentHash) { + CloudCacheResult BlobResult = Session.GetCompressedBlob(AttachmentHash.AsHash()); + AppendResult(BlobResult, Result); + + if (BlobResult.ErrorCode == 0) + { + if (CompressedBuffer Chunk = CompressedBuffer::FromCompressed(SharedBuffer(BlobResult.Response))) + { + Package.AddAttachment(CbAttachment(Chunk)); + } + } + else + { + m_HealthOk = false; + } + }); + } + } + else + { + m_HealthOk = false; + } + } + + OnComplete({.CacheKey = CacheKey, .KeyIndex = Index, .Record = Record, .Package = Package}); + } + + return Result; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey&, const IoHash& PayloadId) override { try { CloudCacheSession Session(m_Client); - const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); + const CloudCacheResult Result = Session.GetCompressedBlob(PayloadId); if (Result.ErrorCode == 0) { @@ -171,12 +228,41 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + CloudCacheSession Session(m_Client); + GetUpstreamCacheResult Result; + + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IoBuffer Payload; + + if (!Result.Error) + { + const CloudCacheResult BlobResult = Session.GetCompressedBlob(Request.ChunkId); + Payload = BlobResult.Response; + + AppendResult(BlobResult, Result); + m_HealthOk = BlobResult.ErrorCode == 0; + } + + OnComplete({.Request = Request, .RequestIndex = Index, .Payload = Payload}); + } + + return Result; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { + using namespace fmt::literals; + ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -200,125 +286,132 @@ namespace detail { } } - return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = Result.Success}; + m_HealthOk = Result.ErrorCode == 0; + + return {.Reason = std::move(Result.Reason), + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; } else { - bool Success = false; int64_t TotalBytes = 0ull; double TotalElapsedSeconds = 0.0; - for (size_t Idx = 0, Count = Payloads.size(); Idx < Count; Idx++) - { - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + const auto PutBlobs = [&](std::span<IoHash> PayloadIds, std::string& OutReason) -> bool { + for (const IoHash& PayloadId : PayloadIds) { - if (CloudCacheResult Result = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - Result.Success) + const auto It = std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), PayloadId); + + if (It == std::end(CacheRecord.PayloadIds)) { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; - break; + OutReason = "payload '{}' MISSING from local cache"_format(PayloadId); + return false; } - } - if (!Success) - { - return {.Reason = "Failed to upload payload", - .Bytes = TotalBytes, - .ElapsedSeconds = TotalElapsedSeconds, - .Success = false}; + const size_t Idx = std::distance(std::begin(CacheRecord.PayloadIds), It); + + CloudCacheResult BlobResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !BlobResult.Success; Attempt++) + { + BlobResult = Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); + } + + m_HealthOk = BlobResult.ErrorCode == 0; + + if (!BlobResult.Success) + { + OutReason = "upload payload '{}' FAILED, reason '{}'"_format(PayloadId, BlobResult.Reason); + return false; + } + + TotalBytes += BlobResult.Bytes; + TotalElapsedSeconds += BlobResult.ElapsedSeconds; } + + return true; + }; + + PutRefResult RefResult; + for (int32_t Attempt = 0; Attempt < MaxAttempts && !RefResult.Success; Attempt++) + { + RefResult = + Session.PutRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RecordValue, ZenContentType::kCbObject); } - Success = false; - for (int32_t Attempt = 0; Attempt < MaxAttempts; Attempt++) + m_HealthOk = RefResult.ErrorCode == 0; + + if (!RefResult.Success) { - if (PutRefResult Result = Session.PutRef(CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - RecordValue, - ZenContentType::kCbObject); - Result.Success) - { - TotalBytes += Result.Bytes; - TotalElapsedSeconds += Result.ElapsedSeconds; - Success = true; + return {.Reason = "upload cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + RefResult.Reason), + .Success = false}; + } - if (!Result.Needs.empty()) - { - for (const IoHash& NeededHash : Result.Needs) - { - Success = false; + TotalBytes += RefResult.Bytes; + TotalElapsedSeconds += RefResult.ElapsedSeconds; - if (auto It = - std::find(std::begin(CacheRecord.PayloadIds), std::end(CacheRecord.PayloadIds), NeededHash); - It != std::end(CacheRecord.PayloadIds)) - { - const size_t Idx = It - std::begin(CacheRecord.PayloadIds); - - if (CloudCacheResult BlobResult = - Session.PutCompressedBlob(CacheRecord.PayloadIds[Idx], Payloads[Idx]); - BlobResult.Success) - { - TotalBytes += BlobResult.Bytes; - TotalElapsedSeconds += BlobResult.ElapsedSeconds; - Success = true; - } - else - { - ZEN_WARN("upload missing payload '{}/{}/{}' FAILED", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } - else - { - ZEN_WARN("needed payload '{}/{}/{}' MISSING", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - NeededHash); - } - } + std::string Reason; + if (!PutBlobs(RefResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - const IoHash RefHash = IoHash::HashBuffer(RecordValue); + const IoHash RefHash = IoHash::HashBuffer(RecordValue); + FinalizeRefResult FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; - if (FinalizeRefResult FinalizeResult = - Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); - FinalizeResult.Success) - { - TotalBytes += FinalizeResult.Bytes; - TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; - Success = true; + if (!FinalizeResult.Success) + { + return {.Reason = "finalize cache record '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } - for (const IoHash& MissingHash : FinalizeResult.Needs) - { - ZEN_WARN("finalize '{}/{}' FAILED, missing '{}'", - CacheRecord.CacheKey.Bucket, - CacheRecord.CacheKey.Hash, - MissingHash); - } - } - else - { - ZEN_WARN("finalize '{}/{}' FAILED", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash); - Success = false; - } - } + if (!FinalizeResult.Needs.empty()) + { + if (!PutBlobs(FinalizeResult.Needs, Reason)) + { + return {.Reason = std::move(Reason), .Success = false}; + } - if (Success) + FinalizeResult = Session.FinalizeRef(CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, RefHash); + m_HealthOk = FinalizeResult.ErrorCode == 0; + + if (!FinalizeResult.Success) + { + return {.Reason = "finalize '{}/{}' FAILED, reason '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + FinalizeResult.Reason), + .Success = false}; + } + + if (!FinalizeResult.Needs.empty()) + { + ExtendableStringBuilder<256> Sb; + for (const IoHash& MissingHash : FinalizeResult.Needs) { - break; + Sb << MissingHash.ToHexString() << ","; } + + return {.Reason = "finalize '{}/{}' FAILED, still needs payload(s) '{}'"_format(CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Sb.ToString()), + .Success = false}; } } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Success}; + TotalBytes += FinalizeResult.Bytes; + TotalElapsedSeconds += FinalizeResult.ElapsedSeconds; + + return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = true}; } } catch (std::exception& Err) { + m_HealthOk = false; return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -326,9 +419,22 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + static void AppendResult(const CloudCacheResult& Result, GetUpstreamCacheResult& Out) + { + Out.Success &= Result.Success; + Out.Bytes += Result.Bytes; + Out.ElapsedSeconds += Result.ElapsedSeconds; + + if (Result.ErrorCode) + { + Out.Error = {.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}; + } + }; + spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; + UpstreamEndpointInfo m_Info; bool m_UseLegacyDdc; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; @@ -349,9 +455,13 @@ namespace detail { }; public: - ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") + ZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) + : m_Log(zen::logging::Get("upstream")) + , m_Info({.Name = std::string("Zen")}) + , m_ConnectTimeout(Options.ConnectTimeout) + , m_Timeout(Options.Timeout) { - for (const auto& Url : Urls) + for (const auto& Url : Options.Urls) { m_Endpoints.push_back({.Url = Url}); } @@ -359,6 +469,8 @@ namespace detail { ~ZenUpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const { return m_Info; } + virtual UpstreamEndpointHealth Initialize() override { using namespace fmt::literals; @@ -366,9 +478,8 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Info.Url = Ep.Url; + m_Client = new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -391,9 +502,9 @@ namespace detail { const ZenEndpoint& Ep = GetEndpoint(); if (Ep.Ok) { - m_ServiceUrl = Ep.Url; - m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); - m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + m_Info.Url = Ep.Url; + m_Client = + new ZenStructuredCacheClient({.Url = m_Info.Url, .ConnectTimeout = m_ConnectTimeout, .Timeout = m_Timeout}); m_HealthOk = true; return {.Ok = true}; @@ -420,9 +531,7 @@ namespace detail { } } - virtual std::string_view DisplayName() const override { return m_DisplayName; } - - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { try { @@ -449,13 +558,80 @@ namespace detail { } } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override + { + std::vector<size_t> IndexMap; + IndexMap.reserve(KeyIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCacheRecords"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("CacheKeys"sv); + for (size_t Index : KeyIndex) + { + const CacheKey& Key = CacheKeys[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + BatchRequest << "Bucket"sv << Key.Bucket; + BatchRequest << "Hash"sv << Key.Hash; + BatchRequest.EndObject(); + } + BatchRequest.EndArray(); + + BatchRequest.BeginObject("Policy"sv); + CacheRecordPolicy::Save(Policy, BatchRequest); + BatchRequest.EndObject(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView Record : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + OnComplete( + {.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = Record.AsObjectView(), .Package = BatchResponse}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : KeyIndex) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { try { ZenStructuredCacheSession Session(*m_Client); - const ZenCacheResult Result = - Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); + const ZenCacheResult Result = Session.GetCachePayload(CacheKey.Bucket, CacheKey.Hash, PayloadId); if (Result.ErrorCode == 0) { @@ -477,12 +653,96 @@ namespace detail { } } + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> IndexMap; + IndexMap.reserve(RequestIndex.size()); + + CbObjectWriter BatchRequest; + BatchRequest << "Method"sv + << "GetCachePayloads"; + + BatchRequest.BeginObject("Params"sv); + { + BatchRequest.BeginArray("ChunkRequests"sv); + { + for (size_t Index : RequestIndex) + { + const CacheChunkRequest& Request = CacheChunkRequests[Index]; + IndexMap.push_back(Index); + + BatchRequest.BeginObject(); + { + BatchRequest.BeginObject("Key"sv); + BatchRequest << "Bucket"sv << Request.Key.Bucket; + BatchRequest << "Hash"sv << Request.Key.Hash; + BatchRequest.EndObject(); + + BatchRequest.AddObjectId("PayloadId"sv, Request.PayloadId); + BatchRequest << "ChunkId"sv << Request.ChunkId; + BatchRequest << "RawOffset"sv << Request.RawOffset; + BatchRequest << "RawSize"sv << Request.RawSize; + BatchRequest << "Policy"sv << static_cast<uint32_t>(Request.Policy); + } + BatchRequest.EndObject(); + } + } + BatchRequest.EndArray(); + } + BatchRequest.EndObject(); + + CbPackage BatchResponse; + ZenCacheResult Result; + + { + ZenStructuredCacheSession Session(*m_Client); + Result = Session.InvokeRpc(BatchRequest.Save()); + } + + if (Result.Success) + { + if (BatchResponse.TryLoad(Result.Response)) + { + for (size_t LocalIndex = 0; CbFieldView AttachmentHash : BatchResponse.GetObject()["Result"sv]) + { + const size_t Index = IndexMap[LocalIndex++]; + IoBuffer Payload; + + if (const CbAttachment* Attachment = BatchResponse.FindAttachment(AttachmentHash.AsHash())) + { + if (const CompressedBuffer& Compressed = Attachment->AsCompressedBinary()) + { + Payload = Compressed.GetCompressed().Flatten().AsIoBuffer(); + } + } + + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = std::move(Payload)}); + } + + return {.Bytes = Result.Bytes, .ElapsedSeconds = Result.ElapsedSeconds, .Success = true}; + } + } + else if (Result.ErrorCode) + { + m_HealthOk = false; + } + + for (size_t Index : RequestIndex) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } + virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, std::span<IoBuffer const> Payloads) override { ZEN_ASSERT(CacheRecord.PayloadIds.size() == Payloads.size()); - const uint32_t MaxAttempts = 3; + const int32_t MaxAttempts = 3; try { @@ -565,12 +825,15 @@ namespace detail { TotalElapsedSeconds += Result.ElapsedSeconds; } - return {.Bytes = TotalBytes, .ElapsedSeconds = TotalElapsedSeconds, .Success = Result.Success}; + return {.Reason = std::move(Result.Reason), + .Bytes = TotalBytes, + .ElapsedSeconds = TotalElapsedSeconds, + .Success = Result.Success}; } - catch (std::exception& e) + catch (std::exception& Err) { m_HealthOk = false; - return {.Reason = std::string(e.what()), .Success = false}; + return {.Reason = std::string(Err.what()), .Success = false}; } } @@ -581,7 +844,7 @@ namespace detail { { for (ZenEndpoint& Ep : m_Endpoints) { - ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheClient Client({.Url = Ep.Url, .ConnectTimeout = std::chrono::milliseconds(1000)}); ZenStructuredCacheSession Session(Client); const int32_t SampleCount = 2; @@ -602,7 +865,7 @@ namespace detail { for (const auto& Ep : m_Endpoints) { - ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); + ZEN_INFO("ping 'Zen' endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); } return m_Endpoints.front(); @@ -611,9 +874,10 @@ namespace detail { spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - std::string m_ServiceUrl; + UpstreamEndpointInfo m_Info; std::vector<ZenEndpoint> m_Endpoints; - std::string m_DisplayName; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RefPtr<ZenStructuredCacheClient> m_Client; UpstreamEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; @@ -700,7 +964,7 @@ struct UpstreamStats const double HitRate = TotalCount > 0 ? (double(HitCount) / double(TotalCount)) : 0.0; Logger.debug("STATS - '{}', Hit rate: {:.2f}%, DOWN: '{:.2f} MiB {:.2f} MiB/s', UP: '{:.2f} MiB {:.2f} MiB/s'", - Ep->DisplayName(), + Ep->GetEndpointInfo().Name, HitRate, DownBytes, DownSpeed, @@ -734,13 +998,15 @@ public: for (auto& Endpoint : m_Endpoints) { const UpstreamEndpointHealth Health = Endpoint->Initialize(); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + if (Health.Ok) { - ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); + ZEN_INFO("'{}' endpoint '{}' OK", Info.Name, Info.Url); } else { - ZEN_WARN("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + ZEN_WARN("'{}' endpoint '{}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); } } @@ -761,7 +1027,7 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) override { m_Endpoints.emplace_back(std::move(Endpoint)); } - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) override { if (m_Options.ReadUpstream) { @@ -776,6 +1042,14 @@ public: { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache record FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -783,7 +1057,99 @@ public: return {}; } - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingKeys(KeyIndex.begin(), KeyIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingKeys.empty()) + { + std::vector<size_t> Missing; + + auto Result = Endpoint->GetCacheRecords(CacheKeys, MissingKeys, Policy, [&](CacheRecordGetCompleteParams&& Params) { + if (Params.Record) + { + OnComplete(std::forward<CacheRecordGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.KeyIndex); + } + }); + + if (Result.Error) + { + ZEN_ERROR("get cache record(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingKeys = std::move(Missing); + } + } + } + + for (size_t Index : MissingKeys) + { + OnComplete({.CacheKey = CacheKeys[Index], .KeyIndex = Index, .Record = CbObjectView(), .Package = CbPackage()}); + } + } + + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) override final + { + std::vector<size_t> MissingPayloads(RequestIndex.begin(), RequestIndex.end()); + + if (m_Options.ReadUpstream) + { + for (auto& Endpoint : m_Endpoints) + { + if (Endpoint->IsHealthy() && !MissingPayloads.empty()) + { + std::vector<size_t> Missing; + + auto Result = + Endpoint->GetCachePayloads(CacheChunkRequests, MissingPayloads, [&](CachePayloadGetCompleteParams&& Params) { + if (Params.Payload) + { + OnComplete(std::forward<CachePayloadGetCompleteParams>(Params)); + } + else + { + Missing.push_back(Params.RequestIndex); + } + }); + + if (Result.Error) + { + ZEN_ERROR("get cache payloads(s) (rpc) FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } + + m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + MissingPayloads = std::move(Missing); + } + } + } + + for (size_t Index : MissingPayloads) + { + OnComplete({.Request = CacheChunkRequests[Index], .RequestIndex = Index, .Payload = IoBuffer()}); + } + } + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) override { if (m_Options.ReadUpstream) { @@ -791,13 +1157,21 @@ public: { if (Endpoint->IsHealthy()) { - const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); + const GetUpstreamCacheResult Result = Endpoint->GetCachePayload(CacheKey, PayloadId); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); if (Result.Success) { return Result; } + + if (Result.Error) + { + ZEN_ERROR("get cache payload FAILED, endpoint '{}', reason '{}', error code '{}'", + Endpoint->GetEndpointInfo().Url, + Result.Error.Reason, + Result.Error.ErrorCode); + } } } } @@ -834,8 +1208,10 @@ public: Status.BeginArray("endpoints"); for (const auto& Ep : m_Endpoints) { + const UpstreamEndpointInfo& Info = Ep->GetEndpointInfo(); Status.BeginObject(); - Status << "name" << Ep->DisplayName(); + Status << "name" << Info.Name; + Status << "url" << Info.Url; Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv); UpstreamEndpointStats& Stats = Ep->Stats(); @@ -890,6 +1266,15 @@ private: { const PutUpstreamCacheResult Result = Endpoint->PutCacheRecord(CacheRecord, CacheValue.Value, std::span(Payloads)); m_Stats.Add(m_Log, *Endpoint, Result, m_Endpoints); + + if (!Result.Success) + { + ZEN_WARN("upload cache record '{}/{}' FAILED, endpoint '{}', reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Endpoint->GetEndpointInfo().Url, + Result.Reason); + } } } } @@ -905,9 +1290,12 @@ private: { ProcessCacheRecord(std::move(CacheRecord)); } - catch (std::exception& e) + catch (std::exception& Err) { - ZEN_WARN("process upstream ({}/{}) FAILED '{}'", CacheRecord.CacheKey.Bucket, CacheRecord.CacheKey.Hash, e.what()); + ZEN_ERROR("upload cache record '{}/{}' FAILED, reason '{}'", + CacheRecord.CacheKey.Bucket, + CacheRecord.CacheKey.Hash, + Err.what()); } } @@ -930,20 +1318,28 @@ private: } } - for (auto& Endpoint : m_Endpoints) + try { - if (!Endpoint->IsHealthy()) + for (auto& Endpoint : m_Endpoints) { - if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) - { - ZEN_INFO("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason); - } - else + if (!Endpoint->IsHealthy()) { - ZEN_WARN("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason); + const UpstreamEndpointInfo& Info = Endpoint->GetEndpointInfo(); + if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok) + { + ZEN_INFO("health check endpoint '{} - {}' OK", Info.Name, Info.Url, Health.Reason); + } + else + { + ZEN_WARN("health check endpoint '{} - {}' FAILED, reason '{}'", Info.Name, Info.Url, Health.Reason); + } } } } + catch (std::exception& Err) + { + ZEN_ERROR("check endpoint(s) health FAILED, reason '{}'", Err.what()); + } } } @@ -1015,9 +1411,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr<UpstreamEndpoint> -MakeZenUpstreamEndpoint(std::span<std::string const> Urls) +MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options) { - return std::make_unique<detail::ZenUpstreamEndpoint>(Urls); + return std::make_unique<detail::ZenUpstreamEndpoint>(Options); } } // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index edc995da6..12287198d 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -5,34 +5,27 @@ #include <zencore/iobuffer.h> #include <zencore/iohash.h> #include <zencore/zencore.h> +#include <zenutil/cache/cache.h> #include <atomic> #include <chrono> +#include <functional> #include <memory> namespace zen { +class CbObjectView; +class CbPackage; class CbObjectWriter; class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; - -struct UpstreamCacheKey -{ - std::string Bucket; - IoHash Hash; -}; - -struct UpstreamPayloadKey -{ - UpstreamCacheKey CacheKey; - IoHash PayloadId; -}; +struct ZenStructuredCacheClientOptions; struct UpstreamCacheRecord { ZenContentType Type = ZenContentType::kBinary; - UpstreamCacheKey CacheKey; + CacheKey CacheKey; std::vector<IoHash> PayloadIds; }; @@ -88,6 +81,31 @@ struct UpstreamEndpointStats std::atomic<double> SecondsDown{}; }; +struct CacheRecordGetCompleteParams +{ + const CacheKey& CacheKey; + size_t KeyIndex = ~size_t(0); + const CbObjectView& Record; + const CbPackage& Package; +}; + +using OnCacheRecordGetComplete = std::function<void(CacheRecordGetCompleteParams&&)>; + +struct CachePayloadGetCompleteParams +{ + const CacheChunkRequest& Request; + size_t RequestIndex{~size_t(0)}; + IoBuffer Payload; +}; + +using OnCachePayloadGetComplete = std::function<void(CachePayloadGetCompleteParams&&)>; + +struct UpstreamEndpointInfo +{ + std::string Name; + std::string Url; +}; + /** * The upstream endpont is responsible for handling upload/downloading of cache records. */ @@ -96,17 +114,26 @@ class UpstreamEndpoint public: virtual ~UpstreamEndpoint() = default; + virtual const UpstreamEndpointInfo& GetEndpointInfo() const = 0; + virtual UpstreamEndpointHealth Initialize() = 0; virtual bool IsHealthy() const = 0; virtual UpstreamEndpointHealth CheckHealth() = 0; - virtual std::string_view DisplayName() const = 0; + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; + + virtual GetUpstreamCacheResult GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& Policy, + OnCacheRecordGetComplete&& OnComplete) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual GetUpstreamCacheResult GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) = 0; virtual PutUpstreamCacheResult PutCacheRecord(const UpstreamCacheRecord& CacheRecord, IoBuffer RecordValue, @@ -127,9 +154,18 @@ public: virtual void RegisterEndpoint(std::unique_ptr<UpstreamEndpoint> Endpoint) = 0; - virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) = 0; + virtual GetUpstreamCacheResult GetCacheRecord(CacheKey CacheKey, ZenContentType Type) = 0; + + virtual void GetCacheRecords(std::span<CacheKey> CacheKeys, + std::span<size_t> KeyIndex, + const CacheRecordPolicy& RecordPolicy, + OnCacheRecordGetComplete&& OnComplete) = 0; + + virtual GetUpstreamCacheResult GetCachePayload(const CacheKey& CacheKey, const IoHash& PayloadId) = 0; - virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) = 0; + virtual void GetCachePayloads(std::span<CacheChunkRequest> CacheChunkRequests, + std::span<size_t> RequestIndex, + OnCachePayloadGetComplete&& OnComplete) = 0; struct EnqueueResult { @@ -145,6 +181,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); -std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls); +std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(const ZenStructuredCacheClientOptions& Options); } // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 14333f45a..cd7f48334 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -311,8 +311,17 @@ namespace detail { ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {} ~ZenCacheSessionState() {} - void Reset() {} + void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout) + { + Session.SetBody({}); + Session.SetHeader({}); + Session.SetConnectTimeout(ConnectTimeout); + Session.SetTimeout(Timeout); + } + + cpr::Session& GetSession() { return Session; } + private: ZenStructuredCacheClient& OwnerClient; cpr::Session Session; }; @@ -321,9 +330,11 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) +ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options) : m_Log(logging::Get(std::string_view("zenclient"))) -, m_ServiceUrl(ServiceUrl) +, m_ServiceUrl(Options.Url) +, m_ConnectTimeout(Options.ConnectTimeout) +, m_Timeout(Options.Timeout) { } @@ -347,7 +358,7 @@ ZenStructuredCacheClient::AllocSessionState() State = new detail::ZenCacheSessionState(*this); } - State->Reset(); + State->Reset(m_ConnectTimeout, m_Timeout); return State; } @@ -381,7 +392,7 @@ ZenStructuredCacheSession::CheckHealth() ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/health/check"; - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); cpr::Response Response = Session.Get(); @@ -399,7 +410,7 @@ ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHas ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", @@ -427,7 +438,7 @@ ZenStructuredCacheSession::GetCachePayload(std::string_view BucketId, const IoHa ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); @@ -452,7 +463,7 @@ ZenStructuredCacheSession::PutCacheRecord(std::string_view BucketId, const IoHas ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", @@ -480,7 +491,7 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa ExtendableStringBuilder<256> Uri; Uri << m_Client.ServiceUrl() << "/z$/" << BucketId << "/" << Key.ToHexString() << "/" << PayloadId.ToHexString(); - cpr::Session& Session = m_SessionState->Session; + cpr::Session& Session = m_SessionState->GetSession(); Session.SetOption(cpr::Url{Uri.c_str()}); Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}}); @@ -499,4 +510,33 @@ ZenStructuredCacheSession::PutCachePayload(std::string_view BucketId, const IoHa .Success = (Response.status_code == 200 || Response.status_code == 201)}; } +ZenCacheResult +ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request) +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/z$/$rpc"; + + BinaryWriter Body; + Request.CopyTo(Body); + + cpr::Session& Session = m_SessionState->GetSession(); + + Session.SetOption(cpr::Url{Uri.c_str()}); + Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}}); + Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()}); + + cpr::Response Response = Session.Post(); + ZEN_DEBUG("POST {}", Response); + + if (Response.error) + { + return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)}; + } + + const bool Success = Response.status_code == 200; + const IoBuffer Buffer = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer(); + + return {.Response = std::move(Buffer), .Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success}; +} + } // namespace zen diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 7f55294ce..df975df1f 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -26,6 +26,8 @@ class logger; namespace zen { class CbObjectWriter; +class CbObjectView; +class CbPackage; class ZenStructuredCacheClient; /** Zen mesh tracker @@ -97,6 +99,14 @@ struct ZenCacheResult bool Success = false; }; +struct ZenStructuredCacheClientOptions +{ + std::string_view Url; + std::span<std::string const> Urls; + std::chrono::milliseconds ConnectTimeout{}; + std::chrono::milliseconds Timeout{}; +}; + /** Zen Structured Cache session * * This provides a context in which cache queries can be performed @@ -114,6 +124,7 @@ public: ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); ZenCacheResult PutCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId, IoBuffer Payload); + ZenCacheResult InvokeRpc(const CbObjectView& Request); private: inline spdlog::logger& Log() { return m_Log; } @@ -131,7 +142,7 @@ private: class ZenStructuredCacheClient : public RefCounted { public: - ZenStructuredCacheClient(std::string_view ServiceUrl); + ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options); ~ZenStructuredCacheClient(); std::string_view ServiceUrl() const { return m_ServiceUrl; } @@ -139,8 +150,10 @@ public: inline spdlog::logger& Log() { return m_Log; } private: - spdlog::logger& m_Log; - std::string m_ServiceUrl; + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::chrono::milliseconds m_ConnectTimeout; + std::chrono::milliseconds m_Timeout; RwLock m_SessionStateLock; std::list<detail::ZenCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index f26ab723c..bb57b4d0a 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -691,7 +691,10 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) if (!ZenUrls.empty()) { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(ZenUrls); + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = + zen::MakeZenUpstreamEndpoint({.Urls = ZenUrls, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}); UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); } } @@ -701,23 +704,29 @@ ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) zen::CloudCacheClientOptions Options; if (UpstreamConfig.JupiterConfig.UseProductionSettings) { - Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, - .DdcNamespace = "ue.ddc"sv, - .BlobStoreNamespace = "ue.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; + Options = + zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue.ddc"sv, + .BlobStoreNamespace = "ue.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), + .UseLegacyDdc = false}; } else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) { - Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, - .DdcNamespace = "ue4.ddc"sv, - .BlobStoreNamespace = "test.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; + Options = + zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), + .UseLegacyDdc = false}; } Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index 13589ee3b..38b51e2f0 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -126,6 +126,7 @@ <ClInclude Include="diag\diagsvcs.h" /> <ClInclude Include="experimental\usnjournal.h" /> <ClInclude Include="targetver.h" /> + <ClInclude Include="upstream\upstreamapply.h" /> <ClInclude Include="upstream\upstreamcache.h" /> <ClInclude Include="upstream\zen.h" /> <ClInclude Include="windows\service.h" /> @@ -149,6 +150,7 @@ <ClCompile Include="testing\launch.cpp" /> <ClCompile Include="casstore.cpp" /> <ClCompile Include="experimental\usnjournal.cpp" /> + <ClCompile Include="upstream\upstreamapply.cpp" /> <ClCompile Include="upstream\upstreamcache.cpp" /> <ClCompile Include="upstream\zen.cpp" /> <ClCompile Include="windows\service.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 87591ef56..97a43c901 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -41,6 +41,9 @@ <ClInclude Include="experimental\vfs.h" /> <ClInclude Include="monitoring\httpstats.h" /> <ClInclude Include="monitoring\httpstatus.h" /> + <ClInclude Include="upstream\upstreamapply.h"> + <Filter>upstream</Filter> + </ClInclude> <ClInclude Include="cache\cachetracking.h" /> </ItemGroup> <ItemGroup> @@ -77,6 +80,9 @@ <ClCompile Include="experimental\vfs.cpp" /> <ClCompile Include="monitoring\httpstats.cpp" /> <ClCompile Include="monitoring\httpstatus.cpp" /> + <ClCompile Include="upstream\upstreamapply.cpp"> + <Filter>upstream</Filter> + </ClCompile> <ClCompile Include="cache\cachetracking.cpp" /> </ItemGroup> <ItemGroup> diff --git a/zenutil/cache/cachekey.cpp b/zenutil/cache/cachekey.cpp new file mode 100644 index 000000000..545b47f11 --- /dev/null +++ b/zenutil/cache/cachekey.cpp @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/cache/cachekey.h> + +namespace zen { + +const CacheKey CacheKey::Empty = CacheKey{.Bucket = std::string(), .Hash = IoHash()}; + +} // namespace zen diff --git a/zenutil/cache/cachepolicy.cpp b/zenutil/cache/cachepolicy.cpp new file mode 100644 index 000000000..f718bf841 --- /dev/null +++ b/zenutil/cache/cachepolicy.cpp @@ -0,0 +1,167 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/cache/cachepolicy.h> + +#include <zencore/compactbinary.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/string.h> + +namespace zen { + +using namespace std::literals; + +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +CachePolicy +ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default) +{ + if (QueryPolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(QueryPolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Local) + { + Result |= CachePolicy::QueryLocal; + } + if (Token == detail::cacheopt::Remote) + { + Result |= CachePolicy::QueryRemote; + } + return true; + }); + + return Result; +} + +CachePolicy +ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default) +{ + if (StorePolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(StorePolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Local) + { + Result |= CachePolicy::StoreLocal; + } + if (Token == detail::cacheopt::Remote) + { + Result |= CachePolicy::StoreRemote; + } + return true; + }); + + return Result; +} + +CachePolicy +ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default) +{ + if (SkipPolicy.empty()) + { + return Default; + } + + CachePolicy Result = CachePolicy::None; + + ForEachStrTok(SkipPolicy, ',', [&Result](const std::string_view& Token) { + if (Token == detail::cacheopt::Meta) + { + Result |= CachePolicy::SkipMeta; + } + if (Token == detail::cacheopt::Value) + { + Result |= CachePolicy::SkipValue; + } + if (Token == detail::cacheopt::Attachments) + { + Result |= CachePolicy::SkipAttachments; + } + if (Token == detail::cacheopt::Data) + { + Result |= CachePolicy::SkipData; + } + return true; + }); + + return Result; +} + +CacheRecordPolicy::CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy PayloadPolicy) +: m_RecordPolicy(RecordPolicy) +, m_DefaultPayloadPolicy(PayloadPolicy) +{ +} + +CachePolicy +CacheRecordPolicy::GetPayloadPolicy(const Oid& PayloadId) const +{ + if (const auto It = m_PayloadPolicies.find(PayloadId); It != m_PayloadPolicies.end()) + { + return It->second; + } + + return m_DefaultPayloadPolicy; +} + +bool +CacheRecordPolicy::Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy) +{ + using namespace std::literals; + + const uint32_t RecordPolicy = RecordPolicyObject["RecordPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default)); + const uint32_t DefaultPayloadPolicy = + RecordPolicyObject["DefaultPayloadPolicy"sv].AsUInt32(static_cast<uint32_t>(CachePolicy::Default)); + + OutRecordPolicy.m_RecordPolicy = static_cast<CachePolicy>(RecordPolicy); + OutRecordPolicy.m_DefaultPayloadPolicy = static_cast<CachePolicy>(DefaultPayloadPolicy); + + for (CbFieldView PayloadPolicyView : RecordPolicyObject["PayloadPolicies"sv]) + { + CbObjectView PayloadPolicyObject = PayloadPolicyView.AsObjectView(); + const Oid PayloadId = PayloadPolicyObject["Id"sv].AsObjectId(); + const uint32_t PayloadPolicy = PayloadPolicyObject["Policy"sv].AsUInt32(); + + if (PayloadId != Oid::Zero && PayloadPolicy != 0) + { + OutRecordPolicy.m_PayloadPolicies.emplace(PayloadId, static_cast<CachePolicy>(PayloadPolicy)); + } + } + + return true; +} + +void +CacheRecordPolicy::Save(const CacheRecordPolicy& Policy, CbWriter& Writer) +{ + Writer << "RecordPolicy"sv << static_cast<uint32_t>(Policy.GetRecordPolicy()); + Writer << "DefaultPayloadPolicy"sv << static_cast<uint32_t>(Policy.GetDefaultPayloadPolicy()); + + if (!Policy.m_PayloadPolicies.empty()) + { + Writer.BeginArray("PayloadPolicies"sv); + for (const auto& Kv : Policy.m_PayloadPolicies) + { + Writer.AddObjectId("Id"sv, Kv.first); + Writer << "Policy"sv << static_cast<uint32_t>(Kv.second); + } + Writer.EndArray(); + } +} + +} // namespace zen diff --git a/zenutil/include/zenutil/cache/cache.h b/zenutil/include/zenutil/cache/cache.h new file mode 100644 index 000000000..1a1dd9386 --- /dev/null +++ b/zenutil/include/zenutil/cache/cache.h @@ -0,0 +1,6 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenutil/cache/cachekey.h> +#include <zenutil/cache/cachepolicy.h> diff --git a/zenutil/include/zenutil/cache/cachekey.h b/zenutil/include/zenutil/cache/cachekey.h new file mode 100644 index 000000000..fb36c7759 --- /dev/null +++ b/zenutil/include/zenutil/cache/cachekey.h @@ -0,0 +1,83 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/string.h> +#include <zencore/uid.h> + +#include <zenutil/cache/cachepolicy.h> + +namespace zen { + +struct CacheKey +{ + std::string Bucket; + IoHash Hash; + + static CacheKey Create(std::string_view Bucket, const IoHash& Hash) { return {.Bucket = ToLower(Bucket), .Hash = Hash}; } + + static const CacheKey Empty; +}; + +inline bool +operator==(const CacheKey& A, const CacheKey& B) +{ + return A.Bucket == B.Bucket && A.Hash == B.Hash; +} + +inline bool +operator!=(const CacheKey& A, const CacheKey& B) +{ + return A.Bucket != B.Bucket || A.Hash != B.Hash; +} + +inline bool +operator<(const CacheKey& A, const CacheKey& B) +{ + const std::string& BucketA = A.Bucket; + const std::string& BucketB = B.Bucket; + return BucketA == BucketB ? A.Hash < B.Hash : BucketA < BucketB; +} + +struct CacheChunkRequest +{ + CacheKey Key; + IoHash ChunkId; + Oid PayloadId; + uint64_t RawOffset = 0ull; + uint64_t RawSize = ~uint64_t(0); + CachePolicy Policy = CachePolicy::Default; +}; + +inline bool +operator<(const CacheChunkRequest& A, const CacheChunkRequest& B) +{ + if (A.Key < B.Key) + { + return true; + } + if (B.Key < A.Key) + { + return false; + } + if (A.ChunkId < B.ChunkId) + { + return true; + } + if (B.ChunkId < A.ChunkId) + { + return false; + } + if (A.PayloadId < B.PayloadId) + { + return true; + } + if (B.PayloadId < A.PayloadId) + { + return false; + } + return A.RawOffset < B.RawOffset; +} + +} // namespace zen diff --git a/zenutil/include/zenutil/cache/cachepolicy.h b/zenutil/include/zenutil/cache/cachepolicy.h new file mode 100644 index 000000000..5675ccf4d --- /dev/null +++ b/zenutil/include/zenutil/cache/cachepolicy.h @@ -0,0 +1,112 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/string.h> +#include <zencore/uid.h> + +#include <gsl/gsl-lite.hpp> +#include <unordered_map> + +namespace zen { + +class CbObjectView; +class CbWriter; + +enum class CachePolicy : uint32_t +{ + /** A value without any flags set. */ + None = 0, + + /** Allow a cache request to query local caches. */ + QueryLocal = 1 << 0, + /** Allow a cache request to query remote caches. */ + QueryRemote = 1 << 1, + /** Allow a cache request to query any caches. */ + Query = QueryLocal | QueryRemote, + + /** Allow cache records and values to be stored in local caches. */ + StoreLocal = 1 << 2, + /** Allow cache records and values to be stored in remote caches. */ + StoreRemote = 1 << 3, + /** Allow cache records and values to be stored in any caches. */ + Store = StoreLocal | StoreRemote, + + /** Skip fetching the metadata for record requests. */ + SkipMeta = 1 << 4, + /** Skip fetching the value for record, chunk, or value requests. */ + SkipValue = 1 << 5, + /** Skip fetching the attachments for record requests. */ + SkipAttachments = 1 << 6, + /** + * Skip fetching the data for any requests. + * + * Put requests with skip flags may assume that record existence implies payload existence. + */ + SkipData = SkipMeta | SkipValue | SkipAttachments, + + /** + * Keep records in the cache for at least the duration of the session. + * + * This is a hint that the record may be accessed again in this session. This is mainly meant + * to be used when subsequent accesses will not tolerate a cache miss. + */ + KeepAlive = 1 << 7, + + /** + * Partial output will be provided with the error status when a required payload is missing. + * + * This is meant for cases when the missing payloads can be individually recovered or rebuilt + * without rebuilding the whole record. The cache automatically adds this flag when there are + * other cache stores that it may be able to recover missing payloads from. + * + * Requests for records would return records where the missing payloads have a hash and size, + * but no data. Requests for chunks or values would return the hash and size, but no data. + */ + PartialOnError = 1 << 8, + + /** Allow cache requests to query and store records and values in local caches. */ + Local = QueryLocal | StoreLocal, + /** Allow cache requests to query and store records and values in remote caches. */ + Remote = QueryRemote | StoreRemote, + + /** Allow cache requests to query and store records and values in any caches. */ + Default = Query | Store, + + /** Do not allow cache requests to query or store records and values in any caches. */ + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy ParseQueryCachePolicy(std::string_view QueryPolicy, CachePolicy Default = CachePolicy::Query); + +CachePolicy ParseStoreCachePolicy(std::string_view StorePolicy, CachePolicy Default = CachePolicy::Store); + +CachePolicy ParseSkipCachePolicy(std::string_view SkipPolicy, CachePolicy Default = CachePolicy::None); + +class CacheRecordPolicy +{ +public: + CacheRecordPolicy() = default; + CacheRecordPolicy(const CachePolicy RecordPolicy, const CachePolicy DefaultPayloadPolicy = CachePolicy::Default); + + CachePolicy GetRecordPolicy() const { return m_RecordPolicy; } + CachePolicy GetPayloadPolicy(const Oid& PayloadId) const; + CachePolicy GetDefaultPayloadPolicy() const { return m_DefaultPayloadPolicy; } + + bool HasRecordPolicy(const CachePolicy Policy) const { return (m_RecordPolicy & Policy) == Policy; } + bool HasPayloadPolicy(const Oid& PayloadId, const CachePolicy Policy) const { return (GetPayloadPolicy(PayloadId) & Policy) == Policy; } + + static bool Load(CbObjectView RecordPolicyObject, CacheRecordPolicy& OutRecordPolicy); + static void Save(const CacheRecordPolicy& Policy, CbWriter& Writer); + +private: + using PayloadPolicyMap = std::unordered_map<Oid, CachePolicy, Oid::Hasher>; + + CachePolicy m_RecordPolicy = CachePolicy::Default; + CachePolicy m_DefaultPayloadPolicy = CachePolicy::Default; + PayloadPolicyMap m_PayloadPolicies; +}; + +} // namespace zen diff --git a/zenutil/zenutil.vcxproj b/zenutil/zenutil.vcxproj index 20f803e2a..e0c034c6f 100644 --- a/zenutil/zenutil.vcxproj +++ b/zenutil/zenutil.vcxproj @@ -95,9 +95,14 @@ </Link> </ItemDefinitionGroup> <ItemGroup> + <ClCompile Include="cache\cachekey.cpp" /> + <ClCompile Include="cache\cachepolicy.cpp" /> <ClCompile Include="zenserverprocess.cpp" /> </ItemGroup> <ItemGroup> + <ClInclude Include="include\zenutil\cache\cache.h" /> + <ClInclude Include="include\zenutil\cache\cachekey.h" /> + <ClInclude Include="include\zenutil\cache\cachepolicy.h" /> <ClInclude Include="include\zenutil\zenserverprocess.h" /> </ItemGroup> <ItemGroup> diff --git a/zenutil/zenutil.vcxproj.filters b/zenutil/zenutil.vcxproj.filters index 9952e7159..368a827c2 100644 --- a/zenutil/zenutil.vcxproj.filters +++ b/zenutil/zenutil.vcxproj.filters @@ -2,11 +2,31 @@ <Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <ItemGroup> <ClCompile Include="zenserverprocess.cpp" /> + <ClCompile Include="cache\cachekey.cpp"> + <Filter>cache</Filter> + </ClCompile> + <ClCompile Include="cache\cachepolicy.cpp"> + <Filter>cache</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenutil\zenserverprocess.h" /> + <ClInclude Include="include\zenutil\cache\cache.h"> + <Filter>cache</Filter> + </ClInclude> + <ClInclude Include="include\zenutil\cache\cachekey.h"> + <Filter>cache</Filter> + </ClInclude> + <ClInclude Include="include\zenutil\cache\cachepolicy.h"> + <Filter>cache</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> </ItemGroup> + <ItemGroup> + <Filter Include="cache"> + <UniqueIdentifier>{a441c536-6a01-4ac4-85a0-2667c95027d0}</UniqueIdentifier> + </Filter> + </ItemGroup> </Project>
\ No newline at end of file |