diff options
| author | Martin Ridgers <[email protected]> | 2021-10-07 08:29:50 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-10-07 08:29:50 +0200 |
| commit | 03232621d183f22e12e798a753e4a606763e63d6 (patch) | |
| tree | 5701d202392dd4ab947139e4046a44ab9bc6cdf7 | |
| parent | Merged main (diff) | |
| parent | Only enable the MSVC debug output sink for sessions when the --debug mode is ... (diff) | |
| download | zen-03232621d183f22e12e798a753e4a606763e63d6.tar.xz zen-03232621d183f22e12e798a753e4a606763e63d6.zip | |
Merged main
58 files changed, 2421 insertions, 677 deletions
diff --git a/scripts/deploybuild.py b/scripts/deploybuild.py index 971f34ff9..c81235a8c 100644 --- a/scripts/deploybuild.py +++ b/scripts/deploybuild.py @@ -17,6 +17,10 @@ def jazz_print(tag, detail = ""): def jazz_fail(tag, detail = ""): print(f"{Fore.RED}{Style.BRIGHT}||> {tag}{Style.RESET_ALL} {detail}") +def copy_file(src, dst): + print(f"{Fore.WHITE}{Style.BRIGHT}||> COPY {Style.RESET_ALL} {src} -> {Fore.GREEN}{Style.BRIGHT}{dst}") + shutil.copy(src, dst) + colorama.init() origcwd = os.getcwd() @@ -25,9 +29,13 @@ origcwd = os.getcwd() parser = argparse.ArgumentParser(description='Deploy a zen build to an UE tree') parser.add_argument("root", help="Path to an UE5 root directory") +parser.add_argument("--sentry", action="store_true", help="Whether to upload symobls to Sentry") +parser.add_argument("--xmake", action="store_true", help="Build with XMake") args = parser.parse_args() engineroot = args.root +upload_symbols = args.sentry +use_xmake = args.xmake if not os.path.isfile(os.path.join(engineroot, "RunUAT.bat")): print(f"{Fore.RED}Not a valid UE5 engine root directory: '{engineroot}'") @@ -45,20 +53,29 @@ jazz_print("Zen root:", zenroot) # Build fresh binaries -vs_path = vswhere.get_latest_path() # can also specify prerelease=True -jazz_print("BUILDING CODE", f"using VS root: {vs_path}") -devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") +if use_xmake: + build_cmd = ["xmake", "-b", "zenserver"] + build_output_dir = r'build\windows\x64\release' +else: + vs_path = vswhere.get_latest_path() # can also specify prerelease=True + jazz_print("BUILDING CODE", f"using VS root: {vs_path}") + devenv_path = os.path.join(vs_path, "Common7\\IDE\\devenv.com") + build_cmd = [devenv_path, "/build", "Release", "zen.sln"] + build_output_dir = r'x64\Release' try: - subprocess.run([devenv_path, "/build", "Release", "zen.sln"], check=True) + subprocess.run(build_cmd, check=True) except: jazz_fail("Build failed!") exit(1) -# Upload symbols etc to Sentry +build_output_binary_path = os.path.join(zenroot, build_output_dir, "zenserver.exe") +build_output_binary_pdb_path = os.path.join(zenroot, build_output_dir, "zenserver.pdb") -jazz_print("Uploading symbols", "to Sentry") -subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", "x64\\Release\\zenserver.exe", "x64\\Release\\zenserver.pdb"]) +# Upload symbols etc to Sentry +if upload_symbols: + jazz_print("Uploading symbols", "to Sentry") + subprocess.run(["scripts\sentry-cli.exe", "upload-dif", "--org", "to", "--project", "zen-server", build_output_binary_path, build_output_binary_pdb_path]) # Change into root directory to pick up Perforce environment @@ -105,9 +122,9 @@ jazz_print("Placing zenserver", f"executables into tree at '{target_bin_dir}'") crashpadtarget = os.path.join(target_bin_dir, "crashpad_handler.exe") try: - shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.exe"), os.path.join(target_bin_dir, "zenserver.exe")) - shutil.copy(os.path.join(zenroot, "x64\Release\zenserver.pdb"), os.path.join(target_bin_dir, "zenserver.pdb")) - shutil.copy(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget) + copy_file(build_output_binary_path, os.path.join(target_bin_dir, "zenserver.exe")) + copy_file(build_output_binary_pdb_path, os.path.join(target_bin_dir, "zenserver.pdb")) + copy_file(os.path.join(zenroot, r'vcpkg_installed\x64-windows-static\tools\sentry-native\crashpad_handler.exe'), crashpadtarget) P4.add(crashpadtarget).run() except Exception as e: print(f"Caught exception while copying: {e.args}") diff --git a/zen/cmds/print.cpp b/zen/cmds/print.cpp new file mode 100644 index 000000000..aac6afd44 --- /dev/null +++ b/zen/cmds/print.cpp @@ -0,0 +1,107 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "print.h" + +#include <zencore/compactbinarypackage.h> +#include <zencore/filesystem.h> +#include <zencore/logging.h> +#include <zencore/string.h> + +using namespace std::literals; + +namespace zen { + +PrintCommand::PrintCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "s", "source", "Object payload file", cxxopts::value(m_Filename), "<file name>"); +} + +PrintCommand::~PrintCommand() = default; + +int +PrintCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + m_Options.parse_positional({"source"}); + + auto result = m_Options.parse(argc, argv); + + if (result.count("help")) + { + std::cout << m_Options.help({"", "Group"}) << std::endl; + + return 0; + } + + // Validate arguments + + if (m_Filename.empty()) + throw std::runtime_error("No file specified"); + + zen::FileContents Fc = zen::ReadFile(m_Filename); + IoBuffer Data = Fc.Flatten(); + zen::CbObject Object{SharedBuffer(Data)}; + + zen::StringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(Object, ObjStr); + zen::ConsoleLog().info("{}", ObjStr); + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +PrintPackageCommand::PrintPackageCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "s", "source", "Package payload file", cxxopts::value(m_Filename), "<file name>"); +} + +PrintPackageCommand::~PrintPackageCommand() +{ +} + +int +PrintPackageCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + m_Options.parse_positional({"source"}); + + auto result = m_Options.parse(argc, argv); + + if (result.count("help")) + { + std::cout << m_Options.help({"", "Group"}) << std::endl; + + return 0; + } + + // Validate arguments + + if (m_Filename.empty()) + throw std::runtime_error("No file specified"); + + zen::FileContents Fc = zen::ReadFile(m_Filename); + IoBuffer Data = Fc.Flatten(); + zen::CbPackage Package; + + bool Ok = Package.TryLoad(Data) || zen::legacy::TryLoadCbPackage(Package, Data, &UniqueBuffer::Alloc); + + if (Ok) + { + zen::StringBuilder<1024> ObjStr; + zen::CompactBinaryToJson(Package.GetObject(), ObjStr); + zen::ConsoleLog().info("{}", ObjStr); + } + else + { + zen::ConsoleLog().error("error: malformed package?"); + } + + return 0; +} + +} // namespace zen diff --git a/zen/cmds/print.h b/zen/cmds/print.h new file mode 100644 index 000000000..eed0aa14e --- /dev/null +++ b/zen/cmds/print.h @@ -0,0 +1,41 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +/** Print Compact Binary + */ +class PrintCommand : public ZenCmdBase +{ +public: + PrintCommand(); + ~PrintCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"print", "Print compact binary object"}; + std::string m_Filename; +}; + +/** Print Compact Binary Package + */ +class PrintPackageCommand : public ZenCmdBase +{ +public: + PrintPackageCommand(); + ~PrintPackageCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"printpkg", "Print compact binary package"}; + std::string m_Filename; +}; + +} // namespace zen diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp index 94eb7ef6d..19b5c8980 100644 --- a/zen/cmds/run.cpp +++ b/zen/cmds/run.cpp @@ -10,6 +10,7 @@ #include <zencore/fmtutils.h> #include <zencore/iohash.h> #include <zencore/logging.h> +#include <zencore/stream.h> #include <zencore/string.h> #include <zencore/timer.h> #include <zenutil/zenserverprocess.h> diff --git a/zen/zen.cpp b/zen/zen.cpp index 86c41d658..3c33ff5e0 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -9,6 +9,7 @@ #include "cmds/dedup.h" #include "cmds/deploy.h" #include "cmds/hash.h" +#include "cmds/print.h" #include "cmds/run.h" #include "cmds/status.h" #include "cmds/top.h" @@ -98,18 +99,20 @@ main(int argc, char** argv) auto _ = zen::MakeGuard([] { spdlog::shutdown(); }); - HashCommand HashCmd; - CopyCommand CopyCmd; - DedupCommand DedupCmd; - DeployCommand DeployCmd; - DropCommand DropCmd; - ChunkCommand ChunkCmd; - RunCommand RunCmd; - StatusCommand StatusCmd; - TopCommand TopCmd; - PsCommand PsCmd; - UpCommand UpCmd; - DownCommand DownCmd; + HashCommand HashCmd; + CopyCommand CopyCmd; + DedupCommand DedupCmd; + DeployCommand DeployCmd; + DropCommand DropCmd; + ChunkCommand ChunkCmd; + RunCommand RunCmd; + StatusCommand StatusCmd; + TopCommand TopCmd; + PrintCommand PrintCmd; + PrintPackageCommand PrintPkgCmd; + PsCommand PsCmd; + UpCommand UpCmd; + DownCommand DownCmd; #if ZEN_WITH_TESTS RunTestsCommand RunTestsCmd; @@ -128,6 +131,8 @@ main(int argc, char** argv) {"dedup", &DedupCmd, "Dedup files"}, {"drop", &DropCmd, "Drop cache bucket(s)"}, {"hash", &HashCmd, "Compute file hashes"}, + {"print", &PrintCmd, "Print compact binary object"}, + {"printpackage", &PrintPkgCmd, "Print compact binary package"}, {"run", &RunCmd, "Remote execution"}, {"status", &StatusCmd, "Show zen status"}, {"ps", &PsCmd, "Enumerate running zen server instances"}, diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj index fb0674e87..f31c0bc17 100644 --- a/zen/zen.vcxproj +++ b/zen/zen.vcxproj @@ -99,6 +99,7 @@ <ClCompile Include="cmds\dedup.cpp" /> <ClCompile Include="cmds\deploy.cpp" /> <ClCompile Include="cmds\hash.cpp" /> + <ClCompile Include="cmds\print.cpp" /> <ClCompile Include="cmds\run.cpp" /> <ClCompile Include="cmds\scrub.cpp" /> <ClCompile Include="cmds\status.cpp" /> @@ -114,6 +115,7 @@ <ClInclude Include="cmds\dedup.h" /> <ClInclude Include="cmds\deploy.h" /> <ClInclude Include="cmds\hash.h" /> + <ClInclude Include="cmds\print.h" /> <ClInclude Include="cmds\run.h" /> <ClInclude Include="cmds\scrub.h" /> <ClInclude Include="cmds\status.h" /> diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters index 9002f01c2..d983b413c 100644 --- a/zen/zen.vcxproj.filters +++ b/zen/zen.vcxproj.filters @@ -28,6 +28,7 @@ <ClCompile Include="cmds\up.cpp" /> <ClCompile Include="cmds\cache.cpp" /> <ClCompile Include="cmds\scrub.cpp" /> + <ClCompile Include="cmds\print.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="chunk\chunk.h" /> @@ -57,6 +58,7 @@ <ClInclude Include="cmds\up.h" /> <ClInclude Include="cmds\cache.h" /> <ClInclude Include="cmds\scrub.h" /> + <ClInclude Include="cmds\print.h" /> </ItemGroup> <ItemGroup> <Filter Include="cmds"> diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index f3fbf312c..2ce6987d2 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -1474,10 +1474,30 @@ public: Builder << Accessor.AsIntegerNegative(); break; case CbFieldType::Float32: - Builder.Append("{:.9g}"_format(Accessor.AsFloat32())); + { + const float Value = Accessor.AsFloat32(); + if (std::isfinite(Value)) + { + Builder.Append("{:.9g}"_format(Value)); + } + else + { + Builder << "null"sv; + } + } break; case CbFieldType::Float64: - Builder.Append("{:.17g}"_format(Accessor.AsFloat64())); + { + const double Value = Accessor.AsFloat64(); + if (std::isfinite(Value)) + { + Builder.Append("{:.17g}"_format(Value)); + } + else + { + Builder << "null"sv; + } + } break; case CbFieldType::BoolFalse: Builder << "false"sv; @@ -1834,7 +1854,7 @@ TEST_CASE("uson.json") SUBCASE("number") { - const double ExpectedFloatValue = 21.21f; + const float ExpectedFloatValue = 21.21f; const double ExpectedDoubleValue = 42.42; CbObjectWriter Writer; @@ -1856,6 +1876,31 @@ TEST_CASE("uson.json") CHECK(FloatValue == doctest::Approx(ExpectedFloatValue)); CHECK(DoubleValue == doctest::Approx(ExpectedDoubleValue)); } + + SUBCASE("number.nan") + { + const float FloatNan = std::numeric_limits<float>::quiet_NaN(); + const double DoubleNan = std::numeric_limits<double>::quiet_NaN(); + + CbObjectWriter Writer; + Writer << "FloatNan" << FloatNan; + Writer << "DoubleNan" << DoubleNan; + + CbObject Obj = Writer.Save(); + + StringBuilder<128> Sb; + const std::string_view JsonText = Obj.ToJson(Sb).ToView(); + + std::string JsonError; + json11::Json Json = json11::Json::parse(JsonText.data(), JsonError); + + const double FloatValue = Json["FloatNan"].number_value(); + const double DoubleValue = Json["DoubleNan"].number_value(); + + CHECK(JsonError.empty()); + CHECK(FloatValue == 0); + CHECK(DoubleValue == 0); + } } #endif diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index 2d2603434..9936f30ec 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -63,6 +63,13 @@ DeleteReparsePoint(const wchar_t* Path, DWORD dwReparseTag) bool CreateDirectories(const wchar_t* Dir) { + // This may be suboptimal, in that it appears to try and create directories + // from the root on up instead of from some directory which is known to + // be present + // + // We should implement a smarter version at some point since this can be + // pretty expensive in aggregate + return std::filesystem::create_directories(Dir); } @@ -522,6 +529,23 @@ WriteFile(std::filesystem::path Path, IoBuffer Data) WriteFile(Path, &DataPtr, 1); } +IoBuffer +FileContents::Flatten() +{ + if (Data.size() == 1) + { + return Data[0]; + } + else if (Data.empty()) + { + return {}; + } + else + { + ZEN_NOT_IMPLEMENTED(); + } +} + FileContents ReadFile(std::filesystem::path Path) { diff --git a/zencore/include/zencore/blockingqueue.h b/zencore/include/zencore/blockingqueue.h new file mode 100644 index 000000000..277095689 --- /dev/null +++ b/zencore/include/zencore/blockingqueue.h @@ -0,0 +1,75 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <atomic> +#include <deque> +#include <mutex> + +namespace zen { + +template<typename T> +class BlockingQueue +{ +public: + BlockingQueue() = default; + + ~BlockingQueue() { CompleteAdding(); } + + void Enqueue(T&& Item) + { + { + std::lock_guard Lock(m_Lock); + m_Queue.emplace_back(std::move(Item)); + m_Size++; + } + + m_NewItemSignal.notify_one(); + } + + bool WaitAndDequeue(T& Item) + { + if (m_CompleteAdding.load()) + { + return false; + } + + std::unique_lock Lock(m_Lock); + m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); + + if (!m_Queue.empty()) + { + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + m_Size--; + + return true; + } + + return false; + } + + void CompleteAdding() + { + if (!m_CompleteAdding.load()) + { + m_CompleteAdding.store(true); + m_NewItemSignal.notify_all(); + } + } + + std::size_t Size() const + { + std::unique_lock Lock(m_Lock); + return m_Queue.size(); + } + +private: + mutable std::mutex m_Lock; + std::condition_variable m_NewItemSignal; + std::deque<T> m_Queue; + std::atomic_bool m_CompleteAdding{false}; + std::atomic_uint32_t m_Size; +}; + +} // namespace zen diff --git a/zencore/include/zencore/filesystem.h b/zencore/include/zencore/filesystem.h index 6678528f6..c7ac7140d 100644 --- a/zencore/include/zencore/filesystem.h +++ b/zencore/include/zencore/filesystem.h @@ -2,9 +2,10 @@ #pragma once -#include "stream.h" #include "zencore.h" +#include <zencore/iobuffer.h> + #include <filesystem> #include <functional> @@ -36,6 +37,8 @@ struct FileContents { std::vector<IoBuffer> Data; std::error_code ErrorCode; + + IoBuffer Flatten(); }; ZENCORE_API FileContents ReadFile(std::filesystem::path Path); diff --git a/zencore/include/zencore/iobuffer.h b/zencore/include/zencore/iobuffer.h index 36ecbd9a7..110cd7d9d 100644 --- a/zencore/include/zencore/iobuffer.h +++ b/zencore/include/zencore/iobuffer.h @@ -25,6 +25,7 @@ enum class ZenContentType : uint8_t kCbPackageOffer = 6, kCompressedBinary = 7, kUnknownContentType = 8, + kHTML = 9, kCOUNT }; @@ -54,6 +55,8 @@ ToString(ZenContentType ContentType) return "compressed-binary"sv; case ZenContentType::kYAML: return "yaml"sv; + case ZenContentType::kHTML: + return "html"sv; } } @@ -218,6 +221,7 @@ protected: kIsMaterialized = 1 << 3, // Data pointers are valid kLowLevelAlloc = 1 << 4, // Using direct memory allocation kIsWholeFile = 1 << 5, // References an entire file + kIoBufferAlloc = 1 << 6, // Using IoBuffer allocator kIsOwnedByThis = 1 << 7, // Note that we have some extended flags defined below @@ -229,8 +233,8 @@ protected: kContentTypeBit3 = 1 << (24 + 3), // bits are reserved }; - void* AllocateBuffer(size_t InSize, size_t Alignment); - void FreeBuffer(); + void AllocateBuffer(size_t InSize, size_t Alignment); + void FreeBuffer(); }; /** diff --git a/zencore/include/zencore/refcount.h b/zencore/include/zencore/refcount.h index 320718f5b..1873ce48e 100644 --- a/zencore/include/zencore/refcount.h +++ b/zencore/include/zencore/refcount.h @@ -4,6 +4,8 @@ #include "atomic.h" #include "zencore.h" +#include <concepts> + namespace zen { /** @@ -114,6 +116,11 @@ public: inline Ref(T* Ptr) : m_Ref(Ptr) { m_Ref && m_Ref->AddRef(); } inline ~Ref() { m_Ref && m_Ref->Release(); } + template<typename DerivedType> + requires std::derived_from<DerivedType, T> inline Ref(const Ref<DerivedType>& Rhs) : Ref(Rhs.m_Ref) + { + } + [[nodiscard]] inline bool IsNull() const { return m_Ref == nullptr; } inline explicit operator bool() const { return m_Ref != nullptr; } inline T* operator->() const { return m_Ref; } @@ -152,6 +159,9 @@ public: private: T* m_Ref = nullptr; + + template<class T> + friend class Ref; }; void refcount_forcelink(); diff --git a/zencore/include/zencore/stats.h b/zencore/include/zencore/stats.h index dfa8dac34..884bb53f6 100644 --- a/zencore/include/zencore/stats.h +++ b/zencore/include/zencore/stats.h @@ -209,6 +209,8 @@ public: Scope(OperationTiming& Outer); ~Scope(); + void Cancel(); + private: OperationTiming& m_Outer; uint64_t m_StartTick; @@ -219,10 +221,70 @@ private: Histogram m_Histogram; }; +/** Metrics for network requests + + Aggregates tracking of duration, payload sizes into a single + class + + */ +class RequestStats +{ +public: + RequestStats(int32_t SampleCount = 514); + ~RequestStats(); + + void Update(int64_t Duration, int64_t Bytes); + uint64_t Count() const; + + // Timing + + int64_t MaxDuration() const { return m_BytesHistogram.Max(); } + int64_t MinDuration() const { return m_BytesHistogram.Min(); } + double MeanDuration() const { return m_BytesHistogram.Mean(); } + SampleSnapshot DurationSnapshot() const { return m_RequestTimeHistogram.Snapshot(); } + double Rate1() { return m_RequestMeter.Rate1(); } + double Rate5() { return m_RequestMeter.Rate5(); } + double Rate15() { return m_RequestMeter.Rate15(); } + double MeanRate() const { return m_RequestMeter.MeanRate(); } + + // Bytes + + int64_t MaxBytes() const { return m_BytesHistogram.Max(); } + int64_t MinBytes() const { return m_BytesHistogram.Min(); } + double MeanBytes() const { return m_BytesHistogram.Mean(); } + SampleSnapshot BytesSnapshot() const { return m_BytesHistogram.Snapshot(); } + double ByteRate1() { return m_BytesMeter.Rate1(); } + double ByteRate5() { return m_BytesMeter.Rate5(); } + double ByteRate15() { return m_BytesMeter.Rate15(); } + double ByteMeanRate() const { return m_BytesMeter.MeanRate(); } + + struct Scope + { + Scope(OperationTiming& Outer); + ~Scope(); + + void Cancel(); + + private: + OperationTiming& m_Outer; + uint64_t m_StartTick; + }; + + void EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo); + +private: + Meter m_RequestMeter; + Meter m_BytesMeter; + Histogram m_RequestTimeHistogram; + Histogram m_BytesHistogram; +}; + void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo); -void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); void EmitSnapshot(std::string_view Tag, Meter& Stat, CbObjectWriter& Cbo); +void EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor); + } // namespace zen::metrics namespace zen { diff --git a/zencore/include/zencore/timer.h b/zencore/include/zencore/timer.h index 693b6daaa..e4ddc3505 100644 --- a/zencore/include/zencore/timer.h +++ b/zencore/include/zencore/timer.h @@ -38,6 +38,21 @@ private: uint64_t m_StartValue; }; +// Low frequency timers + +namespace detail { + extern ZENCORE_API uint64_t g_LofreqTimerValue; +} // namespace detail + +inline uint64_t +GetLofreqTimerValue() +{ + return detail::g_LofreqTimerValue; +} + +ZENCORE_API void UpdateLofreqTimerValue(); +ZENCORE_API uint64_t GetLofreqTimerFrequency(); + void timer_forcelink(); // internal } // namespace zen diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index 244425761..04685defc 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -31,26 +31,29 @@ namespace zen { ////////////////////////////////////////////////////////////////////////// -void* +void IoBufferCore::AllocateBuffer(size_t InSize, size_t Alignment) { #if ZEN_PLATFORM_WINDOWS if (((InSize & 0xffFF) == 0) && (Alignment == 0x10000)) { m_Flags |= kLowLevelAlloc; - return VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE); + m_DataPtr = VirtualAlloc(nullptr, InSize, MEM_COMMIT, PAGE_READWRITE); + + return; } #endif // ZEN_PLATFORM_WINDOWS -#if ZEN_USE_MIMALLOC && 0 +#if ZEN_USE_MIMALLOC void* Ptr = mi_aligned_alloc(Alignment, RoundUp(InSize, Alignment)); + m_Flags |= kIoBufferAlloc; #else void* Ptr = Memory::Alloc(InSize, Alignment); #endif ZEN_ASSERT(Ptr); - return Ptr; + m_DataPtr = Ptr; } void @@ -70,11 +73,14 @@ IoBufferCore::FreeBuffer() } #endif // ZEN_PLATFORM_WINDOWS -#if ZEN_USE_MIMALLOC && 0 - return mi_free(const_cast<void*>(m_DataPtr)); -#else - return Memory::Free(const_cast<void*>(m_DataPtr)); +#if ZEN_USE_MIMALLOC + if (m_Flags & kIoBufferAlloc) + { + return mi_free(const_cast<void*>(m_DataPtr)); + } #endif + + return Memory::Free(const_cast<void*>(m_DataPtr)); } ////////////////////////////////////////////////////////////////////////// @@ -85,7 +91,7 @@ IoBufferCore::IoBufferCore(size_t InSize) { ZEN_ASSERT(InSize); - m_DataPtr = AllocateBuffer(InSize, sizeof(void*)); + AllocateBuffer(InSize, sizeof(void*)); m_DataBytes = InSize; SetIsOwnedByThis(true); @@ -95,7 +101,7 @@ IoBufferCore::IoBufferCore(size_t InSize, size_t Alignment) { ZEN_ASSERT(InSize); - m_DataPtr = AllocateBuffer(InSize, Alignment); + AllocateBuffer(InSize, Alignment); m_DataBytes = InSize; SetIsOwnedByThis(true); @@ -138,10 +144,9 @@ IoBufferCore::MakeOwned(bool Immutable) { if (!IsOwned()) { - void* OwnedDataPtr = AllocateBuffer(m_DataBytes, sizeof(void*)); - memcpy(OwnedDataPtr, m_DataPtr, m_DataBytes); - - m_DataPtr = OwnedDataPtr; + const void* OldDataPtr = m_DataPtr; + AllocateBuffer(m_DataBytes, sizeof(void*)); + memcpy(const_cast<void*>(m_DataPtr), OldDataPtr, m_DataBytes); SetIsOwnedByThis(true); } @@ -183,29 +188,29 @@ IoBufferExtendedCore::~IoBufferExtendedCore() { if (m_MappedPointer) { -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS UnmapViewOfFile(m_MappedPointer); -#else +# else uint64_t MapSize = ~uint64_t(uintptr_t(m_MmapHandle)); munmap(m_MappedPointer, MapSize); -#endif +# endif } -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS if (m_Flags & kOwnsMmap) { CloseHandle(m_MmapHandle); } -#endif +# endif if (m_Flags & kOwnsFile) { -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS BOOL Success = CloseHandle(m_FileHandle); -#else +# else int Fd = int(uintptr_t(m_FileHandle)); bool Success = (close(Fd) == 0); -#endif +# endif if (!Success) { @@ -239,7 +244,7 @@ IoBufferExtendedCore::Materialize() const const uint64_t MappedOffsetDisplacement = m_FileOffset - MapOffset; const uint64_t MapSize = m_DataBytes + MappedOffsetDisplacement; -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS m_MmapHandle = CreateFileMapping(m_FileHandle, /* lpFileMappingAttributes */ nullptr, /* flProtect */ PAGE_READONLY, @@ -260,7 +265,7 @@ IoBufferExtendedCore::Materialize() const /* FileOffsetHigh */ uint32_t(MapOffset >> 32), /* FileOffsetLow */ uint32_t(MapOffset & 0xffFFffFFu), /* dwNumberOfBytesToMap */ MapSize); -#else +# else m_MmapHandle = (void*)uintptr_t(~MapSize); // ~ so it's never null (assuming MapSize >= 0) m_Flags |= kOwnsMmap; @@ -271,7 +276,7 @@ IoBufferExtendedCore::Materialize() const /* flags */ MAP_SHARED | MAP_NORESERVE, /* fd */ int(uintptr_t(m_FileHandle)), /* offset */ MapOffset); -#endif // ZEN_PLATFORM_WINDOWS +# endif // ZEN_PLATFORM_WINDOWS if (MappedBase == nullptr) { @@ -376,7 +381,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint { uint64_t FileSize; -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; HRESULT hRes = DataFile.Create(FileName, GENERIC_READ, FILE_SHARE_READ, OPEN_EXISTING); @@ -387,7 +392,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint } DataFile.GetSize((ULONGLONG&)FileSize); -#else +# else int Fd = open(FileName, O_RDONLY); if (Fd < 0) { @@ -398,7 +403,7 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint struct stat Stat; fstat(Fd, &Stat); FileSize = Stat.st_size; -#endif // ZEN_PLATFORM_WINDOWS +# endif // ZEN_PLATFORM_WINDOWS // TODO: should validate that offset is in range @@ -417,15 +422,15 @@ IoBufferBuilder::MakeFromFile(const path_char_t* FileName, uint64_t Offset, uint if (Size) { -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS void* Fd = DataFile.Detach(); -#endif +# endif return IoBuffer(IoBuffer::File, (void*)uintptr_t(Fd), Offset, Size); } -#if !ZEN_PLATFORM_WINDOWS +# if !ZEN_PLATFORM_WINDOWS close(Fd); -#endif +# endif // For an empty file, we may as well just return an empty memory IoBuffer return IoBuffer(IoBuffer::Wrap, "", 0); @@ -437,7 +442,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName) uint64_t FileSize; void* Handle; -#if ZEN_PLATFORM_WINDOWS +# if ZEN_PLATFORM_WINDOWS CAtlFile DataFile; // We need to open with DELETE since this is used for the case @@ -454,8 +459,8 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName) DataFile.GetSize((ULONGLONG&)FileSize); Handle = DataFile.Detach(); -#else - int Fd = open(FileName, O_RDONLY); +# else + int Fd = open(FileName, O_RDONLY); if (Fd < 0) { return {}; @@ -467,7 +472,7 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName) FileSize = Stat.st_size; Handle = (void*)uintptr_t(Fd); -#endif // ZEN_PLATFORM_WINDOWS +# endif // ZEN_PLATFORM_WINDOWS IoBuffer Iob(IoBuffer::File, Handle, 0, FileSize); Iob.m_Core->SetIsWholeFile(true); @@ -484,7 +489,7 @@ HashBuffer(IoBuffer& Buffer) ////////////////////////////////////////////////////////////////////////// -#if ZEN_WITH_TESTS +# if ZEN_WITH_TESTS void iobuffer_forcelink() @@ -498,6 +503,6 @@ TEST_CASE("IoBuffer") zen::IoBuffer buffer3(buffer2, 0, buffer2.Size()); } -#endif +# endif } // namespace zen diff --git a/zencore/stats.cpp b/zencore/stats.cpp index 34dc2828f..0c0647999 100644 --- a/zencore/stats.cpp +++ b/zencore/stats.cpp @@ -295,7 +295,14 @@ Histogram::Min() const double Histogram::Mean() const { - return double(m_Sum.load(std::memory_order_relaxed)) / m_Count; + if (m_Count) + { + return double(m_Sum.load(std::memory_order_relaxed)) / m_Count; + } + else + { + return 0.0; + } } uint64_t @@ -398,12 +405,73 @@ OperationTiming::Scope::Scope(OperationTiming& Outer) : m_Outer(Outer), m_StartT OperationTiming::Scope::~Scope() { - m_Outer.Update(GetHifreqTimerValue() - m_StartTick); + if (m_StartTick != 0) + { + m_Outer.Update(GetHifreqTimerValue() - m_StartTick); + } +} + +void +OperationTiming::Scope::Cancel() +{ + m_StartTick = 0; +} + +////////////////////////////////////////////////////////////////////////// + +RequestStats::RequestStats(int32_t SampleCount) : m_RequestTimeHistogram{SampleCount}, m_BytesHistogram{SampleCount} +{ +} + +RequestStats::~RequestStats() +{ +} + +void +RequestStats::Update(int64_t Duration, int64_t Bytes) +{ + m_RequestMeter.Mark(1); + m_RequestTimeHistogram.Update(Duration); + + m_BytesMeter.Mark(Bytes); + m_BytesHistogram.Update(Bytes); +} + +uint64_t +RequestStats::Count() const +{ + return m_RequestMeter.Count(); } ////////////////////////////////////////////////////////////////////////// void +EmitSnapshot(Meter& Stat, CbObjectWriter& Cbo) +{ + Cbo << "count" << Stat.Count(); + Cbo << "rate_mean" << Stat.MeanRate(); + Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); +} + +void +RequestStats::EmitSnapshot(std::string_view Tag, CbObjectWriter& Cbo) +{ + Cbo.BeginObject(Tag); + + Cbo.BeginObject("requests"); + metrics::EmitSnapshot(m_RequestMeter, Cbo); + metrics::EmitSnapshot(m_RequestTimeHistogram, Cbo, GetHifreqTimerToSeconds()); + Cbo.EndObject(); + + Cbo.BeginObject("bytes"); + metrics::EmitSnapshot(m_BytesMeter, Cbo); + metrics::EmitSnapshot(m_BytesHistogram, Cbo, 1.0); + Cbo.EndObject(); + + Cbo.EndObject(); +} + +void EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) { Cbo.BeginObject(Tag); @@ -411,7 +479,6 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) SampleSnapshot Snap = Stat.Snapshot(); Cbo << "count" << Stat.Count(); - Cbo << "rate_mean" << Stat.MeanRate(); Cbo << "rate_1" << Stat.Rate1() << "rate_5" << Stat.Rate5() << "rate_15" << Stat.Rate15(); @@ -426,18 +493,22 @@ EmitSnapshot(std::string_view Tag, OperationTiming& Stat, CbObjectWriter& Cbo) } void -EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo) +EmitSnapshot(std::string_view Tag, const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) { Cbo.BeginObject(Tag); + EmitSnapshot(Stat, Cbo, ConversionFactor); + Cbo.EndObject(); +} +void +EmitSnapshot(const Histogram& Stat, CbObjectWriter& Cbo, double ConversionFactor) +{ SampleSnapshot Snap = Stat.Snapshot(); - Cbo << "count" << Stat.Count() << "avg" << Stat.Mean(); - Cbo << "min" << Stat.Min() << "max" << Stat.Max(); - Cbo << "p75" << Snap.Get75Percentile() << "p95" << Snap.Get95Percentile() << "p99" << Snap.Get99Percentile() << "p999" - << Snap.Get999Percentile(); - - Cbo.EndObject(); + Cbo << "count" << Stat.Count() * ConversionFactor << "avg" << Stat.Mean() * ConversionFactor; + Cbo << "min" << Stat.Min() * ConversionFactor << "max" << Stat.Max() * ConversionFactor; + Cbo << "p75" << Snap.Get75Percentile() * ConversionFactor << "p95" << Snap.Get95Percentile() * ConversionFactor << "p99" + << Snap.Get99Percentile() * ConversionFactor << "p999" << Snap.Get999Percentile() * ConversionFactor; } void diff --git a/zencore/timer.cpp b/zencore/timer.cpp index 5d30d9b29..9180519bd 100644 --- a/zencore/timer.cpp +++ b/zencore/timer.cpp @@ -69,6 +69,22 @@ GetHifreqTimerFrequencySafe() } ////////////////////////////////////////////////////////////////////////// + +uint64_t detail::g_LofreqTimerValue = GetHifreqTimerValue(); + +void +UpdateLofreqTimerValue() +{ + detail::g_LofreqTimerValue = GetHifreqTimerValue(); +} + +uint64_t +GetLofreqTimerFrequency() +{ + return GetHifreqTimerFrequencySafe(); +} + +////////////////////////////////////////////////////////////////////////// // // Testing related code follows... // diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 8e5d61877..69974ca06 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -58,19 +58,25 @@ MapContentTypeToString(HttpContentType ContentType) case HttpContentType::kYAML: return "text/yaml"sv; + + case HttpContentType::kHTML: + return "text/html"sv; } } ////////////////////////////////////////////////////////////////////////// static constinit uint32_t HashBinary = HashStringDjb2("application/octet-stream"sv); -static constinit uint32_t HashJson = HashStringDjb2("application/json"sv); -static constinit uint32_t HashYaml = HashStringDjb2("text/yaml"sv); +static constinit uint32_t HashApplicationJson = HashStringDjb2("application/json"sv); +static constinit uint32_t HashApplicationYaml = HashStringDjb2("text/yaml"sv); static constinit uint32_t HashText = HashStringDjb2("text/plain"sv); static constinit uint32_t HashCompactBinary = HashStringDjb2("application/x-ue-cb"sv); static constinit uint32_t HashCompactBinaryPackage = HashStringDjb2("application/x-ue-cbpkg"sv); static constinit uint32_t HashCompactBinaryPackageOffer = HashStringDjb2("application/x-ue-offer"sv); static constinit uint32_t HashCompressedBinary = HashStringDjb2("application/x-ue-comp"sv); +static constinit uint32_t HashJson = HashStringDjb2("json"sv); +static constinit uint32_t HashYaml = HashStringDjb2("yaml"sv); +static constinit uint32_t HashHtml = HashStringDjb2("text/html"sv); std::once_flag InitContentTypeLookup; @@ -78,14 +84,21 @@ struct HashedTypeEntry { uint32_t Hash; HttpContentType Type; -} TypeHashTable[] = {{HashBinary, HttpContentType::kBinary}, - {HashCompactBinary, HttpContentType::kCbObject}, - {HashCompactBinaryPackage, HttpContentType::kCbPackage}, - {HashCompactBinaryPackageOffer, HttpContentType::kCbPackageOffer}, - {HashJson, HttpContentType::kJSON}, - {HashYaml, HttpContentType::kYAML}, - {HashText, HttpContentType::kText}, - {HashCompressedBinary, HttpContentType::kCompressedBinary}}; +} TypeHashTable[] = { + // clang-format off + {HashBinary, HttpContentType::kBinary}, + {HashCompactBinary, HttpContentType::kCbObject}, + {HashCompactBinaryPackage, HttpContentType::kCbPackage}, + {HashCompactBinaryPackageOffer, HttpContentType::kCbPackageOffer}, + {HashJson, HttpContentType::kJSON}, + {HashApplicationJson, HttpContentType::kJSON}, + {HashYaml, HttpContentType::kYAML}, + {HashApplicationYaml, HttpContentType::kYAML}, + {HashText, HttpContentType::kText}, + {HashCompressedBinary, HttpContentType::kCompressedBinary}, + {HashHtml, HttpContentType::kHTML}, + // clang-format on +}; HttpContentType ParseContentTypeImpl(const std::string_view& ContentTypeString) @@ -117,6 +130,16 @@ ParseContentTypeInit(const std::string_view& ContentTypeString) std::sort(std::begin(TypeHashTable), std::end(TypeHashTable), [](const HashedTypeEntry& Lhs, const HashedTypeEntry& Rhs) { return Lhs.Hash < Rhs.Hash; }); + + // validate that there are no hash collisions + + uint32_t LastHash = 0; + + for (const auto& Item : TypeHashTable) + { + ZEN_ASSERT(LastHash != Item.Hash); + LastHash = Item.Hash; + } }); ParseContentType = ParseContentTypeImpl; @@ -548,7 +571,7 @@ CreateHttpServer() #if 0 return new HttpUwsServer; #elif ZEN_WITH_HTTPSYS - return new HttpSysServer{std::thread::hardware_concurrency()}; + return new HttpSysServer(std::thread::hardware_concurrency(), /* background worker threads */ 16); #else return new HttpNullServer; #endif diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index 9b2e7f832..f88563097 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -129,16 +129,18 @@ GetAcceptType(const HTTP_REQUEST* HttpRequest) class HttpSysRequestHandler { public: - explicit HttpSysRequestHandler(HttpSysTransaction& InRequest) : m_Request(InRequest) {} + explicit HttpSysRequestHandler(HttpSysTransaction& Transaction) : m_Transaction(Transaction) {} virtual ~HttpSysRequestHandler() = default; virtual void IssueRequest(std::error_code& ErrorCode) = 0; virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) = 0; + HttpSysTransaction& Transaction() { return m_Transaction; } - HttpSysTransaction& Transaction() { return m_Request; } + HttpSysRequestHandler(const HttpSysRequestHandler&) = delete; + HttpSysRequestHandler& operator=(const HttpSysRequestHandler&) = delete; private: - HttpSysTransaction& m_Request; // Related HTTP transaction object + HttpSysTransaction& m_Transaction; }; /** @@ -184,12 +186,16 @@ public: virtual void WriteResponse(HttpResponseCode ResponseCode) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::span<IoBuffer> Blobs) override; virtual void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::u8string_view ResponseString) override; + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) override; using HttpServerRequest::WriteResponse; - HttpSysTransaction& m_HttpTx; - HttpMessageResponseRequest* m_Response = nullptr; // TODO: make this more general - IoBuffer m_PayloadBuffer; + HttpSysServerRequest(const HttpSysServerRequest&) = delete; + HttpSysServerRequest& operator=(const HttpSysServerRequest&) = delete; + + HttpSysTransaction& m_HttpTx; + HttpSysRequestHandler* m_NextCompletionHandler = nullptr; + IoBuffer m_PayloadBuffer; }; /** HTTP transaction @@ -218,7 +224,9 @@ public: ULONG_PTR NumberOfBytesTransferred, PTP_IO Io); - void IssueInitialRequest(std::error_code& ErrorCode); + void IssueInitialRequest(std::error_code& ErrorCode); + bool IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler); + PTP_IO Iocp(); HANDLE RequestQueueHandle(); inline OVERLAPPED* Overlapped() { return &m_HttpOverlapped; } @@ -227,6 +235,8 @@ public: HttpSysServerRequest& InvokeRequestHandler(HttpService& Service, IoBuffer Payload); + HttpSysServerRequest& ServerRequest() { return m_HandlerRequest.value(); } + private: OVERLAPPED m_HttpOverlapped{}; HttpSysServer& m_HttpServer; @@ -239,8 +249,6 @@ private: Ref<IHttpPackageHandler> m_PackageHandler; }; -////////////////////////////////////////////////////////////////////////// - /** * @brief HTTP request response I/O request handler * @@ -588,6 +596,108 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) } } +/** HTTP completion handler for async work + + This is used to allow work to be taken off the request handler threads + and to support posting responses asynchronously. + */ + +class HttpAsyncWorkRequest : public HttpSysRequestHandler +{ +public: + HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response); + ~HttpAsyncWorkRequest(); + + virtual void IssueRequest(std::error_code& ErrorCode) override final; + virtual HttpSysRequestHandler* HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) override; + +private: + struct AsyncWorkItem : public IWork + { + virtual void Execute() override; + + AsyncWorkItem(HttpSysTransaction& InTx, std::function<void(HttpServerRequest&)>&& InHandler) + : Tx(InTx) + , Handler(std::move(InHandler)) + { + } + + HttpSysTransaction& Tx; + std::function<void(HttpServerRequest&)> Handler; + }; + + Ref<AsyncWorkItem> m_WorkItem; +}; + +HttpAsyncWorkRequest::HttpAsyncWorkRequest(HttpSysTransaction& Tx, std::function<void(HttpServerRequest&)>&& Response) +: HttpSysRequestHandler(Tx) +{ + m_WorkItem = new AsyncWorkItem(Tx, std::move(Response)); +} + +HttpAsyncWorkRequest::~HttpAsyncWorkRequest() +{ +} + +void +HttpAsyncWorkRequest::IssueRequest(std::error_code& ErrorCode) +{ + ErrorCode.clear(); + + Transaction().Server().WorkPool().ScheduleWork(m_WorkItem); +} + +HttpSysRequestHandler* +HttpAsyncWorkRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) +{ + // This ought to not be called since there should be no outstanding I/O request + // when this completion handler is active + + ZEN_UNUSED(IoResult, NumberOfBytesTransferred); + + ZEN_WARN("Unexpected I/O completion during async work! IoResult: {}, NumberOfBytesTransferred: {}", IoResult, NumberOfBytesTransferred); + + return this; +} + +void +HttpAsyncWorkRequest::AsyncWorkItem::Execute() +{ + using namespace fmt::literals; + + try + { + HttpSysServerRequest& ThisRequest = Tx.ServerRequest(); + + ThisRequest.m_NextCompletionHandler = nullptr; + + Handler(ThisRequest); + + // TODO: should Handler be destroyed at this point to ensure there + // are no outstanding references into state which could be + // deleted asynchronously as a result of issuing the response? + + if (HttpSysRequestHandler* NextHandler = ThisRequest.m_NextCompletionHandler) + { + return (void)Tx.IssueNextRequest(NextHandler); + } + else if (!ThisRequest.IsHandled()) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 404, "Not found"sv)); + } + else + { + // "Handled" but no request handler? Shouldn't ever happen + return (void)Tx.IssueNextRequest( + new HttpMessageResponseRequest(Tx, 500, "Response generated but no request handler scheduled"sv)); + } + } + catch (std::exception& Ex) + { + return (void)Tx.IssueNextRequest(new HttpMessageResponseRequest(Tx, 500, "Exception thrown in async work: '{}'"_format(Ex.what()))); + } +} + /** _________ / _____/ ______________ __ ___________ @@ -597,10 +707,11 @@ HttpMessageResponseRequest::IssueRequest(std::error_code& ErrorCode) \/ \/ \/ */ -HttpSysServer::HttpSysServer(unsigned int ThreadCount) +HttpSysServer::HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount) : m_Log(logging::Get("http")) , m_RequestLog(logging::Get("http_requests")) , m_ThreadPool(ThreadCount) +, m_AsyncWorkPool(AsyncWorkThreadCount) { ULONG Result = HttpInitialize(HTTPAPI_VERSION_2, HTTP_INITIALIZE_SERVER, nullptr); @@ -611,6 +722,8 @@ HttpSysServer::HttpSysServer(unsigned int ThreadCount) m_IsHttpInitialized = true; m_IsOk = true; + + ZEN_INFO("http.sys server started, using {} I/O threads and {} async worker threads", ThreadCount, AsyncWorkThreadCount); } HttpSysServer::~HttpSysServer() @@ -915,6 +1028,47 @@ HttpSysTransaction::IoCompletionCallback(PTP_CALLBACK_INSTANCE Instance, } } +bool +HttpSysTransaction::IssueNextRequest(HttpSysRequestHandler* NewCompletionHandler) +{ + HttpSysRequestHandler* CurrentHandler = m_CompletionHandler; + m_CompletionHandler = NewCompletionHandler; + + auto _ = MakeGuard([this, CurrentHandler] { + if ((CurrentHandler != &m_InitialHttpHandler) && (CurrentHandler != m_CompletionHandler)) + { + delete CurrentHandler; + } + }); + + if (NewCompletionHandler == nullptr) + { + return false; + } + + try + { + std::error_code ErrorCode; + m_CompletionHandler->IssueRequest(ErrorCode); + + if (!ErrorCode) + { + return true; + } + + ZEN_ERROR("IssueRequest() failed: '{}'", ErrorCode.message()); + } + catch (std::exception& Ex) + { + ZEN_ERROR("exception caught in IssueNextRequest(): '{}'", Ex.what()); + } + + // something went wrong, no request is pending + m_CompletionHandler = nullptr; + + return false; +} + HttpSysTransaction::Status HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTransferred) { @@ -934,38 +1088,9 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran m_HttpServer.OnHandlingRequest(); } - m_CompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); + auto NewCompletionHandler = CurrentHandler->HandleCompletion(IoResult, NumberOfBytesTransferred); - if (m_CompletionHandler) - { - try - { - std::error_code ErrorCode; - m_CompletionHandler->IssueRequest(ErrorCode); - - if (ErrorCode) - { - ZEN_ERROR("IssueRequest() failed {}", ErrorCode.message()); - } - else - { - IsRequestPending = true; - } - } - catch (std::exception& Ex) - { - ZEN_ERROR("exception caught from IssueRequest(): {}", Ex.what()); - - // something went wrong, no request is pending - } - } - else - { - if (CurrentHandler != &m_InitialHttpHandler) - { - delete CurrentHandler; - } - } + IsRequestPending = IssueNextRequest(NewCompletionHandler); } // Ensure new requests are enqueued as necessary @@ -1086,23 +1211,48 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& const int PrefixLength = Service.UriPrefixLength(); const int AbsPathLength = HttpRequestPtr->CookedUrl.AbsPathLength / sizeof(char16_t); + HttpContentType AcceptContentType = HttpContentType::kUnknownContentType; + if (AbsPathLength >= PrefixLength) { // We convert the URI immediately because most of the code involved prefers to deal - // with utf8. This has some performance impact which I'd prefer to avoid but for now - // we just have to live with it + // with utf8. This is overhead which I'd prefer to avoid but for now we just have + // to live with it WideToUtf8({(char16_t*)HttpRequestPtr->CookedUrl.pAbsPath + PrefixLength, gsl::narrow<size_t>(AbsPathLength - PrefixLength)}, m_UriUtf8); + + std::string_view UriSuffix8{m_UriUtf8}; + + const size_t LastComponentIndex = UriSuffix8.find_last_of('/'); + + if (LastComponentIndex != std::string_view::npos) + { + UriSuffix8.remove_prefix(LastComponentIndex); + } + + const size_t LastDotIndex = UriSuffix8.find_last_of('.'); + + if (LastDotIndex != std::string_view::npos) + { + UriSuffix8.remove_prefix(LastDotIndex + 1); + + AcceptContentType = ParseContentType(UriSuffix8); + + if (AcceptContentType != HttpContentType::kUnknownContentType) + { + m_UriUtf8.RemoveSuffix(uint32_t(m_UriUtf8.Size() - LastComponentIndex - LastDotIndex - 1)); + } + } } else { m_UriUtf8.Reset(); } - if (auto QueryStringLength = HttpRequestPtr->CookedUrl.QueryStringLength) + if (uint16_t QueryStringLength = HttpRequestPtr->CookedUrl.QueryStringLength) { - --QueryStringLength; + --QueryStringLength; // We skip the leading question mark WideToUtf8({(char16_t*)(HttpRequestPtr->CookedUrl.pQueryString) + 1, QueryStringLength / sizeof(char16_t)}, m_QueryStringUtf8); } @@ -1114,7 +1264,23 @@ HttpSysServerRequest::HttpSysServerRequest(HttpSysTransaction& Tx, HttpService& m_Verb = TranslateHttpVerb(HttpRequestPtr->Verb); m_ContentLength = GetContentLength(HttpRequestPtr); m_ContentType = GetContentType(HttpRequestPtr); - m_AcceptType = GetAcceptType(HttpRequestPtr); + + // It an explicit content type extension was specified then we'll use that over any + // Accept: header value that may be present + + if (AcceptContentType != HttpContentType::kUnknownContentType) + { + m_AcceptType = AcceptContentType; + } + else + { + m_AcceptType = GetAcceptType(HttpRequestPtr); + } + + if (m_Verb == HttpVerb::kHead) + { + SetSuppressResponseBody(); + } } Oid @@ -1172,13 +1338,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode) { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1187,13 +1355,15 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, Blobs); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } @@ -1202,17 +1372,32 @@ HttpSysServerRequest::WriteResponse(HttpResponseCode ResponseCode, HttpContentTy { ZEN_ASSERT(IsHandled() == false); - m_Response = + auto Response = new HttpMessageResponseRequest(m_HttpTx, (uint16_t)ResponseCode, ContentType, ResponseString.data(), ResponseString.size()); if (SuppressBody()) { - m_Response->SuppressResponseBody(); + Response->SuppressResponseBody(); } + m_NextCompletionHandler = Response; + SetIsHandled(); } +void +HttpSysServerRequest::WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) +{ + if (m_HttpTx.Server().IsAsyncResponseEnabled()) + { + ContinuationHandler(m_HttpTx.ServerRequest()); + } + else + { + m_NextCompletionHandler = new HttpAsyncWorkRequest(m_HttpTx, std::move(ContinuationHandler)); + } +} + ////////////////////////////////////////////////////////////////////////// InitialRequestHandler::InitialRequestHandler(HttpSysTransaction& InRequest) : HttpSysRequestHandler(InRequest) @@ -1411,14 +1596,14 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT HttpSysServerRequest& ThisRequest = Transaction().InvokeRequestHandler(*Service, m_PayloadBuffer); - if (!ThisRequest.IsHandled()) + if (HttpSysRequestHandler* Response = ThisRequest.m_NextCompletionHandler) { - return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); + return Response; } - if (HttpMessageResponseRequest* Response = ThisRequest.m_Response) + if (!ThisRequest.IsHandled()) { - return Response; + return new HttpMessageResponseRequest(Transaction(), 404, "Not found"sv); } } @@ -1462,4 +1647,4 @@ HttpSysServer::RegisterService(HttpService& Service) } } // namespace zen -#endif
\ No newline at end of file +#endif diff --git a/zenhttp/httpsys.h b/zenhttp/httpsys.h index a8395b283..46ba122cc 100644 --- a/zenhttp/httpsys.h +++ b/zenhttp/httpsys.h @@ -16,6 +16,7 @@ # define _WINSOCKAPI_ # include <zencore/windows.h> # include "iothreadpool.h" +# include "workthreadpool.h" # include <http.h> @@ -35,7 +36,7 @@ class HttpSysServer : public HttpServer friend class HttpSysTransaction; public: - explicit HttpSysServer(unsigned int ThreadCount); + explicit HttpSysServer(unsigned int ThreadCount, unsigned int AsyncWorkThreadCount); ~HttpSysServer(); // HttpServer interface implementation @@ -45,6 +46,11 @@ public: virtual void RequestExit() override; virtual void RegisterService(HttpService& Service) override; + WorkerThreadPool& WorkPool() { return m_AsyncWorkPool; } + + inline bool IsOk() const { return m_IsOk; } + inline bool IsAsyncResponseEnabled() const { return m_IsAsyncResponseEnabled; } + private: void Initialize(const wchar_t* UrlPath); void Cleanup(); @@ -53,8 +59,6 @@ private: void OnHandlingRequest(); void IssueNewRequestMaybe(); - inline bool IsOk() const { return m_IsOk; } - void RegisterService(const char* Endpoint, HttpService& Service); void UnregisterService(const char* Endpoint, HttpService& Service); @@ -63,10 +67,13 @@ private: spdlog::logger& m_RequestLog; spdlog::logger& Log() { return m_Log; } - bool m_IsOk = false; - bool m_IsHttpInitialized = false; - bool m_IsRequestLoggingEnabled = false; - WinIoThreadPool m_ThreadPool; + bool m_IsOk = false; + bool m_IsHttpInitialized = false; + bool m_IsRequestLoggingEnabled = false; + bool m_IsAsyncResponseEnabled = true; + + WinIoThreadPool m_ThreadPool; + WorkerThreadPool m_AsyncWorkPool; std::wstring m_BaseUri; // http://*:nnnn/ HTTP_SERVER_SESSION_ID m_HttpSessionId = 0; diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index 6a7dc8a70..3e6608f11 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -97,6 +97,8 @@ public: void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, std::string_view ResponseString); void WriteResponse(HttpResponseCode ResponseCode, HttpContentType ContentType, IoBuffer Blob); + virtual void WriteResponseAsync(std::function<void(HttpServerRequest&)>&& ContinuationHandler) = 0; + protected: enum { diff --git a/zenhttp/iothreadpool.cpp b/zenhttp/iothreadpool.cpp index 4f1a6642b..6087e69ec 100644 --- a/zenhttp/iothreadpool.cpp +++ b/zenhttp/iothreadpool.cpp @@ -4,6 +4,8 @@ #include <zencore/except.h> +#if ZEN_PLATFORM_WINDOWS + namespace zen { WinIoThreadPool::WinIoThreadPool(int InThreadCount) @@ -32,6 +34,8 @@ WinIoThreadPool::~WinIoThreadPool() void WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, void* Context, std::error_code& ErrorCode) { + ZEN_ASSERT(!m_ThreadPoolIo); + m_ThreadPoolIo = CreateThreadpoolIo(IoHandle, Callback, Context, &m_CallbackEnvironment); if (!m_ThreadPoolIo) @@ -41,3 +45,5 @@ WinIoThreadPool::CreateIocp(HANDLE IoHandle, PTP_WIN32_IO_CALLBACK Callback, voi } } // namespace zen + +#endif diff --git a/zenhttp/iothreadpool.h b/zenhttp/iothreadpool.h index 4418b940b..8333964c3 100644 --- a/zenhttp/iothreadpool.h +++ b/zenhttp/iothreadpool.h @@ -2,9 +2,12 @@ #pragma once -#include <zencore/windows.h> +#include <zencore/zencore.h> -#include <system_error> +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> + +# include <system_error> namespace zen { @@ -31,3 +34,4 @@ private: }; } // namespace zen +#endif diff --git a/zenhttp/workthreadpool.cpp b/zenhttp/workthreadpool.cpp new file mode 100644 index 000000000..41eaaae94 --- /dev/null +++ b/zenhttp/workthreadpool.cpp @@ -0,0 +1,77 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "workthreadpool.h" + +#include <zencore/logging.h> + +namespace zen { + +namespace detail { + struct LambdaWork : IWork + { + LambdaWork(auto Work) : WorkFunction(Work) {} + virtual void Execute() override { WorkFunction(); } + + std::function<void()> WorkFunction; + }; +} // namespace detail + +WorkerThreadPool::WorkerThreadPool(int InThreadCount) +{ + for (int i = 0; i < InThreadCount; ++i) + { + m_WorkerThreads.emplace_back(&WorkerThreadPool::WorkerThreadFunction, this); + } +} + +WorkerThreadPool::~WorkerThreadPool() +{ + m_WorkQueue.CompleteAdding(); + + for (std::thread& Thread : m_WorkerThreads) + { + Thread.join(); + } + + m_WorkerThreads.clear(); +} + +void +WorkerThreadPool::ScheduleWork(Ref<IWork> Work) +{ + m_WorkQueue.Enqueue(std::move(Work)); +} + +void +WorkerThreadPool::ScheduleWork(std::function<void()>&& Work) +{ + m_WorkQueue.Enqueue(new detail::LambdaWork(Work)); +} + +void +WorkerThreadPool::WorkerThreadFunction() +{ + do + { + Ref<IWork> Work; + if (m_WorkQueue.WaitAndDequeue(Work)) + { + try + { + Work->Execute(); + } + catch (std::exception& e) + { + Work->m_Exception = std::current_exception(); + + ZEN_WARN("Caught exception in worker thread: {}", e.what()); + } + } + else + { + return; + } + } while (true); +} + +} // namespace zen
\ No newline at end of file diff --git a/zenhttp/workthreadpool.h b/zenhttp/workthreadpool.h new file mode 100644 index 000000000..6581cc08f --- /dev/null +++ b/zenhttp/workthreadpool.h @@ -0,0 +1,47 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#include <zencore/blockingqueue.h> +#include <zencore/refcount.h> +#include <zencore/windows.h> + +#include <exception> +#include <functional> +#include <system_error> +#include <thread> +#include <vector> + +namespace zen { + +struct IWork : public RefCounted +{ + virtual void Execute() = 0; + + inline std::exception_ptr GetException() { return m_Exception; } + +private: + std::exception_ptr m_Exception; + + friend class WorkerThreadPool; +}; + +class WorkerThreadPool +{ +public: + WorkerThreadPool(int InThreadCount); + ~WorkerThreadPool(); + + void ScheduleWork(Ref<IWork> Work); + void ScheduleWork(std::function<void()>&& Work); + + void WorkerThreadFunction(); + +private: + std::vector<std::thread> m_WorkerThreads; + BlockingQueue<Ref<IWork>> m_WorkQueue; +}; + +} // namespace zen diff --git a/zenhttp/zenhttp.vcxproj b/zenhttp/zenhttp.vcxproj index 899cf4bd1..1fc64bfc2 100644 --- a/zenhttp/zenhttp.vcxproj +++ b/zenhttp/zenhttp.vcxproj @@ -100,6 +100,7 @@ <ClCompile Include="httpsys.cpp" /> <ClCompile Include="httpuws.cpp" /> <ClCompile Include="iothreadpool.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> <ClCompile Include="zenhttp.cpp" /> </ItemGroup> <ItemGroup> @@ -112,6 +113,7 @@ <ClInclude Include="include\zenhttp\httpshared.h" /> <ClInclude Include="include\zenhttp\zenhttp.h" /> <ClInclude Include="iothreadpool.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> diff --git a/zenhttp/zenhttp.vcxproj.filters b/zenhttp/zenhttp.vcxproj.filters index 2e968055c..e57e7a712 100644 --- a/zenhttp/zenhttp.vcxproj.filters +++ b/zenhttp/zenhttp.vcxproj.filters @@ -9,6 +9,7 @@ <ClCompile Include="httpuws.cpp" /> <ClCompile Include="httpshared.cpp" /> <ClCompile Include="zenhttp.cpp" /> + <ClCompile Include="workthreadpool.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="include\zenhttp\httpclient.h" /> @@ -20,6 +21,7 @@ <ClInclude Include="httpuws.h" /> <ClInclude Include="include\zenhttp\httpcommon.h" /> <ClInclude Include="include\zenhttp\httpshared.h" /> + <ClInclude Include="workthreadpool.h" /> </ItemGroup> <ItemGroup> <None Include="xmake.lua" /> diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 0e5e73ffc..fe21aa834 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -1448,16 +1448,21 @@ TEST_CASE("zcache.policy") return Buf; }; - auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto GeneratePackage = [](zen::IoHash& OutRecordKey, zen::IoHash& OutAttachmentKey) -> zen::CbPackage { auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); auto CompressedData = zen::CompressedBuffer::Compress(Data); OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); - zen::CbWriter Obj; - Obj.BeginObject("obj"sv); - Obj.AddBinaryAttachment("data", OutAttachmentKey); - Obj.EndObject(); + + zen::CbWriter Writer; + Writer.BeginObject("obj"sv); + Writer.AddBinaryAttachment("data", OutAttachmentKey); + Writer.EndObject(); + CbObject CacheRecord = Writer.Save().AsObject(); + + OutRecordKey = IoHash::HashBuffer(CacheRecord.GetBuffer().GetView()); + zen::CbPackage Package; - Package.SetObject(Obj.Save().AsObject()); + Package.SetObject(CacheRecord); Package.AddAttachment(zen::CbAttachment(CompressedData)); return Package; @@ -1587,7 +1592,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); zen::IoHash Key; - zen::CbPackage Package = GeneratePackage(Key); + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package upstream @@ -1623,7 +1629,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); zen::IoHash Key; - zen::CbPackage Package = GeneratePackage(Key); + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store packge locally @@ -1659,7 +1666,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); zen::IoHash Key; - zen::CbPackage Package = GeneratePackage(Key); + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package locally and upstream @@ -1692,7 +1700,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); zen::IoHash Key; - zen::CbPackage Package = GeneratePackage(Key); + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package locally @@ -1748,7 +1757,8 @@ TEST_CASE("zcache.policy") LocalCfg.Spawn(LocalInst); zen::IoHash Key; - zen::CbPackage Package = GeneratePackage(Key); + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); auto Buf = ToBuffer(Package); // Store package upstream @@ -1791,6 +1801,80 @@ TEST_CASE("zcache.policy") CHECK(Package.GetAttachments().size() != 0); } } + + SUBCASE("skip - 'data' returns empty cache record/payload") + { + ZenConfig Cfg = ZenConfig::New(); + ZenServerInstance Instance(TestEnv); + const auto Bucket = "test"sv; + + Cfg.Spawn(Instance); + + zen::IoHash Key; + zen::IoHash PayloadId; + zen::CbPackage Package = GeneratePackage(Key, PayloadId); + auto Buf = ToBuffer(Package); + + // Store package + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(Cfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + // Get package + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + CHECK(Result.text.size() == 0); + } + + // Get record + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbobject"}}); + CHECK(Result.status_code == 200); + CHECK(Result.text.size() == 0); + } + + // Get payload + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key, PayloadId)}, + cpr::Header{{"Accept", "application/x-ue-comp"}}); + CHECK(Result.status_code == 200); + CHECK(Result.text.size() == 0); + } + } + + SUBCASE("skip - 'data' returns empty binary value") + { + ZenConfig Cfg = ZenConfig::New(); + ZenServerInstance Instance(TestEnv); + const auto Bucket = "test"sv; + + Cfg.Spawn(Instance); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(Cfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + // Get package + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=data"_format(Cfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + CHECK(Result.text.size() == 0); + } + } } struct RemoteExecutionRequest diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index dc96aecae..4a2a3748a 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -12,6 +12,7 @@ #include <zenhttp/httpserver.h> #include <zenstore/CAS.h> +#include "monitoring/httpstats.h" #include "structuredcache.h" #include "structuredcachestore.h" #include "upstream/jupiter.h" @@ -149,13 +150,19 @@ ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, CasStore& InStore, CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, std::unique_ptr<UpstreamCache> UpstreamCache) : m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) +, m_StatsService(StatsService) +, m_StatusService(StatusService) , m_CasStore(InStore) , m_CidStore(InCidStore) , m_UpstreamCache(std::move(UpstreamCache)) { + StatsService.RegisterHandler("z$", *this); + StatusService.RegisterHandler("z$", *this); } HttpStructuredCacheService::~HttpStructuredCacheService() @@ -200,11 +207,6 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { std::string_view Key = Request.RelativeUri(); - if (Key.empty()) - { - return HandleStatusRequest(Request); - } - if (std::all_of(begin(Key), end(Key), [](const char c) { return std::isalnum(c); })) { // Bucket reference @@ -270,10 +272,6 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, case kHead: case kGet: { - if (Verb == kHead) - { - Request.SetSuppressResponseBody(); - } HandleGetCacheRecord(Request, Ref, Policy); } break; @@ -288,28 +286,104 @@ HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, void HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy) { - const ZenContentType AcceptType = Request.AcceptContentType(); + const ZenContentType AcceptType = Request.AcceptContentType(); + const bool SkipData = (Policy & CachePolicy::SkipData) == CachePolicy::SkipData; + const bool SkipAttachments = (Policy & CachePolicy::SkipAttachments) == CachePolicy::SkipAttachments; + const bool QueryUpstream = m_UpstreamCache && ((Policy & CachePolicy::QueryRemote) == CachePolicy::QueryRemote); - ZenCacheValue Value; - bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); - bool InUpstreamCache = false; + bool Success = false; + ZenCacheValue LocalCacheValue; - const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + if (m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, LocalCacheValue)) + { + Success = true; - if (QueryUpstream) + if (!SkipData && AcceptType == ZenContentType::kCbPackage) + { + CbPackage Package; + CbObjectView CacheRecord(LocalCacheValue.Value.Data()); + uint32_t AttachmentCount = 0; + uint32_t ValidCount = 0; + + if (!SkipAttachments) + { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + ValidCount++; + } + AttachmentCount++; + }); + + if (ValidCount != AttachmentCount) + { + Success = false; + ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + ToString(AcceptType), + ValidCount, + AttachmentCount); + } + } + + Package.SetObject(LoadCompactBinaryObject(LocalCacheValue.Value)); + + BinaryWriter MemStream; + Package.Save(MemStream); + + LocalCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + LocalCacheValue.Value.SetContentType(HttpContentType::kCbPackage); + } + } + + if (Success) { - const ZenContentType CacheRecordType = AcceptType; + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (LOCAL)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(LocalCacheValue.Value.Size()), + ToString(LocalCacheValue.Value.GetContentType())); + + m_CacheStats.HitCount++; + + if (SkipData) + { + return Request.WriteResponse(HttpResponseCode::OK); + } + else + { + return Request.WriteResponse(HttpResponseCode::OK, LocalCacheValue.Value.GetContentType(), LocalCacheValue.Value); + } + } + else if (!QueryUpstream) + { + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + return Request.WriteResponse(HttpResponseCode::NotFound); + } + + // Issue upstream query asynchronously in order to keep requests flowing without + // hogging I/O servicing threads with blocking work + + Request.WriteResponseAsync([this, AcceptType, SkipData, SkipAttachments, Ref](HttpServerRequest& AsyncRequest) { + bool Success = false; + ZenCacheValue UpstreamCacheValue; - if (auto UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, CacheRecordType); + metrics::OperationTiming::Scope $(m_UpstreamGetRequestTiming); + + if (GetUpstreamCacheResult UpstreamResult = m_UpstreamCache->GetCacheRecord({Ref.BucketSegment, Ref.HashKey}, AcceptType); UpstreamResult.Success) { - Value.Value = UpstreamResult.Value; - Success = true; - InUpstreamCache = true; + Success = true; + UpstreamCacheValue.Value = UpstreamResult.Value; + + UpstreamCacheValue.Value.SetContentType(AcceptType); - if (CacheRecordType == ZenContentType::kBinary || CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kBinary || AcceptType == ZenContentType::kCbObject) { - if (CacheRecordType == ZenContentType::kCbObject) + if (AcceptType == ZenContentType::kCbObject) { const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); @@ -322,7 +396,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); - Value.IndexData = IndexData.Save(); + UpstreamCacheValue.IndexData = IndexData.Save(); } else { @@ -336,19 +410,17 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request if (Success) { - m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, Value); + m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, UpstreamCacheValue); } } - else + else if (AcceptType == ZenContentType::kCbPackage) { - ZEN_ASSERT(CacheRecordType == ZenContentType::kCbPackage); - CbPackage Package; - if (Package.TryLoad(UpstreamResult.Value)) + if (Package.TryLoad(UpstreamCacheValue.Value)) { + CbObject CacheRecord = Package.GetObject(); uint32_t AttachmentCount = 0; uint32_t ValidCount = 0; - CbObject CacheRecord = Package.GetObject(); CacheRecord.IterateAttachments([this, &Package, &Ref, &AttachmentCount, &ValidCount](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) @@ -373,7 +445,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); - if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + if (SkipAttachments) { CbPackage PackageWithoutAttachments; PackageWithoutAttachments.SetObject(CacheRecord); @@ -381,7 +453,7 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request BinaryWriter MemStream; PackageWithoutAttachments.Save(MemStream); - Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + UpstreamCacheValue.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); } } else @@ -400,86 +472,34 @@ HttpStructuredCacheService::HandleGetCacheRecord(zen::HttpServerRequest& Request } } } - } - - if (!Success) - { - ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - return Request.WriteResponse(HttpResponseCode::NotFound); - } - - if (AcceptType == ZenContentType::kCbPackage && !InUpstreamCache) - { - CbObjectView CacheRecord(Value.Value.Data()); - - const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); - if (ValidationResult != CbValidateError::None) + if (Success) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); - } - - const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); - uint32_t AttachmentCount = 0; - uint32_t ValidCount = 0; - uint64_t AttachmentBytes = 0ull; - - CbPackage Package; + ZEN_DEBUG("HIT - '{}/{}' {} '{}' (UPSTREAM)", + Ref.BucketSegment, + Ref.HashKey, + NiceBytes(UpstreamCacheValue.Value.Size()), + ToString(UpstreamCacheValue.Value.GetContentType())); - if (!SkipAttachments) - { - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &ValidCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - ValidCount++; - } - AttachmentCount++; - }); + m_CacheStats.HitCount++; + m_CacheStats.UpstreamHitCount++; - if (ValidCount != AttachmentCount) + if (SkipData) { - ZEN_WARN("GET - '{}/{}' '{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - ToString(AcceptType), - ValidCount, - AttachmentCount); - - return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + AsyncRequest.WriteResponse(HttpResponseCode::OK); + } + else + { + AsyncRequest.WriteResponse(HttpResponseCode::OK, UpstreamCacheValue.Value.GetContentType(), UpstreamCacheValue.Value); } } - - Package.SetObject(LoadCompactBinaryObject(Value.Value)); - - ZEN_DEBUG("HIT - '{}/{}' {} '{}', {} attachments (LOCAL)", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(AttachmentBytes + Value.Value.Size()), - ToString(HttpContentType::kCbPackage), - AttachmentCount); - - BinaryWriter MemStream; - Package.Save(MemStream); - - IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); - } - else - { - ZEN_DEBUG("HIT - '{}/{}' {} '{}' ({})", - Ref.BucketSegment, - Ref.HashKey, - NiceBytes(Value.Value.Size()), - ToString(Value.Value.GetContentType()), - InUpstreamCache ? "UPSTREAM" : "LOCAL"); - - Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); - } + else + { + ZEN_DEBUG("MISS - '{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, ToString(AcceptType)); + m_CacheStats.MissCount++; + AsyncRequest.WriteResponse(HttpResponseCode::NotFound); + } + }); } void @@ -668,10 +688,6 @@ HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request case kGet: { HandleGetCachePayload(Request, Ref, Policy); - if (Verb == kHead) - { - Request.SetSuppressResponseBody(); - } } break; case kPut: @@ -712,7 +728,8 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques if (!Payload) { - ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); + ZEN_DEBUG("MISS - '{}/{}/{}' '{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId, ToString(Request.AcceptContentType())); + m_CacheStats.MissCount++; return Request.WriteResponse(HttpResponseCode::NotFound); } @@ -724,7 +741,20 @@ HttpStructuredCacheService::HandleGetCachePayload(zen::HttpServerRequest& Reques ToString(Payload.GetContentType()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); - Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + m_CacheStats.HitCount++; + if (InUpstreamCache) + { + m_CacheStats.UpstreamHitCount++; + } + + if ((Policy & CachePolicy::SkipData) == CachePolicy::SkipData) + { + Request.WriteResponse(HttpResponseCode::OK); + } + else + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); + } } void @@ -839,12 +869,25 @@ HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& } void -HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +HttpStructuredCacheService::HandleStatsRequest(zen::HttpServerRequest& Request) { CbObjectWriter Cbo; - Cbo << "ok" << true; EmitSnapshot("requests", m_HttpRequests, Cbo); + EmitSnapshot("upstream_gets", m_UpstreamGetRequestTiming, Cbo); + + const uint64_t HitCount = m_CacheStats.HitCount; + const uint64_t UpstreamHitCount = m_CacheStats.UpstreamHitCount; + const uint64_t MissCount = m_CacheStats.MissCount; + const uint64_t TotalCount = HitCount + MissCount; + + Cbo.BeginObject("cache"); + Cbo << "hits" << HitCount << "misses" << MissCount; + Cbo << "hit_ratio" << (TotalCount > 0 ? (double(HitCount) / double(TotalCount) * 100.0) : 0.0); + Cbo << "upstream_hits" << m_CacheStats.UpstreamHitCount; + Cbo << "upstream_ratio" << (HitCount > 0 ? (double(UpstreamHitCount) / double(HitCount)) * 100.0 : 0.0); + Cbo.EndObject(); + if (m_UpstreamCache) { Cbo.BeginObject("upstream"); @@ -855,4 +898,12 @@ HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } +void +HttpStructuredCacheService::HandleStatusRequest(zen::HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + } // namespace zen diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 47fc173e9..ad7253f79 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -5,6 +5,9 @@ #include <zencore/stats.h> #include <zenhttp/httpserver.h> +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" + #include <memory> namespace spdlog { @@ -27,12 +30,12 @@ enum class CachePolicy : uint8_t; * * {BucketId}/{KeyHash} * - * Where BucketId is an alphanumeric string, and KeyHash is a 40-character hexadecimal - * sequence. The hash value may be derived in any number of ways, it's up to the - * application to pick an approach. + * Where BucketId is a lower-case alphanumeric string, and KeyHash is a 40-character + * hexadecimal sequence. The hash value may be derived in any number of ways, it's + * up to the application to pick an approach. * * Values may be structured or unstructured. Structured values are encoded using Unreal - * Engine's compact binary encoding + * Engine's compact binary encoding (see CbObject) * * Additionally, attachments may be addressed as: * @@ -47,18 +50,19 @@ enum class CachePolicy : uint8_t; * */ -class HttpStructuredCacheService : public zen::HttpService +class HttpStructuredCacheService : public HttpService, public IHttpStatsProvider, public IHttpStatusProvider { public: HttpStructuredCacheService(ZenCacheStore& InCacheStore, - zen::CasStore& InCasStore, - zen::CidStore& InCidStore, + CasStore& InCasStore, + CidStore& InCidStore, + HttpStatsService& StatsService, + HttpStatusService& StatusService, std::unique_ptr<UpstreamCache> UpstreamCache); ~HttpStructuredCacheService(); virtual const char* BaseUri() const override; - - virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); void Scrub(ScrubContext& Ctx); @@ -71,6 +75,13 @@ private: IoHash PayloadId; }; + struct CacheStats + { + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t UpstreamHitCount{}; + std::atomic_uint64_t MissCount{}; + }; + [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); void HandleCacheRecordRequest(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleGetCacheRecord(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); @@ -79,16 +90,21 @@ private: void HandleGetCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandlePutCachePayload(zen::HttpServerRequest& Request, const CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); - void HandleStatusRequest(zen::HttpServerRequest& Request); + virtual void HandleStatsRequest(zen::HttpServerRequest& Request) override; + virtual void HandleStatusRequest(zen::HttpServerRequest& Request) override; spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - zen::ZenCacheStore& m_CacheStore; - zen::CasStore& m_CasStore; - zen::CidStore& m_CidStore; + ZenCacheStore& m_CacheStore; + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + CasStore& m_CasStore; + CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; uint64_t m_LastScrubTime = 0; metrics::OperationTiming m_HttpRequests; + metrics::OperationTiming m_UpstreamGetRequestTiming; + CacheStats m_CacheStats; }; } // namespace zen diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 5e93ebaa9..580446473 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -32,6 +32,8 @@ ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir { ZEN_INFO("initializing structured cache at '{}'", RootDir); CreateDirectories(RootDir); + + m_DiskLayer.DiscoverBuckets(); } ZenCacheStore::~ZenCacheStore() @@ -116,6 +118,13 @@ ZenCacheStore::Scrub(ScrubContext& Ctx) m_DiskLayer.Scrub(Ctx); m_MemLayer.Scrub(Ctx); } + +void +ZenCacheStore::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -142,6 +151,10 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCa _.ReleaseNow(); + // There's a race here. Since the lock is released early to allow + // inserts, the bucket delete path could end up deleting the + // underlying data structure + return Bucket->Get(HashKey, OutValue); } @@ -195,13 +208,21 @@ ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) } void +ZenCacheMemoryLayer::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + +void ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) { + RwLock::SharedLockScope _(m_bucketLock); + std::vector<IoHash> BadHashes; for (auto& Kv : m_cacheMap) { - if (Kv.first != IoHash::HashBuffer(Kv.second)) + if (Kv.first != IoHash::HashBuffer(Kv.second.Payload)) { BadHashes.push_back(Kv.first); } @@ -209,10 +230,16 @@ ZenCacheMemoryLayer::CacheBucket::Scrub(ScrubContext& Ctx) if (!BadHashes.empty()) { - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); } } +void +ZenCacheMemoryLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) +{ + ZEN_UNUSED(GcCtx); +} + bool ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { @@ -224,18 +251,26 @@ ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutV } else { - OutValue.Value = bucketIt->second; + BucketValue& Value = bucketIt.value(); + OutValue.Value = Value.Payload; + Value.LastAccess = GetCurrentTimeStamp(); return true; } } +uint64_t +ZenCacheMemoryLayer::CacheBucket::GetCurrentTimeStamp() +{ + return GetLofreqTimerValue(); +} + void ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); - m_cacheMap[HashKey] = Value.Value; + m_cacheMap.insert_or_assign(HashKey, BucketValue{.LastAccess = GetCurrentTimeStamp(), .Payload = Value.Value}); } ////////////////////////////////////////////////////////////////////////// @@ -245,11 +280,17 @@ ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue struct DiskLocation { - uint64_t OffsetAndFlags; - uint32_t Size; - uint32_t IndexDataSize; + inline DiskLocation() = default; + + inline DiskLocation(uint64_t Offset, uint64_t ValueSize, uint32_t IndexSize, uint64_t Flags) + : OffsetAndFlags(CombineOffsetAndFlags(Offset, Flags)) + , LowerSize(ValueSize & 0xFFFFffff) + , IndexDataSize(IndexSize) + { + } - static const uint64_t kOffsetMask = 0x00FF'ffFF'ffFF'ffFFull; + static const uint64_t kOffsetMask = 0x0000'ffFF'ffFF'ffFFull; + static const uint64_t kSizeMask = 0x00FF'0000'0000'0000ull; static const uint64_t kFlagsMask = 0xff00'0000'0000'0000ull; static const uint64_t kStandaloneFile = 0x8000'0000'0000'0000ull; static const uint64_t kStructured = 0x4000'0000'0000'0000ull; @@ -257,6 +298,7 @@ struct DiskLocation static uint64_t CombineOffsetAndFlags(uint64_t Offset, uint64_t Flags) { return Offset | Flags; } inline uint64_t Offset() const { return OffsetAndFlags & kOffsetMask; } + inline uint64_t Size() const { return LowerSize; } inline uint64_t IsFlagSet(uint64_t Flag) const { return OffsetAndFlags & Flag; } inline ZenContentType GetContentType() const { @@ -269,6 +311,11 @@ struct DiskLocation return ContentType; } + +private: + uint64_t OffsetAndFlags = 0; + uint32_t LowerSize = 0; + uint32_t IndexDataSize = 0; }; struct DiskIndexEntry @@ -286,7 +333,7 @@ struct ZenCacheDiskLayer::CacheBucket CacheBucket(CasStore& Cas); ~CacheBucket(); - void OpenOrCreate(std::filesystem::path BucketDir); + void OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate = true); static bool Delete(std::filesystem::path BucketDir); bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); @@ -294,14 +341,15 @@ struct ZenCacheDiskLayer::CacheBucket void Drop(); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); - inline bool IsOk() const { return m_Ok; } + inline bool IsOk() const { return m_IsOk; } private: CasStore& m_CasStore; std::filesystem::path m_BucketDir; Oid m_BucketId; - bool m_Ok = false; + bool m_IsOk = false; uint64_t m_LargeObjectThreshold = 64 * 1024; // These files are used to manage storage of small objects for this bucket @@ -314,9 +362,19 @@ private: uint64_t m_WriteCursor = 0; void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); - void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); - bool GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc); + void PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value); + bool GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue); bool GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue); + + // These locks are here to avoid contention on file creation, therefore it's sufficient + // that we take the same lock for the same hash + // + // These locks are small and should really be spaced out so they don't share cache lines, + // but we don't currently access them at particularly high frequency so it should not be + // an issue in practice + + RwLock m_ShardedLocks[256]; + inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardedLocks[Hash.Hash[19]]; } }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) @@ -341,7 +399,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) +ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bool AllowCreate) { CreateDirectories(BucketDir); @@ -368,17 +426,23 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { ManifestFile.Read(&m_BucketId, sizeof(Oid), 0); - m_Ok = true; + m_IsOk = true; } - if (!m_Ok) + if (!m_IsOk) { ManifestFile.Close(); } } - if (!m_Ok) + if (!m_IsOk) { + if (AllowCreate == false) + { + // Invalid bucket + return; + } + // No manifest file found, this is a new bucket ManifestFile.Open(ManifestPath, /* IsCreate */ true, Ec); @@ -410,13 +474,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; - MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size); + MaxFileOffset = std::max<uint64_t>(MaxFileOffset, Record.Location.Offset() + Record.Location.Size()); }); m_WriteCursor = (MaxFileOffset + 15) & ~15; } - m_Ok = true; + m_IsOk = true; } void @@ -437,23 +501,25 @@ ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoH bool ZenCacheDiskLayer::CacheBucket::GetInlineCacheValue(const DiskLocation& Loc, ZenCacheValue& OutValue) { - if (!Loc.IsFlagSet(DiskLocation::kStandaloneFile)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size); - OutValue.Value.SetContentType(Loc.GetContentType()); - - return true; + return false; } - return false; + OutValue.Value = IoBufferBuilder::MakeFromFileHandle(m_SobsFile.Handle(), Loc.Offset(), Loc.Size()); + OutValue.Value.SetContentType(Loc.GetContentType()); + + return true; } bool -ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, ZenCacheValue& OutValue, const DiskLocation& Loc) +ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const DiskLocation& Loc, const IoHash& HashKey, ZenCacheValue& OutValue) { WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); + RwLock::SharedLockScope ValueLock(LockForHash(HashKey)); + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) { OutValue.Value = Data; @@ -468,7 +534,7 @@ ZenCacheDiskLayer::CacheBucket::GetStandaloneCacheValue(const IoHash& HashKey, Z bool ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { - if (!m_Ok) + if (!m_IsOk) { return false; } @@ -486,7 +552,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal _.ReleaseNow(); - return GetStandaloneCacheValue(HashKey, OutValue, Loc); + return GetStandaloneCacheValue(Loc, HashKey, OutValue); } return false; @@ -495,14 +561,14 @@ ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutVal void ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { - if (!m_Ok) + if (!m_IsOk) { return; } if (Value.Value.Size() >= m_LargeObjectThreshold) { - return PutLargeObject(HashKey, Value); + return PutStandaloneCacheValue(HashKey, Value); } else { @@ -517,10 +583,9 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), - .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; + DiskLocation Loc(m_WriteCursor, Value.Value.Size(), 0, EntryFlags); - m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size(), 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -530,11 +595,13 @@ ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& else { // TODO: should check if write is idempotent and bail out if it is? + // this would requiring comparing contents on disk unless we add a + // content hash to the index entry it.value() = Loc; } m_SlogFile.Append({.Key = HashKey, .Location = Loc}); - m_SobsFile.Write(Value.Value.Data(), Loc.Size, Loc.Offset()); + m_SobsFile.Write(Value.Value.Data(), Loc.Size(), Loc.Offset()); } } @@ -558,61 +625,69 @@ ZenCacheDiskLayer::CacheBucket::Flush() void ZenCacheDiskLayer::CacheBucket::Scrub(ScrubContext& Ctx) { - std::vector<DiskIndexEntry> StandaloneFiles; + std::atomic<uint64_t> ScrubbedChunks{0}, ScrubbedBytes{0}; - std::vector<IoHash> BadChunks; - std::vector<IoBuffer> BadStandaloneChunks; + std::vector<IoHash> BadChunks; { RwLock::SharedLockScope _(m_IndexLock); for (auto& Kv : m_Index) { - const IoHash& Hash = Kv.first; - const DiskLocation& Loc = Kv.second; + const IoHash& HashKey = Kv.first; + const DiskLocation& Loc = Kv.second; ZenCacheValue Value; - if (!GetInlineCacheValue(Loc, Value)) + if (GetInlineCacheValue(Loc, Value)) { - ZEN_ASSERT(Loc.IsFlagSet(DiskLocation::kStandaloneFile)); - StandaloneFiles.push_back({.Key = Hash, .Location = Loc}); + // Validate contents } else { - if (GetStandaloneCacheValue(Hash, Value, Loc)) + if (Loc.IsFlagSet(DiskLocation::kStandaloneFile)) { - // Hash contents - - const IoHash ComputedHash = HashBuffer(Value.Value); - - if (ComputedHash != Hash) + if (GetStandaloneCacheValue(Loc, HashKey, Value)) { - BadChunks.push_back(Hash); + // Note: we cannot currently validate contents since we don't + // have a content hash! + } + else + { + // Value not found + BadChunks.push_back(HashKey); } - } - else - { - // Non-existent } } } } - if (Ctx.RunRecovery()) + Ctx.ReportScrubbed(ScrubbedChunks, ScrubbedBytes); + + if (BadChunks.empty()) { - // Clean out bad chunks + return; } - if (!BadChunks.empty()) + Ctx.ReportBadCasChunks(BadChunks); + + if (Ctx.RunRecovery()) { - Ctx.ReportBadChunks(BadChunks); + // Clean out bad data } } void -ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::CacheBucket::GarbageCollect(GcContext& GcCtx) { + ZEN_UNUSED(GcCtx); +} + +void +ZenCacheDiskLayer::CacheBucket::PutStandaloneCacheValue(const IoHash& HashKey, const ZenCacheValue& Value) +{ + RwLock::ExclusiveLockScope ValueLock(LockForHash(HashKey)); + WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); @@ -661,7 +736,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenC RwLock::ExclusiveLockScope _(m_IndexLock); - DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; + DiskLocation Loc(/* Offset */ 0, Value.Value.Size(), 0, EntryFlags); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -719,7 +794,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCach std::filesystem::path BucketPath = m_RootDir; BucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(BucketPath.c_str()); + Bucket->OpenOrCreate(BucketPath); } } @@ -762,7 +837,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z std::filesystem::path bucketPath = m_RootDir; bucketPath /= std::string(InBucket); - Bucket->OpenOrCreate(bucketPath.c_str()); + Bucket->OpenOrCreate(bucketPath); } } @@ -774,6 +849,63 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const Z } } +void +ZenCacheDiskLayer::DiscoverBuckets() +{ + FileSystemTraversal Traversal; + struct Visitor : public FileSystemTraversal::TreeVisitor + { + virtual void VisitFile([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const path_view& File, + [[maybe_unused]] uint64_t FileSize) override + { + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, const path_view& DirectoryName) override + { + Dirs.push_back(std::wstring(DirectoryName)); + return false; + } + + std::vector<std::wstring> Dirs; + } Visit; + + Traversal.TraverseFileSystem(m_RootDir, Visit); + + // Initialize buckets + + RwLock::ExclusiveLockScope _(m_Lock); + + for (const std::wstring& BucketName : Visit.Dirs) + { + // New bucket needs to be created + + std::string BucketName8 = WideToUtf8(BucketName); + + if (auto It = m_Buckets.find(BucketName8); It != m_Buckets.end()) + { + } + else + { + auto InsertResult = m_Buckets.try_emplace(BucketName8, m_CasStore); + + std::filesystem::path BucketPath = m_RootDir; + BucketPath /= BucketName8; + + CacheBucket& Bucket = InsertResult.first->second; + + Bucket.OpenOrCreate(BucketPath, /* AllowCreate */ false); + + if (!Bucket.IsOk()) + { + ZEN_WARN("Found directory '{}' in our base directory '{}' but it is not a valid bucket", BucketName8, m_RootDir); + + m_Buckets.erase(InsertResult.first); + } + } + } +} + bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { @@ -830,27 +962,10 @@ ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) } } -////////////////////////////////////////////////////////////////////////// - -ZenCacheTracker::ZenCacheTracker(ZenCacheStore& CacheStore) -{ - ZEN_UNUSED(CacheStore); -} - -ZenCacheTracker::~ZenCacheTracker() -{ -} - -void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) -{ - ZEN_UNUSED(Bucket); - ZEN_UNUSED(HashKey); -} - void -ZenCacheTracker::Flush() +ZenCacheDiskLayer::GarbageCollect(GcContext& GcCtx) { + ZEN_UNUSED(GcCtx); } } // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index f96757409..4753af627 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -46,6 +46,11 @@ struct ZenCacheValue CbObject IndexData; }; +/** In-memory cache storage + + Intended for small values which are frequently accessed + + */ class ZenCacheMemoryLayer { public: @@ -56,20 +61,41 @@ public: void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + struct Configuration + { + uint64_t TargetFootprintBytes = 16 * 1024 * 1024; + uint64_t ScavengeThreshold = 4 * 1024 * 1024; + }; + + const Configuration& GetConfiguration() const { return m_Configuration; } + void SetConfiguration(const Configuration& NewConfig) { m_Configuration = NewConfig; } private: struct CacheBucket { - RwLock m_bucketLock; - tsl::robin_map<IoHash, IoBuffer> m_cacheMap; + struct BucketValue + { + uint64_t LastAccess = 0; + IoBuffer Payload; + }; + + RwLock m_bucketLock; + tsl::robin_map<IoHash, BucketValue> m_cacheMap; bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + private: + uint64_t GetCurrentTimeStamp(); }; RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; + Configuration m_Configuration; }; class ZenCacheDiskLayer @@ -83,6 +109,9 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); + + void DiscoverBuckets(); private: /** A cache bucket manages a single directory containing @@ -107,6 +136,7 @@ public: bool DropBucket(std::string_view Bucket); void Flush(); void Scrub(ScrubContext& Ctx); + void GarbageCollect(GcContext& GcCtx); private: std::filesystem::path m_RootDir; @@ -116,18 +146,4 @@ private: uint64_t m_LastScrubTime = 0; }; -/** Tracks cache entry access, stats and orchestrates cleanup activities - */ -class ZenCacheTracker -{ -public: - ZenCacheTracker(ZenCacheStore& CacheStore); - ~ZenCacheTracker(); - - void TrackAccess(std::string_view Bucket, const IoHash& HashKey); - void Flush(); - -private: -}; - } // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 254032226..df3259542 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -90,6 +90,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_options()("t, test", "Enable test mode", cxxopts::value<bool>(GlobalOptions.IsTest)->default_value("false")); options.add_options()("log-id", "Specify id for adding context to log output", cxxopts::value<std::string>(GlobalOptions.LogId)); options.add_options()("data-dir", "Specify persistence root", cxxopts::value<std::filesystem::path>(GlobalOptions.DataDir)); + options.add_options()("content-dir", "Frontend content directory", cxxopts::value<std::filesystem::path>(GlobalOptions.ContentDir)); + options.add_options()("abslog", "Path to log file", cxxopts::value<std::filesystem::path>(GlobalOptions.AbsLogFile)); options .add_option("lifetime", "", "owner-pid", "Specify owning process id", cxxopts::value<int>(GlobalOptions.OwnerPid), "<identifier>"); @@ -212,8 +214,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", "upstream-zen-url", - "URL to a remote Zen server instance", - cxxopts::value<std::string>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Url)->default_value(""), + "URL to remote Zen server. Use a comma separated list to choose the one with the best latency.", + cxxopts::value<std::vector<std::string>>(ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls)->default_value(""), ""); options.add_option("cache", @@ -227,7 +229,7 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z "", "upstream-stats", "Collect performance metrics for upstream endpoints", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("true"), + cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.StatsEnabled)->default_value("false"), ""); try @@ -366,9 +368,17 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv if (auto ZenConfig = UpstreamConfig->get<sol::optional<sol::table>>("zen")) { - UpdateStringValueFromConfig(ZenConfig.value(), - std::string_view("url"), - ServiceConfig.UpstreamCacheConfig.ZenConfig.Url); + if (auto Url = ZenConfig.value().get<sol::optional<std::string>>("url")) + { + ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Url.value()); + } + else if (auto Urls = ZenConfig.value().get<sol::optional<sol::table>>("url")) + { + for (const auto& Kv : Urls.value()) + { + ServiceConfig.UpstreamCacheConfig.ZenConfig.Urls.push_back(Kv.second.as<std::string>()); + } + } } } } diff --git a/zenserver/config.h b/zenserver/config.h index 75c19d690..405e22739 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -17,6 +17,8 @@ struct ZenServerOptions bool UninstallService = false; // Flag used to initiate service uninstall (temporary) std::string LogId; // Id for tagging log output std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) + std::filesystem::path AbsLogFile; }; struct ZenUpstreamJupiterConfig @@ -34,7 +36,7 @@ struct ZenUpstreamJupiterConfig struct ZenUpstreamZenConfig { - std::string Url; + std::vector<std::string> Urls; }; enum class UpstreamCachePolicy : uint8_t diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index bc7b883b5..7a7773cba 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -196,7 +196,8 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) EnableVTMode(); - std::filesystem::path LogPath = GlobalOptions.DataDir / "logs/zenserver.log"; + std::filesystem::path LogPath = + !GlobalOptions.AbsLogFile.empty() ? GlobalOptions.AbsLogFile : GlobalOptions.DataDir / "logs/zenserver.log"; bool IsAsync = true; spdlog::level::level_enum LogLevel = spdlog::level::info; @@ -250,7 +251,7 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) Sinks.push_back(FileSink); #if ZEN_PLATFORM_WINDOWS - if (zen::IsDebuggerPresent()) + if (zen::IsDebuggerPresent() && GlobalOptions.IsDebug) { auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>(); DebugSink->set_level(spdlog::level::debug); diff --git a/zenserver/experimental/frontend.cpp b/zenserver/experimental/frontend.cpp new file mode 100644 index 000000000..98d570cfe --- /dev/null +++ b/zenserver/experimental/frontend.cpp @@ -0,0 +1,119 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "frontend.h" + +#include <zencore/filesystem.h> +#include <zencore/string.h> + +namespace zen { + +namespace html { + + constexpr std::string_view Index = R"( +<!DOCTYPE html> +<html> +<head> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" integrity="sha384-F3w7mX95PdgyTmZZMECAngseQB83DfGTowi0iMjiWaeVhAn4FJkqJByhZMI3AhiU" crossorigin="anonymous"> +<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js" integrity="sha384-skAcpIdS7UcVUC05LJ9Dxay8AXcDYfBJqt1CJ85S/CFujBsIzCIv+l9liuYLaMQ/" crossorigin="anonymous"></script> +<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/font/bootstrap-icons.css"> +<style type="text/css"> +body { + background-color: #fafafa; +} +</style> +<script type="text/javascript"> + const getCacheStats = () => { + const opts = { headers: { "Accept": "application/json" } }; + fetch("/stats/z$", opts) + .then(response => { + if (!response.ok) { + throw Error(response.statusText); + } + return response.json(); + }) + .then(json => { + document.getElementById("status").innerHTML = "connected" + document.getElementById("stats").innerHTML = JSON.stringify(json, null, 4); + }) + .catch(error => { + document.getElementById("status").innerHTML = "disconnected" + document.getElementById("stats").innerHTML = "" + console.log(error); + }) + .finally(() => { + window.setTimeout(getCacheStats, 1000); + }); + }; + getCacheStats(); +</script> +</head> +<body> + <div class="container"> + <div class="row"> + <div class="text-center mt-5"> + <pre> +__________ _________ __ +\____ / ____ ____ / _____/_/ |_ ____ _______ ____ + / / _/ __ \ / \ \_____ \ \ __\ / _ \ \_ __ \_/ __ \ + / /_ \ ___/ | | \ / \ | | ( <_> ) | | \/\ ___/ +/_______ \ \___ >|___| //_______ / |__| \____/ |__| \___ > + \/ \/ \/ \/ \/ + </pre> + <pre id="status"/> + </div> + </div> + <div class="row"> + <pre class="mb-0">Z$:</pre> + <pre id="stats"></pre> + <div> + </div> +</body> +</html> +)"; + +} // namespace html + +HttpFrontendService::HttpFrontendService(std::filesystem::path Directory) : m_Directory(Directory) +{ +} + +HttpFrontendService::~HttpFrontendService() +{ +} + +const char* +HttpFrontendService::BaseUri() const +{ + return "/dashboard"; // in order to use the root path we need to remove HttpAddUrlToUrlGroup in HttpSys.cpp +} + +void +HttpFrontendService::HandleRequest(zen::HttpServerRequest& Request) +{ + using namespace std::literals; + + if (m_Directory.empty()) + { + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, html::Index); + } + else + { + std::string_view Uri = Request.RelativeUri(); + std::filesystem::path RelPath{Uri.empty() ? "index.html" : Uri}; + std::filesystem::path AbsPath = m_Directory / RelPath; + + FileContents File = ReadFile(AbsPath); + + if (!File.ErrorCode) + { + // TODO: Map file extension to MIME type + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kHTML, File.Data[0]); + } + else + { + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Ooops!"sv); + } + } +} + +} // namespace zen diff --git a/zenserver/experimental/frontend.h b/zenserver/experimental/frontend.h new file mode 100644 index 000000000..2ae20e940 --- /dev/null +++ b/zenserver/experimental/frontend.h @@ -0,0 +1,24 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenhttp/httpserver.h> + +#include <filesystem> + +namespace zen { + +class HttpFrontendService final : public zen::HttpService +{ +public: + HttpFrontendService(std::filesystem::path Directory); + virtual ~HttpFrontendService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(zen::HttpServerRequest& Request) override; + +private: + std::filesystem::path m_Directory; +}; + +} // namespace zen diff --git a/zenserver/monitoring/httpstats.cpp b/zenserver/monitoring/httpstats.cpp new file mode 100644 index 000000000..de04294d0 --- /dev/null +++ b/zenserver/monitoring/httpstats.cpp @@ -0,0 +1,50 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpstats.h" + +namespace zen { + +HttpStatsService::HttpStatsService() : m_Log(logging::Get("stats")) +{ +} + +HttpStatsService::~HttpStatsService() +{ +} + +const char* +HttpStatsService::BaseUri() const +{ + return "/stats/"; +} + +void +HttpStatsService::RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Providers.insert_or_assign(std::string(Id), &Provider); +} + +void +HttpStatsService::HandleRequest(HttpServerRequest& Request) +{ + using namespace std::literals; + + std::string_view Key = Request.RelativeUri(); + + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) + { + return It->second->HandleStatsRequest(Request); + } + + [[fallthrough]]; + default: + return; + } +} + +} // namespace zen diff --git a/zenserver/monitoring/httpstats.h b/zenserver/monitoring/httpstats.h new file mode 100644 index 000000000..1c3c79dd0 --- /dev/null +++ b/zenserver/monitoring/httpstats.h @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zenhttp/httpserver.h> + +#include <map> + +namespace zen { + +struct IHttpStatsProvider +{ + virtual void HandleStatsRequest(HttpServerRequest& Request) = 0; +}; + +class HttpStatsService : public HttpService +{ +public: + HttpStatsService(); + ~HttpStatsService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + void RegisterHandler(std::string_view Id, IHttpStatsProvider& Provider); + +private: + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + + inline spdlog::logger& Log() { return m_Log; } + + RwLock m_Lock; + std::map<std::string, IHttpStatsProvider*> m_Providers; +}; + +} // namespace zen
\ No newline at end of file diff --git a/zenserver/monitoring/httpstatus.cpp b/zenserver/monitoring/httpstatus.cpp new file mode 100644 index 000000000..e12662b1c --- /dev/null +++ b/zenserver/monitoring/httpstatus.cpp @@ -0,0 +1,50 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "httpstatus.h" + +namespace zen { + +HttpStatusService::HttpStatusService() : m_Log(logging::Get("status")) +{ +} + +HttpStatusService::~HttpStatusService() +{ +} + +const char* +HttpStatusService::BaseUri() const +{ + return "/status/"; +} + +void +HttpStatusService::RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider) +{ + RwLock::ExclusiveLockScope _(m_Lock); + m_Providers.insert_or_assign(std::string(Id), &Provider); +} + +void +HttpStatusService::HandleRequest(HttpServerRequest& Request) +{ + using namespace std::literals; + + std::string_view Key = Request.RelativeUri(); + + switch (Request.RequestVerb()) + { + case HttpVerb::kHead: + case HttpVerb::kGet: + if (auto It = m_Providers.find(std::string{Key}); It != end(m_Providers)) + { + return It->second->HandleStatusRequest(Request); + } + + [[fallthrough]]; + default: + return; + } +} + +} // namespace zen diff --git a/zenserver/monitoring/httpstatus.h b/zenserver/monitoring/httpstatus.h new file mode 100644 index 000000000..8f069f760 --- /dev/null +++ b/zenserver/monitoring/httpstatus.h @@ -0,0 +1,37 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/logging.h> +#include <zenhttp/httpserver.h> + +#include <map> + +namespace zen { + +struct IHttpStatusProvider +{ + virtual void HandleStatusRequest(HttpServerRequest& Request) = 0; +}; + +class HttpStatusService : public HttpService +{ +public: + HttpStatusService(); + ~HttpStatusService(); + + virtual const char* BaseUri() const override; + virtual void HandleRequest(HttpServerRequest& Request) override; + void RegisterHandler(std::string_view Id, IHttpStatusProvider& Provider); + +private: + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + + RwLock m_Lock; + std::map<std::string, IHttpStatusProvider*> m_Providers; + + inline spdlog::logger& Log() { return m_Log; } +}; + +} // namespace zen
\ No newline at end of file diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 7870f9559..5c4983472 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -1200,11 +1200,6 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) return HttpReq.WriteResponse(HttpResponseCode::NotFound); } - if (Verb == HttpVerb::kHead) - { - HttpReq.SetSuppressResponseBody(); - } - if (IsOffset) { if (Offset > Value.Size()) @@ -1425,7 +1420,12 @@ HttpProjectService::HttpProjectService(CasStore& Store, ProjectStore* Projects) if (!legacy::TryLoadCbPackage(Package, Payload, &UniqueBuffer::Alloc, &Resolver)) { - ZEN_ERROR("Received malformed package!"); + std::filesystem::path BadPackagePath = + Oplog.TempPath() / "bad_packages" / "session{}_request{}"_format(HttpReq.SessionId(), HttpReq.RequestId()); + + ZEN_ERROR("Received malformed package! Saving payload to '{}'", BadPackagePath); + + zen::WriteFile(BadPackagePath, Payload); return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid package"); } diff --git a/zenserver/testing/httptest.cpp b/zenserver/testing/httptest.cpp index 01866a63b..924546762 100644 --- a/zenserver/testing/httptest.cpp +++ b/zenserver/testing/httptest.cpp @@ -4,9 +4,12 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> +#include <zencore/timer.h> namespace zen { +using namespace fmt::literals; + HttpTestingService::HttpTestingService() { m_Router.RegisterRoute( @@ -15,6 +18,44 @@ HttpTestingService::HttpTestingService() HttpVerb::kGet); m_Router.RegisterRoute( + "hello_slow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(1000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_veryslow", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest& Request) { + Stopwatch Timer; + Sleep(60000); + Request.WriteResponse(HttpResponseCode::OK, + HttpContentType::kText, + "hello, took me {}"_format(NiceTimeSpanMs(Timer.GetElapsedTimeMs()))); + }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_throw", + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) { throw std::runtime_error("intentional error"); }); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "hello_noresponse", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponseAsync([this](HttpServerRequest&) {}); }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "metrics", [this](HttpRouterRequest& Req) { metrics::OperationTiming::Scope _(m_TimingStats); diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 9573a1631..1de417008 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -47,9 +47,9 @@ struct CloudCacheAccessToken struct CloudCacheResult { IoBuffer Response; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - int32_t ErrorCode = {}; + int64_t Bytes{}; + double ElapsedSeconds{}; + int32_t ErrorCode{}; std::string Reason; bool Success = false; }; diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 03054b542..5b2629f72 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -4,6 +4,7 @@ #include "jupiter.h" #include "zen.h" +#include <zencore/blockingqueue.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -23,7 +24,6 @@ #include <algorithm> #include <atomic> -#include <deque> #include <thread> #include <unordered_map> @@ -33,70 +33,6 @@ using namespace std::literals; namespace detail { - template<typename T> - class BlockingQueue - { - public: - BlockingQueue() = default; - - ~BlockingQueue() { CompleteAdding(); } - - void Enqueue(T&& Item) - { - { - std::lock_guard Lock(m_Lock); - m_Queue.emplace_back(std::move(Item)); - m_Size++; - } - - m_NewItemSignal.notify_one(); - } - - bool WaitAndDequeue(T& Item) - { - if (m_CompleteAdding.load()) - { - return false; - } - - std::unique_lock Lock(m_Lock); - m_NewItemSignal.wait(Lock, [this]() { return !m_Queue.empty() || m_CompleteAdding.load(); }); - - if (!m_Queue.empty()) - { - Item = std::move(m_Queue.front()); - m_Queue.pop_front(); - m_Size--; - - return true; - } - - return false; - } - - void CompleteAdding() - { - if (!m_CompleteAdding.load()) - { - m_CompleteAdding.store(true); - m_NewItemSignal.notify_all(); - } - } - - std::size_t Size() const - { - std::unique_lock Lock(m_Lock); - return m_Queue.size(); - } - - private: - mutable std::mutex m_Lock; - std::condition_variable m_NewItemSignal; - std::deque<T> m_Queue; - std::atomic_bool m_CompleteAdding{false}; - std::atomic_uint32_t m_Size; - }; - class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: @@ -105,12 +41,14 @@ namespace detail { , m_UseLegacyDdc(Options.UseLegacyDdc) { using namespace fmt::literals; - m_DisplayName = "Jupier - '{}'"_format(Options.ServiceUrl); + m_DisplayName = "Jupiter - '{}'"_format(Options.ServiceUrl); m_Client = new CloudCacheClient(Options); } virtual ~JupiterUpstreamEndpoint() = default; + virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); } + virtual bool IsHealthy() const override { return m_HealthOk.load(); } virtual UpstreamEndpointHealth CheckHealth() override @@ -186,16 +124,23 @@ namespace detail { } } - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -206,16 +151,23 @@ namespace detail { CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -386,22 +338,70 @@ namespace detail { class ZenUpstreamEndpoint final : public UpstreamEndpoint { + struct ZenEndpoint + { + std::string Url; + std::string Reason; + double Latency{}; + bool Ok = false; + + bool operator<(const ZenEndpoint& RHS) const { return Ok && RHS.Ok ? Latency < RHS.Latency : Ok; } + }; + public: - ZenUpstreamEndpoint(std::string_view ServiceUrl) + ZenUpstreamEndpoint(std::span<std::string const> Urls) : m_Log(zen::logging::Get("upstream")), m_DisplayName("ZEN") { - using namespace fmt::literals; - m_DisplayName = "Zen - {}"_format(ServiceUrl); - m_Client = new ZenStructuredCacheClient(ServiceUrl); + for (const auto& Url : Urls) + { + m_Endpoints.push_back({.Url = Url}); + } } ~ZenUpstreamEndpoint() = default; + virtual UpstreamEndpointHealth Initialize() override + { + using namespace fmt::literals; + + const ZenEndpoint& Ep = GetEndpoint(); + if (Ep.Ok) + { + m_ServiceUrl = Ep.Url; + m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + + m_HealthOk = true; + return {.Ok = true}; + } + + m_HealthOk = false; + return {.Reason = Ep.Reason}; + } + virtual bool IsHealthy() const override { return m_HealthOk; } virtual UpstreamEndpointHealth CheckHealth() override { + using namespace fmt::literals; + try { + if (m_Client.IsNull()) + { + const ZenEndpoint& Ep = GetEndpoint(); + if (Ep.Ok) + { + m_ServiceUrl = Ep.Url; + m_DisplayName = "ZEN - {}"_format(m_ServiceUrl); + m_Client = new ZenStructuredCacheClient(m_ServiceUrl); + + m_HealthOk = true; + return {.Ok = true}; + } + + return {.Reason = Ep.Reason}; + } + ZenStructuredCacheSession Session(*m_Client); ZenCacheResult Result; @@ -429,16 +429,23 @@ namespace detail { ZenStructuredCacheSession Session(*m_Client); const ZenCacheResult Result = Session.GetCacheRecord(CacheKey.Bucket, CacheKey.Hash, Type); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -450,16 +457,23 @@ namespace detail { const ZenCacheResult Result = Session.GetCachePayload(PayloadKey.CacheKey.Bucket, PayloadKey.CacheKey.Hash, PayloadKey.PayloadId); - m_HealthOk = Result.ErrorCode == 0; - - return {.Value = Result.Response, - .Bytes = Result.Bytes, - .ElapsedSeconds = Result.ElapsedSeconds, - .Success = Result.Success}; + if (Result.ErrorCode == 0) + { + return {.Value = Result.Response, + .Bytes = Result.Bytes, + .ElapsedSeconds = Result.ElapsedSeconds, + .Success = Result.Success}; + } + else + { + m_HealthOk = false; + return {.Error{.ErrorCode = Result.ErrorCode, .Reason = std::move(Result.Reason)}}; + } } catch (std::exception& Err) { - return {.Error{.StatusCode = UpstreamStatusCode::Error, .Reason = Err.what()}}; + m_HealthOk = false; + return {.Error{.ErrorCode = -1, .Reason = Err.what()}}; } } @@ -563,6 +577,42 @@ namespace detail { virtual UpstreamEndpointStats& Stats() override { return m_Stats; } private: + const ZenEndpoint& GetEndpoint() + { + for (ZenEndpoint& Ep : m_Endpoints) + { + ZenStructuredCacheClient Client(Ep.Url); + ZenStructuredCacheSession Session(Client); + const int32_t SampleCount = 2; + + Ep.Ok = false; + Ep.Latency = {}; + + for (int32_t Sample = 0; Sample < SampleCount; ++Sample) + { + ZenCacheResult Result = Session.CheckHealth(); + Ep.Ok = Result.Success; + Ep.Reason = std::move(Result.Reason); + Ep.Latency += Result.ElapsedSeconds; + } + Ep.Latency /= double(SampleCount); + } + + std::sort(std::begin(m_Endpoints), std::end(m_Endpoints)); + + for (const auto& Ep : m_Endpoints) + { + ZEN_INFO("ping ZEN endpoint '{}' latency '{:.3}s' {}", Ep.Url, Ep.Latency, Ep.Ok ? "OK" : Ep.Reason); + } + + return m_Endpoints.front(); + } + + spdlog::logger& Log() { return m_Log; } + + spdlog::logger& m_Log; + std::string m_ServiceUrl; + std::vector<ZenEndpoint> m_Endpoints; std::string m_DisplayName; RefPtr<ZenStructuredCacheClient> m_Client; UpstreamEndpointStats m_Stats; @@ -575,7 +625,7 @@ namespace detail { struct UpstreamStats { - static constexpr uint64_t MaxSampleCount = 100ull; + static constexpr uint64_t MaxSampleCount = 1000ull; UpstreamStats(bool Enabled) : m_Enabled(Enabled) {} @@ -584,11 +634,6 @@ struct UpstreamStats const GetUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Error) @@ -606,7 +651,7 @@ struct UpstreamStats Stats.MissCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -617,11 +662,6 @@ struct UpstreamStats const PutUpstreamCacheResult& Result, const std::vector<std::unique_ptr<UpstreamEndpoint>>& Endpoints) { - if (!m_Enabled) - { - return; - } - UpstreamEndpointStats& Stats = Endpoint.Stats(); if (Result.Success) { @@ -634,7 +674,7 @@ struct UpstreamStats Stats.ErrorCount++; } - if (m_SampleCount++ % MaxSampleCount) + if (m_Enabled && m_SampleCount++ % MaxSampleCount) { Dump(Logger, Endpoints); } @@ -693,7 +733,7 @@ public: { for (auto& Endpoint : m_Endpoints) { - const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); + const UpstreamEndpointHealth Health = Endpoint->Initialize(); if (Health.Ok) { ZEN_INFO("initialize endpoint '{}' OK", Endpoint->DisplayName()); @@ -925,7 +965,7 @@ private: spdlog::logger& Log() { return m_Log; } - using UpstreamQueue = detail::BlockingQueue<UpstreamCacheRecord>; + using UpstreamQueue = BlockingQueue<UpstreamCacheRecord>; struct RunState { @@ -975,9 +1015,9 @@ MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) } std::unique_ptr<UpstreamEndpoint> -MakeZenUpstreamEndpoint(std::string_view Url) +MakeZenUpstreamEndpoint(std::span<std::string const> Urls) { - return std::make_unique<detail::ZenUpstreamEndpoint>(Url); + return std::make_unique<detail::ZenUpstreamEndpoint>(Urls); } } // namespace zen diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index a6b1e9784..edc995da6 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -45,35 +45,29 @@ struct UpstreamCacheOptions bool StatsEnabled = false; }; -enum class UpstreamStatusCode : uint8_t -{ - Ok, - Error -}; - struct UpstreamError { - UpstreamStatusCode StatusCode = UpstreamStatusCode::Ok; - std::string Reason; + int32_t ErrorCode{}; + std::string Reason{}; - explicit operator bool() const { return StatusCode != UpstreamStatusCode::Ok; } + explicit operator bool() const { return ErrorCode != 0; } }; struct GetUpstreamCacheResult { IoBuffer Value; - UpstreamError Error; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + UpstreamError Error{}; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct PutUpstreamCacheResult { std::string Reason; - int64_t Bytes = {}; - double ElapsedSeconds = {}; - bool Success = false; + int64_t Bytes{}; + double ElapsedSeconds{}; + bool Success = false; }; struct UpstreamEndpointHealth @@ -84,14 +78,14 @@ struct UpstreamEndpointHealth struct UpstreamEndpointStats { - std::atomic_uint64_t HitCount = {}; - std::atomic_uint64_t MissCount = {}; - std::atomic_uint64_t UpCount = {}; - std::atomic_uint64_t ErrorCount = {}; - std::atomic<double> UpBytes = {}; - std::atomic<double> DownBytes = {}; - std::atomic<double> SecondsUp = {}; - std::atomic<double> SecondsDown = {}; + std::atomic_uint64_t HitCount{}; + std::atomic_uint64_t MissCount{}; + std::atomic_uint64_t UpCount{}; + std::atomic_uint64_t ErrorCount{}; + std::atomic<double> UpBytes{}; + std::atomic<double> DownBytes{}; + std::atomic<double> SecondsUp{}; + std::atomic<double> SecondsDown{}; }; /** @@ -102,6 +96,8 @@ class UpstreamEndpoint public: virtual ~UpstreamEndpoint() = default; + virtual UpstreamEndpointHealth Initialize() = 0; + virtual bool IsHealthy() const = 0; virtual UpstreamEndpointHealth CheckHealth() = 0; @@ -149,6 +145,6 @@ std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Opt std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); -std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::string_view Url); +std::unique_ptr<UpstreamEndpoint> MakeZenUpstreamEndpoint(std::span<std::string const> Urls); } // namespace zen diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index c988a6b0b..6141fd397 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -328,7 +328,9 @@ namespace detail { ////////////////////////////////////////////////////////////////////////// -ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) : m_ServiceUrl(ServiceUrl) +ZenStructuredCacheClient::ZenStructuredCacheClient(std::string_view ServiceUrl) +: m_Log(logging::Get(std::string_view("zenclient"))) +, m_ServiceUrl(ServiceUrl) { } @@ -369,7 +371,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(logging::Get("zenclient"sv)) +: m_Log(OuterClient.Log()) , m_Client(OuterClient) { m_SessionState = m_Client.AllocSessionState(); diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index 158be668a..12e46bd8d 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -138,8 +138,11 @@ public: std::string_view ServiceUrl() const { return m_ServiceUrl; } + inline spdlog::logger& Log() { return m_Log; } + private: - std::string m_ServiceUrl; + spdlog::logger& m_Log; + std::string m_ServiceUrl; RwLock m_SessionStateLock; std::list<detail::ZenCacheSessionState*> m_SessionStateCache; diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index 7a6981fcd..fb1ba651d 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -32,3 +32,18 @@ target("zenserver") add_packages( "vcpkg::cxxopts", "vcpkg::mimalloc") + + on_load(function(target) + local commit, err = os.iorun("git log -1 --format=\"%h-%cI\"") + if commit ~= nil then + commit = commit:gsub("%s+", "") + commit = commit:gsub("\n", "") + if is_mode("release") then + commit = "rel-" .. commit + else + commit = "dbg-" .. commit + end + target:add("defines","BUILD_VERSION=\"" .. commit .. "\"") + print("build version " .. commit) + end + end) diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index b45df9fef..18c59636d 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -1,5 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. +#include <zencore/compactbinarybuilder.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/iobuffer.h> @@ -31,6 +32,10 @@ #include <set> #include <unordered_map> +#if !defined(BUILD_VERSION) +# define BUILD_VERSION ("dev-build") +#endif + ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable @@ -81,7 +86,10 @@ #include "cache/structuredcachestore.h" #include "compute/apply.h" #include "diag/diagsvcs.h" +#include "experimental/frontend.h" #include "experimental/usnjournal.h" +#include "monitoring/httpstats.h" +#include "monitoring/httpstatus.h" #include "projectstore.h" #include "testing/httptest.h" #include "testing/launch.h" @@ -95,17 +103,16 @@ namespace zen { -class ZenServer -{ - ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; +using namespace std::literals; +class ZenServer : public IHttpStatusProvider +{ public: void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { - m_ServerEntry = ServerEntry; using namespace fmt::literals; - ZEN_INFO(ZEN_APP_NAME " initializing"); + m_ServerEntry = ServerEntry; m_DebugOptionForcedCrash = ServiceConfig.ShouldCrash; if (ParentPid) @@ -139,6 +146,15 @@ public: // Ok so now we're configured, let's kick things off + m_Http = zen::CreateHttpServer(); + m_Http->Initialize(BasePort); + m_Http->RegisterService(m_HealthService); + m_Http->RegisterService(m_StatsService); + m_Http->RegisterService(m_StatusService); + m_StatusService.RegisterHandler("status", *this); + + // Initialize storage and services + ZEN_INFO("initializing storage"); zen::CasStoreConfiguration Config; @@ -166,95 +182,7 @@ public: if (ServiceConfig.StructuredCacheEnabled) { - using namespace std::literals; - auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; - - ZEN_INFO("instantiating structured cache service"); - m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); - - std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) - { - const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; - - zen::UpstreamCacheOptions UpstreamOptions; - UpstreamOptions.ReadUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; - UpstreamOptions.WriteUpstream = - (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; - - if (UpstreamConfig.UpstreamThreadCount < 32) - { - UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); - } - - UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; - - UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); - - if (!UpstreamConfig.ZenConfig.Url.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Url); - UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); - } - - { - zen::CloudCacheClientOptions Options; - if (UpstreamConfig.JupiterConfig.UseProductionSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, - .DdcNamespace = "ue.ddc"sv, - .BlobStoreNamespace = "ue.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) - { - Options = zen::CloudCacheClientOptions{ - .ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, - .DdcNamespace = "ue4.ddc"sv, - .BlobStoreNamespace = "test.ddc"sv, - .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, - .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, - .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, - .UseLegacyDdc = false}; - } - - Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); - Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); - Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); - Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); - Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); - Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); - Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; - - if (!Options.ServiceUrl.empty()) - { - std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); - UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); - } - } - - if (UpstreamCache->Initialize()) - { - ZEN_INFO("upstream cache active ({})", - UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" - : UpstreamOptions.ReadUpstream ? "READONLY" - : UpstreamOptions.WriteUpstream ? "WRITEONLY" - : "DISABLED"); - } - else - { - UpstreamCache.reset(); - ZEN_INFO("NOT using upstream cache"); - } - } - - m_StructuredCacheService.reset( - new zen::HttpStructuredCacheService(*m_CacheStore, *m_CasStore, *m_CidStore, std::move(UpstreamCache))); + InitializeStructuredCache(ServiceConfig); } else { @@ -272,13 +200,8 @@ public: } #endif - m_Http = zen::CreateHttpServer(); - m_Http->Initialize(BasePort); - m_Http->RegisterService(m_HealthService); - m_Http->RegisterService(m_TestService); // NOTE: this is intentionally not limited to test mode as it's useful for diagnostics m_Http->RegisterService(m_TestingService); - m_Http->RegisterService(m_AdminService); if (m_HttpProjectService) @@ -302,8 +225,17 @@ public: { m_Http->RegisterService(*m_HttpFunctionService); } + + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot); + + if (m_FrontendService) + { + m_Http->RegisterService(*m_FrontendService); + } } + void InitializeStructuredCache(ZenServiceConfig& ServiceConfig); + #if ZEN_ENABLE_MESH void StartMesh(int BasePort) { @@ -344,8 +276,12 @@ public: const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; + m_CurrentState = kRunning; + m_Http->Run(IsInteractiveMode); + m_CurrentState = kShuttingDown; + ZEN_INFO(ZEN_APP_NAME " exiting"); m_IoContext.stop(); @@ -364,6 +300,7 @@ public: void SetDedicatedMode(bool State) { m_IsDedicatedMode = State; } void SetTestMode(bool State) { m_TestMode = State; } void SetDataRoot(std::filesystem::path Root) { m_DataRoot = Root; } + void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; } void EnsureIoRunner() { @@ -417,6 +354,7 @@ public: void Scrub() { + Stopwatch Timer; ZEN_INFO("Storage validation STARTING"); ScrubContext Ctx; @@ -425,7 +363,13 @@ public: m_ProjectStore->Scrub(Ctx); m_StructuredCacheService->Scrub(Ctx); - ZEN_INFO("Storage validation DONE"); + const uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); + + ZEN_INFO("Storage validation DONE in {}, ({} in {} chunks - {})", + NiceTimeSpanMs(ElapsedTimeMs), + NiceBytes(Ctx.ScrubbedBytes()), + Ctx.ScrubbedChunks(), + NiceByteRate(Ctx.ScrubbedBytes(), ElapsedTimeMs)); } void Flush() @@ -443,17 +387,51 @@ public: m_ProjectStore->Flush(); } + virtual void HandleStatusRequest(HttpServerRequest& Request) override + { + CbObjectWriter Cbo; + Cbo << "ok" << true; + Cbo << "state" << ToString(m_CurrentState); + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + private: - bool m_IsDedicatedMode = false; - bool m_TestMode = false; - std::filesystem::path m_DataRoot; - std::jthread m_IoRunner; - asio::io_context m_IoContext; - asio::steady_timer m_PidCheckTimer{m_IoContext}; - zen::ProcessMonitor m_ProcessMonitor; - zen::NamedMutex m_ServerMutex; + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + bool m_IsDedicatedMode = false; + bool m_TestMode = false; + std::filesystem::path m_DataRoot; + std::filesystem::path m_ContentRoot; + std::jthread m_IoRunner; + asio::io_context m_IoContext; + asio::steady_timer m_PidCheckTimer{m_IoContext}; + zen::ProcessMonitor m_ProcessMonitor; + zen::NamedMutex m_ServerMutex; + + enum ServerState + { + kInitializing, + kRunning, + kShuttingDown + } m_CurrentState = kInitializing; + + std::string_view ToString(ServerState Value) + { + switch (Value) + { + case kInitializing: + return "initializing"sv; + case kRunning: + return "running"sv; + case kShuttingDown: + return "shutdown"sv; + default: + return "unknown"sv; + } + } zen::Ref<zen::HttpServer> m_Http; + zen::HttpStatusService m_StatusService; + zen::HttpStatsService m_StatsService; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; std::unique_ptr<zen::ZenCacheStore> m_CacheStore; @@ -471,10 +449,105 @@ private: zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; + std::unique_ptr<zen::HttpFrontendService> m_FrontendService; bool m_DebugOptionForcedCrash = false; }; +void +ZenServer::InitializeStructuredCache(ZenServiceConfig& ServiceConfig) +{ + using namespace std::literals; + auto ValueOrDefault = [](std::string_view Value, std::string_view Default) { return Value.empty() ? Default : Value; }; + + ZEN_INFO("instantiating structured cache service"); + m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); + + std::unique_ptr<zen::UpstreamCache> UpstreamCache; + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) + { + const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; + + zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; + + if (UpstreamConfig.UpstreamThreadCount < 32) + { + UpstreamOptions.ThreadCount = static_cast<uint32_t>(UpstreamConfig.UpstreamThreadCount); + } + + UpstreamOptions.StatsEnabled = UpstreamConfig.StatsEnabled; + + UpstreamCache = zen::MakeUpstreamCache(UpstreamOptions, *m_CacheStore, *m_CidStore); + + if (!UpstreamConfig.ZenConfig.Urls.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> ZenEndpoint = zen::MakeZenUpstreamEndpoint(UpstreamConfig.ZenConfig.Urls); + UpstreamCache->RegisterEndpoint(std::move(ZenEndpoint)); + } + + { + zen::CloudCacheClientOptions Options; + if (UpstreamConfig.JupiterConfig.UseProductionSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools.epicgames.com"sv, + .DdcNamespace = "ue.ddc"sv, + .BlobStoreNamespace = "ue.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + else if (UpstreamConfig.JupiterConfig.UseDevelopmentSettings) + { + Options = zen::CloudCacheClientOptions{.ServiceUrl = "https://jupiter.devtools-dev.epicgames.com"sv, + .DdcNamespace = "ue4.ddc"sv, + .BlobStoreNamespace = "test.ddc"sv, + .OAuthProvider = "https://epicgames.okta.com/oauth2/auso645ojjWVdRI3d0x7/v1/token"sv, + .OAuthClientId = "0oao91lrhqPiAlaGD0x7"sv, + .OAuthSecret = "-GBWjjenhCgOwhxL5yBKNJECVIoDPH0MK4RDuN7d"sv, + .UseLegacyDdc = false}; + } + + Options.ServiceUrl = ValueOrDefault(UpstreamConfig.JupiterConfig.Url, Options.ServiceUrl); + Options.DdcNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.DdcNamespace, Options.DdcNamespace); + Options.BlobStoreNamespace = ValueOrDefault(UpstreamConfig.JupiterConfig.Namespace, Options.BlobStoreNamespace); + Options.OAuthProvider = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthProvider, Options.OAuthProvider); + Options.OAuthClientId = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientId, Options.OAuthClientId); + Options.OAuthSecret = ValueOrDefault(UpstreamConfig.JupiterConfig.OAuthClientSecret, Options.OAuthSecret); + Options.UseLegacyDdc |= UpstreamConfig.JupiterConfig.UseLegacyDdc; + + if (!Options.ServiceUrl.empty()) + { + std::unique_ptr<zen::UpstreamEndpoint> JupiterEndpoint = zen::MakeJupiterUpstreamEndpoint(Options); + UpstreamCache->RegisterEndpoint(std::move(JupiterEndpoint)); + } + } + + if (UpstreamCache->Initialize()) + { + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); + } + else + { + UpstreamCache.reset(); + ZEN_INFO("NOT using upstream cache"); + } + } + + m_StructuredCacheService.reset(new zen::HttpStructuredCacheService(*m_CacheStore, + *m_CasStore, + *m_CidStore, + m_StatsService, + m_StatusService, + std::move(UpstreamCache))); +} + } // namespace zen class ZenWindowsService : public WindowsService @@ -522,7 +595,7 @@ ZenWindowsService::Run() ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); - ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + ZEN_INFO(ZEN_APP_NAME " - starting on port {}, build '{}'", GlobalOptions.BasePort, BUILD_VERSION); ZenServerState ServerState; ServerState.Initialize(); @@ -560,6 +633,7 @@ ZenWindowsService::Run() ZenServer Server; Server.SetDataRoot(GlobalOptions.DataDir); + Server.SetContentRoot(GlobalOptions.ContentDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index bcb7ea028..7fad477a1 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -108,7 +108,12 @@ <ClInclude Include="cache\structuredcachestore.h" /> <ClInclude Include="compute\apply.h" /> <ClInclude Include="config.h" /> + <ClInclude Include="diag\formatters.h" /> <ClInclude Include="diag\logging.h" /> + <ClInclude Include="experimental\frontend.h" /> + <ClInclude Include="experimental\vfs.h" /> + <ClInclude Include="monitoring\httpstats.h" /> + <ClInclude Include="monitoring\httpstatus.h" /> <ClInclude Include="resource.h" /> <ClInclude Include="sos\sos.h" /> <ClInclude Include="testing\httptest.h" /> @@ -132,6 +137,10 @@ <ClCompile Include="compute\apply.cpp" /> <ClCompile Include="config.cpp" /> <ClCompile Include="diag\logging.cpp" /> + <ClCompile Include="experimental\frontend.cpp" /> + <ClCompile Include="experimental\vfs.cpp" /> + <ClCompile Include="monitoring\httpstats.cpp" /> + <ClCompile Include="monitoring\httpstatus.cpp" /> <ClCompile Include="projectstore.cpp" /> <ClCompile Include="cache\cacheagent.cpp" /> <ClCompile Include="sos\sos.cpp" /> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index 6b99ca8d7..04e639a33 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -38,6 +38,13 @@ <ClInclude Include="testing\httptest.h" /> <ClInclude Include="windows\service.h" /> <ClInclude Include="resource.h" /> + <ClInclude Include="experimental\frontend.h"> + <Filter>experimental</Filter> + </ClInclude> + <ClInclude Include="diag\formatters.h" /> + <ClInclude Include="experimental\vfs.h" /> + <ClInclude Include="monitoring\httpstats.h" /> + <ClInclude Include="monitoring\httpstatus.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -70,6 +77,13 @@ </ClCompile> <ClCompile Include="testing\httptest.cpp" /> <ClCompile Include="windows\service.cpp" /> + <ClCompile Include="admin\admin.cpp" /> + <ClCompile Include="experimental\frontend.cpp"> + <Filter>experimental</Filter> + </ClCompile> + <ClCompile Include="experimental\vfs.cpp" /> + <ClCompile Include="monitoring\httpstats.cpp" /> + <ClCompile Include="monitoring\httpstatus.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="cache"> diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index 1db2b50bf..a4bbfa340 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -32,6 +32,15 @@ CasChunkSet::AddChunkToSet(const IoHash& HashToAdd) } void +CasChunkSet::AddChunksToSet(std::span<const IoHash> HashesToAdd) +{ + for (const IoHash& Hash : HashesToAdd) + { + m_ChunkSet.insert(Hash); + } +} + +void CasChunkSet::RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate) { for (auto It = begin(m_ChunkSet), ItEnd = end(m_ChunkSet); It != ItEnd;) @@ -58,10 +67,45 @@ CasChunkSet::IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callba ////////////////////////////////////////////////////////////////////////// +struct GcContext::GcState +{ + CasChunkSet m_CasChunks; + CasChunkSet m_CidChunks; +}; + +GcContext::GcContext() : m_State(std::make_unique<GcState>()) +{ +} + +GcContext::~GcContext() +{ +} + +void +GcContext::ContributeCids(std::span<const IoHash> Cids) +{ + m_State->m_CidChunks.AddChunksToSet(Cids); +} + +void +GcContext::ContributeCas(std::span<const IoHash> Cas) +{ + m_State->m_CasChunks.AddChunksToSet(Cas); +} + +////////////////////////////////////////////////////////////////////////// + +void +ScrubContext::ReportBadCasChunks(std::span<IoHash> BadCasChunks) +{ + m_BadCas.AddChunksToSet(BadCasChunks); +} + void -ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) +ScrubContext::ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes) { - ZEN_UNUSED(BadChunks); + m_ChunkCount.fetch_add(ChunkCount); + m_ByteCount.fetch_add(ChunkBytes); } /** diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index df5c32d25..7a5d7bcf4 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -204,7 +204,7 @@ struct CidStore::Impl // TODO: Should compute a snapshot index here - Ctx.ReportBadChunks(BadChunks); + Ctx.ReportBadCasChunks(BadChunks); } uint64_t m_LastScrubTime = 0; diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 5fc3ac356..612f87c7c 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -254,7 +254,7 @@ CasContainerStrategy::Scrub(ScrubContext& Ctx) // be used to invalidate higher level data structures more efficiently // than a full validation pass might be able to do - Ctx.ReportBadChunks(BadChunkHashes); + Ctx.ReportBadCasChunks(BadChunkHashes); } void diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 0b18848d5..ee641b80a 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -394,7 +394,8 @@ FileCasStrategy::Flush() void FileCasStrategy::Scrub(ScrubContext& Ctx) { - std::vector<IoHash> BadHashes; + std::vector<IoHash> BadHashes; + std::atomic<uint64_t> ChunkCount{0}, ChunkBytes{0}; IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { IoHashStream Hasher; @@ -405,8 +406,13 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) { BadHashes.push_back(Hash); } + + ++ChunkCount; + ChunkBytes.fetch_add(Payload.FileSize()); }); + Ctx.ReportScrubbed(ChunkCount, ChunkBytes); + if (!BadHashes.empty()) { ZEN_ERROR("file CAS scrubbing: {} bad chunks found", BadHashes.size()); @@ -428,7 +434,9 @@ FileCasStrategy::Scrub(ScrubContext& Ctx) } } - Ctx.ReportBadChunks(BadHashes); + Ctx.ReportBadCasChunks(BadHashes); + + ZEN_INFO("file CAS scrubbed: {} chunks ({})", ChunkCount.load(), NiceBytes(ChunkBytes)); } void diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index 93454ca6f..d0698df7f 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -31,13 +31,41 @@ struct CasStoreConfiguration uint64_t HugeValueThreshold = 1024 * 1024; }; +/** Manage a set of IoHash values + */ + +class CasChunkSet +{ +public: + void AddChunkToSet(const IoHash& HashToAdd); + void AddChunksToSet(std::span<const IoHash> HashesToAdd); + void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); + void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); + inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } + inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } + inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } + +private: + // Q: should we protect this with a lock, or is that a higher level concern? + std::unordered_set<IoHash> m_ChunkSet; +}; + /** Garbage Collection context object */ class GcContext { public: + GcContext(); + ~GcContext(); + + void ContributeCids(std::span<const IoHash> Cid); + void ContributeCas(std::span<const IoHash> Hash); + private: + struct GcState; + + std::unique_ptr<GcState> m_State; }; /** Context object for data scrubbing @@ -49,28 +77,26 @@ private: class ScrubContext { public: - virtual void ReportBadChunks(std::span<IoHash> BadChunks); + virtual void ReportBadCasChunks(std::span<IoHash> BadCasChunks); inline uint64_t ScrubTimestamp() const { return m_ScrubTime; } inline bool RunRecovery() const { return m_Recover; } + void ReportScrubbed(uint64_t ChunkCount, uint64_t ChunkBytes); + + inline uint64_t ScrubbedChunks() const { return m_ChunkCount; } + inline uint64_t ScrubbedBytes() const { return m_ByteCount; } private: - uint64_t m_ScrubTime = GetHifreqTimerValue(); - bool m_Recover = true; + uint64_t m_ScrubTime = GetHifreqTimerValue(); + bool m_Recover = true; + std::atomic<uint64_t> m_ChunkCount{0}; + std::atomic<uint64_t> m_ByteCount{0}; + CasChunkSet m_BadCas; + CasChunkSet m_BadCid; }; -class CasChunkSet -{ -public: - void AddChunkToSet(const IoHash& HashToAdd); - void RemoveChunksIf(std::function<bool(const IoHash& CandidateHash)>&& Predicate); - void IterateChunks(std::function<void(const IoHash& ChunkHash)>&& Callback); - inline [[nodiscard]] bool ContainsChunk(const IoHash& Hash) const { return m_ChunkSet.find(Hash) != m_ChunkSet.end(); } - inline [[nodiscard]] bool IsEmpty() const { return m_ChunkSet.empty(); } - inline [[nodiscard]] size_t GetSize() const { return m_ChunkSet.size(); } +/** Content Addressable Storage interface -private: - std::unordered_set<IoHash> m_ChunkSet; -}; + */ class CasStore { |