diff options
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 242 |
2 files changed, 113 insertions, 131 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 5b0e14042..33f161a39 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -6,10 +6,10 @@ #include "storageserverinstance.h" #include <zencore/compactbinarybuilder.h> - #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zenhttp/httpstats.h> + #include <ctime> namespace zen { diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index cdcba9902..2a1563c12 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -1241,117 +1241,66 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, StorageServerInstance::SharedLockedPtr&& LockedInstance, size_t ActiveInstanceIndex) { - try + const std::string ModuleId(LockedInstance.GetModuleId()); + + HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (LockedInstance.IsRunning()) { - HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); - if (LockedInstance.IsRunning()) + m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics()); + if (InstanceState == HubInstanceState::Provisioned) { - m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics()); - if (InstanceState == HubInstanceState::Provisioned) - { - const std::string ModuleId(LockedInstance.GetModuleId()); - - const uint16_t Port = LockedInstance.GetBasePort(); - const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); - const std::chrono::system_clock::time_point LastActivityTime = - m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + const uint16_t Port = LockedInstance.GetBasePort(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); - const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); - // We do the activity check without holding a lock to the instance - LockedInstance = {}; + // We do the activity check without holding a lock to the instance + LockedInstance = {}; - uint64_t ActivitySum = PreviousActivitySum; + uint64_t ActivitySum = PreviousActivitySum; - std::chrono::system_clock::time_point NextCheckTime = - LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin; - if (Now >= NextCheckTime) + std::chrono::system_clock::time_point NextCheckTime = + LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin; + if (Now >= NextCheckTime) + { + ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port)); + HttpClient::Response Result = + ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject)); + if (Result.IsSuccess()) { - ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port)); - HttpClient::Response Result = - ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject)); - if (Result.IsSuccess()) + CbObject Response = Result.AsObject(); + if (Response) { - CbObject Response = Result.AsObject(); - if (Response) - { - ActivitySum = Response["sum"].AsUInt64(); - } + ActivitySum = Response["sum"].AsUInt64(); } } + } - if (ActivitySum != PreviousActivitySum) - { - m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() { - if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) - { - const uint64_t ActiveInstanceIndex = It->second; - ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex]; + if (ActivitySum != PreviousActivitySum) + { + m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() { + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) + { + const uint64_t ActiveInstanceIndex = It->second; + ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex]; - HubInstanceState CurrentState = Instance.State.load(); - if (CurrentState == InstanceState) + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState == InstanceState) + { + if (Instance.LastActivityTime.load() == LastActivityTime && + Instance.LastKnownActivitySum.load() == PreviousActivitySum) { - if (Instance.LastActivityTime.load() == LastActivityTime && - Instance.LastKnownActivitySum.load() == PreviousActivitySum) - { - Instance.LastActivityTime.store(Now); - Instance.LastKnownActivitySum.store(ActivitySum); - } + Instance.LastActivityTime.store(Now); + Instance.LastKnownActivitySum.store(ActivitySum); } } - }); - } - else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now) - { - ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...", - ModuleId, - NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); - (void)InternalDeprovision( - ModuleId, - [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { - HubInstanceState CurrentState = Instance.State.load(); - if (CurrentState != InstanceState) - { - ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); - return false; - } - if (Instance.LastActivityTime.load() != LastActivityTime || - Instance.LastKnownActivitySum.load() != PreviousActivitySum) - { - ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId); - return false; - } - return true; - }); - } + } + }); } - - return true; - } - else if (InstanceState == HubInstanceState::Provisioned) - { - // Process is not running but state says it should be - instance died unexpectedly. - const std::string ModuleId(LockedInstance.GetModuleId()); - const uint16_t Port = LockedInstance.GetBasePort(); - UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed); - NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); - LockedInstance = {}; - - return false; - } - else if (InstanceState == HubInstanceState::Hibernated) - { - // Process is not running - no HTTP activity check is possible. - // Use a pure time-based check; the margin window does not apply here. - const std::string ModuleId = std::string(LockedInstance.GetModuleId()); - const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); - const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); - const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); - LockedInstance = {}; - - if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now) + else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now) { - ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...", + ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...", ModuleId, NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); (void)InternalDeprovision( @@ -1360,35 +1309,72 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, HubInstanceState CurrentState = Instance.State.load(); if (CurrentState != InstanceState) { - ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", - ModuleId, - ToString(CurrentState)); + ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); return false; } if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum) { - ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId); + ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId); return false; } return true; }); } - return true; - } - else - { - // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. - // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance - // lock was busy on a previous cycle and recovery is already pending. - return true; } + + return true; } - catch (const std::exception& Ex) + else if (InstanceState == HubInstanceState::Provisioned) { - ZEN_WARN("Failed to check status of module {}. Reason: {}", LockedInstance.GetModuleId(), Ex.what()); + // Process is not running but state says it should be - instance died unexpectedly. + const uint16_t Port = LockedInstance.GetBasePort(); + UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); + LockedInstance = {}; + return false; } + else if (InstanceState == HubInstanceState::Hibernated) + { + // Process is not running - no HTTP activity check is possible. + // Use a pure time-based check; the margin window does not apply here. + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + LockedInstance = {}; + + if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now) + { + ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...", + ModuleId, + NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); + (void)InternalDeprovision( + ModuleId, + [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState != InstanceState) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); + return false; + } + if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId); + return false; + } + return true; + }); + } + return true; + } + else + { + // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance + // lock was busy on a previous cycle and recovery is already pending. + return true; + } } void @@ -1483,16 +1469,24 @@ Hub::WatchDog() std::string ModuleId(LockedInstance.GetModuleId()); - bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); - if (InstanceIsOk) + try { - ShuttingDown = m_ShutdownFlag.load() || m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); + bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); + if (InstanceIsOk) + { + ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); + } + else + { + ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); + AttemptRecoverInstance(ModuleId); + } } - else + catch (const std::exception& Ex) { - ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); - AttemptRecoverInstance(ModuleId); + ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what()); } + ShuttingDown |= m_ShutdownFlag.load(); } } catch (const std::exception& Ex) @@ -2614,23 +2608,11 @@ TEST_CASE("hub.machine_metrics") { ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - // Poll until the watchdog completes at least one cycle and populates disk metrics. - const auto Deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5); - Hub::MachineMetrics Metrics; - while (std::chrono::steady_clock::now() < Deadline) - { - Metrics = HubInstance->GetMachineMetrics(); - if (Metrics.DiskTotalBytes > 0) - { - break; - } - Sleep(10); - } + // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics + // are available immediately without waiting for a watchdog cycle. + const Hub::MachineMetrics Metrics = HubInstance->GetMachineMetrics(); CHECK_GT(Metrics.DiskTotalBytes, 0u); CHECK_LE(Metrics.DiskFreeBytes, Metrics.DiskTotalBytes); |