aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-23 23:52:27 +0100
committerGitHub Enterprise <[email protected]>2026-03-23 23:52:27 +0100
commit658a5fea740d97033cd12aa37bd6ecd32b15a924 (patch)
tree517f69ebeb87fc2e995eb6d158acd3d17bc25cca /src
parentCross-platform process metrics support (#887) (diff)
downloadzen-658a5fea740d97033cd12aa37bd6ecd32b15a924.tar.xz
zen-658a5fea740d97033cd12aa37bd6ecd32b15a924.zip
refactor hub notifications (#888)
* refactor hub callbacks * improve http responses
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/hub/httphubservice.cpp28
-rw-r--r--src/zenserver/hub/hub.cpp353
-rw-r--r--src/zenserver/hub/hub.h55
-rw-r--r--src/zenserver/hub/storageserverinstance.cpp79
-rw-r--r--src/zenserver/hub/storageserverinstance.h22
-rw-r--r--src/zenserver/hub/zenhubserver.cpp49
-rw-r--r--src/zenserver/hub/zenhubserver.h8
7 files changed, 371 insertions, 223 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index a91e36128..34f4294e4 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -269,9 +269,6 @@ HttpHubService::HandleModuleGet(HttpServerRequest& Request, std::string_view Mod
return;
}
- // TODO: A separate http request for the modules/{moduleid}/deprovision endpoint can be called and deprovision the instance leaving us
- // with a dangling pointer...
-
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
Obj << "state" << ToString(InstanceInfo.State);
@@ -288,12 +285,29 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view
return;
}
- if (InstanceInfo.State == HubInstanceState::Provisioned)
+ if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated ||
+ InstanceInfo.State == HubInstanceState::Crashed)
{
- std::string Reason;
- if (!m_Hub.Deprovision(std::string(ModuleId), Reason))
+ std::string FailureReason;
+ try
+ {
+ if (!m_Hub.Deprovision(std::string(ModuleId), FailureReason))
+ {
+ if (FailureReason.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
+ }
+ return;
+ }
+ }
+ catch (const std::exception& Ex)
{
- Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Reason);
+ ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what());
+ Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Ex.what());
return;
}
}
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index e8487d7d9..6a2609443 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -122,14 +122,10 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- ProvisionModuleCallbackFunc&& ProvisionedModuleCallback,
- ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback)
+Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
-, m_ProvisionedModuleCallback(std::move(ProvisionedModuleCallback))
-, m_DeprovisionedModuleCallback(std::move(DeprovisionedModuleCallback))
+, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
{
m_HostMetrics = GetSystemMetrics();
m_ResourceLimits.DiskUsageBytes = 1000ull * 1024 * 1024 * 1024;
@@ -178,51 +174,59 @@ Hub::~Hub()
{
try
{
- ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+ // Safety call - should normally be properly Shutdown by owner
+ Shutdown();
+ }
+ catch (const std::exception& e)
+ {
+ ZEN_WARN("Exception during hub service shutdown: {}", e.what());
+ }
+}
- m_WatchDogEvent.Set();
- if (m_WatchDog.joinable())
- {
- m_WatchDog.join();
- }
+void
+Hub::Shutdown()
+{
+ ZEN_INFO("Hub service shutting down, deprovisioning any current instances");
+
+ m_WatchDogEvent.Set();
+ if (m_WatchDog.joinable())
+ {
+ m_WatchDog.join();
+ }
- m_WatchDog = {};
+ m_WatchDog = {};
- // WatchDog has been joined; no concurrent access is possible
- m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
+ m_Lock.WithExclusiveLock([this] {
+ for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
+ {
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
{
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- {
- StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true));
+ StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true));
- uint16_t BasePort = InstanceRaw->GetBasePort();
- std::string BaseUri; // TODO?
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
- if (m_DeprovisionedModuleCallback)
- {
- try
- {
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
- }
- }
- Instance.Deprovision();
+ try
+ {
+ (void)Instance.Deprovision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
}
- InstanceRaw.reset();
+ // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire.
+ NewState = HubInstanceState::Unprovisioned;
+ Instance = {};
}
- m_InstanceLookup.clear();
- m_ActiveInstances.clear();
- m_FreeActiveInstanceIndexes.clear();
- });
- }
- catch (const std::exception& e)
- {
- ZEN_WARN("Exception during hub service shutdown: {}", e.what());
- }
+ InstanceRaw.reset();
+ }
+ m_InstanceLookup.clear();
+ m_ActiveInstances.clear();
+ m_FreeActiveInstanceIndexes.clear();
+ });
}
bool
@@ -324,6 +328,12 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
ZEN_ASSERT(Instance);
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+
auto RemoveProvisioningModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_ProvisioningModules.erase(std::string(ModuleId));
@@ -341,14 +351,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
try
{
- Instance.Provision();
+ (void)Instance.Provision(); // false = already in target state (idempotent); not an error
+ NewState = Instance.GetState();
Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
- Instance = {};
- OutReason = Ex.what();
+ NewState = Instance.GetState();
+ Instance = {};
if (IsNewInstance)
{
@@ -370,31 +381,20 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
try
{
DestroyInstance.reset();
+ NewState = HubInstanceState::Unprovisioned;
}
- catch (const std::exception& Ex)
+ catch (const std::exception& DestroyEx)
{
- ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, Ex.what());
+ ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
}
}
- return false;
+ throw;
}
OutReason.clear();
OutInfo.Port = AllocatedPort;
// TODO: base URI? Would need to know what host name / IP to use
- if (m_ProvisionedModuleCallback)
- {
- try
- {
- m_ProvisionedModuleCallback(ModuleId, OutInfo);
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Provision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
- }
- }
-
return true;
}
@@ -447,20 +447,11 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
ZEN_ASSERT(RawInstance);
ZEN_ASSERT(Instance);
- uint16_t BasePort = RawInstance->GetBasePort();
- std::string BaseUri; // TODO?
-
- if (m_DeprovisionedModuleCallback)
- {
- try
- {
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = BasePort});
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Deprovision callback for module {} failed. Reason: '{}'", ModuleId, Ex.what());
- }
- }
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
// The module is deprovisioned outside the hub lock to avoid blocking other operations.
//
@@ -476,8 +467,19 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
}
});
- Instance.Deprovision();
-
+ try
+ {
+ (void)Instance.Deprovision();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to deprovision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
+ NewState = HubInstanceState::Unprovisioned;
+ Instance = {};
+ throw;
+ }
+ NewState = Instance.GetState();
Instance = {};
OutReason.clear();
@@ -513,6 +515,11 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
}
ZEN_ASSERT(Instance);
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
auto RemoveHibernatingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
@@ -527,19 +534,22 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
if (!Instance.Hibernate())
{
OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
return false;
}
+ NewState = Instance.GetState();
Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", ModuleId, Ex.what());
- Instance = {};
- OutReason = Ex.what();
- return false;
+ NewState = Instance.GetState();
+ Instance = {};
+ throw;
}
OutReason.clear();
+
return true;
}
@@ -573,6 +583,12 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
ZEN_ASSERT(Instance);
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+
auto RemoveWakingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_WakingModules.erase(ModuleId);
@@ -586,19 +602,22 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
if (!Instance.Wake())
{
OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
return false;
}
+ NewState = Instance.GetState();
Instance = {};
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", ModuleId, Ex.what());
- Instance = {};
- OutReason = Ex.what();
- return false;
+ NewState = Instance.GetState();
+ Instance = {};
+ throw;
}
OutReason.clear();
+
return true;
}
@@ -751,6 +770,12 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
ZEN_ASSERT(Instance);
+ uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+
auto RemoveRecoveringModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_RecoveringModules.erase(std::string(ModuleId));
@@ -762,30 +787,24 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
return;
}
- const uint16_t Port = RawInstance->GetBasePort();
- std::string BaseUri; // TODO?
-
if (Instance.RecoverFromCrash())
{
+ NewState = Instance.GetState();
Instance = {};
return;
}
// Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup.
- Instance.Deprovision();
- Instance = {};
-
- if (m_DeprovisionedModuleCallback)
+ try
{
- try
- {
- m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = Port});
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Deprovision callback for recovered module {} failed. Reason: '{}'", ModuleId, Ex.what());
- }
+ (void)Instance.Deprovision();
}
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
+ }
+ NewState = Instance.GetState();
+ Instance = {};
std::unique_ptr<StorageServerInstance> DestroyInstance;
{
@@ -798,7 +817,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
m_InstanceLookup.erase(It);
}
- m_FreePorts.push_back(Port);
+ m_FreePorts.push_back(BasePort);
m_RecoveringModules.erase(std::string(ModuleId));
}
RemoveRecoveringModule.Dismiss();
@@ -806,6 +825,7 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
try
{
DestroyInstance.reset();
+ NewState = HubInstanceState::Unprovisioned;
}
catch (const std::exception& Ex)
{
@@ -884,6 +904,29 @@ Hub::WatchDog()
}
}
+void
+Hub::OnStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState& NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri)
+{
+ if (m_ModuleStateChangeCallback && OldState != NewState)
+ {
+ try
+ {
+ m_ModuleStateChangeCallback(ModuleId,
+ HubProvisionedInstanceInfo{.BaseUri = std::string(BaseUri), .Port = BasePort},
+ OldState,
+ NewState);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Module state change callback for module '{}' failed. Reason: '{}'", ModuleId, Ex.what());
+ }
+ }
+}
+
#if ZEN_WITH_TESTS
TEST_SUITE_BEGIN("server.hub");
@@ -897,14 +940,44 @@ namespace hub_testutils {
return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir);
}
- std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
- Hub::Configuration Config = {},
- Hub::ProvisionModuleCallbackFunc ProvisionCallback = {},
- Hub::ProvisionModuleCallbackFunc DeprovisionCallback = {})
+ std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
+ Hub::Configuration Config = {},
+ Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {})
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(ProvisionCallback), std::move(DeprovisionCallback));
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
}
+ struct CallbackRecord
+ {
+ std::string ModuleId;
+ uint16_t Port;
+ };
+
+ struct StateChangeCapture
+ {
+ RwLock CallbackMutex;
+ std::vector<CallbackRecord> ProvisionCallbacks;
+ std::vector<CallbackRecord> DeprovisionCallbacks;
+
+ auto CaptureFunc()
+ {
+ return [this](std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState) {
+ ZEN_UNUSED(PreviousState);
+ if (NewState == HubInstanceState::Provisioned)
+ {
+ CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ }
+ else if (NewState == HubInstanceState::Unprovisioned)
+ {
+ CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
+ }
+ };
+ }
+ };
+
} // namespace hub_testutils
TEST_CASE("hub.provision_basic")
@@ -998,23 +1071,9 @@ TEST_CASE("hub.provision_callbacks")
{
ScopedTemporaryDirectory TempDir;
- struct CallbackRecord
- {
- std::string ModuleId;
- uint16_t Port;
- };
- RwLock CallbackMutex;
- std::vector<CallbackRecord> ProvisionRecords;
- std::vector<CallbackRecord> DeprovisionRecords;
-
- auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { ProvisionRecords.push_back({std::string(ModuleId), Info.Port}); });
- };
- auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { DeprovisionRecords.push_back({std::string(ModuleId), Info.Port}); });
- };
+ hub_testutils::StateChangeCapture CaptureInstance;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(ProvisionCb), std::move(DeprovisionCb));
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc());
HubProvisionedInstanceInfo Info;
std::string Reason;
@@ -1023,11 +1082,11 @@ TEST_CASE("hub.provision_callbacks")
REQUIRE_MESSAGE(ProvisionResult, Reason);
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionRecords.size(), 1u);
- CHECK_EQ(ProvisionRecords[0].ModuleId, "cb_module");
- CHECK_EQ(ProvisionRecords[0].Port, Info.Port);
- CHECK_NE(ProvisionRecords[0].Port, 0);
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module");
+ CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port);
+ CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0);
}
{
@@ -1044,11 +1103,11 @@ TEST_CASE("hub.provision_callbacks")
}
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(DeprovisionRecords.size(), 1u);
- CHECK_EQ(DeprovisionRecords[0].ModuleId, "cb_module");
- CHECK_NE(DeprovisionRecords[0].Port, 0);
- CHECK_EQ(ProvisionRecords.size(), 1u);
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module");
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port);
+ CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u);
}
}
@@ -1158,23 +1217,9 @@ TEST_CASE("hub.concurrent_callbacks")
Config.BasePortNumber = 22300;
Config.InstanceLimit = 10;
- struct CallbackRecord
- {
- std::string ModuleId;
- uint16_t Port;
- };
- RwLock CallbackMutex;
- std::vector<CallbackRecord> ProvisionCallbacks;
- std::vector<CallbackRecord> DeprovisionCallbacks;
-
- auto ProvisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { ProvisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
- };
- auto DeprovisionCb = [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) {
- CallbackMutex.WithExclusiveLock([&]() { DeprovisionCallbacks.push_back({std::string(ModuleId), Info.Port}); });
- };
+ hub_testutils::StateChangeCapture CaptureInstance;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(ProvisionCb), std::move(DeprovisionCb));
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, CaptureInstance.CaptureFunc());
constexpr int kHalf = 3;
@@ -1189,9 +1234,9 @@ TEST_CASE("hub.concurrent_callbacks")
CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
{
- RwLock::ExclusiveLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
- ProvisionCallbacks.clear();
+ RwLock::ExclusiveLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ CaptureInstance.ProvisionCallbacks.clear();
}
// Concurrently provision kHalf new modules while deprovisioning the pre-provisioned ones.
@@ -1248,17 +1293,17 @@ TEST_CASE("hub.concurrent_callbacks")
// Each new_* module must have triggered exactly one provision callback with a non-zero port.
// Each pre_* module must have triggered exactly one deprovision callback with a non-zero port.
{
- RwLock::SharedLockScope _(CallbackMutex);
- REQUIRE_EQ(ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
- REQUIRE_EQ(DeprovisionCallbacks.size(), static_cast<size_t>(kHalf));
+ RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
+ REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), static_cast<size_t>(kHalf));
+ REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), static_cast<size_t>(kHalf));
- for (const CallbackRecord& Record : ProvisionCallbacks)
+ for (const hub_testutils::CallbackRecord& Record : CaptureInstance.ProvisionCallbacks)
{
CHECK_NE(Record.Port, 0);
const bool IsNewModule = Record.ModuleId.rfind("new_", 0) == 0;
CHECK_MESSAGE(IsNewModule, Record.ModuleId);
}
- for (const CallbackRecord& Record : DeprovisionCallbacks)
+ for (const hub_testutils::CallbackRecord& Record : CaptureInstance.DeprovisionCallbacks)
{
CHECK_NE(Record.Port, 0);
const bool IsPreModule = Record.ModuleId.rfind("pre_", 0) == 0;
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index f880dab52..28e77e729 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -53,12 +53,13 @@ public:
std::string HydrationTargetSpecification;
};
- typedef std::function<void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)> ProvisionModuleCallbackFunc;
+ typedef std::function<
+ void(std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState)>
+ AsyncModuleStateChangeCallbackFunc;
- Hub(const Configuration& Config,
- ZenServerEnvironment&& RunEnvironment,
- ProvisionModuleCallbackFunc&& ProvisionedModuleCallback = {},
- ProvisionModuleCallbackFunc&& DeprovisionedModuleCallback = {});
+ Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {});
~Hub();
Hub(const Hub&) = delete;
@@ -73,6 +74,11 @@ public:
};
/**
+ * Deprovision all running instances
+ */
+ void Shutdown();
+
+ /**
* Provision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to provision.
@@ -139,8 +145,7 @@ private:
const Configuration m_Config;
ZenServerEnvironment m_RunEnvironment;
- ProvisionModuleCallbackFunc m_ProvisionedModuleCallback;
- ProvisionModuleCallbackFunc m_DeprovisionedModuleCallback;
+ AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
std::string m_HydrationTargetSpecification;
std::filesystem::path m_HydrationTempPath;
@@ -170,6 +175,42 @@ private:
void UpdateStats();
void UpdateCapacityMetrics();
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+
+ class InstanceStateUpdateGuard
+ {
+ public:
+ InstanceStateUpdateGuard(Hub& InHub,
+ std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState& NewState,
+ uint16_t BasePort,
+ const std::string& BaseUri)
+ : m_Hub(InHub)
+ , m_ModuleId(ModuleId)
+ , m_OldState(OldState)
+ , m_NewState(NewState)
+ , m_BasePort(BasePort)
+ , m_BaseUri(BaseUri)
+ {
+ }
+ ~InstanceStateUpdateGuard() { m_Hub.OnStateUpdate(m_ModuleId, m_OldState, m_NewState, m_BasePort, m_BaseUri); }
+
+ private:
+ Hub& m_Hub;
+ const std::string m_ModuleId;
+ HubInstanceState m_OldState;
+ HubInstanceState& m_NewState;
+ uint16_t m_BasePort;
+ const std::string m_BaseUri;
+ };
+
+ void OnStateUpdate(std::string_view ModuleId,
+ HubInstanceState OldState,
+ HubInstanceState& NewState,
+ uint16_t BasePort,
+ std::string_view BaseUri);
+
+ friend class InstanceStateUpdateGuard;
};
#if ZEN_WITH_TESTS
diff --git a/src/zenserver/hub/storageserverinstance.cpp b/src/zenserver/hub/storageserverinstance.cpp
index 0a9efcc44..99f0c29f3 100644
--- a/src/zenserver/hub/storageserverinstance.cpp
+++ b/src/zenserver/hub/storageserverinstance.cpp
@@ -69,13 +69,13 @@ StorageServerInstance::GetProcessMetrics(ProcessMetrics& OutMetrics) const
OutMetrics.PeakPagefileUsage = m_PeakPagefileUsage.load();
}
-void
+bool
StorageServerInstance::ProvisionLocked()
{
if (m_State.load() == HubInstanceState::Provisioned)
{
ZEN_WARN("Storage server instance for module '{}' is already provisioned", m_ModuleId);
- return;
+ return false;
}
if (m_State.load() == HubInstanceState::Crashed)
@@ -86,39 +86,45 @@ StorageServerInstance::ProvisionLocked()
if (m_State.load() == HubInstanceState::Hibernated)
{
- if (WakeLocked())
- {
- return;
- }
- // Wake failed; proceed with a fresh provision (discards hibernated data)
- m_State = HubInstanceState::Unprovisioned;
+ return WakeLocked();
}
- else if (m_State.load() != HubInstanceState::Unprovisioned)
+
+ if (m_State.load() != HubInstanceState::Unprovisioned)
{
ZEN_WARN("Storage server instance for module '{}' is in unexpected state '{}', cannot provision",
m_ModuleId,
ToString(m_State.load()));
- return;
+ return false;
}
ZEN_INFO("Provisioning storage server instance for module '{}', at '{}'", m_ModuleId, m_BaseDir);
m_State = HubInstanceState::Provisioning;
- Hydrate();
- SpawnServerProcess();
- m_State = HubInstanceState::Provisioned;
+ try
+ {
+ Hydrate();
+ SpawnServerProcess();
+ m_State = HubInstanceState::Provisioned;
+ return true;
+ }
+ catch (...)
+ {
+ m_State = HubInstanceState::Unprovisioned;
+ throw;
+ }
}
-void
+bool
StorageServerInstance::DeprovisionLocked()
{
const HubInstanceState CurrentState = m_State.load();
- if (CurrentState != HubInstanceState::Provisioned && CurrentState != HubInstanceState::Crashed)
+ if (CurrentState != HubInstanceState::Provisioned && CurrentState != HubInstanceState::Crashed &&
+ CurrentState != HubInstanceState::Hibernated)
{
ZEN_WARN("Attempted to deprovision storage server instance for module '{}' which is not provisioned (state: '{}')",
m_ModuleId,
ToString(CurrentState));
- return;
+ return false;
}
ZEN_INFO("Deprovisioning storage server instance for module '{}'", m_ModuleId);
@@ -126,13 +132,30 @@ StorageServerInstance::DeprovisionLocked()
m_State = HubInstanceState::Deprovisioning;
if (CurrentState == HubInstanceState::Provisioned)
{
- m_ServerInstance.Shutdown(); // Graceful; process may still be running
+ try
+ {
+ m_ServerInstance.Shutdown();
+ }
+ catch (...)
+ {
+ m_State = HubInstanceState::Provisioned; // Shutdown failed; process may still be running
+ throw;
+ }
}
- // Crashed: process already dead; skip Shutdown
+ // Crashed or Hibernated: process already dead; skip Shutdown
- Dehydrate();
+ try
+ {
+ Dehydrate();
+ }
+ catch (...)
+ {
+ m_State = HubInstanceState::Crashed; // Dehydrate failed; process is already dead
+ throw;
+ }
m_State = HubInstanceState::Unprovisioned;
+ return true;
}
bool
@@ -161,11 +184,10 @@ StorageServerInstance::HibernateLocked()
m_State = HubInstanceState::Hibernated;
return true;
}
- catch (const std::exception& Ex)
+ catch (...)
{
- ZEN_ERROR("Failed to hibernate storage server instance for module '{}': {}", m_ModuleId, Ex.what());
m_State = HubInstanceState::Provisioned; // Shutdown failed; instance is still running
- return false;
+ throw;
}
}
@@ -191,11 +213,10 @@ StorageServerInstance::WakeLocked()
m_State = HubInstanceState::Provisioned;
return true;
}
- catch (const std::exception& Ex)
+ catch (...)
{
- ZEN_ERROR("Failed to wake storage server instance for module '{}': {}", m_ModuleId, Ex.what());
m_State = HubInstanceState::Hibernated;
- return false;
+ throw;
}
}
@@ -418,18 +439,18 @@ StorageServerInstance::ExclusiveLockedPtr::IsRunning() const
return m_Instance->m_State.load() == HubInstanceState::Provisioned && m_Instance->m_ServerInstance.IsRunning();
}
-void
+bool
StorageServerInstance::ExclusiveLockedPtr::Provision()
{
ZEN_ASSERT(m_Instance != nullptr);
- m_Instance->ProvisionLocked();
+ return m_Instance->ProvisionLocked();
}
-void
+bool
StorageServerInstance::ExclusiveLockedPtr::Deprovision()
{
ZEN_ASSERT(m_Instance != nullptr);
- m_Instance->DeprovisionLocked();
+ return m_Instance->DeprovisionLocked();
}
bool
diff --git a/src/zenserver/hub/storageserverinstance.h b/src/zenserver/hub/storageserverinstance.h
index 9963f640e..a0ca496dc 100644
--- a/src/zenserver/hub/storageserverinstance.h
+++ b/src/zenserver/hub/storageserverinstance.h
@@ -68,6 +68,11 @@ public:
ZEN_ASSERT(m_Instance);
return m_Instance->m_State.load();
}
+ uint16_t GetBasePort() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->GetBasePort();
+ }
bool IsRunning() const;
const ResourceMetrics& GetResourceMetrics() const
@@ -114,6 +119,11 @@ public:
ZEN_ASSERT(m_Instance);
return m_Instance->m_State.load();
}
+ uint16_t GetBasePort() const
+ {
+ ZEN_ASSERT(m_Instance);
+ return m_Instance->GetBasePort();
+ }
bool IsRunning() const;
const ResourceMetrics& GetResourceMetrics() const
@@ -122,8 +132,12 @@ public:
return m_Instance->m_ResourceMetrics;
}
- void Provision();
- void Deprovision();
+ // For Provision, Deprovision, Hibernate, Wake:
+ // true = operation performed (state changed)
+ // false = precondition not met (wrong state), nothing attempted
+ // throws = operation attempted but failed; m_State corrected before throw
+ [[nodiscard]] bool Provision();
+ [[nodiscard]] bool Deprovision();
[[nodiscard]] bool Hibernate();
[[nodiscard]] bool Wake();
[[nodiscard]] bool RecoverFromCrash(); // true = recovered; false = spawn failed (Crashed), caller must Deprovision() + cleanup
@@ -136,8 +150,8 @@ public:
[[nodiscard]] ExclusiveLockedPtr LockExclusive(bool Wait) { return ExclusiveLockedPtr(m_Lock, this, Wait); }
private:
- void ProvisionLocked();
- void DeprovisionLocked();
+ [[nodiscard]] bool ProvisionLocked();
+ [[nodiscard]] bool DeprovisionLocked();
[[nodiscard]] bool HibernateLocked();
[[nodiscard]] bool WakeLocked();
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 45a3211b2..f9ff655ec 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -169,14 +169,22 @@ ZenHubServer::~ZenHubServer()
}
void
-ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState)
{
- if (m_ConsulClient)
+ ZEN_UNUSED(PreviousState);
+ if (!m_ConsulClient)
+ {
+ return;
+ }
+ if (NewState == HubInstanceState::Provisioned)
{
consul::ServiceRegistrationInfo ServiceInfo{
- .ServiceId = std::string(ModuleId),
- .ServiceName = "zen-storage",
- // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
.Port = Info.Port,
.HealthEndpoint = "health",
.Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
@@ -197,13 +205,7 @@ ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view Mod
ServiceInfo.ServiceName);
}
}
-}
-
-void
-ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
-{
- ZEN_UNUSED(HubInstanceId);
- if (m_ConsulClient)
+ else if (NewState == HubInstanceState::Unprovisioned)
{
if (!m_ConsulClient->DeregisterService(ModuleId))
{
@@ -216,6 +218,8 @@ ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view M
ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
}
}
+ // Transitional states (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed)
+ // and Hibernated are intentionally ignored.
}
int
@@ -268,6 +272,11 @@ ZenHubServer::Cleanup()
m_Http->Close();
}
+ if (m_Hub)
+ {
+ m_Hub->Shutdown();
+ }
+
m_FrontendService.reset();
m_HubService.reset();
m_ApiService.reset();
@@ -306,14 +315,14 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
ServerConfig.HubInstanceHttpClass),
- m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
- std::string_view ModuleId,
- const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); }
- : Hub::ProvisionModuleCallbackFunc{},
- m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
- std::string_view ModuleId,
- const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); }
- : Hub::ProvisionModuleCallbackFunc{});
+ m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState) {
+ OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState);
+ }}
+ : Hub::AsyncModuleStateChangeCallbackFunc{});
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index d44a13d86..0fb192b9f 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -2,6 +2,7 @@
#pragma once
+#include "hubinstancestate.h"
#include "zenserver.h"
#include <zenutil/consul.h>
@@ -89,8 +90,11 @@ public:
void SetContentRoot(std::filesystem::path Root) { m_ContentRoot = Root; }
private:
- void OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info);
- void OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info);
+ void OnModuleStateChanged(std::string_view HubInstanceId,
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState);
std::filesystem::path m_DataRoot;
std::filesystem::path m_ContentRoot;