diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-23 14:09:46 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-23 14:09:46 +0100 |
| commit | d336e5937019dbab1924419624faa6ffc776cd7f (patch) | |
| tree | cee312dc86f8404bbd4f987cbcf2f4d96124fc19 /src/zenserver/hub/hub.cpp | |
| parent | Unique session/client tracking using HyperLogLog (#884) (diff) | |
| download | zen-d336e5937019dbab1924419624faa6ffc776cd7f.tar.xz zen-d336e5937019dbab1924419624faa6ffc776cd7f.zip | |
add hub instance crash recovery (#885)
* add hub instance crash recovery
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 268 |
1 files changed, 237 insertions, 31 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index ebbb9432a..e8487d7d9 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -307,6 +307,13 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(m_ActiveInstances.size() > ActiveInstanceIndex); + if (m_RecoveringModules.contains(std::string(ModuleId))) + { + OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); + ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); + return false; + } + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; Instance = InstanceRaw->LockExclusive(/*Wait*/ true); AllocatedPort = InstanceRaw->GetBasePort(); @@ -409,6 +416,13 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) return false; } + if (m_RecoveringModules.contains(ModuleId)) + { + OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); + ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); + return false; + } + if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); @@ -479,7 +493,7 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) RwLock::ExclusiveLockScope _(m_Lock); if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) { OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); return false; @@ -538,7 +552,7 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) RwLock::ExclusiveLockScope _(m_Lock); if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId)) + m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) { OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); return false; @@ -707,60 +721,159 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) } void +Hub::AttemptRecoverInstance(std::string_view ModuleId) +{ + StorageServerInstance::ExclusiveLockedPtr Instance; + StorageServerInstance* RawInstance = nullptr; + + { + RwLock::ExclusiveLockScope _(m_Lock); + + if (m_RecoveringModules.contains(std::string(ModuleId)) || m_ProvisioningModules.contains(std::string(ModuleId)) || + m_DeprovisioningModules.contains(std::string(ModuleId)) || m_HibernatingModules.contains(std::string(ModuleId)) || + m_WakingModules.contains(std::string(ModuleId))) + { + return; + } + + auto It = m_InstanceLookup.find(std::string(ModuleId)); + if (It == m_InstanceLookup.end()) + { + return; + } + + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + RawInstance = m_ActiveInstances[ActiveInstanceIndex].get(); + Instance = RawInstance->LockExclusive(/*Wait*/ true); + m_RecoveringModules.emplace(std::string(ModuleId)); + } + + ZEN_ASSERT(Instance); + + auto RemoveRecoveringModule = MakeGuard([&] { + RwLock::ExclusiveLockScope _(m_Lock); + m_RecoveringModules.erase(std::string(ModuleId)); + }); + + // Re-validate: state may have changed between releasing shared lock and acquiring exclusive lock + if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) + { + return; + } + + const uint16_t Port = RawInstance->GetBasePort(); + std::string BaseUri; // TODO? + + if (Instance.RecoverFromCrash()) + { + Instance = {}; + return; + } + + // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup. + Instance.Deprovision(); + Instance = {}; + + if (m_DeprovisionedModuleCallback) + { + try + { + m_DeprovisionedModuleCallback(ModuleId, HubProvisionedInstanceInfo{.BaseUri = BaseUri, .Port = Port}); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Deprovision callback for recovered module {} failed. Reason: '{}'", ModuleId, Ex.what()); + } + } + + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + { + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + } + m_FreePorts.push_back(Port); + m_RecoveringModules.erase(std::string(ModuleId)); + } + RemoveRecoveringModule.Dismiss(); + + try + { + DestroyInstance.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); + } +} + +void Hub::WatchDog() { constexpr uint64_t WatchDogWakeupTimeMs = 5000; constexpr uint64_t WatchDogProcessingTimeMs = 500; - size_t CheckInstanceIndex = 0; + size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs)) { try { - size_t MaxCheckCount = m_Lock.WithSharedLock([this]() { return m_InstanceLookup.size(); }); + // Snapshot slot count. We iterate all slots (including freed nulls) so + // round-robin coverage is not skewed by deprovisioned entries. + size_t SlotsRemaining = m_Lock.WithSharedLock([this]() { return m_ActiveInstances.size(); }); Stopwatch Timer; - while (MaxCheckCount-- > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !m_WatchDogEvent.Wait(5)) + bool ShuttingDown = false; + while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown) { StorageServerInstance::SharedLockedPtr LockedInstance; - m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance]() { - if (m_InstanceLookup.empty()) - { - return; - } - - size_t MaxLoopCount = m_ActiveInstances.size(); - StorageServerInstance* Instance = nullptr; - while (MaxLoopCount-- > 0 && !Instance) + m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() { + // Advance through null (freed) slots under a single lock acquisition. + while (SlotsRemaining > 0) { + SlotsRemaining--; CheckInstanceIndex++; if (CheckInstanceIndex >= m_ActiveInstances.size()) { CheckInstanceIndex = 0; } - Instance = (CheckInstanceIndex < m_ActiveInstances.size()) ? m_ActiveInstances[CheckInstanceIndex].get() : nullptr; - } - - if (Instance) - { - LockedInstance = Instance->LockShared(/*Wait*/ false); + StorageServerInstance* Instance = m_ActiveInstances[CheckInstanceIndex].get(); + if (Instance) + { + LockedInstance = Instance->LockShared(/*Wait*/ false); + break; // Found a live slot (locked or busy); stop scanning this batch. + } } }); - if (LockedInstance) + if (!LockedInstance) { - if (LockedInstance.IsRunning()) - { - LockedInstance.UpdateMetrics(); - } - else if (LockedInstance.GetState() == HubInstanceState::Provisioned) - { - // Process is not running but state says it should be - instance died unexpectedly. - // TODO: Track and attempt recovery. - } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking) - expected, skip. + // Either all remaining slots were null, or the live slot's lock was busy -- move on. + continue; + } + + if (LockedInstance.IsRunning()) + { + LockedInstance.UpdateMetrics(); + } + else if (LockedInstance.GetState() == HubInstanceState::Provisioned) + { + // Process is not running but state says it should be - instance died unexpectedly. + const std::string ModuleId(LockedInstance.GetModuleId()); LockedInstance = {}; + AttemptRecoverInstance(ModuleId); } + // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Crashed) - expected, skip. + LockedInstance = {}; + + // Rate-limit: pause briefly between live-instance checks and respond to shutdown. + ShuttingDown = m_WatchDogEvent.Wait(5); } } catch (const std::exception& Ex) @@ -1293,9 +1406,102 @@ TEST_CASE("hub.hibernate_wake_errors") CHECK(Reason.empty()); } +TEST_CASE("hub.recover_process_crash") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + + // Kill the child process to simulate a crash, then poll until the watchdog detects it, + // recovers the instance, and the new process is serving requests. + HubInstance->TerminateModuleForTesting("module_a"); + + constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); + constexpr auto kTimeoutMs = std::chrono::seconds(20); + const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; + + // A successful HTTP health check on the same port confirms the new process is up. + HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + bool Recovered = false; + while (std::chrono::steady_clock::now() < Deadline) + { + std::this_thread::sleep_for(kPollIntervalMs); + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && + ModClient.Get("/health/")) + { + // Recovery must reuse the same port - the instance was never removed from the hub's + // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. + CHECK_EQ(InstanceInfo.Port, Info.Port); + Recovered = true; + break; + } + } + CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); +} + +TEST_CASE("hub.recover_process_crash_then_deprovision") +{ + ScopedTemporaryDirectory TempDir; + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + HubProvisionedInstanceInfo Info; + std::string Reason; + REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + + // Kill the child process, wait for the watchdog to detect and recover the instance. + HubInstance->TerminateModuleForTesting("module_a"); + + constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); + constexpr auto kTimeoutMs = std::chrono::seconds(20); + const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; + + bool Recovered = false; + while (std::chrono::steady_clock::now() < Deadline) + { + std::this_thread::sleep_for(kPollIntervalMs); + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) + { + Recovered = true; + break; + } + } + REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); + + // After recovery, deprovision should succeed and a re-provision should work. + CHECK_MESSAGE(HubInstance->Deprovision("module_a", Reason), Reason); + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + + HubProvisionedInstanceInfo NewInfo; + CHECK_MESSAGE(HubInstance->Provision("module_a", NewInfo, Reason), Reason); + CHECK_NE(NewInfo.Port, 0); + HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); + CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); +} + TEST_SUITE_END(); void +Hub::TerminateModuleForTesting(const std::string& ModuleId) +{ + RwLock::SharedLockScope _(m_Lock); + auto It = m_InstanceLookup.find(ModuleId); + if (It == m_InstanceLookup.end()) + { + return; + } + StorageServerInstance::SharedLockedPtr Locked = m_ActiveInstances[It->second]->LockShared(/*Wait*/ true); + if (Locked) + { + Locked.TerminateForTesting(); + } +} + +void hub_forcelink() { } |