diff options
| author | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
|---|---|---|
| committer | Martin Ridgers <[email protected]> | 2021-09-21 11:06:13 +0200 |
| commit | 68c951e0f440ffd483795dced737e88152c1a581 (patch) | |
| tree | 5c0910ca2a85b45fb05dba3ce457b7d156213894 | |
| parent | Merge main into linux-mac (diff) | |
| parent | Trigger storage scrubbing pass at startup (diff) | |
| download | zen-68c951e0f440ffd483795dced737e88152c1a581.tar.xz zen-68c951e0f440ffd483795dced737e88152c1a581.zip | |
Merged main into linux-mac
131 files changed, 3859 insertions, 1134 deletions
diff --git a/.gitignore b/.gitignore index df6c1a9f8..bf237a7b3 100644 --- a/.gitignore +++ b/.gitignore @@ -204,9 +204,8 @@ ServiceFabricBackup/ *.rptproj.bak - - - +# generated build files +makefile # Python Tools for Visual Studio (PTVS) __pycache__/ diff --git a/vs-chromium-project.txt b/vs-chromium-project.txt index 2b2e15bc7..2bb89a55c 100644 --- a/vs-chromium-project.txt +++ b/vs-chromium-project.txt @@ -3,6 +3,6 @@ [SourceExplorer.ignore] .git/ -.x64/ +x64/ *.suo -**/.x64/ +**/x64/ @@ -35,11 +35,18 @@ if is_mode("debug") then add_defines("DEBUG") end +if is_mode("debug") then + add_defines("ZEN_WITH_TESTS=1") +else + add_defines("ZEN_WITH_TESTS=0") +end + if is_os("windows") then - add_defines("_CRT_SECURE_NO_WARNINGS") + add_defines("_CRT_SECURE_NO_WARNINGS", "_UNICODE", "UNICODE", "_WIN32_WINNT=0x0A00") end add_defines("USE_SENTRY=1") +add_defines("ZEN_USE_MIMALLOC=1") option("vfs") set_showmenu(true) @@ -64,6 +71,7 @@ set_symbols("debug") includes("zencore", "zencore-test") includes("zenhttp") -includes("zenstore", "zenutil") +includes("zenstore", "zenstore-test") +includes("zenutil") includes("zenserver", "zenserver-test") includes("zen") @@ -11,6 +11,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{4EA55E5B-1 README.md = README.md RESTAPI.md = RESTAPI.md vcpkg.json = vcpkg.json + xmake.lua = xmake.lua EndProjectSection EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zencore", "zencore\zencore.vcxproj", "{D75BF9AB-C61E-4FFF-AD59-1563430F05E2}" @@ -47,6 +48,8 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zentest-appstub", "zentest- EndProject Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zenhttp", "zenhttp\zenhttp.vcxproj", "{8EEB3BE5-7001-46BF-AAFD-EDB7558AC012}" EndProject +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "zenstore-test", "zenstore-test\zenstore-test.vcxproj", "{C001A3DF-B76E-4989-B576-FE2B78AB2580}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -111,6 +114,12 @@ Global {8EEB3BE5-7001-46BF-AAFD-EDB7558AC012}.Release|x64.ActiveCfg = Release|x64 {8EEB3BE5-7001-46BF-AAFD-EDB7558AC012}.Release|x64.Build.0 = Release|x64 {8EEB3BE5-7001-46BF-AAFD-EDB7558AC012}.Release|x86.ActiveCfg = Release|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Debug|x64.ActiveCfg = Debug|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Debug|x64.Build.0 = Debug|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Debug|x86.ActiveCfg = Debug|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Release|x64.ActiveCfg = Release|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Release|x64.Build.0 = Release|x64 + {C001A3DF-B76E-4989-B576-FE2B78AB2580}.Release|x86.ActiveCfg = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/zen/chunk/chunk.cpp b/zen/chunk/chunk.cpp index a5f010dbe..18748e921 100644 --- a/zen/chunk/chunk.cpp +++ b/zen/chunk/chunk.cpp @@ -1,7 +1,6 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "chunk.h" -#include <doctest/doctest.h> #include <gsl/gsl-lite.hpp> @@ -12,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/sha1.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <zencore/thread.h> #include <zencore/timer.h> #include <zenstore/cas.h> @@ -1056,6 +1056,7 @@ ChunkCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS TEST_CASE("chunking") { using namespace zen; @@ -1158,3 +1159,4 @@ TEST_CASE("chunking") SUBCASE("mod method") { test(/* UseThreshold */ false, /* Random */ Random, 2048, 1 * 1024 * 1024); } } +#endif
\ No newline at end of file diff --git a/zen/cmds/cache.cpp b/zen/cmds/cache.cpp index 69e500293..202bf9246 100644 --- a/zen/cmds/cache.cpp +++ b/zen/cmds/cache.cpp @@ -5,7 +5,7 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> #include <zenhttp/httpcommon.h> -#include <zenserverprocess.h> +#include <zenutil/zenserverprocess.h> #include <memory> diff --git a/zen/cmds/copy.cpp b/zen/cmds/copy.cpp index 4ce09c982..947d54e07 100644 --- a/zen/cmds/copy.cpp +++ b/zen/cmds/copy.cpp @@ -7,6 +7,8 @@ #include <zencore/string.h> #include <zencore/timer.h> +namespace zen { + CopyCommand::CopyCommand() { m_Options.add_options()("h,help", "Print help"); @@ -94,3 +96,5 @@ CopyCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/copy.h b/zen/cmds/copy.h index 22b240d11..322cf3f2f 100644 --- a/zen/cmds/copy.h +++ b/zen/cmds/copy.h @@ -4,6 +4,8 @@ #include "../zen.h" +namespace zen { + /** Copy files, possibly using block cloning */ class CopyCommand : public ZenCmdBase @@ -22,3 +24,5 @@ private: std::string m_CopyTarget; bool m_NoClone = false; }; + +} // namespace zen diff --git a/zen/cmds/dedup.cpp b/zen/cmds/dedup.cpp index 90a4fea76..e71314622 100644 --- a/zen/cmds/dedup.cpp +++ b/zen/cmds/dedup.cpp @@ -12,6 +12,8 @@ #include <ppl.h> +namespace zen { + DedupCommand::DedupCommand() { m_Options.add_options()("h,help", "Print help"); @@ -290,3 +292,5 @@ DedupCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/dedup.h b/zen/cmds/dedup.h index 0f0aecc8e..7932d10e6 100644 --- a/zen/cmds/dedup.h +++ b/zen/cmds/dedup.h @@ -6,6 +6,8 @@ #include <ppl.h> +namespace zen { + /** Deduplicate files in a tree using block cloning */ class DedupCommand : public ZenCmdBase @@ -24,3 +26,5 @@ private: std::string m_DedupTarget; size_t m_SizeThreshold = 1024 * 1024; }; + +} // namespace zen diff --git a/zen/cmds/deploy.cpp b/zen/cmds/deploy.cpp index b8879fefb..d60392dd5 100644 --- a/zen/cmds/deploy.cpp +++ b/zen/cmds/deploy.cpp @@ -5,6 +5,8 @@ #include <zencore/logging.h> #include <zencore/string.h> +namespace zen { + DeployCommand::DeployCommand() { m_Options.add_options()("h,help", "Print help"); @@ -80,3 +82,5 @@ DeployCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/deploy.h b/zen/cmds/deploy.h index 1109aaf17..975caf9e9 100644 --- a/zen/cmds/deploy.h +++ b/zen/cmds/deploy.h @@ -4,6 +4,8 @@ #include "../zen.h" +namespace zen { + /** Deploy files from Zen build store */ class DeployCommand : public ZenCmdBase @@ -23,3 +25,5 @@ private: bool m_NoClone = false; bool m_IsClean = false; }; + +} // namespace zen diff --git a/zen/cmds/hash.cpp b/zen/cmds/hash.cpp index b6276dbc1..0a7989ffc 100644 --- a/zen/cmds/hash.cpp +++ b/zen/cmds/hash.cpp @@ -9,6 +9,8 @@ #include <ppl.h> +namespace zen { + HashCommand::HashCommand() { m_Options.add_options()("d,dir", "Directory to scan", cxxopts::value<std::string>(m_ScanDirectory))( @@ -123,3 +125,5 @@ HashCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // TODO: implement snapshot enumeration and display return 0; } + +} // namespace zen diff --git a/zen/cmds/hash.h b/zen/cmds/hash.h index b994b497d..3df9063ea 100644 --- a/zen/cmds/hash.h +++ b/zen/cmds/hash.h @@ -7,6 +7,8 @@ #include <ppl.h> +namespace zen { + /** Generate hash list file */ class HashCommand : public ZenCmdBase @@ -23,3 +25,5 @@ private: std::string m_ScanDirectory; std::string m_OutputFile; }; + +} // namespace zen diff --git a/zen/cmds/run.cpp b/zen/cmds/run.cpp index 97680ed5a..4ffbf820c 100644 --- a/zen/cmds/run.cpp +++ b/zen/cmds/run.cpp @@ -12,7 +12,7 @@ #include <zencore/logging.h> #include <zencore/string.h> #include <zencore/timer.h> -#include <zenserverprocess.h> +#include <zenutil/zenserverprocess.h> #include <filesystem> @@ -35,6 +35,8 @@ ////////////////////////////////////////////////////////////////////////// +namespace zen { + using namespace std::literals; RunCommand::RunCommand() @@ -181,3 +183,5 @@ RunCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/run.h b/zen/cmds/run.h index 8fa1f6ae9..3e1e3f2b2 100644 --- a/zen/cmds/run.h +++ b/zen/cmds/run.h @@ -2,10 +2,9 @@ #pragma once -#include "../internalfile.h" #include "../zen.h" -#include <ppl.h> +namespace zen { /** Execute a command (using Zen) */ @@ -23,3 +22,5 @@ private: std::string m_TargetHost; std::string m_ExeTree; }; + +} // namespace zen diff --git a/zen/cmds/scrub.cpp b/zen/cmds/scrub.cpp new file mode 100644 index 000000000..c0fe8ca61 --- /dev/null +++ b/zen/cmds/scrub.cpp @@ -0,0 +1,42 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "scrub.h" +#include <zenutil/zenserverprocess.h> + +using namespace std::literals; + +namespace zen { + +ScrubCommand::ScrubCommand() +{ +} + +ScrubCommand::~ScrubCommand() = default; + +int +ScrubCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + return 0; +} + +////////////////////////////////////////////////////////////////////////// + +GcCommand::GcCommand() +{ +} + +GcCommand::~GcCommand() +{ +} + +int +GcCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + ZEN_UNUSED(GlobalOptions, argc, argv); + + return 0; +} + +} // namespace zen diff --git a/zen/cmds/scrub.h b/zen/cmds/scrub.h new file mode 100644 index 000000000..561ae578d --- /dev/null +++ b/zen/cmds/scrub.h @@ -0,0 +1,39 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +namespace zen { + +/** Scrub storage + */ +class ScrubCommand : public ZenCmdBase +{ +public: + ScrubCommand(); + ~ScrubCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"scrub", "Scrub zen storage"}; +}; + +/** Garbage collect storage + */ +class GcCommand : public ZenCmdBase +{ +public: + GcCommand(); + ~GcCommand(); + + virtual int Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) override; + virtual cxxopts::Options* Options() override { return &m_Options; } + +private: + cxxopts::Options m_Options{"gc", "Garbage collect zen storage"}; +}; + +} // namespace zen diff --git a/zen/cmds/status.cpp b/zen/cmds/status.cpp index 6741ab9e9..10970e3c2 100644 --- a/zen/cmds/status.cpp +++ b/zen/cmds/status.cpp @@ -3,7 +3,8 @@ #include "status.h" #include <zencore/logging.h> -#include <memory> + +namespace zen { StatusCommand::StatusCommand() { @@ -18,3 +19,5 @@ StatusCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/status.h b/zen/cmds/status.h index bb439f340..acde280c5 100644 --- a/zen/cmds/status.h +++ b/zen/cmds/status.h @@ -4,6 +4,8 @@ #include "../zen.h" +namespace zen { + class StatusCommand : public ZenCmdBase { public: @@ -16,3 +18,5 @@ public: private: cxxopts::Options m_Options{"status", "Show zen status"}; }; + +} // namespace zen diff --git a/zen/cmds/top.cpp b/zen/cmds/top.cpp index 5bb11d0a0..315d8cb38 100644 --- a/zen/cmds/top.cpp +++ b/zen/cmds/top.cpp @@ -2,13 +2,17 @@ #include "top.h" +#include <zencore/fmtutils.h> #include <zencore/logging.h> -#include <zenserverprocess.h> +#include <zencore/uid.h> +#include <zenutil/zenserverprocess.h> #include <memory> ////////////////////////////////////////////////////////////////////////// +namespace zen { + TopCommand::TopCommand() { } @@ -23,10 +27,32 @@ TopCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZenServerState State; if (!State.InitializeReadOnly()) { - ZEN_INFO("no Zen state found"); + ZEN_CONSOLE("no Zen state found"); + + return 0; } - State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_INFO("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); + int n = 0; + const int HeaderPeriod = 20; + + for (;;) + { + if ((n++ % HeaderPeriod) == 0) + { + ZEN_CONSOLE("{:>5} {:>6} {:>24}", "port", "pid", "session"); + } + + State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { + ZEN_CONSOLE("{:5} {:6} {:24}", Entry.ListenPort, Entry.Pid, Entry.GetSessionId()); + }); + + zen::Sleep(1000); + + if (!State.IsReadOnly()) + { + State.Sweep(); + } + } return 0; } @@ -47,10 +73,14 @@ PsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZenServerState State; if (!State.InitializeReadOnly()) { - ZEN_INFO("no Zen state found"); + ZEN_CONSOLE("no Zen state found"); + + return 0; } - State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_INFO("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); + State.Snapshot([&](const ZenServerState::ZenServerEntry& Entry) { ZEN_CONSOLE("Port {} : pid {}", Entry.ListenPort, Entry.Pid); }); return 0; } + +} // namespace zen diff --git a/zen/cmds/top.h b/zen/cmds/top.h index 32ba6c57b..d8bf91a1c 100644 --- a/zen/cmds/top.h +++ b/zen/cmds/top.h @@ -4,6 +4,8 @@ #include "../zen.h" +namespace zen { + class TopCommand : public ZenCmdBase { public: @@ -29,3 +31,5 @@ public: private: cxxopts::Options m_Options{"ps", "Enumerate running Zen server instances"}; }; + +} // namespace zen diff --git a/zen/cmds/up.cpp b/zen/cmds/up.cpp index a1047fd57..17cba3794 100644 --- a/zen/cmds/up.cpp +++ b/zen/cmds/up.cpp @@ -4,10 +4,12 @@ #include <zencore/filesystem.h> #include <zencore/logging.h> -#include <zenserverprocess.h> +#include <zenutil/zenserverprocess.h> #include <memory> +namespace zen { + UpCommand::UpCommand() { } @@ -98,3 +100,5 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + +} // namespace zen diff --git a/zen/cmds/up.h b/zen/cmds/up.h index a3c6eaa06..fe1ed7a0c 100644 --- a/zen/cmds/up.h +++ b/zen/cmds/up.h @@ -4,6 +4,8 @@ #include "../zen.h" +namespace zen { + class UpCommand : public ZenCmdBase { public: @@ -29,3 +31,5 @@ public: private: cxxopts::Options m_Options{"down", "Bring down zen service"}; }; + +} // namespace zen diff --git a/zen/zen.cpp b/zen/zen.cpp index d8bfa13e5..86c41d658 100644 --- a/zen/zen.cpp +++ b/zen/zen.cpp @@ -1,10 +1,6 @@ // Zen command line client utility // -#define DOCTEST_CONFIG_IMPLEMENT -#include <doctest/doctest.h> -#undef DOCTEST_CONFIG_IMPLEMENT - #include "zen.h" #include "chunk/chunk.h" @@ -21,19 +17,13 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/string.h> +#include <zencore/zencore.h> #include <zenstore/cas.h> -#if TEST_UWS -# pragma warning(push) -# pragma warning(disable : 4458) -# pragma warning(disable : 4324) -# pragma warning(disable : 4100) -# pragma warning(disable : 4706) -# include <uwebsockets/App.h> -# pragma warning(pop) - -# pragma comment(lib, "Iphlpapi.lib") -# pragma comment(lib, "userenv.lib") +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT #endif #include <gsl/gsl-lite.hpp> @@ -58,6 +48,8 @@ private: ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + class RunTestsCommand : public ZenCmdBase { public: @@ -87,6 +79,8 @@ private: cxxopts::Options m_Options{"runtests", "Run tests"}; }; +#endif + ////////////////////////////////////////////////////////////////////////// // TODO: should make this Unicode-aware so we can pass anything in on the // command line. @@ -94,46 +88,32 @@ private: int main(int argc, char** argv) { + using namespace zen; + mi_version(); zen::logging::InitializeLogging(); -#if TEST_UWS - /* Overly simple hello world app, using multiple threads */ - std::vector<std::thread*> threads(4); - - std::transform(threads.begin(), threads.end(), threads.begin(), [](std::thread* /*t*/) { - return new std::thread([]() { - uWS::App() - .get("/*", - [&](uWS::HttpResponse<false>* res, uWS::HttpRequest*) { - zen::Sleep(1); - res->end("hello, world!"); - }) - .listen(1337, [&](auto* listen_socket) { ZEN_UNUSED(listen_socket); }) - .run(); - }); - }); - - std::for_each(threads.begin(), threads.end(), [](std::thread* t) { t->join(); }); -#endif ////////////////////////////////////////////////////////////////////////// auto _ = zen::MakeGuard([] { spdlog::shutdown(); }); - HashCommand HashCmd; - CopyCommand CopyCmd; - DedupCommand DedupCmd; - DeployCommand DeployCmd; - DropCommand DropCmd; - ChunkCommand ChunkCmd; + HashCommand HashCmd; + CopyCommand CopyCmd; + DedupCommand DedupCmd; + DeployCommand DeployCmd; + DropCommand DropCmd; + ChunkCommand ChunkCmd; + RunCommand RunCmd; + StatusCommand StatusCmd; + TopCommand TopCmd; + PsCommand PsCmd; + UpCommand UpCmd; + DownCommand DownCmd; + +#if ZEN_WITH_TESTS RunTestsCommand RunTestsCmd; - RunCommand RunCmd; - StatusCommand StatusCmd; - TopCommand TopCmd; - PsCommand PsCmd; - UpCommand UpCmd; - DownCommand DownCmd; +#endif const struct CommandInfo { @@ -141,19 +121,23 @@ main(int argc, char** argv) ZenCmdBase* Cmd; const char* CmdSummary; } Commands[] = { - {"chunk", &ChunkCmd, "Perform chunking"}, - {"copy", &CopyCmd, "Copy file(s)"}, - {"deploy", &DeployCmd, "Deploy data"}, - {"dedup", &DedupCmd, "Dedup files"}, - {"drop", &DropCmd, "Drop cache bucket(s)"}, - {"hash", &HashCmd, "Compute file hashes"}, + // clang-format off + {"chunk", &ChunkCmd, "Perform chunking"}, + {"copy", &CopyCmd, "Copy file(s)"}, + {"deploy", &DeployCmd, "Deploy data"}, + {"dedup", &DedupCmd, "Dedup files"}, + {"drop", &DropCmd, "Drop cache bucket(s)"}, + {"hash", &HashCmd, "Compute file hashes"}, + {"run", &RunCmd, "Remote execution"}, + {"status", &StatusCmd, "Show zen status"}, + {"ps", &PsCmd, "Enumerate running zen server instances"}, + {"top", &TopCmd, "Monitor zen server activity"}, + {"up", &UpCmd, "Bring zen server up"}, + {"down", &DownCmd, "Bring zen server down"}, + // clang-format on +#if ZEN_WITH_TESTS {"runtests", &RunTestsCmd, "Run zen tests"}, - {"run", &RunCmd, "Remote execution"}, - {"status", &StatusCmd, "Show zen status"}, - {"ps", &PsCmd, "Enumerate running zen server instances"}, - {"top", &TopCmd, "Monitor zen server activity"}, - {"up", &UpCmd, "Bring zen server up"}, - {"down", &DownCmd, "Bring zen server down"}, +#endif }; // Build set containing available commands @@ -10,7 +10,6 @@ #include <zencore/refcount.h> #include <zencore/windows.h> -#include <atlfile.h> #include <filesystem> struct ZenCliOptions diff --git a/zen/zen.vcxproj b/zen/zen.vcxproj index 4f0691fab..fb0674e87 100644 --- a/zen/zen.vcxproj +++ b/zen/zen.vcxproj @@ -100,6 +100,7 @@ <ClCompile Include="cmds\deploy.cpp" /> <ClCompile Include="cmds\hash.cpp" /> <ClCompile Include="cmds\run.cpp" /> + <ClCompile Include="cmds\scrub.cpp" /> <ClCompile Include="cmds\status.cpp" /> <ClCompile Include="cmds\top.cpp" /> <ClCompile Include="cmds\up.cpp" /> @@ -114,6 +115,7 @@ <ClInclude Include="cmds\deploy.h" /> <ClInclude Include="cmds\hash.h" /> <ClInclude Include="cmds\run.h" /> + <ClInclude Include="cmds\scrub.h" /> <ClInclude Include="cmds\status.h" /> <ClInclude Include="cmds\top.h" /> <ClInclude Include="cmds\up.h" /> @@ -134,6 +136,9 @@ <Project>{77f8315d-b21d-4db0-9a6f-2d3359f88a70}</Project> </ProjectReference> </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> + </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> diff --git a/zen/zen.vcxproj.filters b/zen/zen.vcxproj.filters index 47b321727..9002f01c2 100644 --- a/zen/zen.vcxproj.filters +++ b/zen/zen.vcxproj.filters @@ -27,6 +27,7 @@ </ClCompile> <ClCompile Include="cmds\up.cpp" /> <ClCompile Include="cmds\cache.cpp" /> + <ClCompile Include="cmds\scrub.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="chunk\chunk.h" /> @@ -55,10 +56,14 @@ </ClInclude> <ClInclude Include="cmds\up.h" /> <ClInclude Include="cmds\cache.h" /> + <ClInclude Include="cmds\scrub.h" /> </ItemGroup> <ItemGroup> <Filter Include="cmds"> <UniqueIdentifier>{2e06a54c-52be-4260-9275-a4232d01a53c}</UniqueIdentifier> </Filter> </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> + </ItemGroup> </Project>
\ No newline at end of file diff --git a/zencore-test/zencore-test.cpp b/zencore-test/zencore-test.cpp index 559349076..cd4ce3e0a 100644 --- a/zencore-test/zencore-test.cpp +++ b/zencore-test/zencore-test.cpp @@ -4,20 +4,22 @@ #include <zencore/logging.h> #include <zencore/zencore.h> -#define DOCTEST_CONFIG_IMPLEMENT -#include <doctest/doctest.h> -#undef DOCTEST_CONFIG_IMPLEMENT - -void -forceLinkTests() -{ - zencore_forcelinktests(); -} +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT +#endif int -main(int argc, char* argv[]) +main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) { +#if ZEN_WITH_TESTS + zen::zencore_forcelinktests(); + zen::logging::InitializeLogging(); return doctest::Context(argc, argv).run(); +#else + return 0; +#endif } diff --git a/zencore/blake3.cpp b/zencore/blake3.cpp index 090eb6897..663f21b6d 100644 --- a/zencore/blake3.cpp +++ b/zencore/blake3.cpp @@ -4,12 +4,12 @@ #include <zencore/compositebuffer.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <zencore/zencore.h> #include "../3rdparty/BLAKE3/c/blake3.h" #pragma comment(lib, "blake3.lib") -#include <doctest/doctest.h> #include <string.h> ////////////////////////////////////////////////////////////////////////// @@ -123,6 +123,8 @@ BLAKE3Stream::GetHash() // Testing related code follows... // +#if ZEN_WITH_TESTS + doctest::String toString(const BLAKE3& value) { @@ -169,4 +171,6 @@ TEST_CASE("BLAKE3") } } +#endif + } // namespace zen diff --git a/zencore/compactbinary.cpp b/zencore/compactbinary.cpp index b508d8fe8..f4908aa9a 100644 --- a/zencore/compactbinary.cpp +++ b/zencore/compactbinary.cpp @@ -2,12 +2,12 @@ #include "zencore/compactbinary.h" +#include <zencore/compactbinaryvalidation.h> #include <zencore/compress.h> #include <zencore/endian.h> #include <zencore/stream.h> -#include "zencore/compactbinaryvalidation.h" +#include <zencore/testing.h> -#include <doctest/doctest.h> #include <string_view> namespace zen { @@ -1146,6 +1146,7 @@ SaveCompactBinary(BinaryWriter& Ar, const CbObjectView& Object) ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS void uson_forcelink() { @@ -1297,5 +1298,6 @@ TEST_CASE("uson.null") CHECK(Field.IsNull() == false); } } +#endif } // namespace zen diff --git a/zencore/compactbinarybuilder.cpp b/zencore/compactbinarybuilder.cpp index 08f37a23d..fa5b6a69b 100644 --- a/zencore/compactbinarybuilder.cpp +++ b/zencore/compactbinarybuilder.cpp @@ -7,12 +7,11 @@ #include <zencore/endian.h> #include <zencore/stream.h> #include <zencore/string.h> +#include <zencore/testing.h> #define _USE_MATH_DEFINES #include <math.h> -#include <doctest/doctest.h> - namespace zen { template<typename T> @@ -700,6 +699,7 @@ operator<<(CbWriter& Writer, const TimeSpan Value) /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS void usonbuilder_forcelink() { @@ -1534,5 +1534,6 @@ TEST_CASE("usonbuilder.stream") CHECK(ValidateCompactBinary(Object.GetBuffer(), CbValidateMode::All) == CbValidateError::None); } } +#endif } // namespace zen diff --git a/zencore/compactbinarypackage.cpp b/zencore/compactbinarypackage.cpp index 9a7e7c098..fbdcd24e9 100644 --- a/zencore/compactbinarypackage.cpp +++ b/zencore/compactbinarypackage.cpp @@ -5,8 +5,7 @@ #include <zencore/compactbinaryvalidation.h> #include <zencore/endian.h> #include <zencore/stream.h> - -#include <doctest/doctest.h> +#include <zencore/testing.h> namespace zen { @@ -747,6 +746,8 @@ namespace legacy { /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + void usonpackage_forcelink() { @@ -1253,4 +1254,6 @@ TEST_CASE("usonpackage.serialization") } } +#endif + } // namespace zen diff --git a/zencore/compactbinaryvalidation.cpp b/zencore/compactbinaryvalidation.cpp index dafd1bcc8..3d72148f9 100644 --- a/zencore/compactbinaryvalidation.cpp +++ b/zencore/compactbinaryvalidation.cpp @@ -6,11 +6,10 @@ #include <zencore/endian.h> #include <zencore/memory.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <algorithm> -#include <doctest/doctest.h> - namespace zen { namespace CbValidationPrivate { @@ -649,6 +648,7 @@ ValidateCompactBinaryPackage(MemoryView View, CbValidateMode Mode) /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS void usonvalidation_forcelink() { @@ -658,5 +658,6 @@ TEST_CASE("usonvalidation") { SUBCASE("Basic") {} } +#endif } // namespace zen diff --git a/zencore/compositebuffer.cpp b/zencore/compositebuffer.cpp index 9349c014f..3190ca5ea 100644 --- a/zencore/compositebuffer.cpp +++ b/zencore/compositebuffer.cpp @@ -3,8 +3,7 @@ #include <zencore/compositebuffer.h> #include <zencore/sharedbuffer.h> - -#include <doctest/doctest.h> +#include <zencore/testing.h> namespace zen { @@ -91,8 +90,8 @@ CompositeBuffer CompositeBuffer::Mid(uint64_t Offset, uint64_t Size) const { const uint64_t BufferSize = GetSize(); - Offset = zen::Min(Offset, BufferSize); - Size = zen::Min(Size, BufferSize - Offset); + Offset = Min(Offset, BufferSize); + Size = Min(Size, BufferSize - Offset); CompositeBuffer Buffer; IterateRange(Offset, Size, [&Buffer](MemoryView View, const SharedBuffer& ViewOuter) { Buffer.m_Segments.push_back(SharedBuffer::MakeView(View, ViewOuter)); @@ -168,6 +167,7 @@ CompositeBuffer::IterateRange(uint64_t Offset, } } +#if ZEN_WITH_TESTS TEST_CASE("CompositeBuffer Null") { CompositeBuffer Buffer; @@ -337,5 +337,6 @@ void compositebuffer_forcelink() { } +#endif } // namespace zen diff --git a/zencore/compress.cpp b/zencore/compress.cpp index 12a7b9ef8..8ca799e39 100644 --- a/zencore/compress.cpp +++ b/zencore/compress.cpp @@ -6,13 +6,13 @@ #include <zencore/compositebuffer.h> #include <zencore/crc32.h> #include <zencore/endian.h> +#include <zencore/testing.h> #include "../3rdparty/Oodle/include/oodle2.h" #if ZEN_PLATFORM_WINDOWS # pragma comment(lib, "oo2core_win64.lib") #endif -#include <doctest/doctest.h> #include <lz4.h> #include <functional> #include <limits> @@ -823,6 +823,8 @@ CompressedBuffer::TryGetCompressParameters(OodleCompressor& OutCompressor, Oodle \/ \/ \/ */ +#if ZEN_WITH_TESTS + TEST_CASE("CompressedBuffer") { uint8_t Zeroes[1024]{}; @@ -908,5 +910,6 @@ void compress_forcelink() { } +#endif } // namespace zen diff --git a/zencore/except.cpp b/zencore/except.cpp index 84e52ab9f..0167c406f 100644 --- a/zencore/except.cpp +++ b/zencore/except.cpp @@ -7,6 +7,41 @@ namespace zen { #if ZEN_PLATFORM_WINDOWS +class WindowsException : public std::exception +{ +public: + WindowsException(std::string_view Message) + { + m_hResult = HRESULT_FROM_WIN32(GetLastError()); + m_Message = Message; + } + + WindowsException(HRESULT hRes, std::string_view Message) + { + m_hResult = hRes; + m_Message = Message; + } + + WindowsException(HRESULT hRes, const char* Message, const char* Detail) + { + m_hResult = hRes; + + ExtendableStringBuilder<128> msg; + msg.Append(Message); + msg.Append(" (detail: '"); + msg.Append(Detail); + msg.Append("')"); + + m_Message = msg.c_str(); + } + + virtual const char* what() const override { return m_Message.c_str(); } + +private: + std::string m_Message; + HRESULT m_hResult; +}; + void ThrowSystemException([[maybe_unused]] HRESULT hRes, [[maybe_unused]] std::string_view Message) { @@ -28,6 +63,12 @@ ThrowLastError(std::string_view Message) throw std::system_error(std::error_code(zen::GetLastError(), std::system_category()), std::string(Message)); } +void +ThrowSystemError(uint32_t ErrorCode, std::string_view Message) +{ + throw std::system_error(std::error_code(ErrorCode, std::system_category()), std::string(Message)); +} + std::string GetLastErrorAsString() { diff --git a/zencore/filesystem.cpp b/zencore/filesystem.cpp index afbddcdbd..8ddcbac52 100644 --- a/zencore/filesystem.cpp +++ b/zencore/filesystem.cpp @@ -458,14 +458,14 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer HRESULT hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); if (hRes == HRESULT_FROM_WIN32(ERROR_PATH_NOT_FOUND)) { - zen::CreateDirectories(Path.parent_path()); + CreateDirectories(Path.parent_path()); hRes = Outfile.Create(Path.c_str(), GENERIC_WRITE, FILE_SHARE_READ, CREATE_ALWAYS); } if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "File open failed for '{}'"_format(Path).c_str()); + ThrowSystemException(hRes, "File open failed for '{}'"_format(Path).c_str()); } #else @@ -491,13 +491,13 @@ WriteFile(std::filesystem::path Path, const IoBuffer* const* Data, size_t Buffer while (WriteSize) { - const uint64_t ChunkSize = zen::Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024); + const uint64_t ChunkSize = Min<uint64_t>(WriteSize, uint64_t(2) * 1024 * 1024 * 1024); #if ZEN_PLATFORM_WINDOWS hRes = Outfile.Write(DataPtr, gsl::narrow_cast<uint32_t>(WriteSize)); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "File write failed for '{}'"_format(Path).c_str()); + ThrowSystemException(hRes, "File write failed for '{}'"_format(Path).c_str()); } #else if (write(Fd, DataPtr, WriteSize) != WriteSize) @@ -630,7 +630,7 @@ FileSystemTraversal::TraverseFileSystem(const std::filesystem::path& RootDir, Tr if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to open handle to volume root"); + ThrowSystemException(hRes, "Failed to open handle to volume root"); } while (Continue) diff --git a/zencore/include/zencore/except.h b/zencore/include/zencore/except.h index f0e04a795..1dc6209d6 100644 --- a/zencore/include/zencore/except.h +++ b/zencore/include/zencore/except.h @@ -15,63 +15,20 @@ namespace zen { #if ZEN_PLATFORM_WINDOWS -class WindowsException : public std::exception -{ -public: - WindowsException(std::string_view Message) - { - m_hResult = HRESULT_FROM_WIN32(GetLastError()); - m_Message = Message; - } - - WindowsException(HRESULT hRes, std::string_view Message) - { - m_hResult = hRes; - m_Message = Message; - } - - WindowsException(HRESULT hRes, const char* Message, const char* Detail) - { - m_hResult = hRes; - - ExtendableStringBuilder<128> msg; - msg.Append(Message); - msg.Append(" (detail: '"); - msg.Append(Detail); - msg.Append("')"); - - m_Message = msg.c_str(); - } - - virtual const char* what() const override { return m_Message.c_str(); } - -private: - std::string m_Message; - HRESULT m_hResult; -}; - -ZENCORE_API void ThrowSystemException(HRESULT hRes, std::string_view Message); +ZENCORE_API void ThrowSystemException [[noreturn]] (HRESULT hRes, std::string_view Message); #endif // ZEN_PLATFORM_WINDOWS -ZENCORE_API void ThrowLastError(std::string_view Message); +ZENCORE_API void ThrowLastError [[noreturn]] (std::string_view Message); #if __cpp_lib_source_location -ZENCORE_API void ThrowLastError(std::string_view Message, const std::source_location& Location); +ZENCORE_API void ThrowLastError [[noreturn]] (std::string_view Message, const std::source_location& Location); #endif +ZENCORE_API void ThrowSystemError [[noreturn]] (uint32_t ErrorCode, std::string_view Message); + ZENCORE_API std::string GetLastErrorAsString(); ZENCORE_API std::string GetErrorAsString(uint32_t ErrorCode); -inline void -ThrowSystemException(const char* Message) -{ -#if ZEN_PLATFORM_WINDOWS - throw WindowsException(Message); -#else - ThrowLastError(Message); -#endif -} - inline int32_t GetLastError() { diff --git a/zencore/include/zencore/logging.h b/zencore/include/zencore/logging.h index 412a39415..221f5f358 100644 --- a/zencore/include/zencore/logging.h +++ b/zencore/include/zencore/logging.h @@ -80,3 +80,10 @@ using zen::Log; using namespace std::literals; \ Log().critical(fmtstr##sv, ##__VA_ARGS__);\ } while (false) + +#define ZEN_CONSOLE(fmtstr, ...) \ + do \ + { \ + using namespace std::literals; \ + ConsoleLog().info(fmtstr##sv, __VA_ARGS__); \ + } while (false) diff --git a/zencore/include/zencore/mpscqueue.h b/zencore/include/zencore/mpscqueue.h new file mode 100644 index 000000000..bb558bb5a --- /dev/null +++ b/zencore/include/zencore/mpscqueue.h @@ -0,0 +1,109 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <atomic> +#include <new> +#include <optional> + +#ifdef __cpp_lib_hardware_interference_size +using std::hardware_constructive_interference_size; +using std::hardware_destructive_interference_size; +#else +// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ... +constexpr std::size_t hardware_constructive_interference_size = 64; +constexpr std::size_t hardware_destructive_interference_size = 64; +#endif + +namespace zen { + +/** An untyped array of data with compile-time alignment and size derived from another type. */ +template<typename ElementType> +struct TypeCompatibleStorage +{ + ElementType* Data() { return (ElementType*)this; } + const ElementType* Data() const { return (const ElementType*)this; } + + char alignas(ElementType) DataMember; +}; + +/** Fast multi-producer/single-consumer unbounded concurrent queue. + + Based on http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + */ + +template<typename T> +class MpscQueue final +{ +public: + using ElementType = T; + + MpscQueue() + { + Node* Sentinel = new Node; + Head.store(Sentinel, std::memory_order_relaxed); + Tail = Sentinel; + } + + ~MpscQueue() + { + Node* Next = Tail->Next.load(std::memory_order_relaxed); + + // sentinel's value is already destroyed + delete Tail; + + while (Next != nullptr) + { + Tail = Next; + Next = Tail->Next.load(std::memory_order_relaxed); + + std::destroy_at((ElementType*)&Tail->Value); + delete Tail; + } + } + + template<typename... ArgTypes> + void Enqueue(ArgTypes&&... Args) + { + Node* New = new Node; + new (&New->Value) ElementType(std::forward<ArgTypes>(Args)...); + + Node* Prev = Head.exchange(New, std::memory_order_acq_rel); + Prev->Next.store(New, std::memory_order_release); + } + + std::optional<ElementType> Dequeue() + { + Node* Next = Tail->Next.load(std::memory_order_acquire); + + if (Next == nullptr) + { + return {}; + } + + ElementType* ValuePtr = (ElementType*)&Next->Value; + std::optional<ElementType> Res{std::move(*ValuePtr)}; + std::destroy_at(ValuePtr); + + delete Tail; // current sentinel + + Tail = Next; // new sentinel + return Res; + } + +private: + struct Node + { + std::atomic<Node*> Next{nullptr}; + TypeCompatibleStorage<ElementType> Value; + }; + +private: + std::atomic<Node*> Head; // accessed only by producers + alignas(hardware_constructive_interference_size) + Node* Tail; // accessed only by consumer, hence should be on a different cache line than `Head` +}; + +void mpscqueue_forcelink(); + +} // namespace zen diff --git a/zencore/include/zencore/session.h b/zencore/include/zencore/session.h index 2da41b2c8..dd90197bf 100644 --- a/zencore/include/zencore/session.h +++ b/zencore/include/zencore/session.h @@ -8,6 +8,7 @@ namespace zen { struct Oid; -ZENCORE_API Oid GetSessionId(); +ZENCORE_API [[nodiscard]] Oid GetSessionId(); +ZENCORE_API [[nodiscard]] std::string_view GetSessionIdString(); } // namespace zen diff --git a/zencore/include/zencore/snapshot_manifest.h b/zencore/include/zencore/snapshot_manifest.h deleted file mode 100644 index 95e64773a..000000000 --- a/zencore/include/zencore/snapshot_manifest.h +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/iohash.h> -#include <zencore/zencore.h> - -#include <filesystem> -#include <functional> -#include <string> -#include <vector> - -namespace zen { - -struct LeafNode -{ - uint64_t FileSize = 0; - uint64_t FileModifiedTime = 0; - zen::IoHash ChunkHash = zen::IoHash::Zero; - std::wstring Name; -}; - -struct TreeNode -{ - std::vector<TreeNode> Children; - std::vector<LeafNode> Leaves; - std::wstring Name; - zen::BLAKE3 ChunkHash = zen::BLAKE3::Zero; - - ZENCORE_API void VisitModifyFiles(std::function<void(LeafNode& node)> func); - ZENCORE_API void VisitFiles(std::function<void(const LeafNode& node)> func); - ZENCORE_API void Finalize(); -}; - -struct SnapshotManifest -{ - std::string Id; - TreeNode Root; - zen::BLAKE3 ChunkHash = zen::BLAKE3::Zero; - - ZENCORE_API void finalize(); -}; - -class InStream; -class OutStream; - -ZENCORE_API void ReadManifest(SnapshotManifest& Manifest, InStream& FromStream); -ZENCORE_API void WriteManifest(const SnapshotManifest& Manifest, OutStream& ToStream); -ZENCORE_API void PrintManifest(const SnapshotManifest& Manifest, OutStream& ToStream); - -// Translate a user-provided manifest specification into a file path. -// Supports hashtag syntax to implicitly refer to user documents zenfs folder -ZENCORE_API std::filesystem::path ManifestSpecToPath(const char* ManifestSpec); - -void snapshotmanifest_forcelink(); - -} // namespace zen diff --git a/zencore/include/zencore/string.h b/zencore/include/zencore/string.h index 2b5f20f86..bb9b1c896 100644 --- a/zencore/include/zencore/string.h +++ b/zencore/include/zencore/string.h @@ -622,6 +622,45 @@ ToLower(const std::string_view& InString) ////////////////////////////////////////////////////////////////////////// +template<typename Fn> +uint32_t +ForEachStrTok(const std::string_view& Str, char Delim, Fn&& Func) +{ + auto It = Str.begin(); + auto End = Str.end(); + uint32_t Count = 0; + + while (It != End) + { + if (*It == Delim) + { + It++; + continue; + } + + std::string_view Remaining{It, End}; + size_t Idx = Remaining.find(Delim, 0); + + if (Idx == std::string_view::npos) + { + Idx = Remaining.size(); + } + + Count++; + std::string_view Token{It, It + Idx}; + if (!Func(Token)) + { + break; + } + + It = It + Idx; + } + + return Count; +} + +////////////////////////////////////////////////////////////////////////// + void string_forcelink(); // internal } // namespace zen diff --git a/zencore/include/zencore/testing.h b/zencore/include/zencore/testing.h new file mode 100644 index 000000000..80aebc26e --- /dev/null +++ b/zencore/include/zencore/testing.h @@ -0,0 +1,9 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#if ZEN_WITH_TESTS +# include <doctest/doctest.h> +#endif diff --git a/zencore/include/zencore/testutils.h b/zencore/include/zencore/testutils.h new file mode 100644 index 000000000..72d985d5c --- /dev/null +++ b/zencore/include/zencore/testutils.h @@ -0,0 +1,31 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <filesystem> + +namespace zen { + +std::filesystem::path CreateTemporaryDirectory(); + +class ScopedTemporaryDirectory +{ +public: + ScopedTemporaryDirectory(); + ~ScopedTemporaryDirectory(); + + std::filesystem::path& Path() { return m_RootPath; } + +private: + std::filesystem::path m_RootPath; +}; + +struct ScopedCurrentDirectoryChange +{ + std::filesystem::path OldPath{std::filesystem::current_path()}; + + ScopedCurrentDirectoryChange() { std::filesystem::current_path(CreateTemporaryDirectory()); } + ~ScopedCurrentDirectoryChange() { std::filesystem::current_path(OldPath); } +}; + +} // namespace zen diff --git a/zencore/include/zencore/thread.h b/zencore/include/zencore/thread.h index b18da6031..7889682cd 100644 --- a/zencore/include/zencore/thread.h +++ b/zencore/include/zencore/thread.h @@ -4,9 +4,9 @@ #include "zencore.h" -#if !ZEN_PLATFORM_WINDOWS -# include <shared_mutex> -#endif +#include <shared_mutex> + +#include <vector> namespace zen { @@ -64,11 +64,7 @@ public: }; private: -#if ZEN_PLATFORM_WINDOWS - void* m_Srw = nullptr; -#else std::shared_mutex m_Mutex; -#endif }; /** Basic abstraction of a simple event synchronization mechanism (aka 'binary semaphore') @@ -93,6 +89,7 @@ public: ZENCORE_API void Set(); ZENCORE_API void Reset(); ZENCORE_API bool Wait(int TimeoutMs = -1); + ZENCORE_API void Close(); protected: explicit Event(void* EventHandle) : m_EventHandle(EventHandle) {} @@ -150,6 +147,28 @@ private: int m_Pid = 0; }; +/** Process monitor - monitors a list of running processes via polling + + Intended to be used to monitor a set of "sponsor" processes, where + we need to determine when none of them remain alive + + */ + +class ProcessMonitor +{ +public: + ProcessMonitor(); + ~ProcessMonitor(); + + ZENCORE_API bool IsRunning(); + ZENCORE_API void AddPid(int Pid); + ZENCORE_API bool IsActive() const; + +private: + mutable RwLock m_Lock; + std::vector<void*> m_ProcessHandles; +}; + ZENCORE_API bool IsProcessRunning(int pid); ZENCORE_API int GetCurrentProcessId(); diff --git a/zencore/include/zencore/uid.h b/zencore/include/zencore/uid.h index 2730b1415..f4e9ab65a 100644 --- a/zencore/include/zencore/uid.h +++ b/zencore/include/zencore/uid.h @@ -60,6 +60,8 @@ struct Oid const Oid& Generate(); [[nodiscard]] static Oid FromHexString(const std::string_view String); StringBuilderBase& ToString(StringBuilderBase& OutString) const; + void ToString(char OutString[StringLength]); + [[nodiscard]] static Oid FromMemory(const void* Ptr); auto operator<=>(const Oid& rhs) const = default; [[nodiscard]] inline operator bool() const { return *this == Zero; } diff --git a/zencore/include/zencore/zencore.h b/zencore/include/zencore/zencore.h index 54df7e85e..f6093cb96 100644 --- a/zencore/include/zencore/zencore.h +++ b/zencore/include/zencore/zencore.h @@ -3,9 +3,13 @@ #pragma once #include <cinttypes> -#include <exception> +#include <stdexcept> #include <string> +#ifndef ZEN_WITH_TESTS +# define ZEN_WITH_TESTS 1 +#endif + ////////////////////////////////////////////////////////////////////////// // Platform // @@ -56,6 +60,10 @@ # endif #endif +#if ZEN_COMPILER_MSC +# pragma warning(disable : 4324) // warning C4324: '<type>': structure was padded due to alignment specifier +#endif + ////////////////////////////////////////////////////////////////////////// // Architecture // @@ -90,37 +98,69 @@ // Assert // +#if ZEN_PLATFORM_WINDOWS +// Tells the compiler to put the decorated function in a certain section (aka. segment) of the executable. +# define ZEN_CODE_SECTION(Name) __declspec(code_seg(Name)) +# define ZEN_FORCENOINLINE __declspec(noinline) /* Force code to NOT be inline */ +#else +# define ZEN_CODE_SECTION(Name) +# define ZEN_FORCENOINLINE +#endif + +#if ZEN_ARCH_ARM64 +// On ARM we can't do this because the executable will require jumps larger +// than the branch instruction can handle. Clang will only generate +// the trampolines in the .text segment of the binary. If the uedbg segment +// is present it will generate code that it cannot link. +# define ZEN_DEBUG_SECTION +#else +// We'll put all assert implementation code into a separate section in the linked +// executable. This code should never execute so using a separate section keeps +// it well off the hot path and hopefully out of the instruction cache. It also +// facilitates reasoning about the makeup of a compiled/linked binary. +# define ZEN_DEBUG_SECTION ZEN_CODE_SECTION(".zcold") +#endif // DO_CHECK || DO_GUARD_SLOW + namespace zen { -class AssertException : public std::exception +class AssertException : public std::logic_error { public: - AssertException(const char* Msg); - ~AssertException(); - - [[nodiscard]] virtual char const* what() const noexcept override { return m_Msg.c_str(); } - -private: - std::string m_Msg; + AssertException(const char* Msg) : std::logic_error(Msg) {} }; } // namespace zen -#define ZEN_ASSERT(x, ...) \ - do \ - { \ - if (x) \ - break; \ - throw ::zen::AssertException{#x}; \ +template<typename RetType = void, class InnerType, typename... ArgTypes> +RetType ZEN_FORCENOINLINE ZEN_DEBUG_SECTION +DispatchAssert(InnerType&& Inner, ArgTypes const&... Args) +{ + return Inner(Args...); +} + +#define ZEN_ASSERT(x, ...) \ + do \ + { \ + if (x) [[unlikely]] \ + break; \ + struct Impl \ + { \ + static void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ExecThrow [[noreturn]] () { throw ::zen::AssertException{#x}; } \ + }; \ + Impl::ExecThrow(); \ } while (false) #ifndef NDEBUG -# define ZEN_ASSERT_SLOW(x, ...) \ - do \ - { \ - if (x) \ - break; \ - throw ::zen::AssertException{#x}; \ +# define ZEN_ASSERT_SLOW(x, ...) \ + do \ + { \ + if (x) [[unlikely]] \ + break; \ + struct Impl \ + { \ + static void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION ExecThrow [[noreturn]] () { throw ::zen::AssertException{#x}; } \ + }; \ + Impl::ExecThrow(); \ } while (false) #else # define ZEN_ASSERT_SLOW(x, ...) @@ -148,14 +188,24 @@ char (&ZenArrayCountHelper(const T (&)[N]))[N + 1]; #define ZEN_UNUSED(...) ((void)__VA_ARGS__) #define ZEN_NOT_IMPLEMENTED(...) ZEN_ASSERT(false, __VA_ARGS__) -#define ZENCORE_API // Placeholder to allow DLL configs in the future +#define ZENCORE_API // Placeholder to allow DLL configs in the future (maybe) + +namespace zen { ZENCORE_API bool IsPointerToStack(const void* ptr); // Query if pointer is within the stack of the currently executing thread ZENCORE_API bool IsApplicationExitRequested(); ZENCORE_API void RequestApplicationExit(int ExitCode); +ZENCORE_API bool IsDebuggerPresent(); +ZENCORE_API bool IsInteractiveSession(); ZENCORE_API void zencore_forcelinktests(); +} // namespace zen + +#ifndef ZEN_USE_MIMALLOC +# define ZEN_USE_MIMALLOC 1 +#endif + ////////////////////////////////////////////////////////////////////////// #if ZEN_COMPILER_MSC diff --git a/zencore/intmath.cpp b/zencore/intmath.cpp index 98c345c79..5a686dc8e 100644 --- a/zencore/intmath.cpp +++ b/zencore/intmath.cpp @@ -3,7 +3,7 @@ #include <zencore/endian.h> #include <zencore/intmath.h> -#include <doctest/doctest.h> +#include <zencore/testing.h> namespace zen { @@ -12,6 +12,8 @@ namespace zen { // Testing related code follows... // +#if ZEN_WITH_TESTS + void intmath_forcelink() { @@ -58,4 +60,6 @@ TEST_CASE("intmath") CHECK(ByteSwap(uint64_t(0x214d'6172'7469'6e21ull)) == 0x216e'6974'7261'4d21ull); } +#endif + } // namespace zen diff --git a/zencore/iobuffer.cpp b/zencore/iobuffer.cpp index beb969bc7..5d3458dba 100644 --- a/zencore/iobuffer.cpp +++ b/zencore/iobuffer.cpp @@ -2,14 +2,15 @@ #include <zencore/iobuffer.h> -#include <doctest/doctest.h> -#include <memory.h> #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> +#include <zencore/testing.h> #include <zencore/thread.h> + +#include <memory.h> #include <system_error> #if ZEN_PLATFORM_WINDOWS @@ -456,6 +457,8 @@ IoBufferBuilder::MakeFromTemporaryFile(const path_char_t* FileName) ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + void iobuffer_forcelink() { @@ -468,4 +471,6 @@ TEST_CASE("IoBuffer") zen::IoBuffer buffer3(buffer2, 0, buffer2.Size()); } +#endif + } // namespace zen diff --git a/zencore/iohash.cpp b/zencore/iohash.cpp index ad8d89ff0..77076c133 100644 --- a/zencore/iohash.cpp +++ b/zencore/iohash.cpp @@ -5,8 +5,8 @@ #include <zencore/blake3.h> #include <zencore/compositebuffer.h> #include <zencore/string.h> +#include <zencore/testing.h> -#include <doctest/doctest.h> #include <gsl/gsl-lite.hpp> namespace zen { diff --git a/zencore/md5.cpp b/zencore/md5.cpp index 228c0feff..237f6cfdd 100644 --- a/zencore/md5.cpp +++ b/zencore/md5.cpp @@ -2,9 +2,9 @@ #include <zencore/md5.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <zencore/zencore.h> -#include <doctest/doctest.h> #include <string.h> // big endian architectures need #define __BYTE_ORDER __BIG_ENDIAN @@ -425,6 +425,8 @@ MD5::ToHexString(StringBuilderBase& outBuilder) const // Testing related code follows... // +#if ZEN_WITH_TESTS + void md5_forcelink() { @@ -443,4 +445,6 @@ TEST_CASE("MD5") { } +#endif + } // namespace zen diff --git a/zencore/memory.cpp b/zencore/memory.cpp index 26c8321e5..613b6ba67 100644 --- a/zencore/memory.cpp +++ b/zencore/memory.cpp @@ -2,6 +2,7 @@ #include <zencore/intmath.h> #include <zencore/memory.h> +#include <zencore/testing.h> #ifdef ZEN_PLATFORM_WINDOWS # include <malloc.h> @@ -9,8 +10,6 @@ # include <cstdlib> #endif -#include <doctest/doctest.h> - namespace zen { ////////////////////////////////////////////////////////////////////////// @@ -147,6 +146,8 @@ ChunkingLinearAllocator::Alloc(size_t Size, size_t Alignment) // Unit tests // +#if ZEN_WITH_TESTS + TEST_CASE("ChunkingLinearAllocator") { ChunkingLinearAllocator Allocator(4096); @@ -194,4 +195,6 @@ memory_forcelink() { } +#endif + } // namespace zen diff --git a/zencore/mpscqueue.cpp b/zencore/mpscqueue.cpp new file mode 100644 index 000000000..e1841ef63 --- /dev/null +++ b/zencore/mpscqueue.cpp @@ -0,0 +1,25 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/mpscqueue.h> + +#include <zencore/testing.h> +#include <string> + +namespace zen { + +#if ZEN_WITH_TESTS && 0 +TEST_CASE("mpsc") +{ + MpscQueue<std::string> Queue; + Queue.Enqueue("hello"); + std::optional<std::string> Value = Queue.Dequeue(); + CHECK_EQ(Value, "hello"); +} +#endif + +void +mpscqueue_forcelink() +{ +} + +} // namespace zen
\ No newline at end of file diff --git a/zencore/refcount.cpp b/zencore/refcount.cpp index 943635552..33b530b90 100644 --- a/zencore/refcount.cpp +++ b/zencore/refcount.cpp @@ -2,7 +2,8 @@ #include <zencore/refcount.h> -#include <doctest/doctest.h> +#include <zencore/testing.h> + #include <functional> namespace zen { @@ -12,6 +13,8 @@ namespace zen { // Testing related code follows... // +#if ZEN_WITH_TESTS + struct TestRefClass : public RefCounted { ~TestRefClass() @@ -93,4 +96,6 @@ TEST_CASE("RefPtr on Stack allocated object") CHECK(IsDestroyed == true); } +#endif + } // namespace zen diff --git a/zencore/session.cpp b/zencore/session.cpp index d57d3685b..ce4bfae1b 100644 --- a/zencore/session.cpp +++ b/zencore/session.cpp @@ -9,14 +9,27 @@ namespace zen { static Oid GlobalSessionId; +static char GlobalSessionString[Oid::StringLength]; static std::once_flag SessionInitFlag; Oid GetSessionId() { - std::call_once(SessionInitFlag, [&] { GlobalSessionId.Generate(); }); + std::call_once(SessionInitFlag, [&] { + GlobalSessionId.Generate(); + GlobalSessionId.ToString(GlobalSessionString); + }); return GlobalSessionId; } +std::string_view +GetSessionIdString() +{ + // Ensure we actually have a generated session identifier + std::ignore = GetSessionId(); + + return std::string_view(GlobalSessionString, Oid::StringLength); +} + } // namespace zen
\ No newline at end of file diff --git a/zencore/sha1.cpp b/zencore/sha1.cpp index 3cc2f5cdf..8b4e7897f 100644 --- a/zencore/sha1.cpp +++ b/zencore/sha1.cpp @@ -6,9 +6,9 @@ #include <zencore/sha1.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <zencore/zencore.h> -#include <doctest/doctest.h> #include <string.h> // big endian architectures need #define __BYTE_ORDER __BIG_ENDIAN @@ -357,6 +357,8 @@ SHA1::ToHexString(StringBuilderBase& outBuilder) const // Testing related code follows... // +#if ZEN_WITH_TESTS + void sha1_forcelink() { @@ -436,4 +438,6 @@ TEST_CASE("SHA1") } } +#endif + } // namespace zen diff --git a/zencore/sharedbuffer.cpp b/zencore/sharedbuffer.cpp index 2761d0b4d..200e06972 100644 --- a/zencore/sharedbuffer.cpp +++ b/zencore/sharedbuffer.cpp @@ -2,7 +2,8 @@ #include <zencore/sharedbuffer.h> -#include <doctest/doctest.h> +#include <zencore/testing.h> + #include <memory.h> #include <gsl/gsl-lite.hpp> @@ -129,6 +130,8 @@ SharedBuffer::Clone(MemoryView View) ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + void sharedbuffer_forcelink() { @@ -138,4 +141,6 @@ TEST_CASE("SharedBuffer") { } +#endif + } // namespace zen diff --git a/zencore/snapshot_manifest.cpp b/zencore/snapshot_manifest.cpp deleted file mode 100644 index 87625fb7f..000000000 --- a/zencore/snapshot_manifest.cpp +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <doctest/doctest.h> -#include <zencore/snapshot_manifest.h> -#include <zencore/stream.h> -#include <zencore/streamutil.h> -#include <zencore/string.h> -#include <ostream> - -#include <filesystem> - -#include <atlbase.h> - -// Used for getting My Documents for default snapshot dir -#include <ShlObj.h> -#if ZEN_PLATFORM_WINDOWS -# pragma comment(lib, "shell32.lib") -#endif - -namespace zen { - -constexpr const char* magicString = "-=- ZEN_SNAP -=-"; - -struct SerializedManifestHeader -{ - char Magic[16]; - - void init() { memcpy(Magic, magicString, sizeof Magic); } - bool verify() const { return memcmp(Magic, magicString, sizeof Magic) == 0; } -}; - -TextWriter& -operator<<(TextWriter& Writer, const LeafNode& Leaf) -{ - Writer << "modTime: " << Leaf.FileModifiedTime << ", size: " << Leaf.FileSize << ", hash: " << Leaf.ChunkHash << ", name: " << Leaf.Name - << "\n"; - - return Writer; -} - -BinaryWriter& -operator<<(BinaryWriter& Writer, const LeafNode& Leaf) -{ - Writer << Leaf.FileModifiedTime << Leaf.FileSize << Leaf.ChunkHash << Leaf.Name; - - return Writer; -} - -BinaryReader& -operator>>(BinaryReader& Reader, LeafNode& Leaf) -{ - Reader >> Leaf.FileModifiedTime >> Leaf.FileSize >> Leaf.ChunkHash >> Leaf.Name; - - return Reader; -} - -void -TreeNode::Finalize() -{ - zen::BLAKE3Stream Blake3Stream; - - for (auto& Node : Children) - { - Node.Finalize(); - Blake3Stream.Append(Node.ChunkHash.Hash, sizeof Node.ChunkHash); - Blake3Stream.Append(Node.Name.data(), Node.Name.size() + 1); - } - - for (auto& leaf : Leaves) - { - Blake3Stream.Append(leaf.ChunkHash.Hash, sizeof leaf.ChunkHash); - Blake3Stream.Append(leaf.Name.data(), leaf.Name.size() + 1); - } - - this->ChunkHash = Blake3Stream.GetHash(); -} - -void -TreeNode::VisitFiles(std::function<void(const LeafNode& node)> func) -{ - for (auto& Node : Children) - Node.VisitFiles(func); - - for (auto& Leaf : Leaves) - func(Leaf); -} - -void -TreeNode::VisitModifyFiles(std::function<void(LeafNode& node)> func) -{ - for (auto& Node : Children) - Node.VisitModifyFiles(func); - - for (auto& Leaf : Leaves) - func(Leaf); -} - -IndentTextWriter& -operator<<(IndentTextWriter& Writer, const TreeNode& Node) -{ - Writer << "hash: " << Node.ChunkHash << ", name: " << Node.Name << "\n"; - - if (!Node.Leaves.empty()) - { - Writer << "files: " - << "\n"; - - IndentTextWriter::Scope _(Writer); - - for (const LeafNode& Leaf : Node.Leaves) - Writer << Leaf; - } - - if (!Node.Children.empty()) - { - Writer << "children: " - << "\n"; - - IndentTextWriter::Scope _(Writer); - - for (const TreeNode& Child : Node.Children) - { - Writer << Child; - } - } - - return Writer; -} - -BinaryWriter& -operator<<(BinaryWriter& Writer, const TreeNode& Node) -{ - Writer << Node.ChunkHash << Node.Name; - Writer << uint32_t(Node.Children.size()); - - for (const TreeNode& child : Node.Children) - Writer << child; - - Writer << uint32_t(Node.Leaves.size()); - - for (const LeafNode& Leaf : Node.Leaves) - Writer << Leaf; - - return Writer; -} - -BinaryReader& -operator>>(BinaryReader& Reader, TreeNode& Node) -{ - Reader >> Node.ChunkHash >> Node.Name; - - uint32_t ChildCount = 0; - Reader >> ChildCount; - Node.Children.resize(ChildCount); - - for (TreeNode& Child : Node.Children) - Reader >> Child; - - uint32_t LeafCount = 0; - Reader >> LeafCount; - Node.Leaves.resize(LeafCount); - - for (LeafNode& Leaf : Node.Leaves) - Reader >> Leaf; - - return Reader; -} - -void -SnapshotManifest::finalize() -{ - Root.Finalize(); - - zen::BLAKE3Stream Blake3Stream; - - Blake3Stream.Append(Root.ChunkHash.Hash, sizeof Root.ChunkHash); - Blake3Stream.Append(Root.Name.data(), Root.Name.size() + 1); - - this->ChunkHash = Blake3Stream.GetHash(); -} - -void -WriteManifest(const SnapshotManifest& Manifest, OutStream& ToStream) -{ - BinaryWriter Out(ToStream); - SerializedManifestHeader Header; - Header.init(); - Out.Write(&Header, sizeof Header); - - Out << Manifest.ChunkHash << Manifest.Id << Manifest.Root; -} - -void -ReadManifest(SnapshotManifest& Manifest, InStream& FromStream) -{ - BinaryReader Reader(FromStream); - SerializedManifestHeader Header; - Reader.Read(&Header, sizeof Header); - - Reader >> Manifest.ChunkHash >> Manifest.Id >> Manifest.Root; -} - -void -PrintManifest(const SnapshotManifest& Manifest, OutStream& ToStream) -{ - IndentTextWriter Writer(ToStream); - - Writer << "hash: " << Manifest.ChunkHash << "\n"; - Writer << "id: " << Manifest.Id << "\n"; - Writer << "root: " - << "\n"; - IndentTextWriter::Scope _(Writer); - Writer << Manifest.Root; -} - -std::filesystem::path -ManifestSpecToPath(const char* ManifestSpec) -{ - ExtendableWideStringBuilder<128> ManifestTargetFile; - - if (ManifestSpec[0] == '#') - { - // Pick sensible default - - WCHAR MyDocumentsDir[MAX_PATH]; - HRESULT hRes = SHGetFolderPathW(NULL, - CSIDL_PERSONAL /* My Documents */, - NULL, - SHGFP_TYPE_CURRENT, - /* out */ MyDocumentsDir); - - if (SUCCEEDED(hRes)) - { - wcscat_s(MyDocumentsDir, L"\\zenfs\\Snapshots\\"); - - ManifestTargetFile.Append(MyDocumentsDir); - ManifestTargetFile.AppendAscii(ManifestSpec + 1); - } - } - else - { - ManifestTargetFile.AppendAscii(ManifestSpec); - } - - std::filesystem::path ManifestPath{ManifestTargetFile.c_str()}; - - if (ManifestPath.extension() != L".zenfs") - { - ManifestPath.append(L".zenfs"); - } - - return ManifestPath; -} - -////////////////////////////////////////////////////////////////////////// -// -// Testing related code follows... -// - -void -snapshotmanifest_forcelink() -{ -} - -TEST_CASE("Snapshot manifest") -{ - SnapshotManifest Manifest; - - Manifest.Id = "test_manifest"; - Manifest.ChunkHash = zen::BLAKE3::HashMemory("abcd", 4); - - MemoryOutStream Outstream; - WriteManifest(Manifest, Outstream); - - MemoryInStream Instream(Outstream.Data(), Outstream.Size()); - SnapshotManifest Manifest2; - ReadManifest(/* out */ Manifest2, Instream); - - CHECK(Manifest.Id == Manifest2.Id); - CHECK(Manifest.ChunkHash == Manifest2.ChunkHash); -} - -} // namespace zen diff --git a/zencore/stats.cpp b/zencore/stats.cpp index f8cdc8fbb..c5187940e 100644 --- a/zencore/stats.cpp +++ b/zencore/stats.cpp @@ -1,10 +1,13 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "zencore/stats.h" -#include <doctest/doctest.h> #include <cmath> #include "zencore/timer.h" +#if ZEN_WITH_TESTS +# include <zencore/testing.h> +#endif + // // Derived from https://github.com/dln/medida/blob/master/src/medida/stats/ewma.cc // @@ -47,6 +50,8 @@ EWMA::Rate() const ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + TEST_CASE("Stats") { SUBCASE("Simple") @@ -70,4 +75,6 @@ stats_forcelink() { } +#endif + } // namespace zen diff --git a/zencore/stream.cpp b/zencore/stream.cpp index 8687d5501..ead0b014b 100644 --- a/zencore/stream.cpp +++ b/zencore/stream.cpp @@ -1,9 +1,10 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <doctest/doctest.h> #include <stdarg.h> #include <zencore/memory.h> #include <zencore/stream.h> +#include <zencore/testing.h> + #include <algorithm> #include <stdexcept> @@ -279,6 +280,8 @@ IndentTextWriter::Write(const void* data, size_t byteCount) // Testing related code follows... // +#if ZEN_WITH_TESTS + void stream_forcelink() { @@ -336,4 +339,6 @@ TEST_CASE("BinaryWriter and BinaryWriter") CHECK(i64 == 42); } +#endif + } // namespace zen diff --git a/zencore/string.cpp b/zencore/string.cpp index 8ea10d2a3..6dcdc9542 100644 --- a/zencore/string.cpp +++ b/zencore/string.cpp @@ -1,11 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <doctest/doctest.h> +#include <zencore/memory.h> +#include <zencore/string.h> +#include <zencore/testing.h> + #include <inttypes.h> #include <math.h> #include <stdio.h> -#include <zencore/memory.h> -#include <zencore/string.h> #include <exception> #include <ostream> #include <stdexcept> @@ -452,6 +453,8 @@ template class StringBuilderImpl<wchar_t>; // Unit tests // +#if ZEN_WITH_TESTS + TEST_CASE("niceNum") { char Buffer[16]; @@ -912,4 +915,46 @@ TEST_CASE("filepath") CHECK(FilepathFindExtension(".txt") == std::string_view(".txt")); } +TEST_CASE("string") +{ + using namespace std::literals; + + SUBCASE("ForEachStrTok") + { + const auto Tokens = "here,is,my,different,tokens"sv; + int32_t ExpectedTokenCount = 5; + int32_t TokenCount = 0; + StringBuilder<512> Sb; + + TokenCount = ForEachStrTok(Tokens, ',', [&Sb](const std::string_view& Token) { + if (Sb.Size()) + { + Sb << ","; + } + Sb << Token; + return true; + }); + + CHECK(TokenCount == ExpectedTokenCount); + CHECK(Sb.ToString() == Tokens); + + ExpectedTokenCount = 1; + const auto Str = "mosdef"sv; + + Sb.Reset(); + TokenCount = ForEachStrTok(Str, ' ', [&Sb](const std::string_view& Token) { + Sb << Token; + return true; + }); + CHECK(Sb.ToString() == Str); + CHECK(TokenCount == ExpectedTokenCount); + + ExpectedTokenCount = 0; + TokenCount = ForEachStrTok(""sv, ',', [](const std::string_view&) { return true; }); + CHECK(TokenCount == ExpectedTokenCount); + } +} + +#endif + } // namespace zen diff --git a/zencore/testutils.cpp b/zencore/testutils.cpp new file mode 100644 index 000000000..116491950 --- /dev/null +++ b/zencore/testutils.cpp @@ -0,0 +1,33 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zencore/testutils.h" +#include <zencore/session.h> +#include "zencore/string.h" + +namespace zen { + +static std::atomic<int> Sequence{0}; + +std::filesystem::path +CreateTemporaryDirectory() +{ + std::error_code Ec; + + std::filesystem::path DirPath = std::filesystem::temp_directory_path() / GetSessionIdString() / IntNum(++Sequence).c_str(); + std::filesystem::remove_all(DirPath, Ec); + std::filesystem::create_directories(DirPath); + + return DirPath; +} + +ScopedTemporaryDirectory::ScopedTemporaryDirectory() : m_RootPath(CreateTemporaryDirectory()) +{ +} + +ScopedTemporaryDirectory::~ScopedTemporaryDirectory() +{ + std::error_code Ec; + std::filesystem::remove_all(m_RootPath, Ec); +} + +} // namespace zen
\ No newline at end of file diff --git a/zencore/thread.cpp b/zencore/thread.cpp index 620ea3bff..ded180337 100644 --- a/zencore/thread.cpp +++ b/zencore/thread.cpp @@ -17,41 +17,25 @@ namespace zen { void RwLock::AcquireShared() { -#if ZEN_PLATFORM_WINDOWS - AcquireSRWLockShared((PSRWLOCK)&m_Srw); -#else m_Mutex.lock_shared(); -#endif } void RwLock::ReleaseShared() { -#if ZEN_PLATFORM_WINDOWS - ReleaseSRWLockShared((PSRWLOCK)&m_Srw); -#else m_Mutex.unlock_shared(); -#endif } void RwLock::AcquireExclusive() { -#if ZEN_PLATFORM_WINDOWS - AcquireSRWLockExclusive((PSRWLOCK)&m_Srw); -#else m_Mutex.lock(); -#endif } void RwLock::ReleaseExclusive() { -#if ZEN_PLATFORM_WINDOWS - ReleaseSRWLockExclusive((PSRWLOCK)&m_Srw); -#else m_Mutex.unlock(); -#endif } ////////////////////////////////////////////////////////////////////////// @@ -78,6 +62,13 @@ Event::Reset() ResetEvent(m_EventHandle); } +void +Event::Close() +{ + CloseHandle(m_EventHandle); + m_EventHandle = nullptr; +} + bool Event::Wait(int TimeoutMs) { @@ -174,6 +165,7 @@ ProcessHandle::Initialize(void* ProcessHandle) ZEN_ASSERT(m_ProcessHandle == nullptr); // TODO: perform some debug verification here to verify it's a valid handle? m_ProcessHandle = ProcessHandle; + m_Pid = GetProcessId(m_ProcessHandle); } ProcessHandle::~ProcessHandle() @@ -255,7 +247,8 @@ ProcessHandle::Wait(int TimeoutMs) case WAIT_FAILED: // What might go wrong here, and what is meaningful to act on? - throw WindowsException("Process::Wait failed"); + using namespace std::literals; + ThrowLastError("Process::Wait failed"sv); } return false; @@ -263,19 +256,100 @@ ProcessHandle::Wait(int TimeoutMs) ////////////////////////////////////////////////////////////////////////// +ProcessMonitor::ProcessMonitor() +{ +} + +ProcessMonitor::~ProcessMonitor() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + for (HANDLE& Proc : m_ProcessHandles) + { + CloseHandle(Proc); + Proc = 0; + } +} + +bool +ProcessMonitor::IsRunning() +{ + RwLock::ExclusiveLockScope _(m_Lock); + + bool FoundOne = false; + + for (HANDLE& Proc : m_ProcessHandles) + { + DWORD ExitCode = 0; + GetExitCodeProcess(Proc, &ExitCode); + + if (ExitCode != STILL_ACTIVE) + { + CloseHandle(Proc); + Proc = 0; + } + else + { + // Still alive + FoundOne = true; + } + } + + std::erase_if(m_ProcessHandles, [](HANDLE Handle) { return Handle == 0; }); + + return FoundOne; +} + +void +ProcessMonitor::AddPid(int Pid) +{ + HANDLE ProcessHandle = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, Pid); + + if (ProcessHandle) + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProcessHandles.push_back(ProcessHandle); + } +} + +bool +ProcessMonitor::IsActive() const +{ + RwLock::SharedLockScope _(m_Lock); + return m_ProcessHandles.empty() == false; +} + +////////////////////////////////////////////////////////////////////////// + bool IsProcessRunning(int pid) { + // This function is arguably not super useful, a pid can be re-used + // by the OS so holding on to a pid and polling it over some time + // period will not necessarily tell you what you probably want to know. + +#if ZEN_PLATFORM_WINDOWS HANDLE hProc = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE, pid); - if (hProc == NULL) + if (!hProc) { - return false; + DWORD Error = zen::GetLastError(); + + if (Error == ERROR_INVALID_PARAMETER) + { + return false; + } + + using namespace fmt::literals; + ThrowSystemError(Error, "failed to open process with pid {}"_format(pid)); } CloseHandle(hProc); return true; +#else + ZEN_NOT_IMPLEMENTED(); +#endif } int diff --git a/zencore/timer.cpp b/zencore/timer.cpp index 08b5e06d2..1e73a7532 100644 --- a/zencore/timer.cpp +++ b/zencore/timer.cpp @@ -1,8 +1,10 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include <doctest/doctest.h> #include <zencore/thread.h> #include <zencore/timer.h> + +#include <zencore/testing.h> + #if ZEN_PLATFORM_WINDOWS # include <zencore/windows.h> #elif ZEN_PLATFORM_LINUX @@ -62,6 +64,8 @@ GetHifreqTimerFrequencySafe() // Testing related code follows... // +#if ZEN_WITH_TESTS + void timer_forcelink() { @@ -79,4 +83,6 @@ TEST_CASE("Timer") CHECK_NE(s0, s1); } +#endif + } // namespace zen diff --git a/zencore/uid.cpp b/zencore/uid.cpp index acf9f9790..d4b708288 100644 --- a/zencore/uid.cpp +++ b/zencore/uid.cpp @@ -4,6 +4,7 @@ #include <zencore/endian.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <atomic> #include <bit> @@ -12,8 +13,6 @@ #include <set> #include <unordered_map> -#include <doctest/doctest.h> - namespace zen { ////////////////////////////////////////////////////////////////////////// @@ -83,10 +82,24 @@ Oid::FromHexString(const std::string_view String) } } +Oid +Oid::FromMemory(const void* Ptr) +{ + Oid Id; + memcpy(Id.OidBits, Ptr, sizeof Id); + return Id; +} + +void +Oid::ToString(char OutString[StringLength]) +{ + ToHexBytes(reinterpret_cast<const uint8_t*>(OidBits), sizeof(Oid::OidBits), OutString); +} + StringBuilderBase& Oid::ToString(StringBuilderBase& OutString) const { - char str[25]; + char str[StringLength + 1]; ToHexBytes(reinterpret_cast<const uint8_t*>(OidBits), sizeof(Oid::OidBits), str); str[2 * sizeof(Oid)] = '\0'; @@ -95,6 +108,8 @@ Oid::ToString(StringBuilderBase& OutString) const return OutString; } +#if ZEN_WITH_TESTS + TEST_CASE("Oid") { SUBCASE("Basic") @@ -129,4 +144,6 @@ uid_forcelink() { } +#endif + } // namespace zen diff --git a/zencore/xxhash.cpp b/zencore/xxhash.cpp index a20ee10bd..450131d19 100644 --- a/zencore/xxhash.cpp +++ b/zencore/xxhash.cpp @@ -3,8 +3,8 @@ #include <zencore/xxhash.h> #include <zencore/string.h> +#include <zencore/testing.h> -#include <doctest/doctest.h> #include <gsl/gsl-lite.hpp> namespace zen { diff --git a/zencore/zencore.cpp b/zencore/zencore.cpp index f9b19ba9d..3eb43c558 100644 --- a/zencore/zencore.cpp +++ b/zencore/zencore.cpp @@ -20,9 +20,9 @@ #include <zencore/intmath.h> #include <zencore/iobuffer.h> #include <zencore/memory.h> +#include <zencore/mpscqueue.h> #include <zencore/refcount.h> #include <zencore/sha1.h> -#include <zencore/snapshot_manifest.h> #include <zencore/stats.h> #include <zencore/stream.h> #include <zencore/string.h> @@ -30,6 +30,10 @@ #include <zencore/timer.h> #include <zencore/uid.h> +namespace zen { + +////////////////////////////////////////////////////////////////////////// + bool IsPointerToStack(const void* ptr) { @@ -55,12 +59,31 @@ IsPointerToStack(const void* ptr) #endif } -zen::AssertException::AssertException(const char* Msg) : m_Msg(Msg) +bool +IsDebuggerPresent() { +#if ZEN_PLATFORM_WINDOWS + return ::IsDebuggerPresent(); +#else + return false; +#endif } -zen::AssertException::~AssertException() +bool +IsInteractiveSession() { +#if ZEN_PLATFORM_WINDOWS + DWORD dwSessionId = 0; + if (ProcessIdToSessionId(GetCurrentProcessId(), &dwSessionId)) + { + return (dwSessionId != 0); + } + + return false; +#else + // TODO: figure out what makes sense here + return true; +#endif } ////////////////////////////////////////////////////////////////////////// @@ -81,6 +104,7 @@ RequestApplicationExit(int ExitCode) s_ApplicationExitRequested = true; } +#if ZEN_WITH_TESTS void zencore_forcelinktests() { @@ -91,9 +115,9 @@ zencore_forcelinktests() zen::intmath_forcelink(); zen::iobuffer_forcelink(); zen::memory_forcelink(); + zen::mpscqueue_forcelink(); zen::refcount_forcelink(); zen::sha1_forcelink(); - zen::snapshotmanifest_forcelink(); zen::stats_forcelink(); zen::stream_forcelink(); zen::string_forcelink(); @@ -104,3 +128,6 @@ zencore_forcelinktests() zen::usonbuilder_forcelink(); zen::usonpackage_forcelink(); } +#endif + +} // namespace zen diff --git a/zencore/zencore.vcxproj b/zencore/zencore.vcxproj index 150c42cd6..2322f7173 100644 --- a/zencore/zencore.vcxproj +++ b/zencore/zencore.vcxproj @@ -128,6 +128,7 @@ <ClInclude Include="include\zencore\md5.h" /> <ClInclude Include="include\zencore\memory.h" /> <ClInclude Include="include\zencore\meta.h" /> + <ClInclude Include="include\zencore\mpscqueue.h" /> <ClInclude Include="include\zencore\postwindows.h" /> <ClInclude Include="include\zencore\prewindows.h" /> <ClInclude Include="include\zencore\refcount.h" /> @@ -136,12 +137,13 @@ <ClInclude Include="include\zencore\sha1.h" /> <ClInclude Include="include\zencore\iobuffer.h" /> <ClInclude Include="include\zencore\sharedbuffer.h" /> - <ClInclude Include="include\zencore\snapshot_manifest.h" /> <ClInclude Include="include\zencore\stats.h" /> <ClInclude Include="include\zencore\stream.h" /> <ClInclude Include="include\zencore\streamutil.h" /> <ClInclude Include="include\zencore\string.h" /> <ClInclude Include="include\zencore\targetver.h" /> + <ClInclude Include="include\zencore\testing.h" /> + <ClInclude Include="include\zencore\testutils.h" /> <ClInclude Include="include\zencore\thread.h" /> <ClInclude Include="include\zencore\timer.h" /> <ClInclude Include="include\zencore\uid.h" /> @@ -166,6 +168,7 @@ <ClCompile Include="logging.cpp" /> <ClCompile Include="md5.cpp" /> <ClCompile Include="memory.cpp" /> + <ClCompile Include="mpscqueue.cpp" /> <ClCompile Include="refcount.cpp" /> <ClCompile Include="session.cpp" /> <ClCompile Include="sha1.cpp"> @@ -176,11 +179,11 @@ </ClCompile> <ClCompile Include="iobuffer.cpp" /> <ClCompile Include="sharedbuffer.cpp" /> - <ClCompile Include="snapshot_manifest.cpp" /> <ClCompile Include="stats.cpp" /> <ClCompile Include="stream.cpp" /> <ClCompile Include="streamutil.cpp" /> <ClCompile Include="string.cpp" /> + <ClCompile Include="testutils.cpp" /> <ClCompile Include="thread.cpp" /> <ClCompile Include="timer.cpp" /> <ClCompile Include="uid.cpp" /> diff --git a/zencore/zencore.vcxproj.filters b/zencore/zencore.vcxproj.filters index ea0f8a912..d2e7a3159 100644 --- a/zencore/zencore.vcxproj.filters +++ b/zencore/zencore.vcxproj.filters @@ -4,7 +4,6 @@ <ClInclude Include="include\zencore\intmath.h" /> <ClInclude Include="include\zencore\scopeguard.h" /> <ClInclude Include="include\zencore\sha1.h" /> - <ClInclude Include="include\zencore\snapshot_manifest.h" /> <ClInclude Include="include\zencore\targetver.h" /> <ClInclude Include="include\zencore\zencore.h" /> <ClInclude Include="include\zencore\compactbinary.h" /> @@ -42,9 +41,11 @@ <ClInclude Include="include\zencore\postwindows.h" /> <ClInclude Include="include\zencore\logging.h" /> <ClInclude Include="include\zencore\session.h" /> + <ClInclude Include="include\zencore\testutils.h" /> + <ClInclude Include="include\zencore\testing.h" /> + <ClInclude Include="include\zencore\mpscqueue.h" /> </ItemGroup> <ItemGroup> - <ClCompile Include="snapshot_manifest.cpp" /> <ClCompile Include="sha1.cpp" /> <ClCompile Include="zencore.cpp" /> <ClCompile Include="compactbinary.cpp" /> @@ -74,6 +75,8 @@ <ClCompile Include="logging.cpp" /> <ClCompile Include="intmath.cpp" /> <ClCompile Include="session.cpp" /> + <ClCompile Include="testutils.cpp" /> + <ClCompile Include="mpscqueue.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="CAS"> diff --git a/zenhttp/httpclient.cpp b/zenhttp/httpclient.cpp index fb1df30b2..20550b0c9 100644 --- a/zenhttp/httpclient.cpp +++ b/zenhttp/httpclient.cpp @@ -10,10 +10,9 @@ #include <zencore/session.h> #include <zencore/sharedbuffer.h> #include <zencore/stream.h> +#include <zencore/testing.h> #include <zenhttp/httpshared.h> -#include <doctest/doctest.h> - static std::atomic<uint32_t> HttpClientRequestIdCounter{0}; namespace zen { @@ -159,6 +158,8 @@ HttpClient::Delete(std::string_view Url) ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + TEST_CASE("httpclient") { using namespace std::literals; @@ -171,4 +172,6 @@ httpclient_forcelink() { } +#endif + } // namespace zen diff --git a/zenhttp/httpnull.cpp b/zenhttp/httpnull.cpp index 57cba13d3..e49051ac5 100644 --- a/zenhttp/httpnull.cpp +++ b/zenhttp/httpnull.cpp @@ -28,8 +28,10 @@ HttpNullServer::Initialize(int BasePort) } void -HttpNullServer::Run(bool TestMode) +HttpNullServer::Run(bool IsInteractiveSession) { + const bool TestMode = !IsInteractiveSession; + if (TestMode == false) { zen::logging::ConsoleLog().info("Zen Server running (null HTTP). Press ESC or Q to quit"); diff --git a/zenhttp/httpnull.h b/zenhttp/httpnull.h index b15b1b123..867bbe4d2 100644 --- a/zenhttp/httpnull.h +++ b/zenhttp/httpnull.h @@ -19,7 +19,7 @@ public: virtual void RegisterService(HttpService& Service) override; virtual void Initialize(int BasePort) override; - virtual void Run(bool TestMode) override; + virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; private: diff --git a/zenhttp/httpserver.cpp b/zenhttp/httpserver.cpp index 62ee66a08..599c99a18 100644 --- a/zenhttp/httpserver.cpp +++ b/zenhttp/httpserver.cpp @@ -13,6 +13,7 @@ #include <zencore/refcount.h> #include <zencore/stream.h> #include <zencore/string.h> +#include <zencore/testing.h> #include <zencore/thread.h> #include <zenhttp/httpshared.h> @@ -23,8 +24,6 @@ #include <span> #include <string_view> -#include <doctest/doctest.h> - namespace zen { using namespace std::literals; @@ -525,6 +524,8 @@ CreateHttpServer() ////////////////////////////////////////////////////////////////////////// +#if ZEN_WITH_TESTS + TEST_CASE("http.common") { using namespace std::literals; @@ -566,4 +567,6 @@ http_forcelink() { } +#endif + } // namespace zen diff --git a/zenhttp/httpshared.cpp b/zenhttp/httpshared.cpp index 2dbf95959..b0c5493db 100644 --- a/zenhttp/httpshared.cpp +++ b/zenhttp/httpshared.cpp @@ -123,23 +123,22 @@ ParsePackageMessage(IoBuffer Payload, std::function<IoBuffer(const IoHash&, uint MemoryInStream InStream(Payload); BinaryReader Reader(InStream); - CbPackage Package; - CbPackageHeader Hdr; Reader.Read(&Hdr, sizeof Hdr); if (Hdr.HeaderMagic != kCbPkgMagic) { - // report error - return {}; + throw std::runtime_error("invalid CbPackage header magic"); } - uint32_t ChunkCount = Hdr.AttachmentCount + 1; + const uint32_t ChunkCount = Hdr.AttachmentCount + 1; std::unique_ptr<CbAttachmentEntry[]> AttachmentEntries{new CbAttachmentEntry[ChunkCount]}; Reader.Read(AttachmentEntries.get(), sizeof(CbAttachmentEntry) * ChunkCount); + CbPackage Package; + for (uint32_t i = 0; i < ChunkCount; ++i) { const CbAttachmentEntry& Entry = AttachmentEntries[i]; diff --git a/zenhttp/httpsys.cpp b/zenhttp/httpsys.cpp index d70c88271..087d4c807 100644 --- a/zenhttp/httpsys.cpp +++ b/zenhttp/httpsys.cpp @@ -415,7 +415,7 @@ HttpMessageResponseRequest::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfB { ZEN_UNUSED(NumberOfBytesTransferred); - if (IoResult) + if (IoResult != NO_ERROR) { ZEN_WARN("response aborted due to error: '{}'", GetErrorAsString(IoResult)); @@ -707,29 +707,30 @@ HttpSysServer::StartServer() } void -HttpSysServer::Run(bool TestMode) +HttpSysServer::Run(bool IsInteractive) { - if (TestMode == false) + if (IsInteractive) { zen::logging::ConsoleLog().info("Zen Server running. Press ESC or Q to quit"); } do { - int WaitTimeout = -1; + // int WaitTimeout = -1; + int WaitTimeout = 100; - if (!TestMode) + if (IsInteractive) { WaitTimeout = 1000; - } - - if (!TestMode && _kbhit() != 0) - { - char c = (char)_getch(); - if (c == 27 || c == 'Q' || c == 'q') + if (_kbhit() != 0) { - RequestApplicationExit(0); + char c = (char)_getch(); + + if (c == 27 || c == 'Q' || c == 'q') + { + RequestApplicationExit(0); + } } } @@ -897,9 +898,7 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran if (HttpSysRequestHandler* CurrentHandler = m_CompletionHandler) { - const bool IsInitialRequest = (CurrentHandler == &m_InitialHttpHandler) && m_InitialHttpHandler.IsInitialRequest(); - - if (IsInitialRequest) + if ((CurrentHandler == &m_InitialHttpHandler) && m_InitialHttpHandler.IsInitialRequest()) { // Ensure we have a sufficient number of pending requests outstanding m_HttpServer.OnHandlingRequest(); @@ -932,7 +931,7 @@ HttpSysTransaction::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesTran } else { - if (IsInitialRequest == false) + if (CurrentHandler != &m_InitialHttpHandler) { delete CurrentHandler; } @@ -1253,11 +1252,12 @@ InitialRequestHandler::HandleCompletion(ULONG IoResult, ULONG_PTR NumberOfBytesT switch (IoResult) { + default: case ERROR_OPERATION_ABORTED: return nullptr; - case ERROR_MORE_DATA: - // Insufficient buffer space + case ERROR_MORE_DATA: // Insufficient buffer space + case NO_ERROR: break; } diff --git a/zenhttp/httpuws.cpp b/zenhttp/httpuws.cpp index 992809b17..e062e7747 100644 --- a/zenhttp/httpuws.cpp +++ b/zenhttp/httpuws.cpp @@ -37,8 +37,10 @@ HttpUwsServer::Initialize(int BasePort) } void -HttpUwsServer::Run(bool TestMode) +HttpUwsServer::Run(bool IsInteractive) { + const bool TestMode = !IsInteractive; + if (TestMode == false) { zen::logging::ConsoleLog().info("Zen Server running (null HTTP). Press ESC or Q to quit"); diff --git a/zenhttp/httpuws.h b/zenhttp/httpuws.h index ec55ae2fd..5e300202f 100644 --- a/zenhttp/httpuws.h +++ b/zenhttp/httpuws.h @@ -16,7 +16,7 @@ public: virtual void RegisterService(HttpService& Service) override; virtual void Initialize(int BasePort) override; - virtual void Run(bool TestMode) override; + virtual void Run(bool IsInteractiveSession) override; virtual void RequestExit() override; private: diff --git a/zenhttp/include/zenhttp/httpserver.h b/zenhttp/include/zenhttp/httpserver.h index f656c69a8..6a7dc8a70 100644 --- a/zenhttp/include/zenhttp/httpserver.h +++ b/zenhttp/include/zenhttp/httpserver.h @@ -39,7 +39,7 @@ public: { std::vector<std::pair<std::string_view, std::string_view>> KvPairs; - std::string_view GetValue(std::string_view ParamName) + std::string_view GetValue(std::string_view ParamName) const { for (const auto& Kv : KvPairs) { @@ -167,7 +167,7 @@ class HttpServer : public RefCounted public: virtual void RegisterService(HttpService& Service) = 0; virtual void Initialize(int BasePort) = 0; - virtual void Run(bool TestMode) = 0; + virtual void Run(bool IsInteractiveSession) = 0; virtual void RequestExit() = 0; }; diff --git a/zenserver-test/zenserver-test.cpp b/zenserver-test/zenserver-test.cpp index 1a41a5541..794a5fe94 100644 --- a/zenserver-test/zenserver-test.cpp +++ b/zenserver-test/zenserver-test.cpp @@ -12,6 +12,7 @@ #include <zencore/iohash.h> #include <zencore/logging.h> #include <zencore/memory.h> +#include <zencore/refcount.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/thread.h> @@ -19,9 +20,11 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpshared.h> #include <zenhttp/zenhttp.h> -#include <zenserverprocess.h> +#include <zenutil/zenserverprocess.h> -#include <mimalloc.h> +#if ZEN_USE_MIMALLOC +# include <mimalloc.h> +#endif #include <http_parser.h> @@ -37,6 +40,7 @@ #include <map> #include <random> #include <span> +#include <unordered_map> #include <atlbase.h> #include <process.h> @@ -49,9 +53,11 @@ ////////////////////////////////////////////////////////////////////////// -#define DOCTEST_CONFIG_IMPLEMENT -#include <doctest/doctest.h> -#undef DOCTEST_CONFIG_IMPLEMENT +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT +#endif using namespace fmt::literals; @@ -80,7 +86,7 @@ class HttpClientConnection static HttpClientConnection* This(http_parser* Parser) { return (HttpClientConnection*)Parser->data; }; public: - HttpClientConnection(asio::io_context& IoContext, HttpConnectionPool& Pool, asio::ip::tcp::socket&& InSocket) + HttpClientConnection(asio::io_context& IoContext, zen::Ref<HttpConnectionPool> Pool, asio::ip::tcp::socket&& InSocket) : m_IoContext(IoContext) , m_Pool(Pool) , m_Resolver(IoContext) @@ -89,8 +95,8 @@ public: } ~HttpClientConnection() {} - HttpConnectionPool& ConnectionPool() { return m_Pool; } - void SetKeepAlive(bool NewState) { m_KeepAlive = NewState; } + zen::Ref<HttpConnectionPool> ConnectionPool() { return m_Pool; } + void SetKeepAlive(bool NewState) { m_KeepAlive = NewState; } void Get(const std::string_view Server, int Port, const std::string_view Path) { @@ -227,16 +233,16 @@ private: } private: - asio::io_context& m_IoContext; - HttpConnectionPool& m_Pool; - asio::ip::tcp::resolver m_Resolver; - asio::ip::tcp::socket m_Socket; - std::string m_Uri; - std::string m_RequestBody; // Initial request data - http_parser m_HttpParser{}; - http_parser_settings m_HttpParserSettings{}; - uint8_t m_ResponseIoBuffer[4096]; - asio::mutable_buffer m_ResponseBuffer{m_ResponseIoBuffer, sizeof m_ResponseIoBuffer}; + asio::io_context& m_IoContext; + zen::Ref<HttpConnectionPool> m_Pool; + asio::ip::tcp::resolver m_Resolver; + asio::ip::tcp::socket m_Socket; + std::string m_Uri; + std::string m_RequestBody; // Initial request data + http_parser m_HttpParser{}; + http_parser_settings m_HttpParserSettings{}; + uint8_t m_ResponseIoBuffer[4096]; + asio::mutable_buffer m_ResponseBuffer{m_ResponseIoBuffer, sizeof m_ResponseIoBuffer}; enum class RequestState { @@ -259,7 +265,7 @@ private: ////////////////////////////////////////////////////////////////////////// -class HttpConnectionPool +class HttpConnectionPool : public zen::RefCounted { public: HttpConnectionPool(asio::io_context& Context, std::string_view HostName, uint16_t Port); @@ -322,7 +328,7 @@ HttpConnectionPool::GetConnection() return nullptr; } - return std::make_unique<HttpClientConnection>(m_Context, *this, std::move(Socket)); + return std::make_unique<HttpClientConnection>(m_Context, this, std::move(Socket)); } std::unique_ptr<HttpClientConnection> Connection{m_AvailableConnections.back()}; @@ -347,15 +353,15 @@ public: std::unique_ptr<HttpClientConnection> GetConnection(std::string_view HostName, uint16_t Port) { - return ConnectionPool(HostName, Port).GetConnection(); + return ConnectionPool(HostName, Port)->GetConnection(); } void ReturnConnection(std::unique_ptr<HttpClientConnection> Connection) { - Connection->ConnectionPool().ReturnConnection(std::move(Connection)); + Connection->ConnectionPool()->ReturnConnection(std::move(Connection)); } - HttpConnectionPool& ConnectionPool(std::string_view HostName, uint16_t Port) + zen::Ref<HttpConnectionPool> ConnectionPool(std::string_view HostName, uint16_t Port) { zen::RwLock::ExclusiveLockScope _(m_Lock); ConnectionId ConnId{std::string(HostName), Port}; @@ -364,7 +370,7 @@ public: { // Not found - create new entry - auto In = m_ConnectionPools.insert({ConnId, std::move(HttpConnectionPool(m_Context, HostName, Port))}); + auto In = m_ConnectionPools.emplace(ConnId, new HttpConnectionPool(m_Context, HostName, Port)); return In.first->second; } @@ -393,8 +399,8 @@ private: uint16_t Port; }; - zen::RwLock m_Lock; - std::map<ConnectionId, HttpConnectionPool> m_ConnectionPools; + zen::RwLock m_Lock; + std::map<ConnectionId, zen::Ref<HttpConnectionPool>> m_ConnectionPools; }; ////////////////////////////////////////////////////////////////////////// @@ -655,22 +661,24 @@ main() [](auto req) { return req->create_response().set_body("Hello, World!").done(); })); return 0; } -#else +#elif ZEN_WITH_TESTS -ZenServerEnvironment TestEnv; +zen::ZenServerEnvironment TestEnv; int main(int argc, char** argv) { +# if ZEN_USE_MIMALLOC mi_version(); +# endif - zencore_forcelinktests(); + zen::zencore_forcelinktests(); zen::zenhttp_forcelinktests(); zen::logging::InitializeLogging(); spdlog::set_level(spdlog::level::debug); - spdlog::set_formatter(std::make_unique<logging::full_formatter>("test", std::chrono::system_clock::now())); + spdlog::set_formatter(std::make_unique<::logging::full_formatter>("test", std::chrono::system_clock::now())); std::filesystem::path ProgramBaseDir = std::filesystem::path(argv[0]).parent_path(); std::filesystem::path TestBaseDir = ProgramBaseDir.parent_path().parent_path() / ".test"; @@ -681,6 +689,8 @@ main(int argc, char** argv) return doctest::Context(argc, argv).run(); } +namespace zen::tests { + # if 1 TEST_CASE("asio.http") { @@ -1400,6 +1410,395 @@ TEST_CASE("zcache.cbpackage") } } +TEST_CASE("zcache.policy") +{ + using namespace std::literals; + + struct ZenConfig + { + std::filesystem::path DataDir; + uint16_t Port; + std::string BaseUri; + std::string Args; + + static ZenConfig New(uint16_t Port = 13337, std::string Args = "") + { + return ZenConfig{.DataDir = TestEnv.CreateNewTestDir(), + .Port = Port, + .BaseUri = "http://localhost:{}/z$"_format(Port), + .Args = std::move(Args)}; + } + + static ZenConfig NewWithUpstream(uint16_t UpstreamPort) + { + return New(13337, "--upstream-thread-count=0 --upstream-zen-url=http://localhost:{}"_format(UpstreamPort)); + } + + void Spawn(ZenServerInstance& Inst) + { + Inst.SetTestDir(DataDir); + Inst.SpawnServer(Port, Args); + Inst.WaitUntilReady(); + } + }; + + auto GenerateData = [](uint64_t Size, zen::IoHash& OutHash) -> zen::UniqueBuffer { + auto Buf = zen::UniqueBuffer::Alloc(Size); + uint8_t* Data = reinterpret_cast<uint8_t*>(Buf.GetData()); + for (uint64_t Idx = 0; Idx < Size; Idx++) + { + Data[Idx] = Idx % 256; + } + OutHash = zen::IoHash::HashBuffer(Data, Size); + return Buf; + }; + + auto GeneratePackage = [](zen::IoHash& OutAttachmentKey) -> zen::CbPackage { + auto Data = zen::SharedBuffer::Clone(zen::MakeMemoryView<uint8_t>({1, 2, 3, 4, 5, 6, 7, 8, 9})); + auto CompressedData = zen::CompressedBuffer::Compress(Data); + OutAttachmentKey = zen::IoHash::FromBLAKE3(CompressedData.GetRawHash()); + zen::CbWriter Obj; + Obj.BeginObject("obj"sv); + Obj.AddBinaryAttachment("data", OutAttachmentKey); + Obj.EndObject(); + zen::CbPackage Package; + Package.SetObject(Obj.Save().AsObject()); + Package.AddAttachment(zen::CbAttachment(CompressedData)); + + return Package; + }; + + auto ToBuffer = [](zen::CbPackage Package) -> zen::IoBuffer { + zen::MemoryOutStream MemStream; + zen::BinaryWriter Writer(MemStream); + Package.Save(Writer); + + return zen::IoBuffer(zen::IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + }; + + SUBCASE("query - 'local' does not query upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (binary)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + auto BinaryValue = GenerateData(1024, Key); + + // Store binary cache value locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)BinaryValue.GetData(), BinaryValue.GetSize()}, + cpr::Header{{"Content-Type", "application/octet-stream"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/octet-stream"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("query - 'local' does not query upstream (cppackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?query=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local' does not store upstream (cbpackge)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store packge locally + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 404); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("store - 'local/remote' stores local and upstream (cbpackage)") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "legacy"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package locally and upstream + { + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}?store=local,remote"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + } + } + + SUBCASE("skip - 'attachments' does not return attachments") + { + ZenConfig LocalCfg = ZenConfig::New(); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "texture"sv; + + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package locally + { + CHECK(Package.GetAttachments().size() != 0); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=attachments"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + + CbObject CacheRecord = Package.GetObject(); + std::vector<IoHash> AttachmentKeys; + + CacheRecord.IterateAttachments( + [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); }); + + CHECK(AttachmentKeys.size() != 0); + CHECK(Package.GetAttachments().size() == 0); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + + CHECK(Package.GetAttachments().size() != 0); + } + } + + SUBCASE("skip - 'attachments' does not return attachments when retrieved from upstream") + { + ZenConfig UpstreamCfg = ZenConfig::New(13338); + ZenServerInstance UpstreamInst(TestEnv); + ZenConfig LocalCfg = ZenConfig::NewWithUpstream(13338); + ZenServerInstance LocalInst(TestEnv); + const auto Bucket = "texture"sv; + + UpstreamCfg.Spawn(UpstreamInst); + LocalCfg.Spawn(LocalInst); + + zen::IoHash Key; + zen::CbPackage Package = GeneratePackage(Key); + auto Buf = ToBuffer(Package); + + // Store package upstream + { + CHECK(Package.GetAttachments().size() != 0); + cpr::Response Result = cpr::Put(cpr::Url{"{}/{}/{}"_format(UpstreamCfg.BaseUri, Bucket, Key)}, + cpr::Body{(const char*)Buf.GetData(), Buf.GetSize()}, + cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 201); + } + + { + cpr::Response Result = cpr::Get(cpr::Url{"{}/{}/{}?skip=attachments"_format(LocalCfg.BaseUri, Bucket, Key)}, + cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + + CbObject CacheRecord = Package.GetObject(); + std::vector<IoHash> AttachmentKeys; + + CacheRecord.IterateAttachments( + [&AttachmentKeys](CbFieldView AttachmentKey) { AttachmentKeys.push_back(AttachmentKey.AsHash()); }); + + CHECK(AttachmentKeys.size() != 0); + CHECK(Package.GetAttachments().size() == 0); + } + + { + cpr::Response Result = + cpr::Get(cpr::Url{"{}/{}/{}"_format(LocalCfg.BaseUri, Bucket, Key)}, cpr::Header{{"Accept", "application/x-ue-cbpkg"}}); + CHECK(Result.status_code == 200); + + zen::IoBuffer Body(zen::IoBuffer::Wrap, Result.text.data(), Result.text.size()); + zen::CbPackage Package; + const bool Ok = Package.TryLoad(Body); + + CHECK(Package.GetAttachments().size() != 0); + } + } +} + struct RemoteExecutionRequest { RemoteExecutionRequest(std::string_view Host, int Port, std::filesystem::path& TreePath) @@ -1637,9 +2036,9 @@ public: ZenServerInstance& GetInstance(int Index) { return *m_Instances[Index]; } private: - std::string m_HelperId; - int m_ServerCount = 0; - std::vector<std::unique_ptr<ZenServerInstance> > m_Instances; + std::string m_HelperId; + int m_ServerCount = 0; + std::vector<std::unique_ptr<ZenServerInstance>> m_Instances; }; TEST_CASE("http.basics") @@ -1709,4 +2108,72 @@ TEST_CASE("http.package") CHECK_EQ(ResponsePackage, TestPackage); } +# if 0 +TEST_CASE("lifetime.owner") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 23456; + + ZenServerInstance Zen1(TestEnv); + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + Zen1.Detach(); + + ZenServerInstance Zen2(TestEnv); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + Zen2.SetTestDir(TestDir2); + Zen2.SpawnServer(PortNumber); + Zen2.WaitUntilReady(); + Zen2.Detach(); +} + +TEST_CASE("lifetime.owner.2") +{ + // This test is designed to verify that the hand-over of sponsor processes is handled + // correctly for the case when a second or third process is launched on the same port + // + // Due to the nature of it, it cannot be + + const uint16_t PortNumber = 13456; + + std::filesystem::path TestDir1 = TestEnv.CreateNewTestDir(); + std::filesystem::path TestDir2 = TestEnv.CreateNewTestDir(); + + ZenServerInstance Zen1(TestEnv); + Zen1.SetTestDir(TestDir1); + Zen1.SpawnServer(PortNumber); + Zen1.WaitUntilReady(); + + ZenServerInstance Zen2(TestEnv); + Zen2.SetTestDir(TestDir2); + Zen2.SetOwnerPid(Zen1.GetPid()); + Zen2.SpawnServer(PortNumber + 1); + Zen2.Detach(); + + ZenServerInstance Zen3(TestEnv); + Zen3.SetTestDir(TestDir2); + Zen3.SetOwnerPid(Zen1.GetPid()); + Zen3.SpawnServer(PortNumber + 1); + Zen3.Detach(); + + ZenServerInstance Zen4(TestEnv); + Zen4.SetTestDir(TestDir2); + Zen4.SetOwnerPid(Zen1.GetPid()); + Zen4.SpawnServer(PortNumber + 1); + Zen4.Detach(); +} +# endif + +} // namespace zen::tests +#else +int +main() +{ +} #endif diff --git a/zenserver/admin/admin.h b/zenserver/admin/admin.h index f90ad4537..3554b1005 100644 --- a/zenserver/admin/admin.h +++ b/zenserver/admin/admin.h @@ -4,6 +4,8 @@ #include <zenhttp/httpserver.h> +namespace zen { + class HttpAdminService : public zen::HttpService { public: @@ -16,3 +18,5 @@ public: private: }; + +} // namespace zen diff --git a/zenserver/cache/structuredcache.cpp b/zenserver/cache/structuredcache.cpp index cf7deaa93..7f1fe7b44 100644 --- a/zenserver/cache/structuredcache.cpp +++ b/zenserver/cache/structuredcache.cpp @@ -9,6 +9,7 @@ #include <zencore/stream.h> #include <zencore/timer.h> #include <zenhttp/httpserver.h> +#include <zenstore/CAS.h> #include "structuredcache.h" #include "structuredcachestore.h" @@ -25,17 +26,130 @@ #include <queue> #include <thread> +#include <gsl/gsl-lite.hpp> + namespace zen { using namespace std::literals; ////////////////////////////////////////////////////////////////////////// -HttpStructuredCacheService::HttpStructuredCacheService(::ZenCacheStore& InCacheStore, - zen::CasStore& InStore, - zen::CidStore& InCidStore, +namespace detail { namespace cacheopt { + constexpr std::string_view Local = "local"sv; + constexpr std::string_view Remote = "remote"sv; + constexpr std::string_view Data = "data"sv; + constexpr std::string_view Meta = "meta"sv; + constexpr std::string_view Value = "value"sv; + constexpr std::string_view Attachments = "attachments"sv; +}} // namespace detail::cacheopt + +////////////////////////////////////////////////////////////////////////// + +enum class CachePolicy : uint8_t +{ + None = 0, + QueryLocal = 1 << 0, + QueryRemote = 1 << 1, + Query = QueryLocal | QueryRemote, + StoreLocal = 1 << 2, + StoreRemote = 1 << 3, + Store = StoreLocal | StoreRemote, + SkipMeta = 1 << 4, + SkipValue = 1 << 5, + SkipAttachments = 1 << 6, + SkipData = SkipMeta | SkipValue | SkipAttachments, + SkipLocalCopy = 1 << 7, + Local = QueryLocal | StoreLocal, + Remote = QueryRemote | StoreRemote, + Default = Query | Store, + Disable = None, +}; + +gsl_DEFINE_ENUM_BITMASK_OPERATORS(CachePolicy); + +CachePolicy +ParseCachePolicy(const HttpServerRequest::QueryParams& QueryParams) +{ + CachePolicy QueryPolicy = CachePolicy::Query; + + { + std::string_view Opts = QueryParams.GetValue("query"sv); + if (!Opts.empty()) + { + QueryPolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&QueryPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + QueryPolicy |= CachePolicy::QueryLocal; + } + if (Opt == detail::cacheopt::Remote) + { + QueryPolicy |= CachePolicy::QueryRemote; + } + return true; + }); + } + } + + CachePolicy StorePolicy = CachePolicy::Store; + + { + std::string_view Opts = QueryParams.GetValue("store"sv); + if (!Opts.empty()) + { + StorePolicy = CachePolicy::None; + ForEachStrTok(Opts, ',', [&StorePolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Local) + { + StorePolicy |= CachePolicy::StoreLocal; + } + if (Opt == detail::cacheopt::Remote) + { + StorePolicy |= CachePolicy::StoreRemote; + } + return true; + }); + } + } + + CachePolicy SkipPolicy = CachePolicy::None; + + { + std::string_view Opts = QueryParams.GetValue("skip"sv); + if (!Opts.empty()) + { + ForEachStrTok(Opts, ',', [&SkipPolicy](const std::string_view& Opt) { + if (Opt == detail::cacheopt::Meta) + { + SkipPolicy |= CachePolicy::SkipMeta; + } + if (Opt == detail::cacheopt::Value) + { + SkipPolicy |= CachePolicy::SkipValue; + } + if (Opt == detail::cacheopt::Attachments) + { + SkipPolicy |= CachePolicy::SkipAttachments; + } + if (Opt == detail::cacheopt::Data) + { + SkipPolicy |= CachePolicy::SkipData; + } + return true; + }); + } + } + + return QueryPolicy | StorePolicy | SkipPolicy; +} + +////////////////////////////////////////////////////////////////////////// + +HttpStructuredCacheService::HttpStructuredCacheService(ZenCacheStore& InCacheStore, + CasStore& InStore, + CidStore& InCidStore, std::unique_ptr<UpstreamCache> UpstreamCache) -: m_Log(zen::logging::Get("cache")) +: m_Log(logging::Get("cache")) , m_CacheStore(InCacheStore) , m_CasStore(InStore) , m_CidStore(InCidStore) @@ -59,8 +173,14 @@ HttpStructuredCacheService::Flush() { } +void +HttpStructuredCacheService::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + void -HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) +HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) { CacheRef Ref; @@ -75,28 +195,31 @@ HttpStructuredCacheService::HandleRequest(zen::HttpServerRequest& Request) return HandleCacheBucketRequest(Request, Key); } - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); // invalid URL + return Request.WriteResponse(HttpResponseCode::BadRequest); // invalid URL } + const auto QueryParams = Request.GetQueryParams(); + CachePolicy Policy = ParseCachePolicy(QueryParams); + if (Ref.PayloadId == IoHash::Zero) { - return HandleCacheRecordRequest(Request, Ref); + return HandleCacheRecordRequest(Request, Ref, Policy); } else { - return HandleCachePayloadRequest(Request, Ref); + return HandleCachePayloadRequest(Request, Ref, Policy); } return; } void -HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket) +HttpStructuredCacheService::HandleCacheBucketRequest(HttpServerRequest& Request, std::string_view Bucket) { ZEN_UNUSED(Request, Bucket); switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -110,22 +233,22 @@ HttpStructuredCacheService::HandleCacheBucketRequest(zen::HttpServerRequest& Req if (m_CacheStore.DropBucket(Bucket)) { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } else { - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } break; } } void -HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCacheRecordRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: @@ -136,7 +259,9 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req bool Success = m_CacheStore.Get(Ref.BucketSegment, Ref.HashKey, /* out */ Value); bool InUpstreamCache = false; - if (!Success && m_UpstreamCache) + const bool QueryUpstream = !Success && m_UpstreamCache && (CachePolicy::QueryRemote == (Policy & CachePolicy::QueryRemote)); + + if (QueryUpstream) { const ZenContentType CacheRecordType = Ref.BucketSegment == "legacy"sv ? ZenContentType::kBinary : AcceptType == ZenContentType::kCbPackage ? ZenContentType::kCbPackage @@ -153,14 +278,13 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { if (CacheRecordType == ZenContentType::kCbObject) { - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(UpstreamResult.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(UpstreamResult.Value, CbValidateMode::All); if (ValidationResult == CbValidateError::None) { - zen::CbObjectView CacheRecord(UpstreamResult.Value.Data()); + CbObjectView CacheRecord(UpstreamResult.Value.Data()); - zen::CbObjectWriter IndexData; + CbObjectWriter IndexData; IndexData.BeginArray("references"); CacheRecord.IterateAttachments([&](CbFieldView Attachment) { IndexData.AddHash(Attachment.AsHash()); }); IndexData.EndArray(); @@ -214,6 +338,18 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (FoundCount == AttachmentCount) { m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = CacheRecord.GetBuffer().AsIoBuffer()}); + + if (zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments)) + { + CbPackage PackageWithoutAttachments; + PackageWithoutAttachments.SetObject(CacheRecord); + + MemoryOutStream MemStream; + BinaryWriter Writer(MemStream); + PackageWithoutAttachments.Save(Writer); + + Value.Value = IoBuffer(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); + } } else { @@ -236,7 +372,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { ZEN_DEBUG("MISS - '{}/{}'", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } if (Verb == kHead) @@ -248,41 +384,45 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req { CbObjectView CacheRecord(Value.Value.Data()); - const zen::CbValidateError ValidationResult = zen::ValidateCompactBinary(Value.Value, zen::CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Value.Value, CbValidateMode::All); if (ValidationResult != CbValidateError::None) { ZEN_WARN("GET - cache record '{}/{}' FAILED, invalid compact binary object", Ref.BucketSegment, Ref.HashKey); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Invalid cache record"sv); } - uint32_t AttachmentCount = 0; - uint32_t FoundCount = 0; - uint64_t AttachmentBytes = 0ull; + const bool SkipAttachments = zen::CachePolicy::SkipAttachments == (Policy & zen::CachePolicy::SkipAttachments); + uint32_t AttachmentCount = 0; + uint32_t FoundCount = 0; + uint64_t AttachmentBytes = 0ull; CbPackage Package; - CacheRecord.IterateAttachments( - [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { - if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) - { - Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); - AttachmentBytes += Chunk.Size(); - FoundCount++; - } - AttachmentCount++; - }); - - if (FoundCount != AttachmentCount) + if (!SkipAttachments) { - ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", - Ref.BucketSegment, - Ref.HashKey, - FoundCount, - AttachmentCount); + CacheRecord.IterateAttachments( + [this, &Ref, &Package, &AttachmentCount, &FoundCount, &AttachmentBytes](CbFieldView AttachmentHash) { + if (IoBuffer Chunk = m_CidStore.FindChunkByCid(AttachmentHash.AsHash())) + { + Package.AddAttachment(CbAttachment(CompressedBuffer::FromCompressed(SharedBuffer(Chunk)))); + AttachmentBytes += Chunk.Size(); + FoundCount++; + } + AttachmentCount++; + }); + + if (FoundCount != AttachmentCount) + { + ZEN_WARN("GET - cache record '{}/{}' FAILED, found '{}' of '{}' attachments", + Ref.BucketSegment, + Ref.HashKey, + FoundCount, + AttachmentCount); - return Request.WriteResponse(zen::HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Missing attachments"sv); + } } Package.SetObject(LoadCompactBinaryObject(Value.Value)); @@ -300,7 +440,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req IoBuffer Response(IoBuffer::Clone, MemStream.Data(), MemStream.Size()); - return Request.WriteResponse(zen::HttpResponseCode::OK, HttpContentType::kCbPackage, Response); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kCbPackage, Response); } else { @@ -310,41 +450,43 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req NiceBytes(Value.Value.Size()), InUpstreamCache ? "UPSTREAM" : "LOCAL"); - return Request.WriteResponse(zen::HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); + return Request.WriteResponse(HttpResponseCode::OK, Value.Value.GetContentType(), Value.Value); } } break; case kPut: { - zen::IoBuffer Body = Request.ReadPayload(); + IoBuffer Body = Request.ReadPayload(); if (!Body || Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } const HttpContentType ContentType = Request.RequestContentType(); + const bool StoreUpstream = m_UpstreamCache && (CachePolicy::StoreRemote == (Policy & CachePolicy::StoreRemote)); + if (ContentType == HttpContentType::kBinary || ContentType == HttpContentType::kUnknownContentType) { // TODO: create a cache record and put value in CAS? m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, {.Value = Body}); ZEN_DEBUG("PUT - binary '{}/{}' {}", Ref.BucketSegment, Ref.HashKey, NiceBytes(Body.Size())); - if (m_UpstreamCache) + if (StoreUpstream) { auto Result = m_UpstreamCache->EnqueueUpstream( {.Type = ZenContentType::kBinary, .CacheKey = {Ref.BucketSegment, Ref.HashKey}}); } - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else if (ContentType == HttpContentType::kCbObject) { // Validate payload before accessing it - const zen::CbValidateError ValidationResult = - zen::ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), zen::CbValidateMode::All); + const CbValidateError ValidationResult = + ValidateCompactBinary(MemoryView(Body.Data(), Body.Size()), CbValidateMode::All); if (ValidationResult != CbValidateError::None) { @@ -360,7 +502,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } // Extract referenced payload hashes - zen::CbObjectView Cbo(Body.Data()); + CbObjectView Cbo(Body.Data()); std::vector<IoHash> References; std::vector<IoHash> MissingRefs; @@ -371,7 +513,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (!References.empty()) { - zen::CbObjectWriter Idx; + CbObjectWriter Idx; Idx.BeginArray("references"); for (const IoHash& Hash : References) @@ -393,26 +535,23 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} attachments missing", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(CacheValue.Value.Size()), + NiceBytes(CacheValue.Value.Size()), MissingRefs.size(), References.size()); - if (MissingRefs.empty()) + if (MissingRefs.empty() && StoreUpstream) { - // Only enqueue valid cache records, i.e. all referenced payloads exists - if (m_UpstreamCache) - { - auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, - .CacheKey = {Ref.BucketSegment, Ref.HashKey}, - .PayloadIds = std::move(References)}); - } + ZEN_ASSERT(m_UpstreamCache); + auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbObject, + .CacheKey = {Ref.BucketSegment, Ref.HashKey}, + .PayloadIds = std::move(References)}); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { // TODO: Binary attachments? - zen::CbObjectWriter Response; + CbObjectWriter Response; Response.BeginArray("needs"); for (const IoHash& MissingRef : MissingRefs) { @@ -422,7 +561,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Response.EndArray(); // Return Created | BadRequest? - return Request.WriteResponse(zen::HttpResponseCode::Created, Response.Save()); + return Request.WriteResponse(HttpResponseCode::Created, Response.Save()); } } else if (ContentType == HttpContentType::kCbPackage) @@ -437,26 +576,22 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req CbObject CacheRecord = Package.GetObject(); - int32_t AttachmentCount = 0; - int32_t NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - bool AttachmentsOk = true; - + struct AttachmentInsertResult + { + int32_t Count = 0; + int32_t NewCount = 0; + uint64_t Bytes = 0; + uint64_t NewBytes = 0; + bool Ok = false; + }; + + AttachmentInsertResult AttachmentResult{.Ok = true}; std::span<const CbAttachment> Attachments = Package.GetAttachments(); + std::vector<IoHash> PayloadIds; - std::vector<IoHash> PayloadIds; PayloadIds.reserve(Attachments.size()); - CacheRecord.IterateAttachments([this, - &Ref, - &Package, - &AttachmentsOk, - &AttachmentCount, - &TotalAttachmentBytes, - &TotalNewBytes, - &NewAttachmentCount, - &PayloadIds](CbFieldView AttachmentHash) { + CacheRecord.IterateAttachments([this, &Ref, &Package, &AttachmentResult, &PayloadIds](CbFieldView AttachmentHash) { if (const CbAttachment* Attachment = Package.FindAttachment(AttachmentHash.AsHash())) { if (Attachment->IsCompressedBinary()) @@ -469,12 +604,12 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req if (InsertResult.New) { - TotalNewBytes += ChunkSize; - ++NewAttachmentCount; + AttachmentResult.NewBytes += ChunkSize; + AttachmentResult.NewCount++; } - TotalAttachmentBytes += ChunkSize; - AttachmentCount++; + AttachmentResult.Bytes += ChunkSize; + AttachmentResult.Count++; } else { @@ -482,7 +617,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } } else @@ -491,23 +626,24 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req Ref.BucketSegment, Ref.HashKey, AttachmentHash.AsHash()); - AttachmentsOk = false; + AttachmentResult.Ok = false; } }); - if (!AttachmentsOk) + if (!AttachmentResult.Ok) { return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Invalid attachments"); } IoBuffer CacheRecordChunk = CacheRecord.GetBuffer().AsIoBuffer(); - const uint64_t TotalPackageBytes = TotalAttachmentBytes + CacheRecordChunk.Size(); + const uint64_t TotalPackageBytes = AttachmentResult.Bytes + CacheRecordChunk.Size(); ZenCacheValue CacheValue{.Value = CacheRecordChunk}; m_CacheStore.Put(Ref.BucketSegment, Ref.HashKey, CacheValue); - if (m_UpstreamCache) + if (StoreUpstream) { + ZEN_ASSERT(m_UpstreamCache); auto Result = m_UpstreamCache->EnqueueUpstream({.Type = ZenContentType::kCbPackage, .CacheKey = {Ref.BucketSegment, Ref.HashKey}, .PayloadIds = std::move(PayloadIds)}); @@ -516,17 +652,17 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req ZEN_DEBUG("PUT - cache record '{}/{}' {}, {}/{} ({}/{}) new attachments", Ref.BucketSegment, Ref.HashKey, - zen::NiceBytes(TotalPackageBytes), - NewAttachmentCount, - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - zen::NiceBytes(TotalAttachmentBytes)); + NiceBytes(TotalPackageBytes), + AttachmentResult.NewCount, + AttachmentResult.Count, + NiceBytes(AttachmentResult.NewBytes), + NiceBytes(AttachmentResult.Bytes)); - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest); + return Request.WriteResponse(HttpResponseCode::BadRequest); } } break; @@ -540,7 +676,7 @@ HttpStructuredCacheService::HandleCacheRecordRequest(zen::HttpServerRequest& Req } void -HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref) +HttpStructuredCacheService::HandleCachePayloadRequest(HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy) { // Note: the URL references the uncompressed payload hash - so this maintains the mapping // from uncompressed CAS identity (aka CID/Content ID) to the stored payload hash @@ -548,27 +684,29 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re // this is a PITA but a consequence of the fact that the client side code is not able to // address data by compressed hash + ZEN_UNUSED(Policy); + switch (auto Verb = Request.RequestVerb()) { - using enum zen::HttpVerb; + using enum HttpVerb; case kHead: case kGet: { - zen::IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); - bool InUpstreamCache = false; + IoBuffer Payload = m_CidStore.FindChunkByCid(Ref.PayloadId); + bool InUpstreamCache = false; if (!Payload && m_UpstreamCache) { if (auto UpstreamResult = m_UpstreamCache->GetCachePayload({{Ref.BucketSegment, Ref.HashKey}, Ref.PayloadId}); UpstreamResult.Success) { - if (zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(UpstreamResult.Value))) { - Payload = UpstreamResult.Value; - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Payload); - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); - InUpstreamCache = true; + Payload = UpstreamResult.Value; + IoHash ChunkHash = IoHash::HashBuffer(Payload); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Payload, ChunkHash); + InUpstreamCache = true; m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); } @@ -582,7 +720,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (!Payload) { ZEN_DEBUG("MISS - '{}/{}/{}'", Ref.BucketSegment, Ref.HashKey, Ref.PayloadId); - return Request.WriteResponse(zen::HttpResponseCode::NotFound); + return Request.WriteResponse(HttpResponseCode::NotFound); } ZEN_DEBUG("HIT - '{}/{}/{}' {} (type: {}) ({})", @@ -598,29 +736,27 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re Request.SetSuppressResponseBody(); } - return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Payload); + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Payload); } break; case kPut: { - if (zen::IoBuffer Body = Request.ReadPayload()) + if (IoBuffer Body = Request.ReadPayload()) { if (Body.Size() == 0) { - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, - HttpContentType::kText, - "Empty payload not permitted"); + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Empty payload not permitted"); } - zen::IoHash ChunkHash = zen::IoHash::HashBuffer(Body); + IoHash ChunkHash = IoHash::HashBuffer(Body); - zen::CompressedBuffer Compressed = zen::CompressedBuffer::FromCompressed(SharedBuffer(Body)); + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Body)); if (!Compressed) { // All attachment payloads need to be in compressed buffer format - return Request.WriteResponse(zen::HttpResponseCode::BadRequest, + return Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Attachments must be compressed"); } @@ -632,7 +768,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re return Request.WriteResponse(HttpResponseCode::BadRequest); } - zen::CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); + CasStore::InsertResult Result = m_CasStore.InsertChunk(Body, ChunkHash); m_CidStore.AddCompressedCid(Ref.PayloadId, ChunkHash); @@ -646,11 +782,11 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re if (Result.New) { - return Request.WriteResponse(zen::HttpResponseCode::Created); + return Request.WriteResponse(HttpResponseCode::Created); } else { - return Request.WriteResponse(zen::HttpResponseCode::OK); + return Request.WriteResponse(HttpResponseCode::OK); } } } @@ -666,7 +802,7 @@ HttpStructuredCacheService::HandleCachePayloadRequest(zen::HttpServerRequest& Re } bool -HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef) +HttpStructuredCacheService::ValidateKeyUri(HttpServerRequest& Request, CacheRef& OutRef) { std::string_view Key = Request.RelativeUri(); std::string_view::size_type BucketSplitOffset = Key.find_first_of('/'); @@ -702,14 +838,14 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach PayloadSegment = Key.substr(PayloadSplitOffset + 1); } - if (HashSegment.size() != zen::IoHash::StringLength) + if (HashSegment.size() != IoHash::StringLength) { return false; } - if (!PayloadSegment.empty() && PayloadSegment.size() == zen::IoHash::StringLength) + if (!PayloadSegment.empty() && PayloadSegment.size() == IoHash::StringLength) { - const bool IsOk = zen::ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); + const bool IsOk = ParseHexBytes(PayloadSegment.data(), PayloadSegment.size(), OutRef.PayloadId.Hash); if (!IsOk) { @@ -718,10 +854,10 @@ HttpStructuredCacheService::ValidateKeyUri(zen::HttpServerRequest& Request, Cach } else { - OutRef.PayloadId = zen::IoHash::Zero; + OutRef.PayloadId = IoHash::Zero; } - const bool IsOk = zen::ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); + const bool IsOk = ParseHexBytes(HashSegment.data(), HashSegment.size(), OutRef.HashKey.Hash); if (!IsOk) { diff --git a/zenserver/cache/structuredcache.h b/zenserver/cache/structuredcache.h index 8289fd700..bd163dd1d 100644 --- a/zenserver/cache/structuredcache.h +++ b/zenserver/cache/structuredcache.h @@ -10,13 +10,13 @@ namespace spdlog { class logger; } -class ZenCacheStore; - namespace zen { class CasStore; class CidStore; class UpstreamCache; +class ZenCacheStore; +enum class CachePolicy : uint8_t; /** * Structured cache service. Imposes constraints on keys, supports blobs and @@ -60,6 +60,7 @@ public: virtual void HandleRequest(zen::HttpServerRequest& Request) override; void Flush(); + void Scrub(ScrubContext& Ctx); private: struct CacheRef @@ -70,13 +71,13 @@ private: }; [[nodiscard]] bool ValidateKeyUri(zen::HttpServerRequest& Request, CacheRef& OutRef); - void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref); - void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref); + void HandleCacheRecordRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); + void HandleCachePayloadRequest(zen::HttpServerRequest& Request, CacheRef& Ref, CachePolicy Policy); void HandleCacheBucketRequest(zen::HttpServerRequest& Request, std::string_view Bucket); spdlog::logger& Log() { return m_Log; } spdlog::logger& m_Log; - ZenCacheStore& m_CacheStore; + zen::ZenCacheStore& m_CacheStore; zen::CasStore& m_CasStore; zen::CidStore& m_CidStore; std::unique_ptr<UpstreamCache> m_UpstreamCache; diff --git a/zenserver/cache/structuredcachestore.cpp b/zenserver/cache/structuredcachestore.cpp index 018955e65..502ca6605 100644 --- a/zenserver/cache/structuredcachestore.cpp +++ b/zenserver/cache/structuredcachestore.cpp @@ -24,15 +24,16 @@ #include <atlfile.h> -using namespace zen; -using namespace fmt::literals; - ////////////////////////////////////////////////////////////////////////// -ZenCacheStore::ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} +namespace zen { + +using namespace fmt::literals; + +ZenCacheStore::ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir) : m_DiskLayer{Cas, RootDir} { ZEN_INFO("initializing structured cache at '{}'", RootDir); - zen::CreateDirectories(RootDir); + CreateDirectories(RootDir); } ZenCacheStore::~ZenCacheStore() @@ -40,7 +41,7 @@ ZenCacheStore::~ZenCacheStore() } bool -ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheStore::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { bool Ok = m_MemLayer.Get(InBucket, HashKey, OutValue); @@ -68,7 +69,7 @@ ZenCacheStore::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCac } void -ZenCacheStore::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheStore::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { // Store value and index @@ -104,6 +105,12 @@ ZenCacheStore::Flush() m_DiskLayer.Flush(); } +void +ZenCacheStore::Scrub(ScrubContext& Ctx) +{ + m_DiskLayer.Scrub(Ctx); + m_MemLayer.Scrub(Ctx); +} ////////////////////////////////////////////////////////////////////////// ZenCacheMemoryLayer::ZenCacheMemoryLayer() @@ -115,7 +122,7 @@ ZenCacheMemoryLayer::~ZenCacheMemoryLayer() } bool -ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; @@ -139,7 +146,7 @@ ZenCacheMemoryLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, } void -ZenCacheMemoryLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; @@ -178,8 +185,14 @@ ZenCacheMemoryLayer::DropBucket(std::string_view Bucket) return !!m_Buckets.erase(std::string(Bucket)); } +void +ZenCacheMemoryLayer::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + bool -ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheMemoryLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { RwLock::SharedLockScope _(m_bucketLock); @@ -196,7 +209,7 @@ ZenCacheMemoryLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& } void -ZenCacheMemoryLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheMemoryLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { RwLock::ExclusiveLockScope _(m_bucketLock); @@ -227,7 +240,7 @@ struct DiskLocation struct DiskIndexEntry { - zen::IoHash Key; + IoHash Key; DiskLocation Location; }; @@ -243,8 +256,8 @@ struct ZenCacheDiskLayer::CacheBucket void OpenOrCreate(std::filesystem::path BucketDir); static bool Delete(std::filesystem::path BucketDir); - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); void Drop(); void Flush(); @@ -260,12 +273,12 @@ private: BasicFile m_SobsFile; TCasLogFile<DiskIndexEntry> m_SlogFile; - void BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey); - void PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value); + void BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey); + void PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value); - RwLock m_IndexLock; - tsl::robin_map<zen::IoHash, DiskLocation, zen::IoHash::Hasher> m_Index; - uint64_t m_WriteCursor = 0; + RwLock m_IndexLock; + tsl::robin_map<IoHash, DiskLocation, IoHash::Hasher> m_Index; + uint64_t m_WriteCursor = 0; }; ZenCacheDiskLayer::CacheBucket::CacheBucket(CasStore& Cas) : m_CasStore(Cas) @@ -281,7 +294,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) { if (std::filesystem::exists(BucketDir)) { - zen::DeleteDirectories(BucketDir); + DeleteDirectories(BucketDir); return true; } @@ -292,7 +305,7 @@ ZenCacheDiskLayer::CacheBucket::Delete(std::filesystem::path BucketDir) void ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) { - zen::CreateDirectories(BucketDir); + CreateDirectories(BucketDir); m_BucketDir = BucketDir; @@ -357,7 +370,7 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) uint64_t MaxFileOffset = 0; - if (zen::RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) + if (RwLock::ExclusiveLockScope _(m_IndexLock); m_Index.empty()) { m_SlogFile.Replay([&](const DiskIndexEntry& Record) { m_Index[Record.Key] = Record.Location; @@ -372,25 +385,29 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir) } void -ZenCacheDiskLayer::CacheBucket::BuildPath(zen::WideStringBuilderBase& Path, const zen::IoHash& HashKey) +ZenCacheDiskLayer::CacheBucket::BuildPath(WideStringBuilderBase& Path, const IoHash& HashKey) { - char hex[sizeof(HashKey.Hash) * 2]; - ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, hex); + char HexString[sizeof(HashKey.Hash) * 2]; + ToHexBytes(HashKey.Hash, sizeof HashKey.Hash, HexString); Path.Append(m_BucketDir.c_str()); + Path.Append(L"/blob/"); + Path.AppendAsciiRange(HexString, HexString + 3); + Path.Append(L"/"); + Path.AppendAsciiRange(HexString + 3, HexString + 5); Path.Append(L"/"); - Path.AppendAsciiRange(hex, hex + sizeof(hex)); + Path.AppendAsciiRange(HexString + 5, HexString + sizeof(HexString)); } bool -ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::CacheBucket::Get(const IoHash& HashKey, ZenCacheValue& OutValue) { if (!m_Ok) { return false; } - zen::RwLock::SharedLockScope _(m_IndexLock); + RwLock::SharedLockScope _(m_IndexLock); if (auto it = m_Index.find(HashKey); it != m_Index.end()) { @@ -417,7 +434,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - if (zen::IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) + if (IoBuffer Data = IoBufferBuilder::MakeFromFile(DataFilePath.c_str())) { OutValue.Value = Data; OutValue.Value.SetContentType(ContentType); @@ -431,7 +448,7 @@ ZenCacheDiskLayer::CacheBucket::Get(const zen::IoHash& HashKey, ZenCacheValue& O } void -ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::CacheBucket::Put(const IoHash& HashKey, const ZenCacheValue& Value) { if (!m_Ok) { @@ -453,12 +470,12 @@ ZenCacheDiskLayer::CacheBucket::Put(const zen::IoHash& HashKey, const ZenCacheVa EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(m_WriteCursor, EntryFlags), .Size = gsl::narrow<uint32_t>(Value.Value.Size())}; - m_WriteCursor = zen::RoundUp(m_WriteCursor + Loc.Size, 16); + m_WriteCursor = RoundUp(m_WriteCursor + Loc.Size, 16); if (auto it = m_Index.find(HashKey); it == m_Index.end()) { @@ -483,7 +500,7 @@ ZenCacheDiskLayer::CacheBucket::Drop() m_SobsFile.Close(); m_SlogFile.Close(); - zen::DeleteDirectories(m_BucketDir); + DeleteDirectories(m_BucketDir); } void @@ -494,12 +511,22 @@ ZenCacheDiskLayer::CacheBucket::Flush() } void -ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Scrub(ScrubContext& Ctx) { - zen::WideStringBuilder<128> DataFilePath; + ZEN_UNUSED(Ctx); +} + +void +ZenCacheDiskLayer::CacheBucket::PutLargeObject(const IoHash& HashKey, const ZenCacheValue& Value) +{ + WideStringBuilder<128> DataFilePath; BuildPath(DataFilePath, HashKey); - // TODO: replace this with a more efficient implementation with proper atomic rename + // TODO: replace this process with a more efficient implementation with proper atomic rename + // and also avoid creating directories if we can + + std::filesystem::path ParentPath = std::filesystem::path(DataFilePath.c_str()).parent_path(); + CreateDirectories(ParentPath); CAtlTemporaryFile DataFile; @@ -507,21 +534,23 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); + ThrowSystemException(hRes, "Failed to open temporary file for put at '{}'"_format(m_BucketDir)); } hRes = DataFile.Write(Value.Value.Data(), gsl::narrow<DWORD>(Value.Value.Size())); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); + ThrowSystemException(hRes, "Failed to write payload ({} bytes) to file"_format(NiceBytes(Value.Value.Size()))); } + // Move file into place (note: not fully atomic!) + hRes = DataFile.Close(DataFilePath.c_str()); if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(zen::WideToUtf8(DataFilePath))); + ThrowSystemException(hRes, "Failed to finalize file '{}'"_format(WideToUtf8(DataFilePath))); } // Update index @@ -533,7 +562,7 @@ ZenCacheDiskLayer::CacheBucket::PutLargeObject(const zen::IoHash& HashKey, const EntryFlags |= DiskLocation::kStructured; } - zen::RwLock::ExclusiveLockScope _(m_IndexLock); + RwLock::ExclusiveLockScope _(m_IndexLock); DiskLocation Loc{.OffsetAndFlags = DiskLocation::CombineOffsetAndFlags(0, EntryFlags), .Size = 0}; @@ -560,12 +589,12 @@ ZenCacheDiskLayer::ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& ZenCacheDiskLayer::~ZenCacheDiskLayer() = default; bool -ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue) +ZenCacheDiskLayer::Get(std::string_view InBucket, const IoHash& HashKey, ZenCacheValue& OutValue) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -579,7 +608,7 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze { // Bucket needs to be opened/created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -603,12 +632,12 @@ ZenCacheDiskLayer::Get(std::string_view InBucket, const zen::IoHash& HashKey, Ze } void -ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, const ZenCacheValue& Value) +ZenCacheDiskLayer::Put(std::string_view InBucket, const IoHash& HashKey, const ZenCacheValue& Value) { CacheBucket* Bucket = nullptr; { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -622,7 +651,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co { // New bucket needs to be created - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); if (auto it = m_Buckets.find(std::string(InBucket)); it != m_Buckets.end()) { @@ -651,7 +680,7 @@ ZenCacheDiskLayer::Put(std::string_view InBucket, const zen::IoHash& HashKey, co bool ZenCacheDiskLayer::DropBucket(std::string_view InBucket) { - zen::RwLock::ExclusiveLockScope _(m_Lock); + RwLock::ExclusiveLockScope _(m_Lock); auto it = m_Buckets.find(std::string(InBucket)); @@ -679,7 +708,7 @@ ZenCacheDiskLayer::Flush() Buckets.reserve(m_Buckets.size()); { - zen::RwLock::SharedLockScope _(m_Lock); + RwLock::SharedLockScope _(m_Lock); for (auto& Kv : m_Buckets) { @@ -705,7 +734,7 @@ ZenCacheTracker::~ZenCacheTracker() } void -ZenCacheTracker::TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey) +ZenCacheTracker::TrackAccess(std::string_view Bucket, const IoHash& HashKey) { ZEN_UNUSED(Bucket); ZEN_UNUSED(HashKey); @@ -715,3 +744,5 @@ void ZenCacheTracker::Flush() { } + +} // namespace zen diff --git a/zenserver/cache/structuredcachestore.h b/zenserver/cache/structuredcachestore.h index 48c3cfde9..fdf4a8cfe 100644 --- a/zenserver/cache/structuredcachestore.h +++ b/zenserver/cache/structuredcachestore.h @@ -23,8 +23,6 @@ namespace zen { class WideStringBuilderBase; class CasStore; -} // namespace zen - /****************************************************************************** /$$$$$$$$ /$$$$$$ /$$ @@ -44,8 +42,8 @@ class CasStore; struct ZenCacheValue { - zen::IoBuffer Value; - zen::CbObject IndexData; + IoBuffer Value; + CbObject IndexData; }; class ZenCacheMemoryLayer @@ -54,34 +52,36 @@ public: ZenCacheMemoryLayer(); ~ZenCacheMemoryLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); + void Scrub(ScrubContext& Ctx); private: struct CacheBucket { - zen::RwLock m_bucketLock; - tsl::robin_map<zen::IoHash, zen::IoBuffer> m_cacheMap; + RwLock m_bucketLock; + tsl::robin_map<IoHash, IoBuffer> m_cacheMap; - bool Get(const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(const IoHash& HashKey, const ZenCacheValue& Value); }; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; }; class ZenCacheDiskLayer { public: - ZenCacheDiskLayer(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheDiskLayer(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheDiskLayer(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: /** A cache bucket manages a single directory containing @@ -89,22 +89,23 @@ private: */ struct CacheBucket; - zen::CasStore& m_CasStore; + CasStore& m_CasStore; std::filesystem::path m_RootDir; - zen::RwLock m_Lock; + RwLock m_Lock; std::unordered_map<std::string, CacheBucket> m_Buckets; // TODO: make this case insensitive }; class ZenCacheStore { public: - ZenCacheStore(zen::CasStore& Cas, const std::filesystem::path& RootDir); + ZenCacheStore(CasStore& Cas, const std::filesystem::path& RootDir); ~ZenCacheStore(); - bool Get(std::string_view Bucket, const zen::IoHash& HashKey, ZenCacheValue& OutValue); - void Put(std::string_view Bucket, const zen::IoHash& HashKey, const ZenCacheValue& Value); + bool Get(std::string_view Bucket, const IoHash& HashKey, ZenCacheValue& OutValue); + void Put(std::string_view Bucket, const IoHash& HashKey, const ZenCacheValue& Value); bool DropBucket(std::string_view Bucket); void Flush(); + void Scrub(ScrubContext& Ctx); private: std::filesystem::path m_RootDir; @@ -121,8 +122,10 @@ public: ZenCacheTracker(ZenCacheStore& CacheStore); ~ZenCacheTracker(); - void TrackAccess(std::string_view Bucket, const zen::IoHash& HashKey); + void TrackAccess(std::string_view Bucket, const IoHash& HashKey); void Flush(); private: }; + +} // namespace zen diff --git a/zenserver/config.cpp b/zenserver/config.cpp index 578a3a202..164d2a792 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -55,6 +55,27 @@ PickDefaultStateDirectory() #endif +UpstreamCachePolicy +ParseUpstreamCachePolicy(std::string_view Options) +{ + if (Options == "readonly") + { + return UpstreamCachePolicy::Read; + } + else if (Options == "writeonly") + { + return UpstreamCachePolicy::Write; + } + else if (Options == "disabled") + { + return UpstreamCachePolicy::Disabled; + } + else + { + return UpstreamCachePolicy::ReadWrite; + } +} + void ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) { @@ -77,6 +98,21 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<std::string>(GlobalOptions.ChildId), "<identifier>"); +#if ZEN_PLATFORM_WINDOWS + options.add_option("lifetime", + "", + "install", + "Install zenserver as a Windows service", + cxxopts::value<bool>(GlobalOptions.InstallService), + ""); + options.add_option("lifetime", + "", + "uninstall", + "Uninstall zenserver as a Windows service", + cxxopts::value<bool>(GlobalOptions.UninstallService), + ""); +#endif + options.add_option("network", "p", "port", @@ -98,6 +134,14 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z cxxopts::value<bool>(ServiceConfig.ShouldCrash)->default_value("false"), ""); + std::string UpstreamCachePolicyOptions; + options.add_option("cache", + "", + "upstream-cache-policy", + "", + cxxopts::value<std::string>(UpstreamCachePolicyOptions)->default_value(""), + "Upstream cache policy (readwrite|readonly|writeonly|disabled)"); + options.add_option("cache", "", "upstream-jupiter-url", @@ -163,13 +207,6 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z options.add_option("cache", "", - "upstream-enabled", - "Whether upstream caching is disabled", - cxxopts::value<bool>(ServiceConfig.UpstreamCacheConfig.Enabled)->default_value("true"), - ""); - - options.add_option("cache", - "", "upstream-thread-count", "Number of threads used for upstream procsssing", cxxopts::value<int>(ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount)->default_value("4"), @@ -185,6 +222,8 @@ ParseGlobalCliOptions(int argc, char* argv[], ZenServerOptions& GlobalOptions, Z exit(0); } + + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(UpstreamCachePolicyOptions); } catch (cxxopts::OptionParseException& e) { @@ -261,7 +300,8 @@ ParseServiceConfig(const std::filesystem::path& DataRoot, ZenServiceConfig& Serv if (auto UpstreamConfig = StructuredCacheConfig->get<sol::optional<sol::table>>("upstream")) { - ServiceConfig.UpstreamCacheConfig.Enabled = UpstreamConfig->get_or("enable", ServiceConfig.UpstreamCacheConfig.Enabled); + std::string Policy = UpstreamConfig->get_or("policy", std::string()); + ServiceConfig.UpstreamCacheConfig.CachePolicy = ParseUpstreamCachePolicy(Policy); ServiceConfig.UpstreamCacheConfig.UpstreamThreadCount = UpstreamConfig->get_or("upstreamthreadcount", 4); if (auto JupiterConfig = UpstreamConfig->get<sol::optional<sol::table>>("jupiter")) diff --git a/zenserver/config.h b/zenserver/config.h index 80ec86905..6ade1b401 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -9,12 +9,14 @@ struct ZenServerOptions { bool IsDebug = false; bool IsTest = false; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - int BasePort = 1337; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - std::string ChildId; // Id assigned by parent process (used for lifetime management) - std::string LogId; // Id for tagging log output - std::filesystem::path DataDir; // Root directory for state (used for testing) + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + int BasePort = 1337; // Service listen port (used for both UDP and TCP) + int OwnerPid = 0; // Parent process id (zero for standalone) + std::string ChildId; // Id assigned by parent process (used for lifetime management) + bool InstallService = false; // Flag used to initiate service install (temporary) + bool UninstallService = false; // Flag used to initiate service uninstall (temporary) + std::string LogId; // Id for tagging log output + std::filesystem::path DataDir; // Root directory for state (used for testing) }; struct ZenUpstreamJupiterConfig @@ -34,12 +36,20 @@ struct ZenUpstreamZenConfig std::string Url; }; +enum class UpstreamCachePolicy : uint8_t +{ + Disabled = 0, + Read = 1 << 0, + Write = 1 << 1, + ReadWrite = Read | Write +}; + struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; ZenUpstreamZenConfig ZenConfig; int UpstreamThreadCount = 4; - bool Enabled = false; + UpstreamCachePolicy CachePolicy = UpstreamCachePolicy::ReadWrite; }; struct ZenServiceConfig diff --git a/zenserver/diag/diagsvcs.h b/zenserver/diag/diagsvcs.h index 51ee98f67..61703e393 100644 --- a/zenserver/diag/diagsvcs.h +++ b/zenserver/diag/diagsvcs.h @@ -7,7 +7,9 @@ ////////////////////////////////////////////////////////////////////////// -class HttpTestService : public zen::HttpService +namespace zen { + +class HttpTestService : public HttpService { uint32_t LogPoint = 0; @@ -17,7 +19,7 @@ public: virtual const char* BaseUri() const override { return "/test/"; } - virtual void HandleRequest(zen::HttpServerRequest& Request) override + virtual void HandleRequest(HttpServerRequest& Request) override { using namespace std::literals; @@ -25,21 +27,21 @@ public: if (Uri == "hello"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"hello world!"sv); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"hello world!"sv); // OutputLogMessageInternal(&LogPoint, 0, 0); } else if (Uri == "1K"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1k); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1k); } else if (Uri == "1M"sv) { - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, m_1m); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, m_1m); } else if (Uri == "1M_1k"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024); for (int i = 0; i < 1024; ++i) @@ -47,11 +49,11 @@ public: Buffers.push_back(m_1k); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } else if (Uri == "1G"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024); for (int i = 0; i < 1024; ++i) @@ -59,11 +61,11 @@ public: Buffers.push_back(m_1m); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } else if (Uri == "1G_1k"sv) { - std::vector<zen::IoBuffer> Buffers; + std::vector<IoBuffer> Buffers; Buffers.reserve(1024 * 1024); for (int i = 0; i < 1024 * 1024; ++i) @@ -71,16 +73,16 @@ public: Buffers.push_back(m_1k); } - Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kBinary, Buffers); + Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kBinary, Buffers); } } private: - zen::IoBuffer m_1m{1024 * 1024}; - zen::IoBuffer m_1k{m_1m, 0u, 1024}; + IoBuffer m_1m{1024 * 1024}; + IoBuffer m_1k{m_1m, 0u, 1024}; }; -class HttpHealthService : public zen::HttpService +class HttpHealthService : public HttpService { public: HttpHealthService() = default; @@ -88,16 +90,18 @@ public: virtual const char* BaseUri() const override { return "/health/"; } - virtual void HandleRequest(zen::HttpServerRequest& Request) override + virtual void HandleRequest(HttpServerRequest& Request) override { using namespace std::literals; switch (Request.RequestVerb()) { - case zen::HttpVerb::kGet: - return Request.WriteResponse(zen::HttpResponseCode::OK, zen::HttpContentType::kText, u8"OK!"sv); + case HttpVerb::kGet: + return Request.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, u8"OK!"sv); } } private: }; + +} // namespace zen diff --git a/zenserver/diag/logging.cpp b/zenserver/diag/logging.cpp index 48eda7512..41b140f90 100644 --- a/zenserver/diag/logging.cpp +++ b/zenserver/diag/logging.cpp @@ -9,6 +9,9 @@ #include <spdlog/pattern_formatter.h> #include <spdlog/sinks/ansicolor_sink.h> #include <spdlog/sinks/basic_file_sink.h> +#include <spdlog/sinks/daily_file_sink.h> +#include <spdlog/sinks/msvc_sink.h> +#include <spdlog/sinks/rotating_file_sink.h> #include <spdlog/sinks/stdout_color_sinks.h> #include <spdlog/spdlog.h> #include <zencore/string.h> @@ -204,6 +207,12 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) IsAsync = false; } + if (GlobalOptions.IsTest) + { + LogLevel = spdlog::level::trace; + IsAsync = false; + } + if (IsAsync) { const int QueueSize = 8192; @@ -217,7 +226,19 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) // Sinks auto ConsoleSink = std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>(); - auto FileSink = std::make_shared<spdlog::sinks::basic_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), /* truncate */ true); + +#if 0 + auto FileSink = std::make_shared<spdlog::sinks::daily_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), + 0, + 0, + /* truncate */ false, + uint16_t(/* max files */ 14)); +#else + auto FileSink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(zen::WideToUtf8(LogPath.c_str()), + /* max size */ 128 * 1024 * 1024, + /* max files */ 16, + /* rotate on open */ true); +#endif // Default @@ -228,20 +249,30 @@ InitializeLogging(const ZenServerOptions& GlobalOptions) Sinks.push_back(ConsoleSink); Sinks.push_back(FileSink); +#if ZEN_PLATFORM_WINDOWS + if (zen::IsDebuggerPresent()) + { + auto DebugSink = std::make_shared<spdlog::sinks::msvc_sink_mt>(); + DebugSink->set_level(spdlog::level::debug); + Sinks.push_back(DebugSink); + } +#endif + // Jupiter - only log HTTP traffic to file auto JupiterLogger = std::make_shared<spdlog::logger>("jupiter", FileSink); spdlog::register_logger(JupiterLogger); - JupiterLogger->set_level(LogLevel); + + // Zen - only log HTTP traffic to file auto ZenClientLogger = std::make_shared<spdlog::logger>("zenclient", FileSink); spdlog::register_logger(ZenClientLogger); - ZenClientLogger->set_level(LogLevel); // Configure all registered loggers according to settings spdlog::set_level(LogLevel); spdlog::flush_on(spdlog::level::err); + spdlog::flush_every(std::chrono::seconds{2}); spdlog::set_formatter(std::make_unique<logging::full_formatter>(GlobalOptions.LogId, std::chrono::system_clock::now())); } diff --git a/zenserver/experimental/usnjournal.cpp b/zenserver/experimental/usnjournal.cpp index ab83b8a1c..1e765fbe5 100644 --- a/zenserver/experimental/usnjournal.cpp +++ b/zenserver/experimental/usnjournal.cpp @@ -34,14 +34,14 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - zen::ThrowSystemException("GetVolumePathName failed"); + zen::ThrowLastError("GetVolumePathName failed"); } Success = GetVolumeNameForVolumeMountPoint(VolumePathName, VolumeName, ZEN_ARRAY_COUNT(VolumeName)); if (!Success) { - zen::ThrowSystemException("GetVolumeNameForVolumeMountPoint failed"); + zen::ThrowLastError("GetVolumeNameForVolumeMountPoint failed"); } // Chop off trailing slash since we want to open a volume handle, not a handle to the volume root directory @@ -64,7 +64,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (m_VolumeHandle == INVALID_HANDLE_VALUE) { - ThrowSystemException("Volume handle open failed"); + ThrowLastError("Volume handle open failed"); } // Figure out which file system is in use for volume @@ -86,7 +86,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - ThrowSystemException("Failed to get volume information"); + ThrowLastError("Failed to get volume information"); } ZEN_DEBUG("File system type is {}", WideToUtf8(FileSystemName)); @@ -173,7 +173,7 @@ UsnJournalReader::Initialize(std::filesystem::path VolumePath) if (!Success) { - ThrowSystemException("GetFileInformationByHandleEx failed"); + ThrowLastError("GetFileInformationByHandleEx failed"); } const Frn VolumeRootFrn = FileInformation.FileId; diff --git a/zenserver/projectstore.cpp b/zenserver/projectstore.cpp index 404484edf..1f4239b23 100644 --- a/zenserver/projectstore.cpp +++ b/zenserver/projectstore.cpp @@ -780,6 +780,12 @@ ProjectStore::Project::Flush() // TODO } +void +ProjectStore::Project::Scrub(ScrubContext& Ctx) +{ + ZEN_UNUSED(Ctx); +} + ////////////////////////////////////////////////////////////////////////// ProjectStore::ProjectStore(CasStore& Store, std::filesystem::path BasePath) @@ -815,6 +821,17 @@ ProjectStore::Flush() } } +void +ProjectStore::Scrub(ScrubContext& Ctx) +{ + RwLock::SharedLockScope _(m_ProjectsLock); + + for (auto& Kv : m_Projects) + { + Kv.second.Scrub(Ctx); + } +} + ProjectStore::Project* ProjectStore::OpenProject(std::string_view ProjectId) { diff --git a/zenserver/projectstore.h b/zenserver/projectstore.h index 3d2247305..e545d78b9 100644 --- a/zenserver/projectstore.h +++ b/zenserver/projectstore.h @@ -101,6 +101,7 @@ public: spdlog::logger& Log() { return m_OuterProject->Log(); } void Flush(); + void Scrub(ScrubContext& Ctx); std::size_t OplogCount() const { return m_LatestOpMap.size(); } @@ -154,6 +155,7 @@ public: void Write(); [[nodiscard]] static bool Exists(std::filesystem::path BasePath); void Flush(); + void Scrub(ScrubContext& Ctx); spdlog::logger& Log(); private: @@ -177,6 +179,7 @@ public: void DeleteProject(std::string_view ProjectId); bool Exists(std::string_view ProjectId); void Flush(); + void Scrub(ScrubContext& Ctx); spdlog::logger& Log() { return m_Log; } const std::filesystem::path& BasePath() const { return m_ProjectBasePath; } @@ -193,13 +196,13 @@ private: ////////////////////////////////////////////////////////////////////////// // -// {ns} a root namespace, should be associated with the project which owns it +// {project} a project identifier // {target} a variation of the project, typically a build target // {lsn} oplog entry sequence number // -// /prj/{ns} -// /prj/{ns}/oplog/{target} -// /prj/{ns}/oplog/{target}/{lsn} +// /prj/{project} +// /prj/{project}/oplog/{target} +// /prj/{project}/oplog/{target}/{lsn} // // oplog entry // diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 4a5467648..2e74602db 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -66,6 +66,14 @@ CloudCacheSession::~CloudCacheSession() } CloudCacheResult +CloudCacheSession::Authenticate() +{ + std::string Auth; + const bool Success = m_CacheClient->AcquireAccessToken(Auth); + return {.Success = Success}; +} + +CloudCacheResult CloudCacheSession::GetDerivedData(std::string_view BucketId, std::string_view Key) { std::string Auth; @@ -163,7 +171,9 @@ CloudCacheSession::PutDerivedData(std::string_view BucketId, std::string_view Ke cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -194,7 +204,9 @@ CloudCacheSession::PutRef(std::string_view BucketId, const IoHash& Key, IoBuffer cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } CloudCacheResult @@ -215,7 +227,9 @@ CloudCacheSession::PutCompressedBlob(const IoHash& Key, IoBuffer Blob) cpr::Response Response = Session.Put(); ZEN_DEBUG("PUT {}", Response); - return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = (Response.status_code == 200 || Response.status_code == 201)}; + return {.Bytes = Response.uploaded_bytes, + .ElapsedSeconds = Response.elapsed, + .Success = (Response.status_code == 200 || Response.status_code == 201)}; } std::vector<IoHash> diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index 5535ba000..21217387c 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -61,6 +61,7 @@ public: CloudCacheSession(CloudCacheClient* OuterClient); ~CloudCacheSession(); + CloudCacheResult Authenticate(); CloudCacheResult GetDerivedData(std::string_view BucketId, std::string_view Key); CloudCacheResult GetDerivedData(std::string_view BucketId, const IoHash& Key); CloudCacheResult GetRef(std::string_view BucketId, const IoHash& Key, ZenContentType RefType); diff --git a/zenserver/upstream/upstreamcache.cpp b/zenserver/upstream/upstreamcache.cpp index 38d30a795..d6b6d44be 100644 --- a/zenserver/upstream/upstreamcache.cpp +++ b/zenserver/upstream/upstreamcache.cpp @@ -94,7 +94,7 @@ namespace detail { std::atomic_bool m_CompleteAdding{false}; }; - class JupiterUpstreamEndpoint final : public zen::UpstreamEndpoint + class JupiterUpstreamEndpoint final : public UpstreamEndpoint { public: JupiterUpstreamEndpoint(const CloudCacheClientOptions& Options) : m_UseLegacyDdc(Options.UseLegacyDdc) @@ -108,8 +108,9 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Jupiter client connection - return !m_Client->ServiceUrl().empty(); + CloudCacheSession Session(m_Client); + const CloudCacheResult Result = Session.Authenticate(); + return Result.Success; } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -118,8 +119,8 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); - CloudCacheResult Result; + CloudCacheSession Session(m_Client); + CloudCacheResult Result; if (m_UseLegacyDdc && Type == ZenContentType::kBinary) { @@ -134,7 +135,7 @@ namespace detail { { CbPackage Package; - const CbValidateError ValidationResult = zen::ValidateCompactBinary(Result.Response, CbValidateMode::All); + const CbValidateError ValidationResult = ValidateCompactBinary(Result.Response, CbValidateMode::All); if (Result.Success = ValidationResult == CbValidateError::None; Result.Success) { CbObject CacheRecord = LoadCompactBinaryObject(Result.Response); @@ -183,7 +184,7 @@ namespace detail { { try { - zen::CloudCacheSession Session(m_Client); + CloudCacheSession Session(m_Client); const CloudCacheResult Result = Session.GetCompressedBlob(PayloadKey.PayloadId); return {.Value = Result.Response, @@ -278,7 +279,7 @@ namespace detail { RefPtr<CloudCacheClient> m_Client; }; - class ZenUpstreamEndpoint final : public zen::UpstreamEndpoint + class ZenUpstreamEndpoint final : public UpstreamEndpoint { public: ZenUpstreamEndpoint(std::string_view ServiceUrl) @@ -292,8 +293,20 @@ namespace detail { virtual bool Initialize() override { - // TODO: Test and authenticate Zen client connection - return !m_Client->ServiceUrl().empty(); + try + { + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + for (int32_t Attempt = 0, MaxAttempts = 3; Attempt < MaxAttempts && !Result.Success; ++Attempt) + { + Result = Session.SayHello(); + } + return Result.Success; + } + catch (std::exception&) + { + return false; + } } virtual std::string_view DisplayName() const override { return m_DisplayName; } @@ -344,14 +357,14 @@ namespace detail { try { - zen::ZenStructuredCacheSession Session(*m_Client); - ZenCacheResult Result; - int64_t TotalBytes = 0ull; - double TotalElapsedSeconds = 0.0; + ZenStructuredCacheSession Session(*m_Client); + ZenCacheResult Result; + int64_t TotalBytes = 0ull; + double TotalElapsedSeconds = 0.0; if (CacheRecord.Type == ZenContentType::kCbPackage) { - zen::CbPackage Package; + CbPackage Package; Package.SetObject(CbObject(SharedBuffer(RecordValue))); for (const IoBuffer& Payload : Payloads) @@ -427,8 +440,8 @@ namespace detail { } private: - std::string m_DisplayName; - RefPtr<zen::ZenStructuredCacheClient> m_Client; + std::string m_DisplayName; + RefPtr<ZenStructuredCacheClient> m_Client; }; } // namespace detail @@ -455,7 +468,7 @@ class UpstreamStats final }; public: - UpstreamStats() : m_Log(zen::logging::Get("upstream")) {} + UpstreamStats() : m_Log(logging::Get("upstream")) {} void Add(const UpstreamEndpoint& Endpoint, const GetUpstreamCacheResult& Result) { @@ -523,8 +536,8 @@ private: class DefaultUpstreamCache final : public UpstreamCache { public: - DefaultUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) - : m_Log(zen::logging::Get("upstream")) + DefaultUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) + : m_Log(logging::Get("upstream")) , m_Options(Options) , m_CacheStore(CacheStore) , m_CidStore(CidStore) @@ -559,12 +572,15 @@ public: virtual GetUpstreamCacheResult GetCacheRecord(UpstreamCacheKey CacheKey, ZenContentType Type) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCacheRecord(CacheKey, Type); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -573,12 +589,15 @@ public: virtual GetUpstreamCacheResult GetCachePayload(UpstreamPayloadKey PayloadKey) override { - for (auto& Endpoint : m_Endpoints) + if (m_Options.ReadUpstream) { - if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + for (auto& Endpoint : m_Endpoints) { - m_Stats.Add(*Endpoint, Result); - return Result; + if (GetUpstreamCacheResult Result = Endpoint->GetCachePayload(PayloadKey); Result.Success) + { + m_Stats.Add(*Endpoint, Result); + return Result; + } } } @@ -587,7 +606,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) override { - if (m_IsRunning.load()) + if (m_IsRunning.load() && m_Options.WriteUpstream) { if (!m_UpstreamThreads.empty()) { @@ -697,21 +716,21 @@ private: spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - UpstreamCacheOptions m_Options; - ::ZenCacheStore& m_CacheStore; - CidStore& m_CidStore; - UpstreamQueue m_UpstreamQueue; - UpstreamStats m_Stats; - std::vector<std::unique_ptr<zen::UpstreamEndpoint>> m_Endpoints; - std::vector<std::thread> m_UpstreamThreads; - std::atomic_bool m_IsRunning{false}; + spdlog::logger& m_Log; + UpstreamCacheOptions m_Options; + ZenCacheStore& m_CacheStore; + CidStore& m_CidStore; + UpstreamQueue m_UpstreamQueue; + UpstreamStats m_Stats; + std::vector<std::unique_ptr<UpstreamEndpoint>> m_Endpoints; + std::vector<std::thread> m_UpstreamThreads; + std::atomic_bool m_IsRunning{false}; }; ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamCache> -MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore) +MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore) { return std::make_unique<DefaultUpstreamCache>(Options, CacheStore, CidStore); } diff --git a/zenserver/upstream/upstreamcache.h b/zenserver/upstream/upstreamcache.h index 327778452..142fe260f 100644 --- a/zenserver/upstream/upstreamcache.h +++ b/zenserver/upstream/upstreamcache.h @@ -8,11 +8,10 @@ #include <memory> -class ZenCacheStore; - namespace zen { class CidStore; +class ZenCacheStore; struct CloudCacheClientOptions; struct UpstreamCacheKey @@ -36,7 +35,9 @@ struct UpstreamCacheRecord struct UpstreamCacheOptions { - uint32_t ThreadCount = 4; + uint32_t ThreadCount = 4; + bool ReadUpstream = true; + bool WriteUpstream = true; }; struct GetUpstreamCacheResult @@ -101,7 +102,7 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamCacheRecord CacheRecord) = 0; }; -std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ::ZenCacheStore& CacheStore, CidStore& CidStore); +std::unique_ptr<UpstreamCache> MakeUpstreamCache(const UpstreamCacheOptions& Options, ZenCacheStore& CacheStore, CidStore& CidStore); std::unique_ptr<UpstreamEndpoint> MakeJupiterUpstreamEndpoint(const CloudCacheClientOptions& Options); diff --git a/zenserver/upstream/zen.cpp b/zenserver/upstream/zen.cpp index 55ddd310f..7f689d7f3 100644 --- a/zenserver/upstream/zen.cpp +++ b/zenserver/upstream/zen.cpp @@ -73,7 +73,7 @@ namespace detail { // Note that currently this just implements an UDP echo service for testing purposes -Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(zen::GetSessionId()) +Mesh::Mesh(asio::io_context& IoContext) : m_Log(logging::Get("mesh")), m_IoContext(IoContext), m_SessionId(GetSessionId()) { } @@ -370,7 +370,7 @@ ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State) using namespace std::literals; ZenStructuredCacheSession::ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient) -: m_Log(zen::logging::Get("zenclient"sv)) +: m_Log(logging::Get("zenclient"sv)) , m_Client(OuterClient) { m_SessionState = m_Client.AllocSessionState(); @@ -382,6 +382,19 @@ ZenStructuredCacheSession::~ZenStructuredCacheSession() } ZenCacheResult +ZenStructuredCacheSession::SayHello() +{ + ExtendableStringBuilder<256> Uri; + Uri << m_Client.ServiceUrl() << "/test/hello"; + + cpr::Session& Session = m_SessionState->Session; + Session.SetOption(cpr::Url{Uri.c_str()}); + cpr::Response Response = Session.Get(); + + return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200}; +} + +ZenCacheResult ZenStructuredCacheSession::GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type) { ExtendableStringBuilder<256> Uri; diff --git a/zenserver/upstream/zen.h b/zenserver/upstream/zen.h index ff4a551bf..36cfd1217 100644 --- a/zenserver/upstream/zen.h +++ b/zenserver/upstream/zen.h @@ -109,6 +109,7 @@ public: ZenStructuredCacheSession(ZenStructuredCacheClient& OuterClient); ~ZenStructuredCacheSession(); + ZenCacheResult SayHello(); ZenCacheResult GetCacheRecord(std::string_view BucketId, const IoHash& Key, ZenContentType Type); ZenCacheResult GetCachePayload(std::string_view BucketId, const IoHash& Key, const IoHash& PayloadId); ZenCacheResult PutCacheRecord(std::string_view BucketId, const IoHash& Key, IoBuffer Value, ZenContentType Type); diff --git a/zenserver/vfs.cpp b/zenserver/vfs.cpp index 86e265b20..fcc9a71f8 100644 --- a/zenserver/vfs.cpp +++ b/zenserver/vfs.cpp @@ -5,7 +5,6 @@ #if ZEN_WITH_VFS # include <zencore/except.h> # include <zencore/filesystem.h> -# include <zencore/snapshot_manifest.h> # include <zencore/stream.h> # include <zencore/windows.h> # include <zencore/logging.h> @@ -532,7 +531,7 @@ retry: } else if (hRes == HRESULT_FROM_WIN32(ERROR_FILE_NOT_FOUND)) { - throw zen::WindowsException(hRes, "Failed to initialize root placeholder"); + ThrowSystemException(hRes, "Failed to initialize root placeholder"); } // Ignore error, problems will be reported below anyway diff --git a/zenserver/windows/service.cpp b/zenserver/windows/service.cpp new file mode 100644 index 000000000..017b5f9a7 --- /dev/null +++ b/zenserver/windows/service.cpp @@ -0,0 +1,631 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "service.h" + +#include <zencore/zencore.h> + +#include <stdio.h> +#include <tchar.h> +#include <zencore/windows.h> + +#define SVCNAME L"Zen Store" + +SERVICE_STATUS gSvcStatus; +SERVICE_STATUS_HANDLE gSvcStatusHandle; +HANDLE ghSvcStopEvent = NULL; + +void SvcInstall(void); + +void ReportSvcStatus(DWORD, DWORD, DWORD); +void SvcReportEvent(LPTSTR); + +WindowsService::WindowsService() +{ +} + +WindowsService::~WindowsService() +{ +} + +// +// Purpose: +// Installs a service in the SCM database +// +// Parameters: +// None +// +// Return value: +// None +// +VOID +WindowsService::Install() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + TCHAR szPath[MAX_PATH]; + + if (!GetModuleFileName(NULL, szPath, MAX_PATH)) + { + printf("Cannot install service (%d)\n", GetLastError()); + return; + } + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Create the service + + schService = CreateService(schSCManager, // SCM database + SVCNAME, // name of service + SVCNAME, // service name to display + SERVICE_ALL_ACCESS, // desired access + SERVICE_WIN32_OWN_PROCESS, // service type + SERVICE_DEMAND_START, // start type + SERVICE_ERROR_NORMAL, // error control type + szPath, // path to service's binary + NULL, // no load ordering group + NULL, // no tag identifier + NULL, // no dependencies + NULL, // LocalSystem account + NULL); // no password + + if (schService == NULL) + { + printf("CreateService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + else + printf("Service installed successfully\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +void +WindowsService::Delete() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + DELETE); // need delete access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Delete the service. + + if (!DeleteService(schService)) + { + printf("DeleteService failed (%d)\n", GetLastError()); + } + else + printf("Service deleted successfully\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +WindowsService* gSvc; + +void WINAPI +CallMain(DWORD, LPSTR*) +{ + gSvc->SvcMain(); +} + +int +WindowsService::ServiceMain() +{ + if (zen::IsInteractiveSession()) + { + // Not actually running as a service + return Run(); + } + else + { + gSvc = this; + + SERVICE_TABLE_ENTRY DispatchTable[] = {{(LPWSTR)SVCNAME, (LPSERVICE_MAIN_FUNCTION)&CallMain}, {NULL, NULL}}; + + // This call returns when the service has stopped. + // The process should simply terminate when the call returns. + + if (!StartServiceCtrlDispatcher(DispatchTable)) + { + SvcReportEvent((LPTSTR)L"StartServiceCtrlDispatcher"); + } + } + + return 0; +} + +int +WindowsService::SvcMain() +{ + // Register the handler function for the service + + gSvcStatusHandle = RegisterServiceCtrlHandler(SVCNAME, SvcCtrlHandler); + + if (!gSvcStatusHandle) + { + SvcReportEvent((LPTSTR)TEXT("RegisterServiceCtrlHandler")); + + return 1; + } + + // These SERVICE_STATUS members remain as set here + + gSvcStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + gSvcStatus.dwServiceSpecificExitCode = 0; + + // Report initial status to the SCM + + ReportSvcStatus(SERVICE_START_PENDING, NO_ERROR, 3000); + + // Create an event. The control handler function, SvcCtrlHandler, + // signals this event when it receives the stop control code. + + ghSvcStopEvent = CreateEvent(NULL, // default security attributes + TRUE, // manual reset event + FALSE, // not signaled + NULL); // no name + + if (ghSvcStopEvent == NULL) + { + ReportSvcStatus(SERVICE_STOPPED, GetLastError(), 0); + + return 1; + } + + // Report running status when initialization is complete. + + ReportSvcStatus(SERVICE_RUNNING, NO_ERROR, 0); + + int ReturnCode = Run(); + + ReportSvcStatus(SERVICE_STOPPED, NO_ERROR, 0); + + return ReturnCode; +} + +// +// Purpose: +// Retrieves and displays the current service configuration. +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoQuerySvc() +{ + SC_HANDLE schSCManager{}; + SC_HANDLE schService{}; + LPQUERY_SERVICE_CONFIG lpsc{}; + LPSERVICE_DESCRIPTION lpsd{}; + DWORD dwBytesNeeded{}, cbBufSize{}, dwError{}; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_QUERY_CONFIG); // need query config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Get the configuration information. + + if (!QueryServiceConfig(schService, NULL, 0, &dwBytesNeeded)) + { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER == dwError) + { + cbBufSize = dwBytesNeeded; + lpsc = (LPQUERY_SERVICE_CONFIG)LocalAlloc(LMEM_FIXED, cbBufSize); + } + else + { + printf("QueryServiceConfig failed (%d)", dwError); + goto cleanup; + } + } + + if (!QueryServiceConfig(schService, lpsc, cbBufSize, &dwBytesNeeded)) + { + printf("QueryServiceConfig failed (%d)", GetLastError()); + goto cleanup; + } + + if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, NULL, 0, &dwBytesNeeded)) + { + dwError = GetLastError(); + if (ERROR_INSUFFICIENT_BUFFER == dwError) + { + cbBufSize = dwBytesNeeded; + lpsd = (LPSERVICE_DESCRIPTION)LocalAlloc(LMEM_FIXED, cbBufSize); + } + else + { + printf("QueryServiceConfig2 failed (%d)", dwError); + goto cleanup; + } + } + + if (!QueryServiceConfig2(schService, SERVICE_CONFIG_DESCRIPTION, (LPBYTE)lpsd, cbBufSize, &dwBytesNeeded)) + { + printf("QueryServiceConfig2 failed (%d)", GetLastError()); + goto cleanup; + } + + // Print the configuration information. + + _tprintf(TEXT("%s configuration: \n"), SVCNAME); + _tprintf(TEXT(" Type: 0x%x\n"), lpsc->dwServiceType); + _tprintf(TEXT(" Start Type: 0x%x\n"), lpsc->dwStartType); + _tprintf(TEXT(" Error Control: 0x%x\n"), lpsc->dwErrorControl); + _tprintf(TEXT(" Binary path: %s\n"), lpsc->lpBinaryPathName); + _tprintf(TEXT(" Account: %s\n"), lpsc->lpServiceStartName); + + if (lpsd->lpDescription != NULL && lstrcmp(lpsd->lpDescription, TEXT("")) != 0) + _tprintf(TEXT(" Description: %s\n"), lpsd->lpDescription); + if (lpsc->lpLoadOrderGroup != NULL && lstrcmp(lpsc->lpLoadOrderGroup, TEXT("")) != 0) + _tprintf(TEXT(" Load order group: %s\n"), lpsc->lpLoadOrderGroup); + if (lpsc->dwTagId != 0) + _tprintf(TEXT(" Tag ID: %d\n"), lpsc->dwTagId); + if (lpsc->lpDependencies != NULL && lstrcmp(lpsc->lpDependencies, TEXT("")) != 0) + _tprintf(TEXT(" Dependencies: %s\n"), lpsc->lpDependencies); + + LocalFree(lpsc); + LocalFree(lpsd); + +cleanup: + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Disables the service. +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoDisableSvc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service start type. + + if (!ChangeServiceConfig(schService, // handle of service + SERVICE_NO_CHANGE, // service type: no change + SERVICE_DISABLED, // service start type + SERVICE_NO_CHANGE, // error control: no change + NULL, // binary path: no change + NULL, // load order group: no change + NULL, // tag ID: no change + NULL, // dependencies: no change + NULL, // account name: no change + NULL, // password: no change + NULL)) // display name: no change + { + printf("ChangeServiceConfig failed (%d)\n", GetLastError()); + } + else + printf("Service disabled successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Enables the service. +// +// Parameters: +// None +// +// Return value: +// None +// +VOID __stdcall DoEnableSvc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service start type. + + if (!ChangeServiceConfig(schService, // handle of service + SERVICE_NO_CHANGE, // service type: no change + SERVICE_DEMAND_START, // service start type + SERVICE_NO_CHANGE, // error control: no change + NULL, // binary path: no change + NULL, // load order group: no change + NULL, // tag ID: no change + NULL, // dependencies: no change + NULL, // account name: no change + NULL, // password: no change + NULL)) // display name: no change + { + printf("ChangeServiceConfig failed (%d)\n", GetLastError()); + } + else + printf("Service enabled successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} +// +// Purpose: +// Updates the service description to "This is a test description". +// +// Parameters: +// None +// +// Return value: +// None +// +void +DoUpdateSvcDesc() +{ + SC_HANDLE schSCManager; + SC_HANDLE schService; + SERVICE_DESCRIPTION sd; + TCHAR szDesc[] = TEXT("This is a test description"); + + // Get a handle to the SCM database. + + schSCManager = OpenSCManager(NULL, // local computer + NULL, // ServicesActive database + SC_MANAGER_ALL_ACCESS); // full access rights + + if (NULL == schSCManager) + { + printf("OpenSCManager failed (%d)\n", GetLastError()); + return; + } + + // Get a handle to the service. + + schService = OpenService(schSCManager, // SCM database + SVCNAME, // name of service + SERVICE_CHANGE_CONFIG); // need change config access + + if (schService == NULL) + { + printf("OpenService failed (%d)\n", GetLastError()); + CloseServiceHandle(schSCManager); + return; + } + + // Change the service description. + + sd.lpDescription = szDesc; + + if (!ChangeServiceConfig2(schService, // handle to service + SERVICE_CONFIG_DESCRIPTION, // change: description + &sd)) // new description + { + printf("ChangeServiceConfig2 failed\n"); + } + else + printf("Service description updated successfully.\n"); + + CloseServiceHandle(schService); + CloseServiceHandle(schSCManager); +} + +// +// Purpose: +// Sets the current service status and reports it to the SCM. +// +// Parameters: +// dwCurrentState - The current state (see SERVICE_STATUS) +// dwWin32ExitCode - The system error code +// dwWaitHint - Estimated time for pending operation, +// in milliseconds +// +// Return value: +// None +// +VOID +ReportSvcStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint) +{ + static DWORD dwCheckPoint = 1; + + // Fill in the SERVICE_STATUS structure. + + gSvcStatus.dwCurrentState = dwCurrentState; + gSvcStatus.dwWin32ExitCode = dwWin32ExitCode; + gSvcStatus.dwWaitHint = dwWaitHint; + + if (dwCurrentState == SERVICE_START_PENDING) + gSvcStatus.dwControlsAccepted = 0; + else + gSvcStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP; + + if ((dwCurrentState == SERVICE_RUNNING) || (dwCurrentState == SERVICE_STOPPED)) + gSvcStatus.dwCheckPoint = 0; + else + gSvcStatus.dwCheckPoint = dwCheckPoint++; + + // Report the status of the service to the SCM. + SetServiceStatus(gSvcStatusHandle, &gSvcStatus); +} + +void +WindowsService::SvcCtrlHandler(DWORD dwCtrl) +{ + // Handle the requested control code. + // + // Called by SCM whenever a control code is sent to the service + // using the ControlService function. + + switch (dwCtrl) + { + case SERVICE_CONTROL_STOP: + ReportSvcStatus(SERVICE_STOP_PENDING, NO_ERROR, 0); + + // Signal the service to stop. + + SetEvent(ghSvcStopEvent); + zen::RequestApplicationExit(0); + + ReportSvcStatus(gSvcStatus.dwCurrentState, NO_ERROR, 0); + return; + + case SERVICE_CONTROL_INTERROGATE: + break; + + default: + break; + } +} + +// +// Purpose: +// Logs messages to the event log +// +// Parameters: +// szFunction - name of function that failed +// +// Return value: +// None +// +// Remarks: +// The service must have an entry in the Application event log. +// +VOID +SvcReportEvent(LPTSTR szFunction) +{ + ZEN_UNUSED(szFunction); + + // HANDLE hEventSource; + // LPCTSTR lpszStrings[2]; + // TCHAR Buffer[80]; + + // hEventSource = RegisterEventSource(NULL, SVCNAME); + + // if (NULL != hEventSource) + //{ + // StringCchPrintf(Buffer, 80, TEXT("%s failed with %d"), szFunction, GetLastError()); + + // lpszStrings[0] = SVCNAME; + // lpszStrings[1] = Buffer; + + // ReportEvent(hEventSource, // event log handle + // EVENTLOG_ERROR_TYPE, // event type + // 0, // event category + // SVC_ERROR, // event identifier + // NULL, // no security identifier + // 2, // size of lpszStrings array + // 0, // no binary data + // lpszStrings, // array of strings + // NULL); // no binary data + + // DeregisterEventSource(hEventSource); + //} +} diff --git a/zenserver/windows/service.h b/zenserver/windows/service.h new file mode 100644 index 000000000..7c9610983 --- /dev/null +++ b/zenserver/windows/service.h @@ -0,0 +1,20 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +class WindowsService +{ +public: + WindowsService(); + ~WindowsService(); + + virtual int Run() = 0; + + int ServiceMain(); + + static void Install(); + static void Delete(); + + int SvcMain(); + static void __stdcall SvcCtrlHandler(unsigned long); +}; diff --git a/zenserver/xmake.lua b/zenserver/xmake.lua index bb70846fa..7a6981fcd 100644 --- a/zenserver/xmake.lua +++ b/zenserver/xmake.lua @@ -14,6 +14,8 @@ target("zenserver") add_ldflags("/MANIFEST:EMBED") add_ldflags("/MANIFESTUAC:level='requireAdministrator'") add_ldflags("/LTCG") + else + del_files("windows/**") end add_options("vfs") diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index 53dc41a24..cf24dc224 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -11,28 +11,35 @@ #include <zencore/timer.h> #include <zencore/windows.h> #include <zenhttp/httpserver.h> -#include <zenserverprocess.h> #include <zenstore/cas.h> #include <zenstore/cidstore.h> +#include <zenutil/zenserverprocess.h> #include <fmt/format.h> -#include <mimalloc-new-delete.h> -#include <mimalloc.h> + +#if ZEN_USE_MIMALLOC +# include <mimalloc-new-delete.h> +# include <mimalloc.h> +#endif + #include <asio.hpp> #include <exception> #include <list> #include <lua.hpp> #include <optional> #include <regex> +#include <set> #include <unordered_map> ////////////////////////////////////////////////////////////////////////// // We don't have any doctest code in this file but this is needed to bring // in some shared code into the executable -#define DOCTEST_CONFIG_IMPLEMENT -#include <doctest/doctest.h> -#undef DOCTEST_CONFIG_IMPLEMENT +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT +#endif ////////////////////////////////////////////////////////////////////////// @@ -40,6 +47,10 @@ #include "config.h" #include "diag/logging.h" +#if ZEN_PLATFORM_WINDOWS +# include "windows/service.h" +#endif + ////////////////////////////////////////////////////////////////////////// // Sentry // @@ -82,11 +93,16 @@ #define ZEN_APP_NAME "Zen store" +namespace zen { + class ZenServer { + ZenServerState::ZenServerEntry* m_ServerEntry = nullptr; + public: - void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid) + void Initialize(ZenServiceConfig& ServiceConfig, int BasePort, int ParentPid, ZenServerState::ZenServerEntry* ServerEntry) { + m_ServerEntry = ServerEntry; using namespace fmt::literals; ZEN_INFO(ZEN_APP_NAME " initializing"); @@ -94,23 +110,29 @@ public: if (ParentPid) { - m_Process.Initialize(ParentPid); + zen::ProcessHandle OwnerProcess; + OwnerProcess.Initialize(ParentPid); - if (!m_Process.IsValid()) + if (!OwnerProcess.IsValid()) { ZEN_WARN("Unable to initialize process handle for specified parent pid #{}", ParentPid); + + // If the pid is not reachable should we just shut down immediately? the intended owner process + // could have been killed or somehow crashed already } else { ZEN_INFO("Using parent pid #{} to control process lifetime", ParentPid); } + + m_ProcessMonitor.AddPid(ParentPid); } // Initialize/check mutex based on base port std::string MutexName = "zen_{}"_format(BasePort); - if (zen::NamedMutex::Exists(MutexName) || (m_ServerMutex.Create(MutexName) == false)) + if (zen::NamedMutex::Exists(MutexName) || ((m_ServerMutex.Create(MutexName) == false))) { throw std::runtime_error("Failed to create mutex '{}' - is another instance already running?"_format(MutexName).c_str()); } @@ -151,11 +173,15 @@ public: m_CacheStore = std::make_unique<ZenCacheStore>(*m_CasStore, m_DataRoot / "cache"); std::unique_ptr<zen::UpstreamCache> UpstreamCache; - if (ServiceConfig.UpstreamCacheConfig.Enabled) + if (ServiceConfig.UpstreamCacheConfig.CachePolicy != UpstreamCachePolicy::Disabled) { const ZenUpstreamCacheConfig& UpstreamConfig = ServiceConfig.UpstreamCacheConfig; zen::UpstreamCacheOptions UpstreamOptions; + UpstreamOptions.ReadUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Read)) != 0; + UpstreamOptions.WriteUpstream = + (uint8_t(ServiceConfig.UpstreamCacheConfig.CachePolicy) & uint8_t(UpstreamCachePolicy::Write)) != 0; if (UpstreamConfig.UpstreamThreadCount < 32) { @@ -201,7 +227,11 @@ public: if (UpstreamCache->Initialize()) { - ZEN_INFO("upstream cache active"); + ZEN_INFO("upstream cache active ({})", + UpstreamOptions.ReadUpstream && UpstreamOptions.WriteUpstream ? "READ|WRITE" + : UpstreamOptions.ReadUpstream ? "READONLY" + : UpstreamOptions.WriteUpstream ? "WRITEONLY" + : "DISABLED"); } else { @@ -267,7 +297,9 @@ public: void Run() { - if (m_Process.IsValid()) + Scrub(); + + if (m_ProcessMonitor.IsActive()) { EnqueueTimer(); } @@ -282,7 +314,7 @@ public: ZEN_INFO(" \\/ \\/ \\/ \\/ \\/ "); } - ZEN_INFO(ZEN_APP_NAME " now running"); + ZEN_INFO(ZEN_APP_NAME " now running (pid: {})", zen::GetCurrentProcessId()); #if USE_SENTRY sentry_clear_modulecache(); @@ -293,7 +325,9 @@ public: __debugbreak(); } - m_Http->Run(m_TestMode); + const bool IsInteractiveMode = zen::IsInteractiveSession() && !m_TestMode; + + m_Http->Run(IsInteractiveMode); ZEN_INFO(ZEN_APP_NAME " exiting"); @@ -332,18 +366,51 @@ public: void CheckOwnerPid() { - if (m_Process.IsRunning()) + // Pick up any new "owner" processes + + std::set<uint32_t> AddedPids; + + for (auto& PidEntry : m_ServerEntry->SponsorPids) + { + if (uint32_t ThisPid = PidEntry.load(std::memory_order::memory_order_relaxed)) + { + if (PidEntry.compare_exchange_strong(ThisPid, 0)) + { + if (AddedPids.insert(ThisPid).second) + { + m_ProcessMonitor.AddPid(ThisPid); + + ZEN_INFO("added process with pid #{} as a sponsor process", ThisPid); + } + } + } + } + + if (m_ProcessMonitor.IsRunning()) { EnqueueTimer(); } else { - ZEN_INFO(ZEN_APP_NAME " exiting since parent process id {} is gone", m_Process.Pid()); + ZEN_INFO(ZEN_APP_NAME " exiting since sponsor processes are all gone"); RequestExit(0); } } + void Scrub() + { + ZEN_INFO("Storage validation STARTING"); + + ScrubContext Ctx; + m_CasStore->Scrub(Ctx); + m_CidStore->Scrub(Ctx); + m_ProjectStore->Scrub(Ctx); + m_StructuredCacheService->Scrub(Ctx); + + ZEN_INFO("Storage validation DONE"); + } + void Flush() { if (m_CasStore) @@ -366,16 +433,16 @@ private: std::jthread m_IoRunner; asio::io_context m_IoContext; asio::steady_timer m_PidCheckTimer{m_IoContext}; - zen::ProcessHandle m_Process; + zen::ProcessMonitor m_ProcessMonitor; zen::NamedMutex m_ServerMutex; zen::Ref<zen::HttpServer> m_Http; std::unique_ptr<zen::CasStore> m_CasStore{zen::CreateCasStore()}; std::unique_ptr<zen::CidStore> m_CidStore; - std::unique_ptr<ZenCacheStore> m_CacheStore; + std::unique_ptr<zen::ZenCacheStore> m_CacheStore; zen::CasGc m_Gc{*m_CasStore}; zen::CasScrubber m_Scrubber{*m_CasStore}; - HttpTestService m_TestService; + zen::HttpTestService m_TestService; zen::HttpTestingService m_TestingService; zen::HttpCasService m_CasService{*m_CasStore}; zen::RefPtr<zen::ProjectStore> m_ProjectStore; @@ -383,23 +450,39 @@ private: std::unique_ptr<zen::HttpLaunchService> m_HttpLaunchService; std::unique_ptr<zen::HttpProjectService> m_HttpProjectService; std::unique_ptr<zen::HttpStructuredCacheService> m_StructuredCacheService; - HttpAdminService m_AdminService; - HttpHealthService m_HealthService; + zen::HttpAdminService m_AdminService; + zen::HttpHealthService m_HealthService; zen::Mesh m_ZenMesh{m_IoContext}; std::unique_ptr<zen::HttpFunctionService> m_HttpFunctionService; bool m_DebugOptionForcedCrash = false; }; -int -main(int argc, char* argv[]) +} // namespace zen + +class ZenWindowsService : public WindowsService { - mi_version(); +public: + ZenWindowsService(ZenServerOptions& GlobalOptions, ZenServiceConfig& ServiceConfig) + : m_GlobalOptions(GlobalOptions) + , m_ServiceConfig(ServiceConfig) + { + } - ZenServerOptions GlobalOptions; - ZenServiceConfig ServiceConfig; - ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); - InitializeLogging(GlobalOptions); + ZenWindowsService(const ZenWindowsService&) = delete; + ZenWindowsService& operator=(const ZenWindowsService&) = delete; + + virtual int Run() override; + +private: + ZenServerOptions& m_GlobalOptions; + ZenServiceConfig& m_ServiceConfig; +}; + +int +ZenWindowsService::Run() +{ + using namespace zen; #if USE_SENTRY // Initialize sentry.io client @@ -408,30 +491,47 @@ main(int argc, char* argv[]) sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); sentry_init(SentryOptions); - auto _ = zen::MakeGuard([&] { sentry_close(); }); + auto _ = zen::MakeGuard([] { sentry_close(); }); #endif - // Prototype config system, let's see how this pans out - - ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); - - ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + auto& GlobalOptions = m_GlobalOptions; + auto& ServiceConfig = m_ServiceConfig; try { + // Prototype config system, we'll see how this pans out + // + // TODO: we need to report any parse errors here + + ParseServiceConfig(GlobalOptions.DataDir, /* out */ ServiceConfig); + + ZEN_INFO("zen cache server starting on port {}", GlobalOptions.BasePort); + ZenServerState ServerState; ServerState.Initialize(); ServerState.Sweep(); - if (ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort)) + ZenServerState::ZenServerEntry* Entry = ServerState.Lookup(GlobalOptions.BasePort); + + if (Entry) { // Instance already running for this port? Should double check pid ZEN_WARN("Looks like there is already a process listening to this port (pid: {})", Entry->Pid); + + if (GlobalOptions.OwnerPid) + { + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); + + std::exit(0); + } } - else + + Entry = ServerState.Register(GlobalOptions.BasePort); + + if (GlobalOptions.OwnerPid) { - ServerState.Register(GlobalOptions.BasePort); + Entry->AddSponsorProcess(GlobalOptions.OwnerPid); } std::unique_ptr<std::thread> ShutdownThread; @@ -445,15 +545,17 @@ main(int argc, char* argv[]) Server.SetDataRoot(GlobalOptions.DataDir); Server.SetTestMode(GlobalOptions.IsTest); Server.SetDedicatedMode(GlobalOptions.IsDedicated); - Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid); + Server.Initialize(ServiceConfig, GlobalOptions.BasePort, GlobalOptions.OwnerPid, Entry); // Monitor shutdown signals ShutdownThread.reset(new std::thread{[&] { ZEN_INFO("shutdown monitor thread waiting for shutdown signal '{}'", ShutdownEventName); - ShutdownEvent->Wait(); - ZEN_INFO("shutdown signal received"); - Server.RequestExit(0); + if (ShutdownEvent->Wait()) + { + ZEN_INFO("shutdown signal received"); + Server.RequestExit(0); + } }}); // If we have a parent process, establish the mechanisms we need @@ -480,3 +582,37 @@ main(int argc, char* argv[]) return 0; } + +int +main(int argc, char* argv[]) +{ + using namespace zen; + +#if ZEN_USE_MIMALLOC + mi_version(); +#endif + + ZenServerOptions GlobalOptions; + ZenServiceConfig ServiceConfig; + ParseGlobalCliOptions(argc, argv, GlobalOptions, ServiceConfig); + InitializeLogging(GlobalOptions); + +#if ZEN_PLATFORM_WINDOWS + if (GlobalOptions.InstallService) + { + WindowsService::Install(); + + std::exit(0); + } + + if (GlobalOptions.UninstallService) + { + WindowsService::Delete(); + + std::exit(0); + } +#endif + + ZenWindowsService App(GlobalOptions, ServiceConfig); + return App.ServiceMain(); +} diff --git a/zenserver/zenserver.vcxproj b/zenserver/zenserver.vcxproj index aa9d538a5..db657d192 100644 --- a/zenserver/zenserver.vcxproj +++ b/zenserver/zenserver.vcxproj @@ -123,6 +123,7 @@ <ClInclude Include="upstream\upstreamcache.h" /> <ClInclude Include="upstream\zen.h" /> <ClInclude Include="vfs.h" /> + <ClInclude Include="windows\service.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="cache\structuredcache.cpp" /> @@ -142,6 +143,7 @@ <ClCompile Include="upstream\upstreamcache.cpp" /> <ClCompile Include="upstream\zen.cpp" /> <ClCompile Include="vfs.cpp" /> + <ClCompile Include="windows\service.cpp" /> <ClCompile Include="zenserver.cpp" /> </ItemGroup> <ItemGroup> diff --git a/zenserver/zenserver.vcxproj.filters b/zenserver/zenserver.vcxproj.filters index a86a6d96d..250c55812 100644 --- a/zenserver/zenserver.vcxproj.filters +++ b/zenserver/zenserver.vcxproj.filters @@ -39,6 +39,7 @@ <Filter>upstream</Filter> </ClInclude> <ClInclude Include="testing\httptest.h" /> + <ClInclude Include="windows\service.h" /> </ItemGroup> <ItemGroup> <ClCompile Include="zenserver.cpp" /> @@ -73,6 +74,7 @@ <Filter>upstream</Filter> </ClCompile> <ClCompile Include="testing\httptest.cpp" /> + <ClCompile Include="windows\service.cpp" /> </ItemGroup> <ItemGroup> <Filter Include="cache"> diff --git a/zenstore-test/xmake.lua b/zenstore-test/xmake.lua new file mode 100644 index 000000000..c8995dab2 --- /dev/null +++ b/zenstore-test/xmake.lua @@ -0,0 +1,5 @@ +target("zenstore-test") + set_kind("binary") + add_files("*.cpp") + add_deps("zenstore", "zencore") + add_packages("vcpkg::doctest") diff --git a/zenstore-test/zenstore-test.cpp b/zenstore-test/zenstore-test.cpp new file mode 100644 index 000000000..e6bd92ab9 --- /dev/null +++ b/zenstore-test/zenstore-test.cpp @@ -0,0 +1,24 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zencore/logging.h> +#include <zencore/zencore.h> +#include <zenstore/zenstore.h> + +#if ZEN_WITH_TESTS +# define DOCTEST_CONFIG_IMPLEMENT +# include <zencore/testing.h> +# undef DOCTEST_CONFIG_IMPLEMENT +#endif + +int +main([[maybe_unused]] int argc, [[maybe_unused]] char* argv[]) +{ +#if ZEN_WITH_TESTS + zen::zenstore_forcelinktests(); + + zen::logging::InitializeLogging(); + + return doctest::Context(argc, argv).run(); +#else +#endif +} diff --git a/zenstore-test/zenstore-test.vcxproj b/zenstore-test/zenstore-test.vcxproj new file mode 100644 index 000000000..201594a25 --- /dev/null +++ b/zenstore-test/zenstore-test.vcxproj @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project DefaultTargets="Build" ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup Label="ProjectConfigurations"> + <ProjectConfiguration Include="Debug|x64"> + <Configuration>Debug</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + <ProjectConfiguration Include="Release|x64"> + <Configuration>Release</Configuration> + <Platform>x64</Platform> + </ProjectConfiguration> + </ItemGroup> + <PropertyGroup Label="Globals"> + <VCProjectVersion>15.0</VCProjectVersion> + <ProjectGuid>{C001A3DF-B76E-4989-B576-FE2B78AB2580}</ProjectGuid> + <Keyword>Win32Proj</Keyword> + <RootNamespace>zenstoretest</RootNamespace> + <WindowsTargetPlatformVersion>10.0</WindowsTargetPlatformVersion> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>true</UseDebugLibraries> + <PlatformToolset>v142</PlatformToolset> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'" Label="Configuration"> + <ConfigurationType>Application</ConfigurationType> + <UseDebugLibraries>false</UseDebugLibraries> + <PlatformToolset>v142</PlatformToolset> + <WholeProgramOptimization>false</WholeProgramOptimization> + <CharacterSet>Unicode</CharacterSet> + </PropertyGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" /> + <ImportGroup Label="ExtensionSettings"> + </ImportGroup> + <ImportGroup Label="Shared"> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="..\zenfs_common.props" /> + </ImportGroup> + <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" /> + <Import Project="..\zenfs_common.props" /> + </ImportGroup> + <PropertyGroup Label="UserMacros" /> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <LinkIncremental>true</LinkIncremental> + </PropertyGroup> + <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <LinkIncremental>false</LinkIncremental> + </PropertyGroup> + <PropertyGroup Label="Vcpkg" Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <VcpkgEnableManifest>true</VcpkgEnableManifest> + <VcpkgUseStatic>true</VcpkgUseStatic> + <VcpkgAdditionalInstallOptions>--overlay-ports=$(SolutionDir)vcpkg_overlay-ports</VcpkgAdditionalInstallOptions> + </PropertyGroup> + <PropertyGroup Label="Vcpkg" Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <VcpkgEnableManifest>true</VcpkgEnableManifest> + <VcpkgUseStatic>true</VcpkgUseStatic> + <VcpkgAdditionalInstallOptions>--overlay-ports=$(SolutionDir)vcpkg_overlay-ports</VcpkgAdditionalInstallOptions> + </PropertyGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <Optimization>Disabled</Optimization> + <SDLCheck>true</SDLCheck> + <PreprocessorDefinitions>_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <ConformanceMode>true</ConformanceMode> + <LanguageStandard>stdcpplatest</LanguageStandard> + <RuntimeLibrary>MultiThreadedDebug</RuntimeLibrary> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <GenerateDebugInformation>true</GenerateDebugInformation> + <AdditionalDependencies>kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies> + <AdditionalLibraryDirectories>%(AdditionalLibraryDirectories)</AdditionalLibraryDirectories> + </Link> + <ProjectReference /> + </ItemDefinitionGroup> + <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|x64'"> + <ClCompile> + <PrecompiledHeader>NotUsing</PrecompiledHeader> + <Optimization>MaxSpeed</Optimization> + <FunctionLevelLinking>true</FunctionLevelLinking> + <IntrinsicFunctions>true</IntrinsicFunctions> + <SDLCheck>true</SDLCheck> + <PreprocessorDefinitions>NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions> + <ConformanceMode>true</ConformanceMode> + <WholeProgramOptimization>false</WholeProgramOptimization> + <LanguageStandard>stdcpplatest</LanguageStandard> + <RuntimeLibrary>MultiThreaded</RuntimeLibrary> + </ClCompile> + <Link> + <SubSystem>Console</SubSystem> + <EnableCOMDATFolding>true</EnableCOMDATFolding> + <OptimizeReferences>true</OptimizeReferences> + <GenerateDebugInformation>true</GenerateDebugInformation> + <ShowProgress>NotSet</ShowProgress> + </Link> + <ProjectReference /> + </ItemDefinitionGroup> + <ItemGroup> + <ClInclude Include="targetver.h" /> + </ItemGroup> + <ItemGroup> + <ClCompile Include="zenstore-test.cpp" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="..\zencore\zencore.vcxproj"> + <Project>{d75bf9ab-c61e-4fff-ad59-1563430f05e2}</Project> + </ProjectReference> + <ProjectReference Include="..\zenstore\zenstore.vcxproj"> + <Project>{26cbbaeb-14c1-4efc-877d-80f48215651c}</Project> + </ProjectReference> + </ItemGroup> + <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> + <ImportGroup Label="ExtensionTargets"> + </ImportGroup> +</Project>
\ No newline at end of file diff --git a/zenstore-test/zenstore-test.vcxproj.filters b/zenstore-test/zenstore-test.vcxproj.filters new file mode 100644 index 000000000..000599c79 --- /dev/null +++ b/zenstore-test/zenstore-test.vcxproj.filters @@ -0,0 +1,9 @@ +<?xml version="1.0" encoding="utf-8"?> +<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <ItemGroup> + <ClInclude Include="targetver.h" /> + </ItemGroup> + <ItemGroup> + <ClCompile Include="zenstore-test.cpp" /> + </ItemGroup> +</Project>
\ No newline at end of file diff --git a/zenstore/CAS.cpp b/zenstore/CAS.cpp index e77c0ed64..916e7f709 100644 --- a/zenstore/CAS.cpp +++ b/zenstore/CAS.cpp @@ -5,12 +5,13 @@ #include "compactcas.h" #include "filecas.h" -#include <doctest/doctest.h> #include <zencore/except.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/memory.h> #include <zencore/string.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <zencore/thread.h> #include <zencore/uid.h> @@ -20,15 +21,23 @@ #include <functional> #include <unordered_map> -struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- -#include <atlfile.h> - ////////////////////////////////////////////////////////////////////////// namespace zen { +void +ScrubContext::ReportBadChunks(std::span<IoHash> BadChunks) +{ + ZEN_UNUSED(BadChunks); +} + /** - * Slightly less naive CAS store + * CAS store implementation + * + * Uses a basic strategy of splitting payloads by size, to improve ability to reclaim space + * quickly for unused large chunks and to maintain locality for small chunks which are + * frequently accessed together. + * */ class CasImpl : public CasStore { @@ -41,16 +50,15 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) override; virtual void FilterChunks(CasChunkSet& InOutChunks) override; virtual void Flush() override; + virtual void Scrub(ScrubContext& Ctx) override; private: - void PickDefaultDirectory(); - CasContainerStrategy m_TinyStrategy; CasContainerStrategy m_SmallStrategy; FileCasStrategy m_LargeStrategy; }; -CasImpl::CasImpl() : m_TinyStrategy(m_Config, m_Stats), m_SmallStrategy(m_Config, m_Stats), m_LargeStrategy(m_Config, m_Stats) +CasImpl::CasImpl() : m_TinyStrategy(m_Config), m_SmallStrategy(m_Config), m_LargeStrategy(m_Config) { } @@ -63,13 +71,16 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) { m_Config = InConfig; - ZEN_INFO("initializing CAS pool at {}", m_Config.RootDirectory); + ZEN_INFO("initializing CAS pool at '{}'", m_Config.RootDirectory); // Ensure root directory exists - create if it doesn't exist already std::filesystem::create_directories(m_Config.RootDirectory); // Open or create manifest + // + // The manifest is not currently fully implemented. The goal is to + // use it for recovery and configuration bool IsNewStore = false; @@ -77,23 +88,22 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) std::filesystem::path ManifestPath = m_Config.RootDirectory; ManifestPath /= ".ucas_root"; - CAtlFile marker; - HRESULT hRes = marker.Create(ManifestPath.c_str(), GENERIC_READ, 0, OPEN_EXISTING); + std::error_code Ec; + BasicFile Marker; + Marker.Open(ManifestPath.c_str(), /* IsCreate */ false, Ec); - if (FAILED(hRes)) + if (Ec) { IsNewStore = true; ExtendableStringBuilder<128> manifest; - manifest.Append("#CAS_ROOT\n"); // TODO: should write something meaningful here + manifest.Append("#CAS_ROOT\n"); manifest.Append("ID="); zen::Oid id = zen::Oid::NewOid(); id.ToString(manifest); - hRes = marker.Create(ManifestPath.c_str(), GENERIC_WRITE, 0, CREATE_ALWAYS); - - if (SUCCEEDED(hRes)) - marker.Write(manifest.c_str(), (DWORD)manifest.Size()); + Marker.Open(ManifestPath.c_str(), /* IsCreate */ true); + Marker.Write(manifest.c_str(), (DWORD)manifest.Size(), 0); } } @@ -101,6 +111,9 @@ CasImpl::Initialize(const CasStoreConfiguration& InConfig) m_TinyStrategy.Initialize("tobs", 16, IsNewStore); m_SmallStrategy.Initialize("sobs", 4096, IsNewStore); + + ScrubContext Ctx; + Scrub(Ctx); } CasStore::InsertResult @@ -160,6 +173,14 @@ CasImpl::Flush() m_LargeStrategy.Flush(); } +void +CasImpl::Scrub(ScrubContext& Ctx) +{ + m_SmallStrategy.Scrub(Ctx); + m_TinyStrategy.Scrub(Ctx); + m_LargeStrategy.Scrub(Ctx); +} + ////////////////////////////////////////////////////////////////////////// CasStore* @@ -173,18 +194,51 @@ CreateCasStore() // Testing related code follows... // -void -CAS_forcelink() -{ -} +#if ZEN_WITH_TESTS TEST_CASE("CasStore") { - zen::CasStoreConfiguration config; - config.RootDirectory = "c:\\temp\\test"; + ScopedTemporaryDirectory TempDir; + + CasStoreConfiguration config; + config.RootDirectory = TempDir.Path(); + + std::unique_ptr<CasStore> Store{CreateCasStore()}; + Store->Initialize(config); + + ScrubContext Ctx; + Store->Scrub(Ctx); + + IoBuffer Value1{16}; + memcpy(Value1.MutableData(), "1234567890123456", 16); + IoHash Hash1 = IoHash::HashBuffer(Value1.Data(), Value1.Size()); + CasStore::InsertResult Result1 = Store->InsertChunk(Value1, Hash1); + CHECK(Result1.New); + + IoBuffer Value2{16}; + memcpy(Value2.MutableData(), "ABCDEFGHIJKLMNOP", 16); + IoHash Hash2 = IoHash::HashBuffer(Value2.Data(), Value2.Size()); + CasStore::InsertResult Result2 = Store->InsertChunk(Value2, Hash2); + CHECK(Result2.New); + + CasChunkSet ChunkSet; + ChunkSet.AddChunk(Hash1); + ChunkSet.AddChunk(Hash2); - std::unique_ptr<zen::CasStore> store{CreateCasStore()}; - store->Initialize(config); + Store->FilterChunks(ChunkSet); + CHECK(ChunkSet.GetChunkSet().size() == 0); + + IoBuffer Lookup1 = Store->FindChunk(Hash1); + CHECK(Lookup1); + IoBuffer Lookup2 = Store->FindChunk(Hash2); + CHECK(Lookup2); } +void +CAS_forcelink() +{ +} + +#endif + } // namespace zen diff --git a/zenstore/basicfile.cpp b/zenstore/basicfile.cpp index 35ccdd042..fe54184cf 100644 --- a/zenstore/basicfile.cpp +++ b/zenstore/basicfile.cpp @@ -5,6 +5,8 @@ #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/testing.h> +#include <zencore/testutils.h> #include <fmt/format.h> #include <gsl/gsl-lite.hpp> @@ -13,16 +15,54 @@ namespace zen { using namespace fmt::literals; +BasicFile::~BasicFile() +{ + Close(); +} + +void +BasicFile::Open(std::filesystem::path FileName, bool IsCreate) +{ + std::error_code Ec; + Open(FileName, IsCreate, Ec); + + if (Ec) + { + throw std::system_error(Ec, "failed to open file '{}'"_format(FileName)); + } +} + void -BasicFile::Open(std::filesystem::path FileName, bool isCreate) +BasicFile::Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec) { - const DWORD dwCreationDisposition = isCreate ? CREATE_ALWAYS : OPEN_EXISTING; + const DWORD dwCreationDisposition = IsCreate ? CREATE_ALWAYS : OPEN_EXISTING; + const DWORD dwDesiredAccess = GENERIC_READ | GENERIC_WRITE; + const DWORD dwShareMode = FILE_SHARE_READ; + const DWORD dwFlagsAndAttributes = FILE_ATTRIBUTE_NORMAL; + HANDLE hTemplateFile = nullptr; + + HANDLE FileHandle = CreateFile(FileName.c_str(), + dwDesiredAccess, + dwShareMode, + /* lpSecurityAttributes */ nullptr, + dwCreationDisposition, + dwFlagsAndAttributes, + hTemplateFile); + + if (FileHandle == INVALID_HANDLE_VALUE) + { + Ec = zen::MakeErrorCodeFromLastError(); + } - HRESULT hRes = m_File.Create(FileName.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ, dwCreationDisposition); + m_FileHandle = FileHandle; +} - if (FAILED(hRes)) +void +BasicFile::Close() +{ + if (m_FileHandle) { - ThrowSystemException(hRes, "Failed to open bucket sobs file '{}'"_format(FileName)); + ::CloseHandle(m_FileHandle); } } @@ -34,11 +74,14 @@ BasicFile::Read(void* Data, uint64_t Size, uint64_t Offset) Ovl.Offset = DWORD(Offset & 0xffff'ffffu); Ovl.OffsetHigh = DWORD(Offset >> 32); - HRESULT hRes = m_File.Read(Data, gsl::narrow<DWORD>(Size), &Ovl); + DWORD dwNumberOfBytesToRead = gsl::narrow<DWORD>(Size); + DWORD dwNumberOfBytesRead = 0; + + BOOL Success = ::ReadFile(m_FileHandle, Data, dwNumberOfBytesToRead, &dwNumberOfBytesRead, &Ovl); - if (FAILED(hRes)) + if (!Success) { - ThrowSystemException(hRes, "Failed to read from file '{}'"_format(zen::PathFromHandle(m_File))); + ThrowLastError("Failed to read from file '{}'"_format(zen::PathFromHandle(m_FileHandle))); } } @@ -53,6 +96,35 @@ BasicFile::ReadAll() } void +BasicFile::StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) +{ + StreamByteRange(0, FileSize(), std::move(ChunkFun)); +} + +void +BasicFile::StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun) +{ + const uint64_t ChunkSize = 128 * 1024; + IoBuffer ReadBuffer{ChunkSize}; + void* BufferPtr = ReadBuffer.MutableData(); + + uint64_t RemainBytes = Size; + uint64_t CurrentOffset = FileOffset; + + while (RemainBytes) + { + const uint64_t ThisChunkBytes = zen::Min(ChunkSize, RemainBytes); + + Read(BufferPtr, ThisChunkBytes, CurrentOffset); + + ChunkFun(BufferPtr, ThisChunkBytes); + + CurrentOffset += ThisChunkBytes; + RemainBytes -= ThisChunkBytes; + } +} + +void BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) { OVERLAPPED Ovl{}; @@ -60,33 +132,50 @@ BasicFile::Write(const void* Data, uint64_t Size, uint64_t Offset) Ovl.Offset = DWORD(Offset & 0xffff'ffffu); Ovl.OffsetHigh = DWORD(Offset >> 32); - HRESULT hRes = m_File.Write(Data, gsl::narrow<DWORD>(Size), &Ovl); + DWORD dwNumberOfBytesToWrite = gsl::narrow<DWORD>(Size); + DWORD dwNumberOfBytesWritten = 0; + + BOOL Success = ::WriteFile(m_FileHandle, Data, dwNumberOfBytesToWrite, &dwNumberOfBytesWritten, &Ovl); - if (FAILED(hRes)) + if (!Success) { - ThrowSystemException(hRes, "Failed to write to file '{}'"_format(zen::PathFromHandle(m_File))); + ThrowLastError("Failed to write to file '{}'"_format(zen::PathFromHandle(m_FileHandle))); } } void BasicFile::Flush() { - m_File.Flush(); + FlushFileBuffers(m_FileHandle); } uint64_t BasicFile::FileSize() { - ULONGLONG Sz; - m_File.GetSize(Sz); + ULARGE_INTEGER liFileSize; + liFileSize.LowPart = ::GetFileSize(m_FileHandle, &liFileSize.HighPart); + + return uint64_t(liFileSize.QuadPart); +} + +#if ZEN_WITH_TESTS + +TEST_CASE("BasicFile") +{ + ScopedCurrentDirectoryChange _; - return uint64_t(Sz); + BasicFile File1; + CHECK_THROWS(File1.Open("zonk", false)); + CHECK_NOTHROW(File1.Open("zonk", true)); + CHECK_NOTHROW(File1.Write("abcd", 4, 0)); + CHECK(File1.FileSize() == 4); } void -BasicFile::Close() +basicfile_forcelink() { - m_File.Close(); } +#endif + } // namespace zen diff --git a/zenstore/caslog.cpp b/zenstore/caslog.cpp index 70bcf4669..dc6021544 100644 --- a/zenstore/caslog.cpp +++ b/zenstore/caslog.cpp @@ -62,7 +62,7 @@ CasLogFile::Open(std::filesystem::path FileName, size_t RecordSize, bool IsCreat if (IsCreate) { // Initialize log by writing header - FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = zen::Oid::NewOid(), .ValidatedTail = 0}; + FileHeader Header = {.RecordSize = gsl::narrow<uint32_t>(RecordSize), .LogId = Oid::NewOid(), .ValidatedTail = 0}; memcpy(Header.Magic, FileHeader::MagicSequence, sizeof Header.Magic); Header.Finalize(); @@ -128,7 +128,7 @@ CasLogFile::Replay(std::function<void(const void*)>&& Handler) if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to read log file"); + ThrowSystemException(hRes, "Failed to read log file"); } for (int i = 0; i < LogEntryCount; ++i) @@ -144,7 +144,7 @@ CasLogFile::Append(const void* DataPointer, uint64_t DataSize) if (FAILED(hRes)) { - zen::ThrowSystemException(hRes, "Failed to write to log file '{}'"_format(zen::PathFromHandle(m_File))); + ThrowSystemException(hRes, "Failed to write to log file '{}'"_format(PathFromHandle(m_File))); } } diff --git a/zenstore/cidstore.cpp b/zenstore/cidstore.cpp index 100054a0e..5e266f9d3 100644 --- a/zenstore/cidstore.cpp +++ b/zenstore/cidstore.cpp @@ -73,24 +73,96 @@ struct CidStore::CidState bool ContainsChunk(const IoHash& DecompressedId) { RwLock::SharedLockScope _(m_Lock); + // Note that we do not check CAS here. This is optimistic but usually + // what we want. return m_CidMap.find(DecompressedId) != m_CidMap.end(); } void InitializeIndex(const std::filesystem::path& RootDir) { - zen::CreateDirectories(RootDir); + CreateDirectories(RootDir); std::filesystem::path SlogPath{RootDir / "cid.slog"}; bool IsNew = !std::filesystem::exists(SlogPath); m_LogFile.Open(SlogPath, IsNew); - m_LogFile.Replay([&](const IndexEntry& Ie) { m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); }); + m_LogFile.Replay([&](const IndexEntry& Ie) { + if (Ie.Compressed != IoHash::Zero) + { + // Update + m_CidMap.insert_or_assign(Ie.Uncompressed, Ie.Compressed); + } + else + { + // Tombstone + m_CidMap.erase(Ie.Uncompressed); + } + }); ZEN_DEBUG("CID index initialized: {} entries found", m_CidMap.size()); } void Flush() { m_LogFile.Flush(); } + + void Scrub(ScrubContext& Ctx) + { + CasChunkSet ChunkSet; + + { + RwLock::SharedLockScope _(m_Lock); + + for (auto& Kv : m_CidMap) + { + ChunkSet.AddChunk(Kv.second); + } + } + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + // All good - we have all the chunks + return; + } + + ZEN_ERROR("Scrubbing found that {} cid mappings mapped to non-existent CAS chunks", ChunkSet.GetChunkSet().size()); + + // Erase all mappings to chunks which are not present in the underlying CAS store + // we do this by removing mappings from the in-memory lookup structure and also + // by emitting tombstone records to the commit log + + const auto& MissingChunks = ChunkSet.GetChunkSet(); + std::vector<IoHash> BadChunks; + { + RwLock::SharedLockScope _(m_Lock); + + for (auto It = begin(m_CidMap), ItEnd = end(m_CidMap); It != ItEnd;) + { + if (auto MissingIt = MissingChunks.find(It->second); MissingIt != MissingChunks.end()) + { + const IoHash& BadHash = It->first; + + // Log a tombstone record + m_LogFile.Append({.Uncompressed = BadHash, .Compressed = IoHash::Zero}); + + BadChunks.push_back(BadHash); + + It = m_CidMap.erase(It); + } + else + { + ++It; + } + } + } + + m_LogFile.Flush(); + + // TODO: Should compute a snapshot index here + + Ctx.ReportBadChunks(BadChunks); + } }; ////////////////////////////////////////////////////////////////////////// @@ -134,4 +206,10 @@ CidStore::Flush() m_Impl->Flush(); } +void +CidStore::Scrub(ScrubContext& Ctx) +{ + m_Impl->Scrub(Ctx); +} + } // namespace zen diff --git a/zenstore/compactcas.cpp b/zenstore/compactcas.cpp index 4407d8b08..fe38f0fde 100644 --- a/zenstore/compactcas.cpp +++ b/zenstore/compactcas.cpp @@ -10,18 +10,22 @@ #include <zencore/thread.h> #include <zencore/uid.h> -#include <gsl/gsl-lite.hpp> - -#include <functional> - -struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- -#include <atlfile.h> #include <filesystem> +#include <functional> +#include <gsl/gsl-lite.hpp> ////////////////////////////////////////////////////////////////////////// namespace zen { +CasContainerStrategy::CasContainerStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +{ +} + +CasContainerStrategy::~CasContainerStrategy() +{ +} + void CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore) { @@ -43,8 +47,10 @@ CasContainerStrategy::Initialize(const std::string_view ContainerBaseName, uint6 uint64_t MaxFileOffset = 0; { - // This is not technically necessary but may help future static analysis - zen::RwLock::ExclusiveLockScope _(m_LocationMapLock); + // This is not technically necessary (nobody should be accessing us from + // another thread at this stage) but may help static analysis + + RwLock::ExclusiveLockScope _(m_LocationMapLock); m_CasLog.Replay([&](const CasDiskIndexEntry& Record) { m_LocationMap[Record.Key] = Record.Location; @@ -103,12 +109,11 @@ IoBuffer CasContainerStrategy::FindChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt != m_LocationMap.end()) + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { const CasDiskLocation& Location = KeyIt->second; - return zen::IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); + return IoBufferBuilder::MakeFromFileHandle(m_SmallObjectFile.Handle(), Location.Offset, Location.Size); } // Not found @@ -120,9 +125,8 @@ bool CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) { RwLock::SharedLockScope _(m_LocationMapLock); - auto KeyIt = m_LocationMap.find(ChunkHash); - if (KeyIt != m_LocationMap.end()) + if (auto KeyIt = m_LocationMap.find(ChunkHash); KeyIt != m_LocationMap.end()) { return true; } @@ -133,6 +137,13 @@ CasContainerStrategy::HaveChunk(const IoHash& ChunkHash) void CasContainerStrategy::FilterChunks(CasChunkSet& InOutChunks) { + // This implementation is good enough for relatively small + // chunk sets (in terms of chunk identifiers), but would + // benefit from a better implementation which removes + // items incrementally for large sets, especially when + // we're likely to already have a large proportion of the + // chunks in the set + std::unordered_set<IoHash> HaveSet; for (const IoHash& Hash : InOutChunks.GetChunkSet()) @@ -157,4 +168,113 @@ CasContainerStrategy::Flush() m_SmallObjectFile.Flush(); } +void +CasContainerStrategy::Scrub(ScrubContext& Ctx) +{ + const uint64_t WindowSize = 4 * 1024 * 1024; + uint64_t WindowStart = 0; + uint64_t WindowEnd = WindowSize; + const uint64_t FileSize = m_SmallObjectFile.FileSize(); + + std::vector<CasDiskIndexEntry> BigChunks; + std::vector<CasDiskIndexEntry> BadChunks; + + // We do a read sweep through the payloads file and validate + // any entries that are contained within each segment, with + // the assumption that most entries will be checked in this + // pass. An alternative strategy would be to use memory mapping. + + { + IoBuffer ReadBuffer{WindowSize}; + void* BufferBase = ReadBuffer.MutableData(); + + RwLock::SharedLockScope _(m_LocationMapLock); + + do + { + const uint64_t ChunkSize = Min(WindowSize, FileSize - WindowStart); + m_SmallObjectFile.Read(BufferBase, ChunkSize, WindowStart); + + for (auto& Entry : m_LocationMap) + { + const uint64_t EntryOffset = Entry.second.Offset; + + if ((EntryOffset >= WindowStart) && (EntryOffset < WindowEnd)) + { + const uint64_t EntryEnd = EntryOffset + Entry.second.Size; + + if (EntryEnd >= WindowEnd) + { + BigChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + + continue; + } + + const IoHash ComputedHash = IoHash::HashBuffer(BufferBase, Entry.second.Size); + + if (Entry.first != ComputedHash) + { + // Hash mismatch + + BadChunks.push_back({.Key = Entry.first, .Location = Entry.second}); + } + } + } + + WindowStart += WindowSize; + WindowEnd += WindowSize; + } while (WindowStart < FileSize); + } + + // Deal with large chunks + + for (const CasDiskIndexEntry& Entry : BigChunks) + { + IoHashStream Hasher; + m_SmallObjectFile.StreamByteRange(Entry.Location.Offset, Entry.Location.Size, [&](const void* Data, uint64_t Size) { + Hasher.Append(Data, Size); + }); + IoHash ComputedHash = Hasher.GetHash(); + + if (Entry.Key != ComputedHash) + { + BadChunks.push_back(Entry); + } + } + + // Deal with bad chunks by removing them from our lookup map + + std::vector<IoHash> BadChunkHashes; + + for (const CasDiskIndexEntry& Entry : BadChunks) + { + BadChunkHashes.push_back(Entry.Key); + m_LocationMap.erase(Entry.Key); + } + + // Let whomever it concerns know about the bad chunks. This could + // be used to invalidate higher level data structures more efficiently + // than a full validation pass might be able to do + + Ctx.ReportBadChunks(BadChunkHashes); +} + +void +CasContainerStrategy::MakeSnapshot() +{ + RwLock::SharedLockScope _(m_LocationMapLock); + + std::vector<CasDiskIndexEntry> Entries{m_LocationMap.size()}; + + uint64_t EntryIndex = 0; + for (auto& Entry : m_LocationMap) + { + CasDiskIndexEntry& IndexEntry = Entries[EntryIndex++]; + IndexEntry.Key = Entry.first; + IndexEntry.Location = Entry.second; + } + + m_SmallObjectIndex.Write(Entries.data(), Entries.size() * sizeof(CasDiskIndexEntry), 0); +} + } // namespace zen diff --git a/zenstore/compactcas.h b/zenstore/compactcas.h index 05bbf81f6..101e6b1b7 100644 --- a/zenstore/compactcas.h +++ b/zenstore/compactcas.h @@ -14,9 +14,6 @@ #include <zenstore/cas.h> #include <zenstore/caslog.h> -#include <atlfile.h> -#include <functional> - namespace zen { ////////////////////////////////////////////////////////////////////////// @@ -27,7 +24,10 @@ namespace zen { struct CasDiskLocation { uint64_t Offset; - uint32_t Size; // TODO: Make this more like the IoStore index so we can store larger chunks (should be five bytes) + // If we wanted to be able to store larger chunks using this storage mechanism then + // we could make this more like the IoStore index so we can store larger chunks. + // I.e use five bytes for size and seven for offset + uint32_t Size; }; struct CasDiskIndexEntry @@ -50,7 +50,9 @@ static_assert(sizeof(CasDiskIndexEntry) == 32); struct CasContainerStrategy { - CasContainerStrategy(const CasStoreConfiguration& Config, CasStore::Stats& Stats) : m_Config(Config), m_Stats(Stats) {} + CasContainerStrategy(const CasStoreConfiguration& Config); + ~CasContainerStrategy(); + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& chunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -58,10 +60,10 @@ struct CasContainerStrategy void FilterChunks(CasChunkSet& InOutChunks); void Initialize(const std::string_view ContainerBaseName, uint64_t Alignment, bool IsNewStore); void Flush(); + void Scrub(ScrubContext& Ctx); private: const CasStoreConfiguration& m_Config; - CasStore::Stats& m_Stats; uint64_t m_PayloadAlignment = 1 << 4; bool m_IsInitialized = false; BasicFile m_SmallObjectFile; @@ -73,6 +75,8 @@ private: RwLock m_InsertLock; // used to serialize inserts std::atomic<uint64_t> m_CurrentInsertOffset = 0; std::atomic<uint64_t> m_CurrentIndexOffset = 0; + + void MakeSnapshot(); }; } // namespace zen diff --git a/zenstore/filecas.cpp b/zenstore/filecas.cpp index 170f13875..968c9f3a0 100644 --- a/zenstore/filecas.cpp +++ b/zenstore/filecas.cpp @@ -10,6 +10,7 @@ #include <zencore/string.h> #include <zencore/thread.h> #include <zencore/uid.h> +#include <zenstore/basicfile.h> #include <gsl/gsl-lite.hpp> @@ -17,6 +18,7 @@ #include <functional> #include <unordered_map> +// clang-format off #include <zencore/prewindows.h> struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax error: 'identifier' was unexpected here" when using /permissive- @@ -24,13 +26,19 @@ struct IUnknown; // Workaround for "combaseapi.h(229): error C2187: syntax erro #include <zencore/postwindows.h> // clang-format on -// -////////////////////////////////////////////////////////////////////////// namespace zen { using namespace fmt::literals; +FileCasStrategy::FileCasStrategy(const CasStoreConfiguration& Config) : m_Config(Config) +{ +} + +FileCasStrategy::~FileCasStrategy() +{ +} + WideStringBuilderBase& FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len) { @@ -56,7 +64,7 @@ FileCasStrategy::MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHas OutShard2len = ShardedPath.Size(); ShardedPath.Append('\\'); - ShardedPath.AppendAsciiRange(str + 6, str + 64); + ShardedPath.AppendAsciiRange(str + 5, str + 40); return ShardedPath; } @@ -151,7 +159,7 @@ FileCasStrategy::InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash) if (FAILED(hRes)) { - zen::CreateDirectories(FilePath.c_str()); + CreateDirectories(FilePath.c_str()); hRes = InternalCreateDirectoryHandle(); } @@ -259,12 +267,9 @@ FileCasStrategy::InsertChunk(const void* const ChunkData, const size_t ChunkSize } // We cannot rely on RAII to close the file handle since it would be closed - // *after* the lock is released due to the initialization order. + // *after* the lock is released due to the initialization order PayloadFile.Close(); - AtomicIncrement(m_Stats.PutCount); - AtomicAdd(m_Stats.PutBytes, ChunkSize); - return {.New = true}; } @@ -279,15 +284,7 @@ FileCasStrategy::FindChunk(const IoHash& ChunkHash) RwLock::SharedLockScope _(LockForHash(ChunkHash)); - auto Chunk = IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); - - if (Chunk) - { - AtomicIncrement(m_Stats.GetCount); - AtomicAdd(m_Stats.GetBytes, Chunk.Size()); - } - - return Chunk; + return IoBufferBuilder::MakeFromFile(ShardedPath.c_str()); } bool @@ -338,6 +335,62 @@ FileCasStrategy::FilterChunks(CasChunkSet& InOutChunks) } void +FileCasStrategy::IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback) +{ + struct Visitor : public FileSystemTraversal::TreeVisitor + { + Visitor(const std::filesystem::path& RootDir) : RootDirectory(RootDir) {} + virtual void VisitFile(const std::filesystem::path& Parent, const std::wstring_view& File, uint64_t FileSize) override + { + ZEN_UNUSED(FileSize); + + std::filesystem::path RelPath = std::filesystem::relative(Parent, RootDirectory); + + std::wstring PathString = RelPath.native(); + + if ((PathString.size() == (3 + 2 + 1)) && (File.size() == (40 - 3 - 2))) + { + if (PathString.at(3) == std::filesystem::path::preferred_separator) + { + PathString.erase(3, 1); + } + PathString.append(File); + + StringBuilder<64> Utf8; + WideToUtf8(PathString, Utf8); + + // TODO: should validate that we're actually dealing with a valid hex string here + + IoHash NameHash = IoHash::FromHexString({Utf8.Data(), Utf8.Size()}); + + BasicFile PayloadFile; + std::error_code Ec; + PayloadFile.Open(Parent / File, false, Ec); + + if (!Ec) + { + Callback(NameHash, PayloadFile); + } + } + } + + virtual bool VisitDirectory([[maybe_unused]] const std::filesystem::path& Parent, + [[maybe_unused]] const std::wstring_view& DirectoryName) + { + return true; + } + + const std::filesystem::path& RootDirectory; + std::function<void(const IoHash& Hash, BasicFile& PayloadFile)> Callback; + } CasVisitor{m_Config.RootDirectory}; + + CasVisitor.Callback = std::move(Callback); + + FileSystemTraversal Traversal; + Traversal.TraverseFileSystem(m_Config.RootDirectory, CasVisitor); +} + +void FileCasStrategy::Flush() { // Since we don't keep files open after writing there's nothing specific @@ -353,6 +406,25 @@ FileCasStrategy::Flush() } void +FileCasStrategy::Scrub(ScrubContext& Ctx) +{ + std::vector<IoHash> BadHashes; + + IterateChunks([&](const IoHash& Hash, BasicFile& Payload) { + IoHashStream Hasher; + Payload.StreamFile([&](const void* Data, size_t Size) { Hasher.Append(Data, Size); }); + IoHash ComputedHash = Hasher.GetHash(); + + if (ComputedHash != Hash) + { + BadHashes.push_back(Hash); + } + }); + + Ctx.ReportBadChunks(BadHashes); +} + +void FileCasStrategy::GarbageCollect(GcContext& GcCtx) { ZEN_UNUSED(GcCtx); diff --git a/zenstore/filecas.h b/zenstore/filecas.h index 448d1a05f..18102968a 100644 --- a/zenstore/filecas.h +++ b/zenstore/filecas.h @@ -10,11 +10,20 @@ #include <zencore/thread.h> #include <zenstore/cas.h> +#include <functional> + namespace zen { +class BasicFile; + +/** CAS storage strategy using a file-per-chunk storage strategy + */ + struct FileCasStrategy { - FileCasStrategy(const CasStoreConfiguration& Config, CasStore::Stats& Stats) : m_Config(Config), m_Stats(Stats) {} + FileCasStrategy(const CasStoreConfiguration& Config); + ~FileCasStrategy(); + CasStore::InsertResult InsertChunk(const void* ChunkData, size_t ChunkSize, const IoHash& ChunkHash); CasStore::InsertResult InsertChunk(IoBuffer Chunk, const IoHash& ChunkHash); IoBuffer FindChunk(const IoHash& ChunkHash); @@ -22,15 +31,16 @@ struct FileCasStrategy void FilterChunks(CasChunkSet& InOutChunks); void Flush(); void GarbageCollect(GcContext& GcCtx); + void Scrub(ScrubContext& Ctx); private: const CasStoreConfiguration& m_Config; - CasStore::Stats& m_Stats; RwLock m_Lock; RwLock m_ShardLocks[256]; // TODO: these should be spaced out so they don't share cache lines inline RwLock& LockForHash(const IoHash& Hash) { return m_ShardLocks[Hash.Hash[19]]; } static WideStringBuilderBase& MakeShardedPath(WideStringBuilderBase& ShardedPath, const IoHash& ChunkHash, size_t& OutShard2len); + void IterateChunks(std::function<void(const IoHash& Hash, BasicFile& PayloadFile)>&& Callback); }; } // namespace zen diff --git a/zenstore/include/zenstore/CAS.h b/zenstore/include/zenstore/CAS.h index b4de533dd..bb310b179 100644 --- a/zenstore/include/zenstore/CAS.h +++ b/zenstore/include/zenstore/CAS.h @@ -2,7 +2,7 @@ #pragma once -#include <zencore/zencore.h> +#include "zenstore.h" #include <zencore/blake3.h> #include <zencore/iobuffer.h> @@ -37,6 +37,14 @@ public: private: }; +class ScrubContext +{ +public: + virtual void ReportBadChunks(std::span<IoHash> BadChunks); + +private: +}; + class CasChunkSet { public: @@ -54,17 +62,7 @@ class CasStore public: virtual ~CasStore() = default; - struct Stats - { - uint64_t PutBytes = 0; - uint64_t PutCount = 0; - - uint64_t GetBytes = 0; - uint64_t GetCount = 0; - }; - const CasStoreConfiguration& Config() { return m_Config; } - const Stats& GetStats() const { return m_Stats; } struct InsertResult { @@ -76,10 +74,10 @@ public: virtual IoBuffer FindChunk(const IoHash& ChunkHash) = 0; virtual void FilterChunks(CasChunkSet& InOutChunks) = 0; virtual void Flush() = 0; + virtual void Scrub(ScrubContext& Ctx) = 0; protected: CasStoreConfiguration m_Config; - Stats m_Stats; }; ZENCORE_API CasStore* CreateCasStore(); diff --git a/zenstore/include/zenstore/basicfile.h b/zenstore/include/zenstore/basicfile.h index c6f61d466..d4d65b366 100644 --- a/zenstore/include/zenstore/basicfile.h +++ b/zenstore/include/zenstore/basicfile.h @@ -2,34 +2,46 @@ #pragma once -#include <zencore/iobuffer.h> -#include <zencore/zencore.h> +#include "zenstore.h" +#include <zencore/iobuffer.h> #include <zencore/windows.h> -#include <atlfile.h> #include <filesystem> +#include <functional> namespace zen { /** * Probably the most basic file abstraction in the universe + * + * One thing of note is that there is no notion of a "current file position" + * in this API -- all reads and writes are done from explicit offsets in + * the file. This avoids concurrency issues which can occur otherwise. + * */ class BasicFile { public: + BasicFile() = default; + ~BasicFile(); void Open(std::filesystem::path FileName, bool IsCreate); - void Read(void* Data, uint64_t Size, uint64_t Offset); - void Write(const void* Data, uint64_t Size, uint64_t Offset); + void Open(std::filesystem::path FileName, bool IsCreate, std::error_code& Ec); + void Close(); + void Read(void* Data, uint64_t Size, uint64_t FileOffset); + void StreamFile(std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + void StreamByteRange(uint64_t FileOffset, uint64_t Size, std::function<void(const void* Data, uint64_t Size)>&& ChunkFun); + void Write(const void* Data, uint64_t Size, uint64_t FileOffset); void Flush(); uint64_t FileSize(); - void* Handle() { return m_File; } - void Close(); + void* Handle() { return m_FileHandle; } IoBuffer ReadAll(); private: - CAtlFile m_File; + void* m_FileHandle = nullptr; // This is either null or valid }; +ZENCORE_API void basicfile_forcelink(); + } // namespace zen diff --git a/zenstore/include/zenstore/caslog.h b/zenstore/include/zenstore/caslog.h index aea855e4c..1fbda0265 100644 --- a/zenstore/include/zenstore/caslog.h +++ b/zenstore/include/zenstore/caslog.h @@ -2,7 +2,7 @@ #pragma once -#include <zencore/zencore.h> +#include "zenstore.h" #include <zencore/iobuffer.h> #include <zencore/string.h> @@ -33,7 +33,7 @@ private: { uint8_t Magic[16]; uint32_t RecordSize = 0; - zen::Oid LogId; + Oid LogId; uint32_t ValidatedTail = 0; uint32_t Pad[6]; uint32_t Checksum = 0; diff --git a/zenstore/include/zenstore/cidstore.h b/zenstore/include/zenstore/cidstore.h index 76a33c915..49f2bf99a 100644 --- a/zenstore/include/zenstore/cidstore.h +++ b/zenstore/include/zenstore/cidstore.h @@ -2,6 +2,8 @@ #pragma once +#include "zenstore.h" + #include <tsl/robin_map.h> #include <zencore/iohash.h> #include <zenstore/CAS.h> @@ -43,6 +45,7 @@ public: IoBuffer FindChunkByCid(const IoHash& DecompressedId); bool ContainsChunk(const IoHash& DecompressedId); void Flush(); + void Scrub(ScrubContext& Ctx); // TODO: add batch filter support diff --git a/zenstore/include/zenstore/scrub.h b/zenstore/include/zenstore/scrub.h index 5a34d4860..4948afcd5 100644 --- a/zenstore/include/zenstore/scrub.h +++ b/zenstore/include/zenstore/scrub.h @@ -2,6 +2,8 @@ #pragma once +#include "zenstore.h" + #include <zencore/iohash.h> #include <span> diff --git a/zenstore/include/zenstore/zenstore.h b/zenstore/include/zenstore/zenstore.h new file mode 100644 index 000000000..46d62029d --- /dev/null +++ b/zenstore/include/zenstore/zenstore.h @@ -0,0 +1,13 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/zencore.h> + +#define ZENSTORE_API + +namespace zen { + +ZENSTORE_API void zenstore_forcelinktests(); + +} diff --git a/zenstore/zenstore.cpp b/zenstore/zenstore.cpp new file mode 100644 index 000000000..cd16e5634 --- /dev/null +++ b/zenstore/zenstore.cpp @@ -0,0 +1,17 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenstore/zenstore.h" + +#include <zenstore/CAS.h> +#include <zenstore/basicfile.h> + +namespace zen { + +void +zenstore_forcelinktests() +{ + basicfile_forcelink(); + CAS_forcelink(); +} + +} // namespace zen diff --git a/zenstore/zenstore.vcxproj b/zenstore/zenstore.vcxproj index 8d665f2c3..eb2ecd02b 100644 --- a/zenstore/zenstore.vcxproj +++ b/zenstore/zenstore.vcxproj @@ -19,6 +19,7 @@ <ClCompile Include="filecas.cpp" /> <ClCompile Include="gc.cpp" /> <ClCompile Include="scrub.cpp" /> + <ClCompile Include="zenstore.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="compactcas.h" /> @@ -29,12 +30,16 @@ <ClInclude Include="include\zenstore\scrub.h" /> <ClInclude Include="include\zenstore\CAS.h" /> <ClInclude Include="include\zenstore\caslog.h" /> + <ClInclude Include="include\zenstore\zenstore.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> <Project>{d75bf9ab-c61e-4fff-ad59-1563430f05e2}</Project> </ProjectReference> </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> + </ItemGroup> <PropertyGroup Label="Globals"> <VCProjectVersion>16.0</VCProjectVersion> <Keyword>Win32Proj</Keyword> diff --git a/zenstore/zenstore.vcxproj.filters b/zenstore/zenstore.vcxproj.filters index 3dfb89dbf..8a52c69f6 100644 --- a/zenstore/zenstore.vcxproj.filters +++ b/zenstore/zenstore.vcxproj.filters @@ -9,6 +9,7 @@ <ClCompile Include="scrub.cpp" /> <ClCompile Include="basicfile.cpp" /> <ClCompile Include="cidstore.cpp" /> + <ClCompile Include="zenstore.cpp" /> </ItemGroup> <ItemGroup> <ClInclude Include="compactcas.h" /> @@ -19,5 +20,9 @@ <ClInclude Include="include\zenstore\scrub.h" /> <ClInclude Include="include\zenstore\basicfile.h" /> <ClInclude Include="include\zenstore\cidstore.h" /> + <ClInclude Include="include\zenstore\zenstore.h" /> + </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> </ItemGroup> </Project>
\ No newline at end of file diff --git a/zenutil/include/zenserverprocess.h b/zenutil/include/zenutil/zenserverprocess.h index 7b41c8aba..09728aa1a 100644 --- a/zenutil/include/zenserverprocess.h +++ b/zenutil/include/zenutil/zenserverprocess.h @@ -5,11 +5,15 @@ #include <zencore/enumflags.h> #include <zencore/logging.h> #include <zencore/thread.h> +#include <zencore/uid.h> #include <gsl/gsl-lite.hpp> #include <atomic> #include <filesystem> +#include <optional> + +namespace zen { class ZenServerEnvironment { @@ -44,6 +48,9 @@ struct ZenServerInstance [[nodiscard]] bool WaitUntilReady(int Timeout); void EnableTermination() { m_Terminate = true; } void EnableMesh() { m_MeshEnabled = true; } + void Detach(); + inline int GetPid() { return m_Process.Pid(); } + inline void SetOwnerPid(int Pid) { m_OwnerPid = Pid; } void SetTestDir(std::filesystem::path TestDir) { @@ -59,13 +66,14 @@ struct ZenServerInstance private: ZenServerEnvironment& m_Env; - zen::ProcessHandle m_Process; - zen::Event m_ReadyEvent; - zen::Event m_ShutdownEvent; + ProcessHandle m_Process; + Event m_ReadyEvent; + Event m_ShutdownEvent; bool m_Terminate = false; std::filesystem::path m_TestDir; bool m_MeshEnabled = false; int m_BasePort = 0; + std::optional<int> m_OwnerPid; void CreateShutdownEvent(int BasePort); }; @@ -90,7 +98,9 @@ public: std::atomic<uint16_t> ListenPort; std::atomic<uint16_t> Flags; uint8_t SessionId[12]; + std::atomic<uint32_t> SponsorPids[32]; uint8_t Padding[12]; + uint8_t Padding2[96]; enum class FlagsEnum : uint16_t { @@ -99,11 +109,13 @@ public: FRIEND_ENUM_CLASS_FLAGS(FlagsEnum); + Oid GetSessionId() const { return Oid::FromMemory(SessionId); } void Reset(); void SignalShutdownRequest(); + bool AddSponsorProcess(uint32_t Pid); }; - static_assert(sizeof(ZenServerEntry) == 32); + static_assert(sizeof(ZenServerEntry) == 256); void Initialize(); [[nodiscard]] bool InitializeReadOnly(); @@ -111,10 +123,14 @@ public: ZenServerEntry* Register(int ListenPort); void Sweep(); void Snapshot(std::function<void(const ZenServerEntry&)>&& Callback); + inline bool IsReadOnly() const { return m_IsReadOnly; } private: - void* m_hMapFile = nullptr; - ZenServerEntry* m_Data; - int m_MaxEntryCount = 4096 / sizeof(ZenServerEntry); + void* m_hMapFile = nullptr; + ZenServerEntry* m_Data = nullptr; + int m_MaxEntryCount = 131072 / sizeof(ZenServerEntry); ZenServerEntry* m_OurEntry = nullptr; + bool m_IsReadOnly = true; }; + +} // namespace zen diff --git a/zenutil/zenserverprocess.cpp b/zenutil/zenserverprocess.cpp index 7f4be2368..55b592ab1 100644 --- a/zenutil/zenserverprocess.cpp +++ b/zenutil/zenserverprocess.cpp @@ -1,11 +1,12 @@ // Copyright Epic Games, Inc. All Rights Reserved. -#include "zenserverprocess.h" +#include "zenutil/zenserverprocess.h" #include <zencore/except.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/session.h> #include <zencore/string.h> #include <atlbase.h> @@ -16,44 +17,42 @@ ////////////////////////////////////////////////////////////////////////// +namespace zen { + namespace zenutil { -class SecurityAttributes -{ -public: - inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } + class SecurityAttributes + { + public: + inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } -protected: - SECURITY_ATTRIBUTES m_Attributes{}; - SECURITY_DESCRIPTOR m_Sd{}; -}; + protected: + SECURITY_ATTRIBUTES m_Attributes{}; + SECURITY_DESCRIPTOR m_Sd{}; + }; -// Security attributes which allows any user access + // Security attributes which allows any user access -class AnyUserSecurityAttributes : public SecurityAttributes -{ -public: - AnyUserSecurityAttributes() + class AnyUserSecurityAttributes : public SecurityAttributes { - m_Attributes.nLength = sizeof m_Attributes; - m_Attributes.bInheritHandle = false; // Disable inheritance - - const BOOL success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); - - if (success) + public: + AnyUserSecurityAttributes() { - const BOOL bSetOk = SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE); + m_Attributes.nLength = sizeof m_Attributes; + m_Attributes.bInheritHandle = false; // Disable inheritance - if (bSetOk) + const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); + + if (Success) { + if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) + { + ThrowLastError("SetSecurityDescriptorDacl failed", std::source_location::current()); + } + m_Attributes.lpSecurityDescriptor = &m_Sd; } - else - { - zen::ThrowLastError("SetSecurityDescriptorDacl failed", std::source_location::current()); - } } - } -}; + }; } // namespace zenutil @@ -109,7 +108,7 @@ ZenServerState::Initialize() if (hMap == NULL) { - zen::ThrowLastError("Could not open or create file mapping object for Zen server state"); + ThrowLastError("Could not open or create file mapping object for Zen server state"); } m_hMapFile = hMap; @@ -123,10 +122,11 @@ ZenServerState::Initialize() if (pBuf == NULL) { - zen::ThrowLastError("Could not map view of Zen server state"); + ThrowLastError("Could not map view of Zen server state"); } - m_Data = reinterpret_cast<ZenServerEntry*>(pBuf); + m_Data = reinterpret_cast<ZenServerEntry*>(pBuf); + m_IsReadOnly = false; } bool @@ -149,7 +149,7 @@ ZenServerState::InitializeReadOnly() if (pBuf == NULL) { - zen::ThrowLastError("Could not map view of Zen server state"); + ThrowLastError("Could not map view of Zen server state"); } m_Data = reinterpret_cast<ZenServerEntry*>(pBuf); @@ -181,7 +181,7 @@ ZenServerState::Register(int ListenPort) // Allocate an entry - int Pid = zen::GetCurrentProcessId(); + int Pid = GetCurrentProcessId(); for (int i = 0; i < m_MaxEntryCount; ++i) { @@ -199,6 +199,9 @@ ZenServerState::Register(int ListenPort) Entry.Pid = Pid; Entry.Flags = 0; + const Oid SesId = GetSessionId(); + memcpy(Entry.SessionId, &SesId, sizeof SesId); + return &Entry; } } @@ -215,13 +218,15 @@ ZenServerState::Sweep() return; } + ZEN_ASSERT(m_IsReadOnly == false); + for (int i = 0; i < m_MaxEntryCount; ++i) { ZenServerEntry& Entry = m_Data[i]; if (Entry.ListenPort) { - if (zen::IsProcessRunning(Entry.Pid) == false) + if (IsProcessRunning(Entry.Pid) == false) { ZEN_DEBUG("Sweep - pid {} not running, reclaiming entry (port {})", Entry.Pid, Entry.ListenPort); @@ -264,6 +269,30 @@ ZenServerState::ZenServerEntry::SignalShutdownRequest() Flags |= uint16_t(FlagsEnum::kShutdownPlease); } +bool +ZenServerState::ZenServerEntry::AddSponsorProcess(uint32_t PidToAdd) +{ + for (std::atomic<uint32_t>& PidEntry : SponsorPids) + { + if (PidEntry.load(std::memory_order::memory_order_relaxed) == 0) + { + uint32_t Expected = 0; + if (PidEntry.compare_exchange_strong(Expected, uint16_t(PidToAdd))) + { + // Success! + return true; + } + } + else if (PidEntry.load(std::memory_order::memory_order_relaxed) == PidToAdd) + { + // Success, the because pid is already in the list + return true; + } + } + + return false; +} + ////////////////////////////////////////////////////////////////////////// std::atomic<int> TestCounter{0}; @@ -294,7 +323,7 @@ ZenServerEnvironment::InitializeForTest(std::filesystem::path ProgramBaseDir, st ZEN_INFO("Program base dir is '{}'", ProgramBaseDir); ZEN_INFO("Cleaning test base dir '{}'", TestBaseDir); - zen::DeleteDirectories(TestBaseDir.c_str()); + DeleteDirectories(TestBaseDir.c_str()); m_IsTestInstance = true; m_IsInitialized = true; @@ -305,14 +334,14 @@ ZenServerEnvironment::CreateNewTestDir() { using namespace std::literals; - zen::ExtendableWideStringBuilder<256> TestDir; + ExtendableWideStringBuilder<256> TestDir; TestDir << "test"sv << int64_t(++TestCounter); std::filesystem::path TestPath = m_TestBaseDir / TestDir.c_str(); ZEN_INFO("Creating new test dir @ '{}'", TestPath); - zen::CreateDirectories(TestPath.c_str()); + CreateDirectories(TestPath.c_str()); return TestPath; } @@ -377,16 +406,16 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr const int MyPid = _getpid(); const int ChildId = ++ChildIdCounter; - zen::ExtendableStringBuilder<32> ChildEventName; + ExtendableStringBuilder<32> ChildEventName; ChildEventName << "Zen_Child_" << ChildId; - zen::NamedEvent ChildEvent{ChildEventName}; + NamedEvent ChildEvent{ChildEventName}; CreateShutdownEvent(BasePort); - zen::ExtendableStringBuilder<32> LogId; + ExtendableStringBuilder<32> LogId; LogId << "Zen" << ChildId; - zen::ExtendableWideStringBuilder<512> CommandLine; + ExtendableWideStringBuilder<512> CommandLine; CommandLine << "\""; CommandLine.Append(Executable.c_str()); CommandLine << "\""; @@ -395,7 +424,17 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr if (IsTest) { - CommandLine << " --test --owner-pid " << MyPid << " --log-id " << LogId; + if (!m_OwnerPid.has_value()) + { + m_OwnerPid = MyPid; + } + + CommandLine << " --test --log-id " << LogId; + } + + if (m_OwnerPid.has_value()) + { + CommandLine << " --owner-pid " << m_OwnerPid.value(); } CommandLine << " --child-id " << ChildEventName; @@ -515,10 +554,10 @@ ZenServerInstance::SpawnServer(int BasePort, std::string_view AdditionalServerAr void ZenServerInstance::CreateShutdownEvent(int BasePort) { - zen::ExtendableStringBuilder<32> ChildShutdownEventName; + ExtendableStringBuilder<32> ChildShutdownEventName; ChildShutdownEventName << "Zen_" << BasePort; ChildShutdownEventName << "_Shutdown"; - zen::NamedEvent ChildShutdownEvent{ChildShutdownEventName}; + NamedEvent ChildShutdownEvent{ChildShutdownEventName}; m_ShutdownEvent = std::move(ChildShutdownEvent); } @@ -554,9 +593,25 @@ ZenServerInstance::AttachToRunningServer(int BasePort) } void +ZenServerInstance::Detach() +{ + if (m_Process.IsValid()) + { + m_Process.Reset(); + m_ShutdownEvent.Close(); + } +} + +void ZenServerInstance::WaitUntilReady() { - m_ReadyEvent.Wait(); + while (m_ReadyEvent.Wait(100) == false) + { + if (!m_Process.IsRunning() || !m_Process.IsValid()) + { + return; + } + } } bool @@ -574,3 +629,5 @@ ZenServerInstance::GetBaseUri() const return "http://localhost:{}"_format(m_BasePort); } + +} // namespace zen diff --git a/zenutil/zenutil.vcxproj b/zenutil/zenutil.vcxproj index fcb27dff3..3bf6111f7 100644 --- a/zenutil/zenutil.vcxproj +++ b/zenutil/zenutil.vcxproj @@ -100,13 +100,16 @@ <ClCompile Include="zenserverprocess.cpp" /> </ItemGroup> <ItemGroup> - <ClInclude Include="include\zenserverprocess.h" /> + <ClInclude Include="include\zenutil\zenserverprocess.h" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\zencore\zencore.vcxproj"> <Project>{d75bf9ab-c61e-4fff-ad59-1563430f05e2}</Project> </ProjectReference> </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> + </ItemGroup> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <ImportGroup Label="ExtensionTargets"> </ImportGroup> diff --git a/zenutil/zenutil.vcxproj.filters b/zenutil/zenutil.vcxproj.filters index ca1414842..9952e7159 100644 --- a/zenutil/zenutil.vcxproj.filters +++ b/zenutil/zenutil.vcxproj.filters @@ -4,6 +4,9 @@ <ClCompile Include="zenserverprocess.cpp" /> </ItemGroup> <ItemGroup> - <ClInclude Include="include\zenserverprocess.h" /> + <ClInclude Include="include\zenutil\zenserverprocess.h" /> + </ItemGroup> + <ItemGroup> + <None Include="xmake.lua" /> </ItemGroup> </Project>
\ No newline at end of file |