aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/zenserver/hub/httphubservice.cpp2
-rw-r--r--src/zenserver/hub/hub.cpp242
2 files changed, 113 insertions, 131 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 5b0e14042..33f161a39 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -6,10 +6,10 @@
#include "storageserverinstance.h"
#include <zencore/compactbinarybuilder.h>
-
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zenhttp/httpstats.h>
+
#include <ctime>
namespace zen {
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index cdcba9902..2a1563c12 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -1241,117 +1241,66 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
StorageServerInstance::SharedLockedPtr&& LockedInstance,
size_t ActiveInstanceIndex)
{
- try
+ const std::string ModuleId(LockedInstance.GetModuleId());
+
+ HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (LockedInstance.IsRunning())
{
- HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
- if (LockedInstance.IsRunning())
+ m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics());
+ if (InstanceState == HubInstanceState::Provisioned)
{
- m_ActiveInstances[ActiveInstanceIndex].ProcessMetrics.Store(LockedInstance.GetProcessMetrics());
- if (InstanceState == HubInstanceState::Provisioned)
- {
- const std::string ModuleId(LockedInstance.GetModuleId());
-
- const uint16_t Port = LockedInstance.GetBasePort();
- const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
- const std::chrono::system_clock::time_point LastActivityTime =
- m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+ const uint16_t Port = LockedInstance.GetBasePort();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
- const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
- // We do the activity check without holding a lock to the instance
- LockedInstance = {};
+ // We do the activity check without holding a lock to the instance
+ LockedInstance = {};
- uint64_t ActivitySum = PreviousActivitySum;
+ uint64_t ActivitySum = PreviousActivitySum;
- std::chrono::system_clock::time_point NextCheckTime =
- LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin;
- if (Now >= NextCheckTime)
+ std::chrono::system_clock::time_point NextCheckTime =
+ LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin;
+ if (Now >= NextCheckTime)
+ {
+ ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port));
+ HttpClient::Response Result =
+ ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Result.IsSuccess())
{
- ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port));
- HttpClient::Response Result =
- ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject));
- if (Result.IsSuccess())
+ CbObject Response = Result.AsObject();
+ if (Response)
{
- CbObject Response = Result.AsObject();
- if (Response)
- {
- ActivitySum = Response["sum"].AsUInt64();
- }
+ ActivitySum = Response["sum"].AsUInt64();
}
}
+ }
- if (ActivitySum != PreviousActivitySum)
- {
- m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() {
- if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
- {
- const uint64_t ActiveInstanceIndex = It->second;
- ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex];
+ if (ActivitySum != PreviousActivitySum)
+ {
+ m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() {
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
+ {
+ const uint64_t ActiveInstanceIndex = It->second;
+ ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex];
- HubInstanceState CurrentState = Instance.State.load();
- if (CurrentState == InstanceState)
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState == InstanceState)
+ {
+ if (Instance.LastActivityTime.load() == LastActivityTime &&
+ Instance.LastKnownActivitySum.load() == PreviousActivitySum)
{
- if (Instance.LastActivityTime.load() == LastActivityTime &&
- Instance.LastKnownActivitySum.load() == PreviousActivitySum)
- {
- Instance.LastActivityTime.store(Now);
- Instance.LastKnownActivitySum.store(ActivitySum);
- }
+ Instance.LastActivityTime.store(Now);
+ Instance.LastKnownActivitySum.store(ActivitySum);
}
}
- });
- }
- else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now)
- {
- ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...",
- ModuleId,
- NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
- (void)InternalDeprovision(
- ModuleId,
- [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
- HubInstanceState CurrentState = Instance.State.load();
- if (CurrentState != InstanceState)
- {
- ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
- return false;
- }
- if (Instance.LastActivityTime.load() != LastActivityTime ||
- Instance.LastKnownActivitySum.load() != PreviousActivitySum)
- {
- ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId);
- return false;
- }
- return true;
- });
- }
+ }
+ });
}
-
- return true;
- }
- else if (InstanceState == HubInstanceState::Provisioned)
- {
- // Process is not running but state says it should be - instance died unexpectedly.
- const std::string ModuleId(LockedInstance.GetModuleId());
- const uint16_t Port = LockedInstance.GetBasePort();
- UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
- NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
- LockedInstance = {};
-
- return false;
- }
- else if (InstanceState == HubInstanceState::Hibernated)
- {
- // Process is not running - no HTTP activity check is possible.
- // Use a pure time-based check; the margin window does not apply here.
- const std::string ModuleId = std::string(LockedInstance.GetModuleId());
- const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
- const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
- const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
- LockedInstance = {};
-
- if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now)
+ else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now)
{
- ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...",
+ ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...",
ModuleId,
NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
(void)InternalDeprovision(
@@ -1360,35 +1309,72 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
HubInstanceState CurrentState = Instance.State.load();
if (CurrentState != InstanceState)
{
- ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}",
- ModuleId,
- ToString(CurrentState));
+ ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
return false;
}
if (Instance.LastActivityTime.load() != LastActivityTime ||
Instance.LastKnownActivitySum.load() != PreviousActivitySum)
{
- ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId);
+ ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId);
return false;
}
return true;
});
}
- return true;
- }
- else
- {
- // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
- // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
- // lock was busy on a previous cycle and recovery is already pending.
- return true;
}
+
+ return true;
}
- catch (const std::exception& Ex)
+ else if (InstanceState == HubInstanceState::Provisioned)
{
- ZEN_WARN("Failed to check status of module {}. Reason: {}", LockedInstance.GetModuleId(), Ex.what());
+ // Process is not running but state says it should be - instance died unexpectedly.
+ const uint16_t Port = LockedInstance.GetBasePort();
+ UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
+ LockedInstance = {};
+
return false;
}
+ else if (InstanceState == HubInstanceState::Hibernated)
+ {
+ // Process is not running - no HTTP activity check is possible.
+ // Use a pure time-based check; the margin window does not apply here.
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ LockedInstance = {};
+
+ if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ return true;
+ }
+ else
+ {
+ // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
+ // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
+ // lock was busy on a previous cycle and recovery is already pending.
+ return true;
+ }
}
void
@@ -1483,16 +1469,24 @@ Hub::WatchDog()
std::string ModuleId(LockedInstance.GetModuleId());
- bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
- if (InstanceIsOk)
+ try
{
- ShuttingDown = m_ShutdownFlag.load() || m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
+ bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
+ if (InstanceIsOk)
+ {
+ ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
+ }
+ else
+ {
+ ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
+ AttemptRecoverInstance(ModuleId);
+ }
}
- else
+ catch (const std::exception& Ex)
{
- ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
- AttemptRecoverInstance(ModuleId);
+ ZEN_WARN("Failed to check status of module {}. Reason: {}", ModuleId, Ex.what());
}
+ ShuttingDown |= m_ShutdownFlag.load();
}
}
catch (const std::exception& Ex)
@@ -2614,23 +2608,11 @@ TEST_CASE("hub.machine_metrics")
{
ScopedTemporaryDirectory TempDir;
- Hub::Configuration Config;
- Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {});
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
-
- // Poll until the watchdog completes at least one cycle and populates disk metrics.
- const auto Deadline = std::chrono::steady_clock::now() + std::chrono::seconds(5);
- Hub::MachineMetrics Metrics;
- while (std::chrono::steady_clock::now() < Deadline)
- {
- Metrics = HubInstance->GetMachineMetrics();
- if (Metrics.DiskTotalBytes > 0)
- {
- break;
- }
- Sleep(10);
- }
+ // UpdateMachineMetrics() is called synchronously in the Hub constructor, so metrics
+ // are available immediately without waiting for a watchdog cycle.
+ const Hub::MachineMetrics Metrics = HubInstance->GetMachineMetrics();
CHECK_GT(Metrics.DiskTotalBytes, 0u);
CHECK_LE(Metrics.DiskFreeBytes, Metrics.DiskTotalBytes);