aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub')
-rw-r--r--src/zenserver/hub/httphubservice.cpp46
-rw-r--r--src/zenserver/hub/httphubservice.h19
-rw-r--r--src/zenserver/hub/hub.cpp348
-rw-r--r--src/zenserver/hub/hub.h29
-rw-r--r--src/zenserver/hub/zenhubserver.cpp88
-rw-r--r--src/zenserver/hub/zenhubserver.h13
6 files changed, 485 insertions, 58 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 032a61f08..dcab50f2d 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -8,6 +8,7 @@
#include <zencore/compactbinarybuilder.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zenhttp/httpstats.h>
namespace zen {
@@ -42,7 +43,10 @@ namespace {
}
} // namespace
-HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
+HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService)
+: m_Hub(Hub)
+, m_StatsService(StatsService)
+, m_StatusService(StatusService)
{
using namespace std::literals;
@@ -234,10 +238,15 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
},
HttpVerb::kGet);
+
+ m_StatsService.RegisterHandler("hub", *this);
+ m_StatusService.RegisterHandler("hub", *this);
}
HttpHubService::~HttpHubService()
{
+ m_StatusService.UnregisterHandler("hub", *this);
+ m_StatsService.UnregisterHandler("hub", *this);
}
const char*
@@ -254,9 +263,40 @@ HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEnd
}
void
-HttpHubService::HandleRequest(zen::HttpServerRequest& Request)
+HttpHubService::HandleRequest(HttpServerRequest& Request)
+{
+ using namespace std::literals;
+
+ metrics::OperationTiming::Scope $(m_HttpRequests);
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv);
+ }
+}
+
+void
+HttpHubService::HandleStatusRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ Cbo << "ok" << true;
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpHubService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+
+ EmitSnapshot("requests", m_HttpRequests, Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+uint64_t
+HttpHubService::GetActivityCounter()
{
- m_Router.HandleRequest(Request);
+ return m_HttpRequests.Count();
}
void
diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h
index d08eeea2a..5f940017e 100644
--- a/src/zenserver/hub/httphubservice.h
+++ b/src/zenserver/hub/httphubservice.h
@@ -3,9 +3,11 @@
#pragma once
#include <zenhttp/httpserver.h>
+#include <zenhttp/httpstatus.h>
namespace zen {
+class HttpStatsService;
class Hub;
/** ZenServer Hub Service
@@ -14,25 +16,32 @@ class Hub;
* use in UEFN content worker style scenarios.
*
*/
-class HttpHubService : public zen::HttpService
+class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider
{
public:
- HttpHubService(Hub& Hub);
+ HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService);
~HttpHubService();
HttpHubService(const HttpHubService&) = delete;
HttpHubService& operator=(const HttpHubService&) = delete;
virtual const char* BaseUri() const override;
- virtual void HandleRequest(zen::HttpServerRequest& Request) override;
+ virtual void HandleRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatusRequest(HttpServerRequest& Request) override;
+ virtual void HandleStatsRequest(HttpServerRequest& Request) override;
+ virtual uint64_t GetActivityCounter() override;
void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId);
private:
- HttpRequestRouter m_Router;
-
Hub& m_Hub;
+ HttpRequestRouter m_Router;
+ metrics::OperationTiming m_HttpRequests;
+
+ HttpStatsService& m_StatsService;
+ HttpStatusService& m_StatusService;
+
void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId);
void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId);
};
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index db406947a..6c44e2333 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -11,6 +11,7 @@
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
+#include <zenhttp/httpclient.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -21,7 +22,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zenhttp/httpclient.h>
#endif
#include <numeric>
@@ -179,7 +179,10 @@ Hub::~Hub()
try
{
// Safety call - should normally be properly Shutdown by owner
- Shutdown();
+ if (!m_ShutdownFlag.load())
+ {
+ Shutdown();
+ }
}
catch (const std::exception& e)
{
@@ -212,10 +215,13 @@ Hub::Shutdown()
}
EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) {
- ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings...
+ ZEN_UNUSED(Info);
try
{
- const Response DepResp = InternalDeprovision(std::string(ModuleId));
+ const Response DepResp = InternalDeprovision(std::string(ModuleId), [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted)
{
ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message);
@@ -499,11 +505,14 @@ Hub::Response
Hub::Deprovision(const std::string& ModuleId)
{
ZEN_ASSERT(!m_ShutdownFlag.load());
- return InternalDeprovision(ModuleId);
+ return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) {
+ ZEN_UNUSED(Instance);
+ return true;
+ });
}
Hub::Response
-Hub::InternalDeprovision(const std::string& ModuleId)
+Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
size_t ActiveInstanceIndex = (size_t)-1;
@@ -521,6 +530,11 @@ Hub::InternalDeprovision(const std::string& ModuleId)
ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ if (!DeprovisionGate(m_ActiveInstances[ActiveInstanceIndex]))
+ {
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' deprovision denied by gate", ModuleId)};
+ }
+
HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load();
switch (CurrentState)
@@ -1051,6 +1065,8 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS
}
return false;
}(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState));
+ m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0);
+ m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now());
return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState);
}
@@ -1152,14 +1168,165 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
}
}
+bool
+Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex)
+{
+ HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load();
+ if (LockedInstance.IsRunning())
+ {
+ LockedInstance.UpdateMetrics();
+ if (InstanceState == HubInstanceState::Provisioned)
+ {
+ const std::string ModuleId(LockedInstance.GetModuleId());
+
+ const uint16_t Port = LockedInstance.GetBasePort();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+
+ // We do the activity check without holding a lock to the instance
+ LockedInstance = {};
+
+ uint64_t ActivitySum = PreviousActivitySum;
+
+ std::chrono::system_clock::time_point NextCheckTime =
+ LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin;
+ if (Now >= NextCheckTime)
+ {
+ ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port));
+ HttpClient::Response Result =
+ ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject));
+ if (Result.IsSuccess())
+ {
+ CbObject Response = Result.AsObject();
+ if (Response)
+ {
+ ActivitySum = Response["sum"].AsUInt64();
+ }
+ }
+ }
+
+ if (ActivitySum != PreviousActivitySum)
+ {
+ m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() {
+ if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end())
+ {
+ const uint64_t ActiveInstanceIndex = It->second;
+ ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex];
+
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState == InstanceState)
+ {
+ if (Instance.LastActivityTime.load() == LastActivityTime &&
+ Instance.LastKnownActivitySum.load() == PreviousActivitySum)
+ {
+ Instance.LastActivityTime.store(Now);
+ Instance.LastKnownActivitySum.store(ActivitySum);
+ }
+ }
+ }
+ });
+ }
+ else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime ||
+ Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ }
+
+ return true;
+ }
+ else if (InstanceState == HubInstanceState::Provisioned)
+ {
+ // Process is not running but state says it should be - instance died unexpectedly.
+ const std::string ModuleId(LockedInstance.GetModuleId());
+ const uint16_t Port = LockedInstance.GetBasePort();
+ UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed);
+ NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
+ LockedInstance = {};
+
+ return false;
+ }
+ else if (InstanceState == HubInstanceState::Hibernated)
+ {
+ // Process is not running - no HTTP activity check is possible.
+ // Use a pure time-based check; the margin window does not apply here.
+ const std::string ModuleId = std::string(LockedInstance.GetModuleId());
+ const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load();
+ const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load();
+ const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now();
+ LockedInstance = {};
+
+ if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now)
+ {
+ ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...",
+ ModuleId,
+ NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count()));
+ (void)InternalDeprovision(
+ ModuleId,
+ [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool {
+ HubInstanceState CurrentState = Instance.State.load();
+ if (CurrentState != InstanceState)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState));
+ return false;
+ }
+ if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum)
+ {
+ ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId);
+ return false;
+ }
+ return true;
+ });
+ }
+ return true;
+ }
+ else
+ {
+ // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
+ // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
+ // lock was busy on a previous cycle and recovery is already pending.
+ return true;
+ }
+}
+
void
Hub::WatchDog()
{
- constexpr uint64_t WatchDogWakeupTimeMs = 3000;
- constexpr uint64_t WatchDogProcessingTimeMs = 500;
+ const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count();
+ const uint64_t CycleProcessingBudgetMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleProcessingBudget).count();
+ const uint64_t InstanceCheckThrottleMs =
+ std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count();
+
+ HttpClient ActivityCheckClient("http://localhost",
+ HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout,
+ .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout},
+ [&]() -> bool { return m_WatchDogEvent.Wait(0); });
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
- while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs))
+ while (!m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs)))
{
try
{
@@ -1169,7 +1336,7 @@ Hub::WatchDog()
Stopwatch Timer;
bool ShuttingDown = false;
- while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown)
+ while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown)
{
StorageServerInstance::SharedLockedPtr LockedInstance;
m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() {
@@ -1197,27 +1364,18 @@ Hub::WatchDog()
continue;
}
- if (LockedInstance.IsRunning())
+ std::string ModuleId(LockedInstance.GetModuleId());
+
+ bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex);
+ if (InstanceIsOk)
{
- LockedInstance.UpdateMetrics();
+ ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs));
}
- else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned)
+ else
{
- // Process is not running but state says it should be - instance died unexpectedly.
- const std::string ModuleId(LockedInstance.GetModuleId());
- const uint16_t Port = LockedInstance.GetBasePort();
- UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed);
- NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {});
- LockedInstance = {};
+ ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId);
AttemptRecoverInstance(ModuleId);
}
- // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip.
- // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance
- // lock was busy on a previous cycle and recovery is already pending.
- LockedInstance = {};
-
- // Rate-limit: pause briefly between live-instance checks and respond to shutdown.
- ShuttingDown = m_WatchDogEvent.Wait(5);
}
}
catch (const std::exception& Ex)
@@ -1306,7 +1464,7 @@ namespace hub_testutils {
// Poll until Find() returns false for the given module (i.e. async deprovision completes).
static bool WaitForInstanceGone(Hub& HubInstance,
std::string_view ModuleId,
- std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
std::chrono::seconds Timeout = std::chrono::seconds(30))
{
const auto Deadline = std::chrono::steady_clock::now() + Timeout;
@@ -1324,7 +1482,7 @@ namespace hub_testutils {
// Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete).
static bool WaitForInstanceCount(Hub& HubInstance,
int ExpectedCount,
- std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200),
+ std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50),
std::chrono::seconds Timeout = std::chrono::seconds(30))
{
const auto Deadline = std::chrono::steady_clock::now() + Timeout;
@@ -1867,7 +2025,7 @@ TEST_CASE("hub.async_hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
- constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
constexpr auto kTimeout = std::chrono::seconds(30);
// Provision and wait until Provisioned
@@ -1961,7 +2119,12 @@ TEST_CASE("hub.recover_process_crash")
CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); });
};
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc));
+ // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
+ Hub::Configuration Config;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(CaptureFunc));
HubProvisionedInstanceInfo Info;
{
@@ -1973,8 +2136,8 @@ TEST_CASE("hub.recover_process_crash")
// recovers the instance, and the new process is serving requests.
HubInstance->TerminateModuleForTesting("module_a");
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(200);
- constexpr auto kTimeoutMs = std::chrono::seconds(20);
+ constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
+ constexpr auto kTimeoutMs = std::chrono::seconds(15);
const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
// A successful HTTP health check on the same port confirms the new process is up.
@@ -2020,7 +2183,13 @@ TEST_CASE("hub.recover_process_crash")
TEST_CASE("hub.recover_process_crash_then_deprovision")
{
ScopedTemporaryDirectory TempDir;
- std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
+
+ // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default.
+ Hub::Configuration Config;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
{
@@ -2031,8 +2200,8 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
// Kill the child process, wait for the watchdog to detect and recover the instance.
HubInstance->TerminateModuleForTesting("module_a");
- constexpr auto kPollIntervalMs = std::chrono::milliseconds(200);
- constexpr auto kTimeoutMs = std::chrono::seconds(20);
+ constexpr auto kPollIntervalMs = std::chrono::milliseconds(50);
+ constexpr auto kTimeoutMs = std::chrono::seconds(15);
const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
bool Recovered = false;
@@ -2108,7 +2277,7 @@ TEST_CASE("hub.async_provision_concurrent")
}
// Poll until all instances reach Provisioned state
- constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kPollInterval = std::chrono::milliseconds(50);
constexpr auto kTimeout = std::chrono::seconds(30);
const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
@@ -2209,6 +2378,113 @@ TEST_CASE("hub.async_provision_rejected")
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
}
+TEST_CASE("hub.instance.inactivity.deprovision")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ // Aggressive watchdog settings to keep test duration short.
+ // Provisioned timeout (2s) > Hibernated timeout (1s) - this is the key invariant under test.
+ // Margin (1s) means the HTTP activity check fires at LastActivityTime+1s for Provisioned instances.
+ // The Hibernated branch ignores the margin and uses a direct time-based check.
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23200;
+ Config.InstanceLimit = 3;
+ Config.WatchDog.CycleInterval = std::chrono::milliseconds(10);
+ Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1);
+ Config.WatchDog.ProvisionedInactivityTimeout = std::chrono::seconds(2);
+ Config.WatchDog.HibernatedInactivityTimeout = std::chrono::seconds(1);
+ Config.WatchDog.InactivityCheckMargin = std::chrono::seconds(1);
+ Config.WatchDog.ActivityCheckConnectTimeout = std::chrono::milliseconds(200);
+ Config.WatchDog.ActivityCheckRequestTimeout = std::chrono::milliseconds(500);
+
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
+
+ // Provision in order: idle first, idle_hib second (then hibernate), persistent last.
+ // idle_hib uses the shorter Hibernated timeout (1s) and expires before idle (2s provisioned).
+ // persistent gets real HTTP PUTs so its activity timer is reset; it must still be alive
+ // after both idle instances are gone.
+
+ HubProvisionedInstanceInfo IdleInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle", IdleInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ HubProvisionedInstanceInfo IdleHibInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("idle_hib", IdleHibInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ const Hub::Response H = HubInstance->Hibernate("idle_hib");
+ REQUIRE_MESSAGE(H.ResponseCode == Hub::EResponseCode::Completed, H.Message);
+ }
+
+ HubProvisionedInstanceInfo PersistentInfo;
+ {
+ const Hub::Response R = HubInstance->Provision("persistent", PersistentInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+
+ auto PokeInstance = [&](uint16_t Port) {
+ // Make a real storage request to increment the instance's activity sum.
+ // The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
+ {
+ HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
+ HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
+ .count();
+ IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
+ const HttpClient::Response PutResult =
+ PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key),
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive"))));
+ CHECK(PutResult);
+ }
+ };
+
+ PokeInstance(IdleInfo.Port);
+ PokeInstance(PersistentInfo.Port);
+
+ Sleep(100);
+
+ // Phase 1: immediately after setup all three instances must still be alive.
+ // No timeout has elapsed yet (only 100ms have passed).
+ CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed");
+
+ // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout.
+ // idle must remain alive - its 2s provisioned timeout has not elapsed yet.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle_hib", std::chrono::milliseconds(100), std::chrono::seconds(3)),
+ "idle_hib was not deprovisioned within its 1s hibernated timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should be gone after its 1s hibernated timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("idle"),
+ "idle was deprovisioned before its 2s provisioned timeout - only idle_hib's 1s hibernated timeout has elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ PokeInstance(PersistentInfo.Port);
+
+ // Phase 3: idle must be deprovisioned by the watchdog within its 2s provisioned timeout.
+ // persistent must remain alive - its activity timer was reset by PokeInstance.
+ CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle", std::chrono::milliseconds(100), std::chrono::seconds(4)),
+ "idle was not deprovisioned within its 2s provisioned timeout");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2");
+
+ CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed");
+
+ CHECK_MESSAGE(HubInstance->Find("persistent"),
+ "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance");
+
+ HubInstance->Shutdown();
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index eb2a06587..c343b19e2 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -9,16 +9,17 @@
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
+#include <chrono>
#include <deque>
#include <filesystem>
#include <functional>
#include <memory>
#include <thread>
#include <unordered_map>
-#include <unordered_set>
namespace zen {
+class HttpClient;
class WorkerThreadPool;
/**
@@ -36,6 +37,19 @@ struct HubProvisionedInstanceInfo
class Hub
{
public:
+ struct WatchDogConfiguration
+ {
+ std::chrono::milliseconds CycleInterval = std::chrono::seconds(3);
+ std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500);
+ std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5);
+ std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10);
+ std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30);
+ std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1);
+
+ std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100);
+ std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200);
+ };
+
struct Configuration
{
/** Enable or disable the use of a Windows Job Object for child process management.
@@ -52,6 +66,8 @@ public:
int InstanceCoreLimit = 0; // Automatic
std::filesystem::path InstanceConfigPath;
std::string HydrationTargetSpecification;
+
+ WatchDogConfiguration WatchDog;
};
typedef std::function<
@@ -177,11 +193,15 @@ private:
std::unique_ptr<StorageServerInstance> Instance;
std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned;
// TODO: We should move current metrics here (from StorageServerInstance)
+
+ // Read and updated by WatchDog, updates to State triggers a reset of both
+ std::atomic<uint64_t> LastKnownActivitySum = 0;
+ std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min();
};
// UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive
// lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State.
- // State mutation and notification (NotifyStateUpdate) are intentionally decoupled — see NotifyStateUpdate below.
+ // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below.
HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance,
size_t ActiveInstanceIndex,
@@ -213,6 +233,9 @@ private:
Event m_WatchDogEvent;
void WatchDog();
+ bool CheckInstanceStatus(HttpClient& ActivityHttpClient,
+ StorageServerInstance::SharedLockedPtr&& LockedInstance,
+ size_t ActiveInstanceIndex);
void AttemptRecoverInstance(std::string_view ModuleId);
void UpdateStats();
@@ -220,7 +243,7 @@ private:
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const;
- Response InternalDeprovision(const std::string& ModuleId);
+ Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate);
void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance,
size_t ActiveInstanceIndex,
HubInstanceState OldState,
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index 269de28c2..314031246 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -147,6 +147,62 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"),
"");
#endif // ZEN_PLATFORM_WINDOWS
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-interval-ms",
+ "Interval between watchdog cycles in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-processing-budget-ms",
+ "Maximum processing time budget per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs)->default_value("500"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-instance-check-throttle-ms",
+ "Delay between checking successive instances per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs)->default_value("5"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-provisioned-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a provisioned instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds)->default_value("600"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-hibernated-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a hibernated instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds)->default_value("1800"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-inactivity-check-margin-seconds",
+ "Margin in seconds subtracted from inactivity timeout before triggering an activity check",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds)->default_value("60"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-connect-timeout-ms",
+ "Connect timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs)->default_value("100"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-request-timeout-ms",
+ "Request timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"),
+ "<ms>");
}
void
@@ -320,17 +376,27 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig)
void
ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
{
- ZEN_UNUSED(ServerConfig);
-
ZEN_INFO("instantiating Hub");
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject,
- .BasePortNumber = ServerConfig.HubBasePortNumber,
- .InstanceLimit = ServerConfig.HubInstanceLimit,
- .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
- .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
- .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
- .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification},
+ Hub::Configuration{
+ .UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit,
+ .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
+ .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
+ .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
+ .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .WatchDog =
+ {
+ .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
+ .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs),
+ .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs),
+ .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds),
+ .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds),
+ .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds),
+ .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
+ .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
+ }},
ZenServerEnvironment(ZenServerEnvironment::Hub,
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
@@ -349,10 +415,10 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
ZEN_INFO("instantiating hub service");
- m_HubService = std::make_unique<HttpHubService>(*m_Hub);
+ m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService);
m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
}
void
diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h
index c4a45a8fe..77df3eaa3 100644
--- a/src/zenserver/hub/zenhubserver.h
+++ b/src/zenserver/hub/zenhubserver.h
@@ -20,6 +20,18 @@ class HttpApiService;
class HttpFrontendService;
class HttpHubService;
+struct ZenHubWatchdogConfig
+{
+ uint32_t CycleIntervalMs = 3000;
+ uint32_t CycleProcessingBudgetMs = 500;
+ uint32_t InstanceCheckThrottleMs = 5;
+ uint32_t ProvisionedInactivityTimeoutSeconds = 600;
+ uint32_t HibernatedInactivityTimeoutSeconds = 1800;
+ uint32_t InactivityCheckMarginSeconds = 60; // Activity check is triggered this far before the inactivity timeout
+ uint32_t ActivityCheckConnectTimeoutMs = 100;
+ uint32_t ActivityCheckRequestTimeoutMs = 200;
+};
+
struct ZenHubServerConfig : public ZenServerConfig
{
std::string UpstreamNotificationEndpoint;
@@ -36,6 +48,7 @@ struct ZenHubServerConfig : public ZenServerConfig
int HubInstanceCoreLimit = 0; // Automatic
std::filesystem::path HubInstanceConfigPath; // Path to Lua config file
std::string HydrationTargetSpecification; // hydration/dehydration target specification
+ ZenHubWatchdogConfig WatchdogConfig;
};
class Hub;