aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/hub.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-21 23:13:34 +0100
committerGitHub Enterprise <[email protected]>2026-03-21 23:13:34 +0100
commite3388acaca0ce6f1a2d4cb17e535497f2689118a (patch)
tree817948a42b57ebd07f31d8317065c2667eddb699 /src/zenserver/hub/hub.cpp
parentInterprocess pipe support (for stdout/stderr capture) (#866) (diff)
downloadzen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.tar.xz
zen-e3388acaca0ce6f1a2d4cb17e535497f2689118a.zip
zen hub command (#877)
- Feature: Added `zen hub` command for managing a hub server and its provisioned module instances: - `zen hub up` - Start a hub server (equivalent to `zen up` in hub mode) - `zen hub down` - Shut down a hub server - `zen hub provision <moduleid>` - Provision a storage server instance for a module - `zen hub deprovision <moduleid>` - Deprovision a storage server instance - `zen hub hibernate <moduleid>` - Hibernate a provisioned instance (shut down, data preserved) - `zen hub wake <moduleid>` - Wake a hibernated instance - `zen hub status [moduleid]` - Show state of all instances or a specific module - Feature: Added new hub HTTP endpoints for instance lifecycle management: - `POST /hub/modules/{moduleid}/hibernate` - Hibernate the instance for the given module - `POST /hub/modules/{moduleid}/wake` - Wake a hibernated instance for the given module - Improvement: `zen up` refactored to use shared `StartupZenServer`/`ShutdownZenServer` helpers (also used by `zen hub up`/`zen hub down`) - Bugfix: Fixed shutdown event not being cleared after the server process exits in `ZenServerInstance::Shutdown()`, which could cause stale state on reuse
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
-rw-r--r--src/zenserver/hub/hub.cpp372
1 files changed, 244 insertions, 128 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index b0208db1f..54f45e511 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -340,7 +340,8 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
- Instance = {};
+ Instance = {};
+ OutReason = Ex.what();
if (IsNewInstance)
{
@@ -371,6 +372,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
return false;
}
+ OutReason.clear();
OutInfo.Port = AllocatedPort;
// TODO: base URI? Would need to know what host name / IP to use
@@ -398,7 +400,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_ProvisioningModules.find(ModuleId); It != m_ProvisioningModules.end())
+ if (m_ProvisioningModules.contains(ModuleId))
{
OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
@@ -411,7 +413,7 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
{
ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
- // Not found, OutReason left empty
+ OutReason.clear(); // empty = not found (-> 404)
return false;
}
else
@@ -463,7 +465,126 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
Instance.Deprovision();
Instance = {};
+ OutReason.clear();
+
+ return true;
+}
+
+bool
+Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
+{
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
+ m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId))
+ {
+ OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
+ return false;
+ }
+
+ auto It = m_InstanceLookup.find(ModuleId);
+ if (It == m_InstanceLookup.end())
+ {
+ OutReason.clear(); // empty = not found (-> 404)
+ return false;
+ }
+
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
+ m_HibernatingModules.emplace(ModuleId);
+ }
+
+ ZEN_ASSERT(Instance);
+
+ auto RemoveHibernatingModule = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_HibernatingModules.erase(ModuleId);
+ });
+
+ // NOTE: done while not holding the hub lock, as hibernation may take time.
+ // m_HibernatingModules tracks which modules are being hibernated, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ try
+ {
+ if (!Instance.Hibernate())
+ {
+ OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()));
+ return false;
+ }
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what());
+ Instance = {};
+ OutReason = Ex.what();
+ return false;
+ }
+ OutReason.clear();
+ return true;
+}
+
+bool
+Hub::Wake(const std::string& ModuleId, std::string& OutReason)
+{
+ StorageServerInstance::ExclusiveLockedPtr Instance;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+
+ if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
+ m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId))
+ {
+ OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
+ return false;
+ }
+
+ auto It = m_InstanceLookup.find(ModuleId);
+ if (It == m_InstanceLookup.end())
+ {
+ OutReason.clear(); // empty = not found (-> 404)
+ return false;
+ }
+
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
+ m_WakingModules.emplace(ModuleId);
+ }
+
+ ZEN_ASSERT(Instance);
+
+ auto RemoveWakingModule = MakeGuard([&] {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_WakingModules.erase(ModuleId);
+ });
+
+ // NOTE: done while not holding the hub lock, as waking may take time.
+ // m_WakingModules tracks which modules are being woken, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+ try
+ {
+ if (!Instance.Wake())
+ {
+ OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()));
+ return false;
+ }
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what());
+ Instance = {};
+ OutReason = Ex.what();
+ return false;
+ }
+
+ OutReason.clear();
return true;
}
@@ -547,14 +668,14 @@ Hub::UpdateStats()
bool
Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
{
- if (m_DeprovisioningModules.find(std::string(ModuleId)) != m_DeprovisioningModules.end())
+ if (m_DeprovisioningModules.contains(std::string(ModuleId)))
{
OutReason = fmt::format("module '{}' is currently being deprovisioned", ModuleId);
return false;
}
- if (m_ProvisioningModules.find(std::string(ModuleId)) != m_ProvisioningModules.end())
+ if (m_ProvisioningModules.contains(std::string(ModuleId)))
{
OutReason = fmt::format("module '{}' is currently being provisioned", ModuleId);
@@ -632,10 +753,10 @@ Hub::WatchDog()
}
else if (LockedInstance.GetState() == HubInstanceState::Provisioned)
{
- // Process is not running but state says it should be — instance died unexpectedly.
+ // Process is not running but state says it should be - instance died unexpectedly.
// TODO: Track and attempt recovery.
}
- // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) — expected, skip.
+ // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) - expected, skip.
LockedInstance = {};
}
}
@@ -652,6 +773,8 @@ Hub::WatchDog()
TEST_SUITE_BEGIN("server.hub");
+static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)};
+
namespace hub_testutils {
ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir)
@@ -681,16 +804,28 @@ TEST_CASE("hub.provision_basic")
std::string Reason;
const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
REQUIRE_MESSAGE(ProvisionResult, Reason);
+ CHECK(Reason.empty());
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
Hub::InstanceInfo InstanceInfo;
REQUIRE(HubInstance->Find("module_a", &InstanceInfo));
CHECK_EQ(InstanceInfo.State, HubInstanceState::Provisioned);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
CHECK(DeprovisionResult);
+ CHECK(Reason.empty());
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
}
TEST_CASE("hub.provision_config")
@@ -728,10 +863,20 @@ TEST_CASE("hub.provision_config")
CHECK(TestResponse.IsSuccess());
CHECK(TestResponse.AsObject()["ok"].AsBool());
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
CHECK(DeprovisionResult);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
}
TEST_CASE("hub.provision_callbacks")
@@ -770,10 +915,20 @@ TEST_CASE("hub.provision_callbacks")
CHECK_NE(ProvisionRecords[0].Port, 0);
}
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason);
CHECK(DeprovisionResult);
{
+ HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ {
RwLock::SharedLockScope _(CallbackMutex);
REQUIRE_EQ(DeprovisionRecords.size(), 1u);
CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module");
@@ -816,18 +971,6 @@ TEST_CASE("hub.instance_limit")
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
-TEST_CASE("hub.deprovision_nonexistent")
-{
- ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
-
- std::string Reason;
- const bool Result = HubInstance->Deprovision("never_provisioned", Reason);
- CHECK_FALSE(Result);
- CHECK(Reason.empty());
- CHECK_EQ(HubInstance->GetInstanceCount(), 0);
-}
-
TEST_CASE("hub.enumerate_modules")
{
ScopedTemporaryDirectory TempDir;
@@ -840,11 +983,16 @@ TEST_CASE("hub.enumerate_modules")
REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
std::vector<std::string> Ids;
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) {
+ int ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
Ids.push_back(std::string(ModuleId));
- CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
});
CHECK_EQ(Ids.size(), 2u);
+ CHECK_EQ(ProvisionedCount, 2);
const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end();
const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end();
CHECK(FoundA);
@@ -852,12 +1000,17 @@ TEST_CASE("hub.enumerate_modules")
HubInstance->Deprovision("enum_a", Reason);
Ids.clear();
- HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& Info) {
+ ProvisionedCount = 0;
+ HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
Ids.push_back(std::string(ModuleId));
- CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ if (InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ProvisionedCount++;
+ }
});
REQUIRE_EQ(Ids.size(), 1u);
CHECK_EQ(Ids[0], "enum_b");
+ CHECK_EQ(ProvisionedCount, 1);
}
TEST_CASE("hub.max_instance_count")
@@ -883,84 +1036,6 @@ TEST_CASE("hub.max_instance_count")
CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
}
-TEST_CASE("hub.concurrent")
-{
- ScopedTemporaryDirectory TempDir;
- Hub::Configuration Config;
- Config.BasePortNumber = 22000;
- Config.InstanceLimit = 10;
-
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- constexpr int kHalf = 3;
-
- // Serially pre-provision kHalf modules
- for (int I = 0; I < kHalf; ++I)
- {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
- }
- CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
-
- // Simultaneously:
- // Provisioner pool → provisions kHalf new modules ("new_0" .. "new_N")
- // Deprovisioner pool → deprovisions the kHalf pre-provisioned modules ("pre_0" .. "pre_N")
- // The two pools use distinct OS threads, so provisions and deprovisions are interleaved.
-
- // Use int rather than bool to avoid std::vector<bool> bitfield packing,
- // which would cause data races on concurrent per-index writes.
- std::vector<int> ProvisionResults(kHalf, 0);
- std::vector<std::string> ProvisionReasons(kHalf);
- std::vector<int> DeprovisionResults(kHalf, 0);
-
- {
- WorkerThreadPool Provisioners(kHalf, "hub_test_provisioners");
- WorkerThreadPool Deprovisioneers(kHalf, "hub_test_deprovisioneers");
-
- std::vector<std::future<void>> ProvisionFutures(kHalf);
- std::vector<std::future<void>> DeprovisionFutures(kHalf);
-
- for (int I = 0; I < kHalf; ++I)
- {
- ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool Result =
- HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
- ProvisionResults[I] = Result ? 1 : 0;
- ProvisionReasons[I] = Reason;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
-
- DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
- std::string Reason;
- const bool Result =
- HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
- DeprovisionResults[I] = Result ? 1 : 0;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
- }
-
- for (std::future<void>& F : ProvisionFutures)
- {
- F.get();
- }
- for (std::future<void>& F : DeprovisionFutures)
- {
- F.get();
- }
- }
-
- for (int I = 0; I < kHalf; ++I)
- {
- CHECK_MESSAGE(ProvisionResults[I] != 0, ProvisionReasons[I]);
- CHECK(DeprovisionResults[I] != 0);
- }
- // Only the newly provisioned modules should remain
- CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
-}
-
TEST_CASE("hub.concurrent_callbacks")
{
ScopedTemporaryDirectory TempDir;
@@ -1122,57 +1197,98 @@ TEST_CASE("hub.job_object")
}
# endif // ZEN_PLATFORM_WINDOWS
-TEST_CASE("hub.instance_state_basic")
+TEST_CASE("hub.hibernate_wake")
{
ScopedTemporaryDirectory TempDir;
Hub::Configuration Config;
- Config.BasePortNumber = 22400;
+ Config.BasePortNumber = 22600;
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("state_a", ProvInfo, Reason), Reason);
+ // Provision
+ REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason);
+ CHECK(Reason.empty());
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Hibernate
+ const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason);
+ REQUIRE_MESSAGE(HibernateResult, Reason);
+ CHECK(Reason.empty());
+ REQUIRE(HubInstance->Find("hib_a", &Info));
+ CHECK_EQ(Info.State, HubInstanceState::Hibernated);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
- REQUIRE(HubInstance->Find("state_a", &Info));
+ // Wake
+ const bool WakeResult = HubInstance->Wake("hib_a", Reason);
+ REQUIRE_MESSAGE(WakeResult, Reason);
+ CHECK(Reason.empty());
+ REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
- HubInstance->Deprovision("state_a", Reason);
- CHECK_FALSE(HubInstance->Find("state_a"));
+ // Deprovision
+ const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason);
+ CHECK(DeprovisionResult);
+ CHECK(Reason.empty());
+ CHECK_FALSE(HubInstance->Find("hib_a"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
}
-TEST_CASE("hub.instance_state_enumerate")
+TEST_CASE("hub.hibernate_wake_errors")
{
ScopedTemporaryDirectory TempDir;
Hub::Configuration Config;
- Config.BasePortNumber = 22500;
+ Config.BasePortNumber = 22700;
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo ProvInfo;
std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("estate_a", ProvInfo, Reason), Reason);
- REQUIRE_MESSAGE(HubInstance->Provision("estate_b", ProvInfo, Reason), Reason);
- int ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) {
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- CHECK_EQ(ProvisionedCount, 2);
+ // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404)
+ CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason));
+ CHECK(Reason.empty());
- HubInstance->Deprovision("estate_a", Reason);
+ CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason));
+ CHECK(Reason.empty());
- ProvisionedCount = 0;
- HubInstance->EnumerateModules([&](std::string_view, const Hub::InstanceInfo& InstanceInfo) {
- if (InstanceInfo.State == HubInstanceState::Provisioned)
- {
- ProvisionedCount++;
- }
- });
- CHECK_EQ(ProvisionedCount, 1);
+ // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400)
+ REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason);
+ CHECK(Reason.empty());
+ REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason);
+ CHECK(Reason.empty());
+
+ Reason.clear();
+ CHECK_FALSE(HubInstance->Hibernate("err_b", Reason));
+ CHECK_FALSE(Reason.empty());
+
+ // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400)
+ REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason);
+ CHECK(Reason.empty());
+
+ Reason.clear();
+ CHECK_FALSE(HubInstance->Wake("err_b", Reason));
+ CHECK_FALSE(Reason.empty());
+
+ // Deprovision not-found - should return false with empty reason (-> 404)
+ CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason));
+ CHECK(Reason.empty());
}
TEST_SUITE_END();