diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-21 23:13:34 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-21 23:13:34 +0100 |
| commit | e3388acaca0ce6f1a2d4cb17e535497f2689118a (patch) | |
| tree | 817948a42b57ebd07f31d8317065c2667eddb699 /src | |
| parent | Interprocess pipe support (for stdout/stderr capture) (#866) (diff) | |
| download | zen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.tar.xz zen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.zip | |
zen hub command (#877)
- Feature: Added `zen hub` command for managing a hub server and its provisioned module instances:
- `zen hub up` - Start a hub server (equivalent to `zen up` in hub mode)
- `zen hub down` - Shut down a hub server
- `zen hub provision <moduleid>` - Provision a storage server instance for a module
- `zen hub deprovision <moduleid>` - Deprovision a storage server instance
- `zen hub hibernate <moduleid>` - Hibernate a provisioned instance (shut down, data preserved)
- `zen hub wake <moduleid>` - Wake a hibernated instance
- `zen hub status [moduleid]` - Show state of all instances or a specific module
- Feature: Added new hub HTTP endpoints for instance lifecycle management:
- `POST /hub/modules/{moduleid}/hibernate` - Hibernate the instance for the given module
- `POST /hub/modules/{moduleid}/wake` - Wake a hibernated instance for the given module
- Improvement: `zen up` refactored to use shared `StartupZenServer`/`ShutdownZenServer` helpers (also used by `zen hub up`/`zen hub down`)
- Bugfix: Fixed shutdown event not being cleared after the server process exits in `ZenServerInstance::Shutdown()`, which could cause stale state on reuse
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/hub_cmd.cpp | 440 | ||||
| -rw-r--r-- | src/zen/cmds/hub_cmd.h | 118 | ||||
| -rw-r--r-- | src/zen/cmds/up_cmd.cpp | 175 | ||||
| -rw-r--r-- | src/zen/cmds/up_cmd.h | 2 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 3 | ||||
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 445 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 72 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 372 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 21 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.cpp | 23 | ||||
| -rw-r--r-- | src/zenserver/hub/storageserverinstance.h | 10 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/zenserverprocess.h | 26 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 135 |
13 files changed, 1394 insertions, 448 deletions
diff --git a/src/zen/cmds/hub_cmd.cpp b/src/zen/cmds/hub_cmd.cpp new file mode 100644 index 000000000..5bdd3a922 --- /dev/null +++ b/src/zen/cmds/hub_cmd.cpp @@ -0,0 +1,440 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "hub_cmd.h" + +#include <zencore/compactbinary.h> +#include <zencore/compactbinaryutil.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/iobuffer.h> +#include <zencore/logging.h> +#include <zencore/process.h> +#include <zenhttp/httpclient.h> +#include <zenutil/zenserverprocess.h> + +#include <vector> + +namespace zen { + +////////////////////////////////////////////////////////////////////////// +// HubUpSubCmd + +HubUpSubCmd::HubUpSubCmd() : ZenSubCmdBase("up", "Bring hub server up") +{ + SubOptions().add_option("", "p", "port", "Host port", cxxopts::value(m_Port)->default_value("0"), "<hostport>"); + SubOptions().add_option("", "b", "base-dir", "Parent folder of server executable", cxxopts::value(m_ProgramBaseDir), "<directory>"); + SubOptions().add_option("", "c", "show-console", "Open a console window for the zenserver process", cxxopts::value(m_ShowConsole), ""); + SubOptions().add_option("", + "l", + "show-log", + "Show the output log of the zenserver process after successful start", + cxxopts::value(m_ShowLog), + ""); +} + +void +HubUpSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + if (m_ShowConsole && m_ShowLog) + { + throw OptionParseException("'--show-console' conflicts with '--show-log'", SubOptions().help()); + } + + std::optional<int> StartResult = StartupZenServer(ConsoleLog(), + {.ProgramBaseDir = m_ProgramBaseDir, + .Port = m_Port, + .OpenConsole = m_ShowConsole, + .ShowLog = m_ShowLog, + .ExtraArgs = GlobalOptions.PassthroughCommandLine, + .Mode = ZenServerInstance::ServerMode::kHubServer}); + if (!StartResult.has_value()) + { + ZEN_CONSOLE("Zen server already running"); + return; + } + if (*StartResult != 0) + { + throw ErrorWithReturnCode("Zen server failed to start", *StartResult); + } + + ZEN_CONSOLE("Zen server up"); +} + +////////////////////////////////////////////////////////////////////////// +// HubDownSubCmd + +HubDownSubCmd::HubDownSubCmd() : ZenSubCmdBase("down", "Bring hub server down") +{ + SubOptions().add_option("", "p", "port", "Host port", cxxopts::value(m_Port)->default_value("0"), "<hostport>"); + SubOptions().add_option("", "a", "all", "Shut down all running zen server instances", cxxopts::value(m_All), ""); + SubOptions().add_option("", "f", "force", "Force terminate if graceful shutdown fails", cxxopts::value(m_ForceTerminate), "<force>"); + SubOptions().add_option("", "b", "base-dir", "Parent folder of server executable", cxxopts::value(m_ProgramBaseDir), "<directory>"); + SubOptions() + .add_option("", "", "data-dir", "Path to data directory to inspect for running server", cxxopts::value(m_DataDir), "<file>"); +} + +void +HubDownSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + if (m_ProgramBaseDir.empty()) + { + m_ProgramBaseDir = GetRunningExecutablePath().parent_path(); + } + + ZenServerState Instance; + Instance.Initialize(); + + if (m_All) + { + struct EntryInfo + { + uint16_t Port = 0; + uint32_t Pid = 0; + }; + std::vector<EntryInfo> Entries; + Instance.Snapshot([&Entries](const ZenServerState::ZenServerEntry& Entry) { + uint16_t Port = Entry.DesiredListenPort.load(); + uint32_t Pid = Entry.Pid.load(); + if (Port != 0 && Pid != 0) + { + Entries.push_back({Port, Pid}); + } + }); + + if (Entries.empty()) + { + ZEN_CONSOLE("No zen server instances to bring down"); + return; + } + + int FailCount = 0; + for (const EntryInfo& Info : Entries) + { + Instance.Sweep(); + ZenServerState::ZenServerEntry* Entry = Instance.Lookup(Info.Port); + if (Entry && Entry->Pid.load() == Info.Pid) + { + if (!ShutdownZenServer(ConsoleLog(), Instance, Entry, m_ProgramBaseDir)) + { + ZEN_CONSOLE_WARN("Failed to shutdown server on port {} (pid {})", Info.Port, Info.Pid); + ++FailCount; + } + } + } + + if (FailCount > 0 && !m_ForceTerminate) + { + throw std::runtime_error(fmt::format("Failed to shutdown {} instance(s), use --force to hard terminate", FailCount)); + } + return; + } + + ZenServerState::ZenServerEntry* Entry = Instance.Lookup(m_Port); + + if (!m_DataDir.empty()) + { + if (!IsFile(m_DataDir / ".lock")) + { + throw std::runtime_error(fmt::format("Lock file does not exist in directory '{}'", m_DataDir)); + } + CbValidateError ValidateResult = CbValidateError::None; + if (CbObject LockFileObject = + ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(m_DataDir / ".lock"), ValidateResult); + ValidateResult == CbValidateError::None && LockFileObject) + { + LockFileInfo Info = ReadLockFilePayload(LockFileObject); + std::string Reason; + if (!ValidateLockFileInfo(Info, Reason)) + { + throw std::runtime_error(fmt::format("Lock file in directory '{}' is not valid. Reason: '{}'", m_DataDir, Reason)); + } + Entry = Instance.LookupByEffectivePort(Info.EffectiveListenPort); + } + else + { + throw std::runtime_error( + fmt::format("Lock file in directory '{}' is malformed. Reason: '{}'", m_DataDir, ToString(ValidateResult))); + } + } + + if (Entry) + { + if (ShutdownZenServer(ConsoleLog(), Instance, Entry, m_ProgramBaseDir)) + { + return; + } + } + + if (m_ForceTerminate) + { + std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; + ProcessHandle RunningProcess; + if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess, /*IncludeSelf*/ false); !Ec) + { + ZEN_CONSOLE_WARN("Attempting hard terminate of zen process with pid ({})", RunningProcess.Pid()); + if (RunningProcess.Terminate(0)) + { + ZEN_CONSOLE("Terminate complete"); + return; + } + throw std::runtime_error("Failed to terminate server, still running"); + } + else + { + ZEN_CONSOLE_WARN("Failed to find process '{}', reason: {}", ServerExePath.string(), Ec.message()); + } + } + else if (Entry) + { + throw std::runtime_error( + fmt::format("Failed to shutdown server on port {}, use --force to hard terminate process", Entry->DesiredListenPort.load())); + } + + ZEN_CONSOLE("No zen server to bring down"); +} + +////////////////////////////////////////////////////////////////////////// +// HubProvisionSubCmd + +HubProvisionSubCmd::HubProvisionSubCmd() : ZenSubCmdBase("provision", "Provision a hub module instance") +{ + SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + SubOptions().add_option("", "", "moduleid", "Module ID to provision", cxxopts::value(m_ModuleId)->default_value(""), "<moduleid>"); + SubOptions().parse_positional({"moduleid"}); +} + +void +HubProvisionSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) + { + throw OptionParseException("Unable to resolve hub host specification", SubOptions().help()); + } + if (m_ModuleId.empty()) + { + throw OptionParseException("moduleid is required", SubOptions().help()); + } + + HttpClient Http = ZenCmdBase::CreateHttpClient(m_HostName); + if (HttpClient::Response Resp = + Http.Post(fmt::format("/hub/modules/{}/provision", m_ModuleId), HttpClient::KeyValueMap{}, HttpClient::KeyValueMap{})) + { + CbObject Obj = Resp.AsObject(); + std::string_view Id = Obj["moduleId"].AsString(); + std::string_view Uri = Obj["baseUri"].AsString(); + uint16_t Port = Obj["port"].AsUInt16(); + ZEN_CONSOLE("module '{}' provisioned: {} (port {})", Id, Uri, Port); + } + else + { + Resp.ThrowError("Provision failed"); + } +} + +////////////////////////////////////////////////////////////////////////// +// HubDeprovisionSubCmd + +HubDeprovisionSubCmd::HubDeprovisionSubCmd() : ZenSubCmdBase("deprovision", "Deprovision a hub module instance") +{ + SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + SubOptions().add_option("", "", "moduleid", "Module ID to deprovision", cxxopts::value(m_ModuleId)->default_value(""), "<moduleid>"); + SubOptions().parse_positional({"moduleid"}); +} + +void +HubDeprovisionSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) + { + throw OptionParseException("Unable to resolve hub host specification", SubOptions().help()); + } + if (m_ModuleId.empty()) + { + throw OptionParseException("moduleid is required", SubOptions().help()); + } + + HttpClient Http = ZenCmdBase::CreateHttpClient(m_HostName); + if (HttpClient::Response Resp = + Http.Post(fmt::format("/hub/modules/{}/deprovision", m_ModuleId), HttpClient::KeyValueMap{}, HttpClient::KeyValueMap{})) + { + CbObject Obj = Resp.AsObject(); + std::string_view Id = Obj["moduleId"].AsString(); + ZEN_CONSOLE("module '{}' deprovisioned", Id); + } + else + { + Resp.ThrowError("Deprovision failed"); + } +} + +////////////////////////////////////////////////////////////////////////// +// HubHibernateSubCmd + +HubHibernateSubCmd::HubHibernateSubCmd() : ZenSubCmdBase("hibernate", "Hibernate a hub module instance") +{ + SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + SubOptions().add_option("", "", "moduleid", "Module ID to hibernate", cxxopts::value(m_ModuleId)->default_value(""), "<moduleid>"); + SubOptions().parse_positional({"moduleid"}); +} + +void +HubHibernateSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) + { + throw OptionParseException("Unable to resolve hub host specification", SubOptions().help()); + } + if (m_ModuleId.empty()) + { + throw OptionParseException("moduleid is required", SubOptions().help()); + } + + HttpClient Http = ZenCmdBase::CreateHttpClient(m_HostName); + if (HttpClient::Response Resp = + Http.Post(fmt::format("/hub/modules/{}/hibernate", m_ModuleId), HttpClient::KeyValueMap{}, HttpClient::KeyValueMap{})) + { + CbObject Obj = Resp.AsObject(); + std::string_view Id = Obj["moduleId"].AsString(); + ZEN_CONSOLE("module '{}' hibernated", Id); + } + else + { + Resp.ThrowError("Hibernate failed"); + } +} + +////////////////////////////////////////////////////////////////////////// +// HubWakeSubCmd + +HubWakeSubCmd::HubWakeSubCmd() : ZenSubCmdBase("wake", "Wake a hibernated hub module instance") +{ + SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + SubOptions().add_option("", "", "moduleid", "Module ID to wake", cxxopts::value(m_ModuleId)->default_value(""), "<moduleid>"); + SubOptions().parse_positional({"moduleid"}); +} + +void +HubWakeSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) + { + throw OptionParseException("Unable to resolve hub host specification", SubOptions().help()); + } + if (m_ModuleId.empty()) + { + throw OptionParseException("moduleid is required", SubOptions().help()); + } + + HttpClient Http = ZenCmdBase::CreateHttpClient(m_HostName); + if (HttpClient::Response Resp = + Http.Post(fmt::format("/hub/modules/{}/wake", m_ModuleId), HttpClient::KeyValueMap{}, HttpClient::KeyValueMap{})) + { + CbObject Obj = Resp.AsObject(); + std::string_view Id = Obj["moduleId"].AsString(); + ZEN_CONSOLE("module '{}' woken", Id); + } + else + { + Resp.ThrowError("Wake failed"); + } +} + +////////////////////////////////////////////////////////////////////////// +// HubStatusSubCmd + +HubStatusSubCmd::HubStatusSubCmd() : ZenSubCmdBase("status", "Show status of hub module instances") +{ + SubOptions().add_option("", "u", "hosturl", ZenCmdBase::kHostUrlHelp, cxxopts::value(m_HostName)->default_value(""), "<hosturl>"); + SubOptions() + .add_option("", "", "moduleid", "Module ID (omit to list all)", cxxopts::value(m_ModuleId)->default_value(""), "<moduleid>"); + SubOptions().parse_positional({"moduleid"}); +} + +void +HubStatusSubCmd::Run(const ZenCliOptions& GlobalOptions) +{ + ZEN_UNUSED(GlobalOptions); + + m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + if (m_HostName.empty()) + { + throw OptionParseException("Unable to resolve hub host specification", SubOptions().help()); + } + + HttpClient Http = ZenCmdBase::CreateHttpClient(m_HostName); + + if (!m_ModuleId.empty()) + { + if (HttpClient::Response Resp = Http.Get(fmt::format("/hub/modules/{}", m_ModuleId))) + { + CbObject Obj = Resp.AsObject(); + std::string_view Id = Obj["moduleId"].AsString(); + std::string_view State = Obj["state"].AsString(); + ZEN_CONSOLE("module '{}': {}", Id, State); + } + else + { + Resp.ThrowError("Status query failed"); + } + } + else + { + if (HttpClient::Response Resp = Http.Get("/hub/status")) + { + CbObject Obj = Resp.AsObject(); + CbArrayView Modules = Obj["modules"].AsArrayView(); + if (Modules.Num() == 0) + { + ZEN_CONSOLE("No modules"); + } + else + { + for (CbFieldView Module : Modules) + { + CbObjectView ModObj = Module.AsObjectView(); + ZEN_CONSOLE("module '{}': {}", ModObj["moduleId"].AsString(), ModObj["state"].AsString()); + } + } + } + else + { + Resp.ThrowError("Status query failed"); + } + } +} + +////////////////////////////////////////////////////////////////////////// +// HubCommand + +HubCommand::HubCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("__hidden__", "", "subcommand", "", cxxopts::value<std::string>(m_SubCommand)->default_value(""), ""); + m_Options.parse_positional({"subcommand"}); + + AddSubCommand(m_UpSubCmd); + AddSubCommand(m_DownSubCmd); + AddSubCommand(m_ProvisionSubCmd); + AddSubCommand(m_DeprovisionSubCmd); + AddSubCommand(m_HibernateSubCmd); + AddSubCommand(m_WakeSubCmd); + AddSubCommand(m_StatusSubCmd); +} + +HubCommand::~HubCommand() = default; + +} // namespace zen diff --git a/src/zen/cmds/hub_cmd.h b/src/zen/cmds/hub_cmd.h new file mode 100644 index 000000000..e3da1ee42 --- /dev/null +++ b/src/zen/cmds/hub_cmd.h @@ -0,0 +1,118 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "../zen.h" + +#include <filesystem> +#include <string> + +namespace zen { + +class HubUpSubCmd : public ZenSubCmdBase +{ +public: + HubUpSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + uint16_t m_Port = 0; + bool m_ShowConsole = false; + bool m_ShowLog = false; + std::filesystem::path m_ProgramBaseDir; +}; + +class HubDownSubCmd : public ZenSubCmdBase +{ +public: + HubDownSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + uint16_t m_Port = 0; + bool m_All = false; + bool m_ForceTerminate = false; + std::filesystem::path m_ProgramBaseDir; + std::filesystem::path m_DataDir; +}; + +class HubProvisionSubCmd : public ZenSubCmdBase +{ +public: + HubProvisionSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + std::string m_HostName; + std::string m_ModuleId; +}; + +class HubDeprovisionSubCmd : public ZenSubCmdBase +{ +public: + HubDeprovisionSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + std::string m_HostName; + std::string m_ModuleId; +}; + +class HubHibernateSubCmd : public ZenSubCmdBase +{ +public: + HubHibernateSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + std::string m_HostName; + std::string m_ModuleId; +}; + +class HubWakeSubCmd : public ZenSubCmdBase +{ +public: + HubWakeSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + std::string m_HostName; + std::string m_ModuleId; +}; + +class HubStatusSubCmd : public ZenSubCmdBase +{ +public: + HubStatusSubCmd(); + void Run(const ZenCliOptions& GlobalOptions) override; + +private: + std::string m_HostName; + std::string m_ModuleId; +}; + +class HubCommand : public ZenCmdWithSubCommands +{ +public: + static constexpr char Name[] = "hub"; + static constexpr char Description[] = "Manage zen hub server and its module instances"; + + HubCommand(); + ~HubCommand(); + + cxxopts::Options& Options() override { return m_Options; } + ZenCmdCategory& CommandCategory() const override { return g_UtilitiesCategory; } + +private: + cxxopts::Options m_Options{Name, Description}; + std::string m_SubCommand; + HubUpSubCmd m_UpSubCmd; + HubDownSubCmd m_DownSubCmd; + HubProvisionSubCmd m_ProvisionSubCmd; + HubDeprovisionSubCmd m_DeprovisionSubCmd; + HubHibernateSubCmd m_HibernateSubCmd; + HubWakeSubCmd m_WakeSubCmd; + HubStatusSubCmd m_StatusSubCmd; +}; + +} // namespace zen diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp index db2c77b6b..809a41bb6 100644 --- a/src/zen/cmds/up_cmd.cpp +++ b/src/zen/cmds/up_cmd.cpp @@ -8,25 +8,21 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/process.h> -#include <zencore/timer.h> #include <zenutil/zenserverprocess.h> -#include <memory> - namespace zen { UpCommand::UpCommand() { m_Options.add_option("", "p", "port", "Host port", cxxopts::value(m_Port)->default_value("0"), "<hostport>"); m_Options.add_option("", "b", "base-dir", "Parent folder of server executable", cxxopts::value(m_ProgramBaseDir), "<directory>"); - m_Options - .add_option("", "c", "show-console", "Open a console window for the zenserver process", cxxopts::value(m_ShowConsole), "<console>"); + m_Options.add_option("", "c", "show-console", "Open a console window for the zenserver process", cxxopts::value(m_ShowConsole), ""); m_Options.add_option("", "l", "show-log", "Show the output log of the zenserver process after successful start", cxxopts::value(m_ShowLog), - "<showlog>"); + ""); } UpCommand::~UpCommand() = default; @@ -34,10 +30,6 @@ UpCommand::~UpCommand() = default; void UpCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { - using namespace std::literals; - - ZEN_UNUSED(GlobalOptions); - if (!ParseOptions(argc, argv)) { return; @@ -45,91 +37,26 @@ UpCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_ShowConsole && m_ShowLog) { - throw OptionParseException("'--show-console' conficts with '--show-log'", m_Options.help()); - } - - { - ZenServerState State; - if (State.InitializeReadOnly()) - { - struct EntryInfo - { - uint32_t Pid = 0; - uint16_t DesiredPort = 0; - uint16_t EffectivePort = 0; - }; - std::vector<EntryInfo> RunningEntries; - State.Snapshot([&RunningEntries, DesiredPort = this->m_Port](const zen::ZenServerState::ZenServerEntry& Entry) { - if (DesiredPort == 0 || Entry.DesiredListenPort.load() == DesiredPort) - { - RunningEntries.push_back(EntryInfo{.Pid = Entry.Pid.load(), - .DesiredPort = Entry.DesiredListenPort.load(), - .EffectivePort = Entry.EffectiveListenPort.load()}); - } - }); - if (RunningEntries.size() > 0) - { - ZEN_CONSOLE("Zen server already running with base port {}. First instance at port {}, pid {}", - RunningEntries[0].DesiredPort, - RunningEntries[0].EffectivePort, - RunningEntries[0].Pid); - return; - } - } + throw OptionParseException("'--show-console' conflicts with '--show-log'", m_Options.help()); } - if (m_ProgramBaseDir.empty()) + std::optional<int> StartResult = StartupZenServer(ConsoleLog(), + {.ProgramBaseDir = m_ProgramBaseDir, + .Port = m_Port, + .OpenConsole = m_ShowConsole, + .ShowLog = m_ShowLog, + .ExtraArgs = GlobalOptions.PassthroughCommandLine}); + if (!StartResult.has_value()) { - std::filesystem::path ExePath = zen::GetRunningExecutablePath(); - m_ProgramBaseDir = ExePath.parent_path(); + ZEN_CONSOLE("Zen server already running"); + return; } - ZenServerEnvironment ServerEnvironment; - ServerEnvironment.Initialize(m_ProgramBaseDir); - ZenServerInstance Server(ServerEnvironment); - std::string ServerArguments = GlobalOptions.PassthroughCommandLine; - if ((m_Port != 0) && (ServerArguments.find("--port"sv) == std::string::npos)) + if (*StartResult != 0) { - ServerArguments.append(fmt::format(" --port {}", m_Port)); + throw ErrorWithReturnCode("Zen server failed to start", *StartResult); } - Server.SpawnServer(ServerArguments, m_ShowConsole, /*WaitTimeoutMs*/ 0); - int Timeout = 10000; - - if (!Server.WaitUntilReady(Timeout)) - { - if (Server.IsRunning()) - { - ZEN_CONSOLE_WARN("Zen server launch failed (timed out), terminating"); - Server.Terminate(); - if (!m_ShowConsole) - { - ZEN_CONSOLE("{}", Server.GetLogOutput()); - } - throw std::runtime_error("Zen server launch failed (timed out), launched process was terminated"); - } - int ServerExitCode = Server.Shutdown(); - if (!m_ShowConsole) - { - ZEN_CONSOLE("{}", Server.GetLogOutput()); - } - if (ServerExitCode != 0) - { - throw ErrorWithReturnCode( - fmt::format("Zen server failed to get to a ready state and exited with return code {}", ServerExitCode), - ServerExitCode); - } - } - else - { - if (m_ShowLog) - { - ZEN_CONSOLE("{}", Server.GetLogOutput()); - } - else - { - ZEN_CONSOLE("Zen server up"); - } - } + ZEN_CONSOLE("Zen server up"); } ////////////////////////////////////////////////////////////////////////// @@ -211,70 +138,6 @@ DownCommand::DownCommand() DownCommand::~DownCommand() = default; -bool -DownCommand::ShutdownEntry(ZenServerState& Instance, ZenServerState::ZenServerEntry* Entry) -{ - int EntryPort = (int)Entry->DesiredListenPort.load(); - const uint32_t ServerProcessPid = Entry->Pid.load(); - try - { - ZenServerEnvironment ServerEnvironment; - ServerEnvironment.Initialize(m_ProgramBaseDir); - ZenServerInstance Server(ServerEnvironment); - Server.AttachToRunningServer(EntryPort); - - ZEN_CONSOLE("attached to server on port {} (pid {}), requesting shutdown", EntryPort, ServerProcessPid); - - std::error_code Ec; - if (Server.SignalShutdown(Ec) && !Ec) - { - Stopwatch Timer; - while (Timer.GetElapsedTimeMs() < 10000) - { - if (Server.WaitUntilExited(100, Ec) && !Ec) - { - ZEN_CONSOLE("shutdown complete"); - return true; - } - else if (Ec) - { - ZEN_CONSOLE("Waiting for server on port {} (pid {}) failed. Reason: '{}'", EntryPort, ServerProcessPid, Ec.message()); - } - } - } - else if (Ec) - { - ZEN_CONSOLE_WARN("Requesting shutdown of server on port {} failed. Reason: '{}'", EntryPort, Ec.message()); - } - } - catch (const std::exception& Ex) - { - ZEN_DEBUG("Exception caught when requesting shutdown: {}", Ex.what()); - } - - // Since we cannot obtain a handle to the process we are unable to block on the process - // handle to determine when the server has shut down. Thus we signal that we would like - // a shutdown via the shutdown flag and the check if the entry is still running. - - ZEN_CONSOLE("Requesting detached shutdown of server on port {}", EntryPort); - Entry->SignalShutdownRequest(); - - Stopwatch Timer; - while (Timer.GetElapsedTimeMs() < 10000) - { - Instance.Sweep(); - Entry = Instance.Lookup(EntryPort); - if (Entry == nullptr || Entry->Pid.load() != ServerProcessPid) - { - ZEN_CONSOLE("Shutdown complete"); - return true; - } - Sleep(100); - } - - return false; -} - void DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { @@ -325,7 +188,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZenServerState::ZenServerEntry* Entry = Instance.Lookup(Info.Port); if (Entry && Entry->Pid.load() == Info.Pid) { - if (!ShutdownEntry(Instance, Entry)) + if (!ShutdownZenServer(ConsoleLog(), Instance, Entry, m_ProgramBaseDir)) { ZEN_CONSOLE_WARN("Failed to shutdown server on port {} (pid {})", Info.Port, Info.Pid); ++FailCount; @@ -370,7 +233,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (Entry) { - if (ShutdownEntry(Instance, Entry)) + if (ShutdownZenServer(ConsoleLog(), Instance, Entry, m_ProgramBaseDir)) { return; } @@ -381,7 +244,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // Try to find the running executable by path name std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec, /*IncludeSelf*/ false) + if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess, /*IncludeSelf*/ false); !Ec) { ZEN_CONSOLE_WARN("Attempting hard terminate of zen process with pid ({})", RunningProcess.Pid()); if (RunningProcess.Terminate(0)) @@ -399,7 +262,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) else if (Entry) { throw std::runtime_error( - fmt::format("Failed to shutdown of server on port {}, use --force to hard terminate process", Entry->DesiredListenPort.load())); + fmt::format("Failed to shut down server on port {}, use --force to hard terminate process", Entry->DesiredListenPort.load())); } ZEN_CONSOLE("No zen server to bring down"); diff --git a/src/zen/cmds/up_cmd.h b/src/zen/cmds/up_cmd.h index c88ce6bb9..f904fe0d9 100644 --- a/src/zen/cmds/up_cmd.h +++ b/src/zen/cmds/up_cmd.h @@ -62,8 +62,6 @@ public: virtual cxxopts::Options& Options() override { return m_Options; } private: - bool ShutdownEntry(ZenServerState& Instance, ZenServerState::ZenServerEntry* Entry); - cxxopts::Options m_Options{Name, Description}; uint16_t m_Port = 0; bool m_All = false; diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 91cdca4a2..cbaf64e31 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -12,6 +12,7 @@ #include "cmds/copy_cmd.h" #include "cmds/dedup_cmd.h" #include "cmds/exec_cmd.h" +#include "cmds/hub_cmd.h" #include "cmds/info_cmd.h" #include "cmds/print_cmd.h" #include "cmds/projectstore_cmd.h" @@ -594,6 +595,7 @@ main(int argc, char** argv) GcCommand GcCmd; GcStatusCommand GcStatusCmd; GcStopCommand GcStopCmd; + HubCommand HubCmd; ImportOplogCommand ImportOplogCmd; InfoCommand InfoCmd; JobCommand JobCmd; @@ -652,6 +654,7 @@ main(int argc, char** argv) {GcStatusCommand::Name, &GcStatusCmd, GcStatusCommand::Description}, {GcStopCommand::Name, &GcStopCmd, GcStopCommand::Description}, {GcCommand::Name, &GcCmd, GcCommand::Description}, + {HubCommand::Name, &HubCmd, HubCommand::Description}, {InfoCommand::Name, &InfoCmd, InfoCommand::Description}, {JobCommand::Name, &JobCmd, JobCommand::Description}, {LoggingCommand::Name, &LoggingCmd, LoggingCommand::Description}, diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index a372b11e5..dbe6fa785 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -31,216 +31,238 @@ namespace zen::tests::hub { using namespace std::literals; +static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; + TEST_SUITE_BEGIN("server.hub"); -TEST_CASE("hub.lifecycle.basic") +TEST_CASE("hub.lifecycle.children") { - { - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(); - CHECK(PortNumber != 0); + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6"); + REQUIRE(PortNumber != 0); - HttpClient Client(Instance.GetBaseUri() + "/hub/"); + HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout); + // Verify the hub starts with no modules + { HttpClient::Response Result = Client.Get("status"); REQUIRE(Result); CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); } -} -TEST_CASE("hub.lifecycle.children") -{ - ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + HttpClient::Response Result; - const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6"); - REQUIRE(PortNumber != 0); + uint16_t AbcPort = 0; + uint16_t DefPort = 0; - SUBCASE("spawn") { - HttpClient Client(Instance.GetBaseUri() + "/hub/"); - - HttpClient::Response Result = Client.Get("status"); + Result = Client.Post("modules/abc/provision"); REQUIRE(Result); - { - Result = Client.Post("modules/abc/provision"); - REQUIRE(Result); + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + AbcPort = AbcResult["port"].AsUInt16(0); + CHECK_NE(AbcPort, 0); - CbObject AbcResult = Result.AsObject(); - CHECK(AbcResult["moduleId"].AsString() == "abc"sv); - const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); - CHECK_NE(AbcPort, 0); + Result = Client.Get("modules/abc"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); - Result = Client.Get("modules/abc"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + // This should be a fresh instance with no contents - // This should be a fresh instance with no contents + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(AbcClient.Get("/health/")); - HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); - Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } - Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", - IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv))); - CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); - } + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); - { - Result = Client.Post("modules/def/provision"); - REQUIRE(Result); + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); - CbObject DefResult = Result.AsObject(); - CHECK(DefResult["moduleId"].AsString() == "def"sv); - const uint16_t DefPort = DefResult["port"].AsUInt16(0); - REQUIRE_NE(DefPort, 0); + // This should be a fresh instance with no contents - // This should be a fresh instance with no contents + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(DefClient.Get("/health/")); - HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); - Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } - Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", - IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv))); - CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); - } + // this should be rejected because of the invalid module id + Result = Client.Post("modules/!!!!!/provision"); + CHECK(!Result); - // this should be rejected because of the invalid module id - Result = Client.Post("modules/!!!!!/provision"); - CHECK(!Result); + Result = Client.Post("modules/ghi/provision"); + REQUIRE(Result); - Result = Client.Post("modules/ghi/provision"); - REQUIRE(Result); + // Tear down instances - // Tear down instances + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - Result = Client.Post("modules/abc/deprovision"); - REQUIRE(Result); + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - Result = Client.Post("modules/def/deprovision"); - REQUIRE(Result); + Result = Client.Post("modules/ghi/deprovision"); + REQUIRE(Result); - Result = Client.Post("modules/ghi/deprovision"); + // re-provision to verify that (de)hydration preserved state + { + Result = Client.Post("modules/abc/provision"); REQUIRE(Result); - // re-provision to verify that (de)hydration preserved state - { - Result = Client.Post("modules/abc/provision"); - REQUIRE(Result); + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); - CbObject AbcResult = Result.AsObject(); - CHECK(AbcResult["moduleId"].AsString() == "abc"sv); - const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); - REQUIRE_NE(AbcPort, 0); + // This should contain the content from the previous run - // This should contain the content from the previous run + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(AbcClient.Get("/health/")); - HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + CHECK_EQ(Result.AsText(), "abcdef"sv); - CHECK_EQ(Result.AsText(), "abcdef"sv); + Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } - Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", - IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv))); - CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); - } + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); - { - Result = Client.Post("modules/def/provision"); - REQUIRE(Result); + CbObject DefResult = Result.AsObject(); + CHECK(DefResult["moduleId"].AsString() == "def"sv); + DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); - CbObject DefResult = Result.AsObject(); - CHECK(DefResult["moduleId"].AsString() == "def"sv); - const uint16_t DefPort = DefResult["port"].AsUInt16(0); - REQUIRE_NE(DefPort, 0); + // This should contain the content from the previous run - // This should contain the content from the previous run + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(DefClient.Get("/health/")); - HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + CHECK_EQ(Result.AsText(), "AbcDef"sv); - CHECK_EQ(Result.AsText(), "AbcDef"sv); + Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } - Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567", - IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv))); - CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); - } + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - Result = Client.Post("modules/abc/deprovision"); - REQUIRE(Result); + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - Result = Client.Post("modules/def/deprovision"); + // re-provision to verify that (de)hydration preserved state, including + // state which was generated after the very first dehydration + { + Result = Client.Post("modules/abc/provision"); REQUIRE(Result); - // re-provision to verify that (de)hydration preserved state, including - // state which was generated after the very first dehydration - { - Result = Client.Post("modules/abc/provision"); - REQUIRE(Result); - - CbObject AbcResult = Result.AsObject(); - CHECK(AbcResult["moduleId"].AsString() == "abc"sv); - const uint16_t AbcPort = AbcResult["port"].AsUInt16(0); - REQUIRE_NE(AbcPort, 0); + CbObject AbcResult = Result.AsObject(); + CHECK(AbcResult["moduleId"].AsString() == "abc"sv); + AbcPort = AbcResult["port"].AsUInt16(0); + REQUIRE_NE(AbcPort, 0); - // This should contain the content from the previous two runs + // This should contain the content from the previous two runs - HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort)); + HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(AbcClient.Get("/health/")); - Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - CHECK_EQ(Result.AsText(), "abcdef"sv); + CHECK_EQ(Result.AsText(), "abcdef"sv); - Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - CHECK_EQ(Result.AsText(), "ghijklmnop"sv); - } - - { - Result = Client.Post("modules/def/provision"); - REQUIRE(Result); + CHECK_EQ(Result.AsText(), "ghijklmnop"sv); + } - CbObject DefResult = Result.AsObject(); - REQUIRE(DefResult["moduleId"].AsString() == "def"sv); - const uint16_t DefPort = DefResult["port"].AsUInt16(0); - REQUIRE_NE(DefPort, 0); + { + Result = Client.Post("modules/def/provision"); + REQUIRE(Result); - // This should contain the content from the previous two runs + CbObject DefResult = Result.AsObject(); + REQUIRE(DefResult["moduleId"].AsString() == "def"sv); + DefPort = DefResult["port"].AsUInt16(0); + REQUIRE_NE(DefPort, 0); - HttpClient DefClient(fmt::format("http://localhost:{}", DefPort)); + // This should contain the content from the previous two runs - Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(DefClient.Get("/health/")); - CHECK_EQ(Result.AsText(), "AbcDef"sv); + Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); - CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + CHECK_EQ(Result.AsText(), "AbcDef"sv); - CHECK_EQ(Result.AsText(), "GhijklmNop"sv); - } + Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); - Result = Client.Post("modules/abc/deprovision"); - REQUIRE(Result); + CHECK_EQ(Result.AsText(), "GhijklmNop"sv); + } - Result = Client.Post("modules/def/deprovision"); - REQUIRE(Result); + Result = Client.Post("modules/abc/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - // final sanity check that the hub is still responsive and all modules are gone - Result = Client.Get("status"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); + Result = Client.Post("modules/def/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); } + + // final sanity check that the hub is still responsive and all modules are gone + Result = Client.Get("status"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u); } static bool @@ -339,7 +361,7 @@ TEST_CASE("hub.consul.hub.registration.token") "--consul-token-env=ZEN_TEST_CONSUL_TOKEN"); REQUIRE(PortNumber != 0); - // Use a plain client — dev-mode Consul doesn't enforce ACLs, but the + // Use a plain client -- dev-mode Consul doesn't enforce ACLs, but the // server has exercised the ConsulTokenEnv -> GetEnvVariable -> ConsulClient path. consul::ConsulClient Client("http://localhost:8500/"); @@ -366,7 +388,7 @@ TEST_CASE("hub.consul.provision.registration") REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000)); - HttpClient HubClient(Instance.GetBaseUri() + "/hub/"); + HttpClient HubClient(Instance.GetBaseUri() + "/hub/", kFastTimeout); HttpClient::Response Result = HubClient.Post("modules/testmod/provision"); REQUIRE(Result); @@ -376,6 +398,11 @@ TEST_CASE("hub.consul.provision.registration") const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0); REQUIRE(ModulePort != 0); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + std::string JsonError; CbFieldIterator ServicesRoot = LoadCompactBinaryFromJson(Client.GetAgentServicesJson(), JsonError); REQUIRE(JsonError.empty()); @@ -427,10 +454,15 @@ TEST_CASE("hub.consul.provision.registration") CHECK_EQ(HubService["Service"sv].AsString(), "zen-hub"sv); CHECK_EQ(HubService["Port"sv].AsDouble(0), double(PortNumber)); } - } - Result = HubClient.Post("modules/testmod/deprovision"); - REQUIRE(Result); + Result = HubClient.Post("modules/testmod/deprovision"); + REQUIRE(Result); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + } CHECK(!Client.HasService("testmod")); @@ -439,6 +471,129 @@ TEST_CASE("hub.consul.provision.registration") ConsulProc.StopConsulAgent(); } +TEST_CASE("hub.hibernate.lifecycle") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6"); + REQUIRE(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout); + + // Provision + HttpClient::Response Result = Client.Post("modules/testmod/provision"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); + const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0); + REQUIRE_NE(ModulePort, 0); + + Result = Client.Get("modules/testmod"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(ModClient.Get("/health/")); + + // Write data to verify it survives the hibernate/wake cycle + Result = ModClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567", + IoBufferBuilder::MakeFromMemory(MakeMemoryView("hibernatetest"sv))); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Created); + } + + // Hibernate - state should become "hibernated", server should be unreachable + Result = Client.Post("modules/testmod/hibernate"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); + + Result = Client.Get("modules/testmod"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["state"].AsString(), "hibernated"sv); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Wake - state should return to "provisioned", server should be reachable, data should be intact + Result = Client.Post("modules/testmod/wake"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); + + Result = Client.Get("modules/testmod"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(ModClient.Get("/health/")); + + Result = ModClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567"); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + CHECK_EQ(Result.AsText(), "hibernatetest"sv); + } + + // Deprovision - server should become unreachable + Result = Client.Post("modules/testmod/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Re-provision - server should be reachable on its (potentially new) port + Result = Client.Post("modules/testmod/provision"); + REQUIRE(Result); + CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); + const uint16_t ModulePort2 = Result.AsObject()["port"].AsUInt16(0); + REQUIRE_NE(ModulePort2, 0); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Final deprovision - server should become unreachable + Result = Client.Post("modules/testmod/deprovision"); + REQUIRE(Result); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } +} + +TEST_CASE("hub.hibernate.errors") +{ + ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer); + const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6"); + REQUIRE(PortNumber != 0); + + HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout); + + // Hibernate/wake on an unknown module id should return 404 + HttpClient::Response Result = Client.Post("modules/unknown/hibernate"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = Client.Post("modules/unknown/wake"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + // Double-hibernate: first call succeeds, second returns 400 (wrong state) + Result = Client.Post("modules/errmod/provision"); + REQUIRE(Result); + + Result = Client.Post("modules/errmod/hibernate"); + REQUIRE(Result); + + Result = Client.Post("modules/errmod/hibernate"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + + // Wake on provisioned: succeeds (state restored), then waking again returns 400 + Result = Client.Post("modules/errmod/wake"); + REQUIRE(Result); + + Result = Client.Post("modules/errmod/wake"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); +} + TEST_SUITE_END(); } // namespace zen::tests::hub diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 5497bcf2b..03be6e85d 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -152,6 +152,78 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) HttpVerb::kPost); m_Router.RegisterRoute( + "modules/{moduleid}/hibernate", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + std::string FailureReason = "unknown"; + + try + { + if (!m_Hub.Hibernate(std::string(ModuleId), /* out */ FailureReason)) + { + if (FailureReason.empty()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + else + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); + } + } + + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while hibernating module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "modules/{moduleid}/wake", + [this](HttpRouterRequest& Req) { + std::string_view ModuleId = Req.GetCapture(1); + std::string FailureReason = "unknown"; + + try + { + if (!m_Hub.Wake(std::string(ModuleId), /* out */ FailureReason)) + { + if (FailureReason.empty()) + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); + } + else + { + return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); + } + } + + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + + return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Exception while waking module '{}': {}", ModuleId, Ex.what()); + + FailureReason = Ex.what(); + } + + Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "stats", [this](HttpRouterRequest& Req) { CbObjectWriter Obj; diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index b0208db1f..54f45e511 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -340,7 +340,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s catch (const std::exception& Ex) { ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); - Instance = {}; + Instance = {}; + OutReason = Ex.what(); if (IsNewInstance) { @@ -371,6 +372,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s return false; } + OutReason.clear(); OutInfo.Port = AllocatedPort; // TODO: base URI? Would need to know what host name / IP to use @@ -398,7 +400,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) { RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end()) + if (m_ProvisioningModules.contains(ModuleId)) { OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); @@ -411,7 +413,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - // Not found, OutReason left empty + OutReason.clear(); // empty = not found (-> 404) return false; } else @@ -463,7 +465,126 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) Instance.Deprovision(); Instance = {}; + OutReason.clear(); + + return true; +} + +bool +Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + { + OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); + return false; + } + + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + OutReason.clear(); // empty = not found (-> 404) + return false; + } + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + m_HibernatingModules.emplace(ModuleId); + } + + ZEN_ASSERT(Instance); + + auto RemoveHibernatingModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(ModuleId); + }); + + // NOTE: done while not holding the hub lock, as hibernation may take time. + // m_HibernatingModules tracks which modules are being hibernated, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + try + { + if (!Instance.Hibernate()) + { + OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); + return false; + } + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + OutReason = Ex.what(); + return false; + } + OutReason.clear(); + return true; +} + +bool +Hub::Wake(const std::string& ModuleId, std::string& OutReason) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + { + OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); + return false; + } + + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + OutReason.clear(); // empty = not found (-> 404) + return false; + } + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + m_WakingModules.emplace(ModuleId); + } + + ZEN_ASSERT(Instance); + + auto RemoveWakingModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(ModuleId); + }); + + // NOTE: done while not holding the hub lock, as waking may take time. + // m_WakingModules tracks which modules are being woken, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + try + { + if (!Instance.Wake()) + { + OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); + return false; + } + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + OutReason = Ex.what(); + return false; + } + + OutReason.clear(); return true; } @@ -547,14 +668,14 @@ Hub::UpdateStats() bool Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) { - if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end()) + if (m_DeprovisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId); return false; } - if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end()) + if (m_ProvisioningModules.contains(std::string(ModuleId))) { OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId); @@ -632,10 +753,10 @@ Hub::WatchDog() } else if (LockedInstance.GetState() == HubInstanceState::Provisioned) { - // Process is not running but state says it should be — instance died unexpectedly. + // Process is not running but state says it should be - instance died unexpectedly. // TODO: Track and attempt recovery. } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) — expected, skip. + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) - expected, skip. LockedInstance = {}; } } @@ -652,6 +773,8 @@ Hub::WatchDog() TEST_SUITE_BEGIN("server.hub"); +static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; + namespace hub_testutils { ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) @@ -681,16 +804,28 @@ TEST_CASE("hub.provision_basic") std::string Reason; const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); REQUIRE_MESSAGE(ProvisionResult, Reason); + CHECK(Reason.empty()); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; REQUIRE(HubInstance->Find("module_a", &InstanceInfo)); CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); CHECK(DeprovisionResult); + CHECK(Reason.empty()); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } TEST_CASE("hub.provision_config") @@ -728,10 +863,20 @@ TEST_CASE("hub.provision_config") CHECK(TestResponse.IsSuccess()); CHECK(TestResponse.AsObject()["ok"].AsBool()); + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); CHECK(DeprovisionResult); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } TEST_CASE("hub.provision_callbacks") @@ -770,10 +915,20 @@ TEST_CASE("hub.provision_callbacks") CHECK_NE(ProvisionRecords[0].Port, 0); } + { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); CHECK(DeprovisionResult); { + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + { RwLock::SharedLockScope _(CallbackMutex); REQUIRE_EQ(DeprovisionRecords.size(), 1u); CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module"); @@ -816,18 +971,6 @@ TEST_CASE("hub.instance_limit") CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.deprovision_nonexistent") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - std::string Reason; - const bool Result = HubInstance->Deprovision("never_provisioned", Reason); - CHECK_FALSE(Result); - CHECK(Reason.empty()); - CHECK_EQ(HubInstance->GetInstanceCount(), 0); -} - TEST_CASE("hub.enumerate_modules") { ScopedTemporaryDirectory TempDir; @@ -840,11 +983,16 @@ TEST_CASE("hub.enumerate_modules") REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); std::vector<std::string> Ids; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); - CHECK_EQ(Info.State, HubInstanceState::Provisioned); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } }); CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); CHECK(FoundA); @@ -852,12 +1000,17 @@ TEST_CASE("hub.enumerate_modules") HubInstance->Deprovision("enum_a", Reason); Ids.clear(); - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) { + ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { Ids.push_back(std::string(ModuleId)); - CHECK_EQ(Info.State, HubInstanceState::Provisioned); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } }); REQUIRE_EQ(Ids.size(), 1u); CHECK_EQ(Ids[0], "enum_b"); + CHECK_EQ(ProvisionedCount, 1); } TEST_CASE("hub.max_instance_count") @@ -883,84 +1036,6 @@ TEST_CASE("hub.max_instance_count") CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } -TEST_CASE("hub.concurrent") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22000; - Config.InstanceLimit = 10; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - constexpr int kHalf = 3; - - // Serially pre-provision kHalf modules - for (int I = 0; I < kHalf; ++I) - { - HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); - } - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); - - // Simultaneously: - // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N") - // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N") - // The two pools use distinct OS threads, so provisions and deprovisions are interleaved. - - // Use int rather than bool to avoid std::vector<bool> bitfield packing, - // which would cause data races on concurrent per-index writes. - std::vector<int> ProvisionResults(kHalf, 0); - std::vector<std::string> ProvisionReasons(kHalf); - std::vector<int> DeprovisionResults(kHalf, 0); - - { - WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners"); - WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers"); - - std::vector<std::future<void>> ProvisionFutures(kHalf); - std::vector<std::future<void>> DeprovisionFutures(kHalf); - - for (int I = 0; I < kHalf; ++I) - { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); - } - - for (std::future<void>& F : ProvisionFutures) - { - F.get(); - } - for (std::future<void>& F : DeprovisionFutures) - { - F.get(); - } - } - - for (int I = 0; I < kHalf; ++I) - { - CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]); - CHECK(DeprovisionResults[I] != 0); - } - // Only the newly provisioned modules should remain - CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); -} - TEST_CASE("hub.concurrent_callbacks") { ScopedTemporaryDirectory TempDir; @@ -1122,57 +1197,98 @@ TEST_CASE("hub.job_object") } # endif // ZEN_PLATFORM_WINDOWS -TEST_CASE("hub.instance_state_basic") +TEST_CASE("hub.hibernate_wake") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; - Config.BasePortNumber = 22400; + Config.BasePortNumber = 22600; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("state_a", ProvInfo, Reason), Reason); + // Provision + REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Hibernate + const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason); + REQUIRE_MESSAGE(HibernateResult, Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } - REQUIRE(HubInstance->Find("state_a", &Info)); + // Wake + const bool WakeResult = HubInstance->Wake("hib_a", Reason); + REQUIRE_MESSAGE(WakeResult, Reason); + CHECK(Reason.empty()); + REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } - HubInstance->Deprovision("state_a", Reason); - CHECK_FALSE(HubInstance->Find("state_a")); + // Deprovision + const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason); + CHECK(DeprovisionResult); + CHECK(Reason.empty()); + CHECK_FALSE(HubInstance->Find("hib_a")); + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } } -TEST_CASE("hub.instance_state_enumerate") +TEST_CASE("hub.hibernate_wake_errors") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; - Config.BasePortNumber = 22500; + Config.BasePortNumber = 22700; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("estate_a", ProvInfo, Reason), Reason); - REQUIRE_MESSAGE(HubInstance->Provision("estate_b", ProvInfo, Reason), Reason); - int ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(ProvisionedCount, 2); + // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404) + CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason)); + CHECK(Reason.empty()); - HubInstance->Deprovision("estate_a", Reason); + CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason)); + CHECK(Reason.empty()); - ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) { - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(ProvisionedCount, 1); + // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400) + REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason); + CHECK(Reason.empty()); + REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason); + CHECK(Reason.empty()); + + Reason.clear(); + CHECK_FALSE(HubInstance->Hibernate("err_b", Reason)); + CHECK_FALSE(Reason.empty()); + + // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400) + REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason); + CHECK(Reason.empty()); + + Reason.clear(); + CHECK_FALSE(HubInstance->Wake("err_b", Reason)); + CHECK_FALSE(Reason.empty()); + + // Deprovision not-found - should return false with empty reason (-> 404) + CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason)); + CHECK(Reason.empty()); } TEST_SUITE_END(); diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 7094378f6..9a84f7744 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -90,6 +90,25 @@ public: bool Deprovision(const std::string& ModuleId, std::string& OutReason); /** + * Hibernate a storage server instance for the given module ID. + * The instance is shut down but its data is preserved; it can be woken later. + * + * @param ModuleId The ID of the module to hibernate. + * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). + * @return true if the instance was hibernated, false otherwise. + */ + bool Hibernate(const std::string& ModuleId, std::string& OutReason); + + /** + * Wake a hibernated storage server instance for the given module ID. + * + * @param ModuleId The ID of the module to wake. + * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). + * @return true if the instance was woken, false otherwise. + */ + bool Wake(const std::string& ModuleId, std::string& OutReason); + + /** * Find info about storage server instance for the given module ID. * * @param ModuleId The ID of the module to find. @@ -128,6 +147,8 @@ private: std::unordered_map<std::string, size_t> m_InstanceLookup; std::unordered_set<std::string> m_DeprovisioningModules; std::unordered_set<std::string> m_ProvisioningModules; + std::unordered_set<std::string> m_HibernatingModules; + std::unordered_set<std::string> m_WakingModules; std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances; std::vector<size_t> m_FreeActiveInstanceIndexes; ResourceMetrics m_ResourceLimits; diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp index bab501429..7933cfa70 100644 --- a/src/zenserver/hub/storageserverinstance.cpp +++ b/src/zenserver/hub/storageserverinstance.cpp @@ -123,7 +123,7 @@ StorageServerInstance::DeprovisionLocked() m_State = HubInstanceState::Unprovisioned; } -void +bool StorageServerInstance::HibernateLocked() { // Signal server to shut down, but keep data around for later wake @@ -133,16 +133,13 @@ StorageServerInstance::HibernateLocked() ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not provisioned (state: '{}')", m_ModuleId, ToString(m_State.load())); - return; + return false; } if (!m_ServerInstance.IsRunning()) { ZEN_WARN("Attempted to hibernate storage server instance for module '{}' which is not running", m_ModuleId); - - // This is an unexpected state. Should consider the instance invalid? - - return; + return false; } m_State = HubInstanceState::Hibernating; @@ -150,12 +147,13 @@ StorageServerInstance::HibernateLocked() { m_ServerInstance.Shutdown(); m_State = HubInstanceState::Hibernated; - return; + return true; } catch (const std::exception& Ex) { ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what()); m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running + return false; } } @@ -166,9 +164,10 @@ StorageServerInstance::WakeLocked() if (m_State.load() != HubInstanceState::Hibernated) { - ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated", m_ModuleId); - - return true; // Instance is already usable (noop success) + ZEN_WARN("Attempted to wake storage server instance for module '{}' which is not hibernated (state: '{}')", + m_ModuleId, + ToString(m_State.load())); + return false; } ZEN_ASSERT_FORMAT(!m_ServerInstance.IsRunning(), "Storage server instance for module '{}' is already running", m_ModuleId); @@ -389,11 +388,11 @@ StorageServerInstance::ExclusiveLockedPtr::Deprovision() m_Instance->DeprovisionLocked(); } -void +bool StorageServerInstance::ExclusiveLockedPtr::Hibernate() { ZEN_ASSERT(m_Instance != nullptr); - m_Instance->HibernateLocked(); + return m_Instance->HibernateLocked(); } bool diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h index bf6fff8a0..ba15133bf 100644 --- a/src/zenserver/hub/storageserverinstance.h +++ b/src/zenserver/hub/storageserverinstance.h @@ -118,10 +118,10 @@ public: return m_Instance->m_ResourceMetrics; } - void Provision(); - void Deprovision(); - void Hibernate(); - bool Wake(); + void Provision(); + void Deprovision(); + [[nodiscard]] bool Hibernate(); + [[nodiscard]] bool Wake(); private: RwLock* m_Lock = nullptr; @@ -134,7 +134,7 @@ private: void ProvisionLocked(); void DeprovisionLocked(); - void HibernateLocked(); + [[nodiscard]] bool HibernateLocked(); [[nodiscard]] bool WakeLocked(); void UpdateMetricsLocked(); diff --git a/src/zenutil/include/zenutil/zenserverprocess.h b/src/zenutil/include/zenutil/zenserverprocess.h index 308ae0ef2..daa07a1e1 100644 --- a/src/zenutil/include/zenutil/zenserverprocess.h +++ b/src/zenutil/include/zenutil/zenserverprocess.h @@ -12,6 +12,8 @@ #include <filesystem> #include <functional> #include <optional> +#include <stdexcept> +#include <string> namespace zen { @@ -330,4 +332,28 @@ CbObject MakeLockFilePayload(const LockFileInfo& Info); LockFileInfo ReadLockFilePayload(const CbObject& Payload); bool ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason); +struct StartupZenServerOptions +{ + std::filesystem::path ProgramBaseDir; // empty = auto-resolve from running executable + uint16_t Port = 0; + bool OpenConsole = false; // open a console window for the server process + bool ShowLog = false; // emit captured server log to LogRef on successful start + std::string ExtraArgs; // e.g. GlobalOptions.PassthroughCommandLine + ZenServerInstance::ServerMode Mode = ZenServerInstance::ServerMode::kStorageServer; +}; + +// Returns std::nullopt if a matching server is already running (no action taken); logs instance info via LogRef. +// Returns 0 if the server was successfully started and is ready. +// Returns a non-zero exit code if startup failed; the captured server log is emitted via LogRef before returning. +std::optional<int> StartupZenServer(LoggerRef LogRef, const StartupZenServerOptions& Options); + +// Attempts graceful shutdown of a running server entry. +// First tries ZenServerInstance::SignalShutdown; falls back to +// ZenServerEntry::SignalShutdownRequest + polling. +// Returns true on successful shutdown, false if it timed out. +bool ShutdownZenServer(LoggerRef LogRef, + ZenServerState& State, + ZenServerState::ZenServerEntry* Entry, + const std::filesystem::path& ProgramBaseDir); + } // namespace zen diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index 8eaf2cf5b..a2ab4c291 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -15,6 +15,7 @@ #include <zencore/timer.h> #include <atomic> +#include <string> #include <gsl/gsl-lite.hpp> @@ -964,6 +965,7 @@ ZenServerInstance::Shutdown() ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid()); int ExitCode = m_Process.GetExitCode(); m_Process.Reset(); + m_ShutdownEvent.reset(); return ExitCode; } @@ -993,6 +995,7 @@ ZenServerInstance::Shutdown() ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid()); int ExitCode = m_Process.GetExitCode(); m_Process.Reset(); + m_ShutdownEvent.reset(); return ExitCode; } else if (Ec) @@ -1020,6 +1023,7 @@ ZenServerInstance::Shutdown() int ExitCode = m_Process.GetExitCode(); ZEN_DEBUG("zenserver process {} ({}) exited", m_Name, m_Process.Pid()); m_Process.Reset(); + m_ShutdownEvent.reset(); return ExitCode; } ZEN_DEBUG("Detached from zenserver process {} ({})", m_Name, m_Process.Pid()); @@ -1553,4 +1557,135 @@ ValidateLockFileInfo(const LockFileInfo& Info, std::string& OutReason) return true; } +std::optional<int> +StartupZenServer(LoggerRef LogRef, const StartupZenServerOptions& Options) +{ + auto Log = [&LogRef]() { return LogRef; }; + + // Check if a matching server is already running + { + ZenServerState State; + if (State.InitializeReadOnly()) + { + uint32_t RunningPid = 0; + uint16_t RunningEffectivePort = 0; + State.Snapshot([&, DesiredPort = Options.Port](const ZenServerState::ZenServerEntry& Entry) { + if (RunningPid == 0 && (DesiredPort == 0 || Entry.DesiredListenPort.load() == DesiredPort)) + { + RunningPid = Entry.Pid.load(); + RunningEffectivePort = Entry.EffectiveListenPort.load(); + } + }); + if (RunningPid != 0) + { + ZEN_INFO("Zen server already running at port {}, pid {}", RunningEffectivePort, RunningPid); + return std::nullopt; + } + } + } + + std::filesystem::path ProgramBaseDir = Options.ProgramBaseDir; + if (ProgramBaseDir.empty()) + { + ProgramBaseDir = GetRunningExecutablePath().parent_path(); + } + + ZenServerEnvironment ServerEnvironment; + ServerEnvironment.Initialize(ProgramBaseDir); + ZenServerInstance Server(ServerEnvironment, Options.Mode); + + std::string ServerArguments(Options.ExtraArgs); + if ((Options.Port != 0) && (ServerArguments.find("--port") == std::string::npos)) + { + ServerArguments.append(fmt::format(" --port {}", Options.Port)); + } + Server.SpawnServer(ServerArguments, Options.OpenConsole, /*WaitTimeoutMs*/ 0); + + constexpr int Timeout = 10000; + + if (!Server.WaitUntilReady(Timeout)) + { + ZEN_WARN("{}", Server.GetLogOutput()); + if (Server.IsRunning()) + { + ZEN_WARN("Zen server launch failed (timed out), terminating"); + Server.Terminate(); + return 1; + } + int ExitCode = Server.Shutdown(); + ZEN_WARN("Zen server failed to get to a ready state and exited with return code {}", ExitCode); + return ExitCode != 0 ? ExitCode : 1; + } + + if (Options.ShowLog) + { + ZEN_INFO("{}", Server.GetLogOutput()); + } + return 0; +} + +bool +ShutdownZenServer(LoggerRef LogRef, + ZenServerState& State, + ZenServerState::ZenServerEntry* Entry, + const std::filesystem::path& ProgramBaseDir) +{ + auto Log = [&LogRef]() { return LogRef; }; + int EntryPort = (int)Entry->DesiredListenPort.load(); + const uint32_t ServerProcessPid = Entry->Pid.load(); + try + { + ZenServerEnvironment ServerEnvironment; + ServerEnvironment.Initialize(ProgramBaseDir); + ZenServerInstance Server(ServerEnvironment); + Server.AttachToRunningServer(EntryPort); + + ZEN_INFO("attached to server on port {} (pid {}), requesting shutdown", EntryPort, ServerProcessPid); + + std::error_code Ec; + if (Server.SignalShutdown(Ec) && !Ec) + { + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 10000) + { + if (Server.WaitUntilExited(100, Ec) && !Ec) + { + ZEN_INFO("shutdown complete"); + return true; + } + else if (Ec) + { + ZEN_WARN("Waiting for server on port {} (pid {}) failed. Reason: '{}'", EntryPort, ServerProcessPid, Ec.message()); + } + } + } + else if (Ec) + { + ZEN_WARN("Requesting shutdown of server on port {} failed. Reason: '{}'", EntryPort, Ec.message()); + } + } + catch (const std::exception& Ex) + { + ZEN_DEBUG("Exception caught when requesting shutdown: {}", Ex.what()); + } + + ZEN_INFO("Requesting detached shutdown of server on port {}", EntryPort); + Entry->SignalShutdownRequest(); + + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < 10000) + { + State.Sweep(); + Entry = State.Lookup(EntryPort); + if (Entry == nullptr || Entry->Pid.load() != ServerProcessPid) + { + ZEN_INFO("Shutdown complete"); + return true; + } + Sleep(100); + } + + return false; +} + } // namespace zen |