diff options
Diffstat (limited to 'src/zenserver/hub')
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 46 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.h | 19 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 348 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 29 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 88 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.h | 13 |
6 files changed, 485 insertions, 58 deletions
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 032a61f08..dcab50f2d 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -8,6 +8,7 @@ #include <zencore/compactbinarybuilder.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zenhttp/httpstats.h> namespace zen { @@ -42,7 +43,10 @@ namespace { } } // namespace -HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) +HttpHubService::HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService) +: m_Hub(Hub) +, m_StatsService(StatsService) +, m_StatusService(StatusService) { using namespace std::literals; @@ -234,10 +238,15 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); }, HttpVerb::kGet); + + m_StatsService.RegisterHandler("hub", *this); + m_StatusService.RegisterHandler("hub", *this); } HttpHubService::~HttpHubService() { + m_StatusService.UnregisterHandler("hub", *this); + m_StatsService.UnregisterHandler("hub", *this); } const char* @@ -254,9 +263,40 @@ HttpHubService::SetNotificationEndpoint(std::string_view UpstreamNotificationEnd } void -HttpHubService::HandleRequest(zen::HttpServerRequest& Request) +HttpHubService::HandleRequest(HttpServerRequest& Request) +{ + using namespace std::literals; + + metrics::OperationTiming::Scope $(m_HttpRequests); + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + return Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "Not found"sv); + } +} + +void +HttpHubService::HandleStatusRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + Cbo << "ok" << true; + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +void +HttpHubService::HandleStatsRequest(HttpServerRequest& Request) +{ + CbObjectWriter Cbo; + + EmitSnapshot("requests", m_HttpRequests, Cbo); + + Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); +} + +uint64_t +HttpHubService::GetActivityCounter() { - m_Router.HandleRequest(Request); + return m_HttpRequests.Count(); } void diff --git a/src/zenserver/hub/httphubservice.h b/src/zenserver/hub/httphubservice.h index d08eeea2a..5f940017e 100644 --- a/src/zenserver/hub/httphubservice.h +++ b/src/zenserver/hub/httphubservice.h @@ -3,9 +3,11 @@ #pragma once #include <zenhttp/httpserver.h> +#include <zenhttp/httpstatus.h> namespace zen { +class HttpStatsService; class Hub; /** ZenServer Hub Service @@ -14,25 +16,32 @@ class Hub; * use in UEFN content worker style scenarios. * */ -class HttpHubService : public zen::HttpService +class HttpHubService : public HttpService, public IHttpStatusProvider, public IHttpStatsProvider { public: - HttpHubService(Hub& Hub); + HttpHubService(Hub& Hub, HttpStatsService& StatsService, HttpStatusService& StatusService); ~HttpHubService(); HttpHubService(const HttpHubService&) = delete; HttpHubService& operator=(const HttpHubService&) = delete; virtual const char* BaseUri() const override; - virtual void HandleRequest(zen::HttpServerRequest& Request) override; + virtual void HandleRequest(HttpServerRequest& Request) override; + virtual void HandleStatusRequest(HttpServerRequest& Request) override; + virtual void HandleStatsRequest(HttpServerRequest& Request) override; + virtual uint64_t GetActivityCounter() override; void SetNotificationEndpoint(std::string_view UpstreamNotificationEndpoint, std::string_view InstanceId); private: - HttpRequestRouter m_Router; - Hub& m_Hub; + HttpRequestRouter m_Router; + metrics::OperationTiming m_HttpRequests; + + HttpStatsService& m_StatsService; + HttpStatusService& m_StatusService; + void HandleModuleGet(HttpServerRequest& Request, std::string_view ModuleId); void HandleModuleDelete(HttpServerRequest& Request, std::string_view ModuleId); }; diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index db406947a..6c44e2333 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -11,6 +11,7 @@ #include <zencore/scopeguard.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -21,7 +22,6 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/filesystem.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zenhttp/httpclient.h> #endif #include <numeric> @@ -179,7 +179,10 @@ Hub::~Hub() try { // Safety call - should normally be properly Shutdown by owner - Shutdown(); + if (!m_ShutdownFlag.load()) + { + Shutdown(); + } } catch (const std::exception& e) { @@ -212,10 +215,13 @@ Hub::Shutdown() } EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { - ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings... + ZEN_UNUSED(Info); try { - const Response DepResp = InternalDeprovision(std::string(ModuleId)); + const Response DepResp = InternalDeprovision(std::string(ModuleId), [](ActiveInstance& Instance) { + ZEN_UNUSED(Instance); + return true; + }); if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) { ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); @@ -499,11 +505,14 @@ Hub::Response Hub::Deprovision(const std::string& ModuleId) { ZEN_ASSERT(!m_ShutdownFlag.load()); - return InternalDeprovision(ModuleId); + return InternalDeprovision(ModuleId, [](ActiveInstance& Instance) { + ZEN_UNUSED(Instance); + return true; + }); } Hub::Response -Hub::InternalDeprovision(const std::string& ModuleId) +Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; @@ -521,6 +530,11 @@ Hub::InternalDeprovision(const std::string& ModuleId) ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + if (!DeprovisionGate(m_ActiveInstances[ActiveInstanceIndex])) + { + return Response{EResponseCode::Rejected, fmt::format("Module '{}' deprovision denied by gate", ModuleId)}; + } + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); switch (CurrentState) @@ -1051,6 +1065,8 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS } return false; }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); + m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.store(0); + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return m_ActiveInstances[ActiveInstanceIndex].State.exchange(NewState); } @@ -1152,14 +1168,165 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) } } +bool +Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, + StorageServerInstance::SharedLockedPtr&& LockedInstance, + size_t ActiveInstanceIndex) +{ + HubInstanceState InstanceState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + if (LockedInstance.IsRunning()) + { + LockedInstance.UpdateMetrics(); + if (InstanceState == HubInstanceState::Provisioned) + { + const std::string ModuleId(LockedInstance.GetModuleId()); + + const uint16_t Port = LockedInstance.GetBasePort(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + + // We do the activity check without holding a lock to the instance + LockedInstance = {}; + + uint64_t ActivitySum = PreviousActivitySum; + + std::chrono::system_clock::time_point NextCheckTime = + LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout - m_Config.WatchDog.InactivityCheckMargin; + if (Now >= NextCheckTime) + { + ActivityCheckClient.SetBaseUri(fmt::format("http://localhost:{}", Port)); + HttpClient::Response Result = + ActivityCheckClient.Get("/stats/activity_counters", HttpClient::Accept(HttpContentType::kCbObject)); + if (Result.IsSuccess()) + { + CbObject Response = Result.AsObject(); + if (Response) + { + ActivitySum = Response["sum"].AsUInt64(); + } + } + } + + if (ActivitySum != PreviousActivitySum) + { + m_Lock.WithSharedLock([this, InstanceState, PreviousActivitySum, &LastActivityTime, ActivitySum, Now, ModuleId]() { + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) + { + const uint64_t ActiveInstanceIndex = It->second; + ActiveInstance& Instance = m_ActiveInstances[ActiveInstanceIndex]; + + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState == InstanceState) + { + if (Instance.LastActivityTime.load() == LastActivityTime && + Instance.LastKnownActivitySum.load() == PreviousActivitySum) + { + Instance.LastActivityTime.store(Now); + Instance.LastKnownActivitySum.store(ActivitySum); + } + } + } + }); + } + else if (LastActivityTime + m_Config.WatchDog.ProvisionedInactivityTimeout < Now) + { + ZEN_INFO("Instance {} has not been active for {}, attempting deprovision...", + ModuleId, + NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); + (void)InternalDeprovision( + ModuleId, + [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState != InstanceState) + { + ZEN_INFO("Instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); + return false; + } + if (Instance.LastActivityTime.load() != LastActivityTime || + Instance.LastKnownActivitySum.load() != PreviousActivitySum) + { + ZEN_INFO("Instance {} idle deprovision aborted due to activity", ModuleId); + return false; + } + return true; + }); + } + } + + return true; + } + else if (InstanceState == HubInstanceState::Provisioned) + { + // Process is not running but state says it should be - instance died unexpectedly. + const std::string ModuleId(LockedInstance.GetModuleId()); + const uint16_t Port = LockedInstance.GetBasePort(); + UpdateInstanceState(LockedInstance, ActiveInstanceIndex, HubInstanceState::Crashed); + NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); + LockedInstance = {}; + + return false; + } + else if (InstanceState == HubInstanceState::Hibernated) + { + // Process is not running - no HTTP activity check is possible. + // Use a pure time-based check; the margin window does not apply here. + const std::string ModuleId = std::string(LockedInstance.GetModuleId()); + const std::chrono::system_clock::time_point LastActivityTime = m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.load(); + const uint64_t PreviousActivitySum = m_ActiveInstances[ActiveInstanceIndex].LastKnownActivitySum.load(); + const std::chrono::system_clock::time_point Now = std::chrono::system_clock::now(); + LockedInstance = {}; + + if (LastActivityTime + m_Config.WatchDog.HibernatedInactivityTimeout < Now) + { + ZEN_INFO("Hibernated instance {} has not been active for {}, attempting deprovision...", + ModuleId, + NiceTimeSpanMs(std::chrono::duration_cast<std::chrono::milliseconds>(Now - LastActivityTime).count())); + (void)InternalDeprovision( + ModuleId, + [ModuleId, InstanceState, LastActivityTime, PreviousActivitySum](ActiveInstance& Instance) -> bool { + HubInstanceState CurrentState = Instance.State.load(); + if (CurrentState != InstanceState) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted - state changed to {}", ModuleId, ToString(CurrentState)); + return false; + } + if (Instance.LastActivityTime.load() != LastActivityTime || Instance.LastKnownActivitySum.load() != PreviousActivitySum) + { + ZEN_INFO("Hibernated instance {} idle deprovision aborted due to activity", ModuleId); + return false; + } + return true; + }); + } + return true; + } + else + { + // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance + // lock was busy on a previous cycle and recovery is already pending. + return true; + } +} + void Hub::WatchDog() { - constexpr uint64_t WatchDogWakeupTimeMs = 3000; - constexpr uint64_t WatchDogProcessingTimeMs = 500; + const uint64_t CycleIntervalMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleInterval).count(); + const uint64_t CycleProcessingBudgetMs = + std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.CycleProcessingBudget).count(); + const uint64_t InstanceCheckThrottleMs = + std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count(); + + HttpClient ActivityCheckClient("http://localhost", + HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout, + .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout}, + [&]() -> bool { return m_WatchDogEvent.Wait(0); }); size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 - while (!m_WatchDogEvent.Wait(WatchDogWakeupTimeMs)) + while (!m_WatchDogEvent.Wait(gsl::narrow<int>(CycleIntervalMs))) { try { @@ -1169,7 +1336,7 @@ Hub::WatchDog() Stopwatch Timer; bool ShuttingDown = false; - while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < WatchDogProcessingTimeMs && !ShuttingDown) + while (SlotsRemaining > 0 && Timer.GetElapsedTimeMs() < CycleProcessingBudgetMs && !ShuttingDown) { StorageServerInstance::SharedLockedPtr LockedInstance; m_Lock.WithSharedLock([this, &CheckInstanceIndex, &LockedInstance, &SlotsRemaining]() { @@ -1197,27 +1364,18 @@ Hub::WatchDog() continue; } - if (LockedInstance.IsRunning()) + std::string ModuleId(LockedInstance.GetModuleId()); + + bool InstanceIsOk = CheckInstanceStatus(ActivityCheckClient, std::move(LockedInstance), CheckInstanceIndex); + if (InstanceIsOk) { - LockedInstance.UpdateMetrics(); + ShuttingDown = m_WatchDogEvent.Wait(gsl::narrow<int>(InstanceCheckThrottleMs)); } - else if (m_ActiveInstances[CheckInstanceIndex].State.load() == HubInstanceState::Provisioned) + else { - // Process is not running but state says it should be - instance died unexpectedly. - const std::string ModuleId(LockedInstance.GetModuleId()); - const uint16_t Port = LockedInstance.GetBasePort(); - UpdateInstanceState(LockedInstance, CheckInstanceIndex, HubInstanceState::Crashed); - NotifyStateUpdate(ModuleId, HubInstanceState::Provisioned, HubInstanceState::Crashed, Port, {}); - LockedInstance = {}; + ZEN_WARN("Instance for module '{}' is not running, attempting recovery", ModuleId); AttemptRecoverInstance(ModuleId); } - // else: transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. - // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance - // lock was busy on a previous cycle and recovery is already pending. - LockedInstance = {}; - - // Rate-limit: pause briefly between live-instance checks and respond to shutdown. - ShuttingDown = m_WatchDogEvent.Wait(5); } } catch (const std::exception& Ex) @@ -1306,7 +1464,7 @@ namespace hub_testutils { // Poll until Find() returns false for the given module (i.e. async deprovision completes). static bool WaitForInstanceGone(Hub& HubInstance, std::string_view ModuleId, - std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50), std::chrono::seconds Timeout = std::chrono::seconds(30)) { const auto Deadline = std::chrono::steady_clock::now() + Timeout; @@ -1324,7 +1482,7 @@ namespace hub_testutils { // Poll until GetInstanceCount() reaches ExpectedCount (i.e. all async deprovisions complete). static bool WaitForInstanceCount(Hub& HubInstance, int ExpectedCount, - std::chrono::milliseconds PollInterval = std::chrono::milliseconds(200), + std::chrono::milliseconds PollInterval = std::chrono::milliseconds(50), std::chrono::seconds Timeout = std::chrono::seconds(30)) { const auto Deadline = std::chrono::steady_clock::now() + Timeout; @@ -1867,7 +2025,7 @@ TEST_CASE("hub.async_hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; - constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kPollInterval = std::chrono::milliseconds(50); constexpr auto kTimeout = std::chrono::seconds(30); // Provision and wait until Provisioned @@ -1961,7 +2119,12 @@ TEST_CASE("hub.recover_process_crash") CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); }; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); + // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. + Hub::Configuration Config; + Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, std::move(CaptureFunc)); HubProvisionedInstanceInfo Info; { @@ -1973,8 +2136,8 @@ TEST_CASE("hub.recover_process_crash") // recovers the instance, and the new process is serving requests. HubInstance->TerminateModuleForTesting("module_a"); - constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); - constexpr auto kTimeoutMs = std::chrono::seconds(20); + constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); + constexpr auto kTimeoutMs = std::chrono::seconds(15); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; // A successful HTTP health check on the same port confirms the new process is up. @@ -2020,7 +2183,13 @@ TEST_CASE("hub.recover_process_crash") TEST_CASE("hub.recover_process_crash_then_deprovision") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. + Hub::Configuration Config; + Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; { @@ -2031,8 +2200,8 @@ TEST_CASE("hub.recover_process_crash_then_deprovision") // Kill the child process, wait for the watchdog to detect and recover the instance. HubInstance->TerminateModuleForTesting("module_a"); - constexpr auto kPollIntervalMs = std::chrono::milliseconds(200); - constexpr auto kTimeoutMs = std::chrono::seconds(20); + constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); + constexpr auto kTimeoutMs = std::chrono::seconds(15); const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; bool Recovered = false; @@ -2108,7 +2277,7 @@ TEST_CASE("hub.async_provision_concurrent") } // Poll until all instances reach Provisioned state - constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kPollInterval = std::chrono::milliseconds(50); constexpr auto kTimeout = std::chrono::seconds(30); const auto Deadline = std::chrono::steady_clock::now() + kTimeout; @@ -2209,6 +2378,113 @@ TEST_CASE("hub.async_provision_rejected") CHECK_EQ(HubInstance->GetInstanceCount(), 1); } +TEST_CASE("hub.instance.inactivity.deprovision") +{ + ScopedTemporaryDirectory TempDir; + + // Aggressive watchdog settings to keep test duration short. + // Provisioned timeout (2s) > Hibernated timeout (1s) - this is the key invariant under test. + // Margin (1s) means the HTTP activity check fires at LastActivityTime+1s for Provisioned instances. + // The Hibernated branch ignores the margin and uses a direct time-based check. + Hub::Configuration Config; + Config.BasePortNumber = 23200; + Config.InstanceLimit = 3; + Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); + Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); + Config.WatchDog.ProvisionedInactivityTimeout = std::chrono::seconds(2); + Config.WatchDog.HibernatedInactivityTimeout = std::chrono::seconds(1); + Config.WatchDog.InactivityCheckMargin = std::chrono::seconds(1); + Config.WatchDog.ActivityCheckConnectTimeout = std::chrono::milliseconds(200); + Config.WatchDog.ActivityCheckRequestTimeout = std::chrono::milliseconds(500); + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); + + // Provision in order: idle first, idle_hib second (then hibernate), persistent last. + // idle_hib uses the shorter Hibernated timeout (1s) and expires before idle (2s provisioned). + // persistent gets real HTTP PUTs so its activity timer is reset; it must still be alive + // after both idle instances are gone. + + HubProvisionedInstanceInfo IdleInfo; + { + const Hub::Response R = HubInstance->Provision("idle", IdleInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + HubProvisionedInstanceInfo IdleHibInfo; + { + const Hub::Response R = HubInstance->Provision("idle_hib", IdleHibInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + const Hub::Response H = HubInstance->Hibernate("idle_hib"); + REQUIRE_MESSAGE(H.ResponseCode == Hub::EResponseCode::Completed, H.Message); + } + + HubProvisionedInstanceInfo PersistentInfo; + { + const Hub::Response R = HubInstance->Provision("persistent", PersistentInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + + auto PokeInstance = [&](uint16_t Port) { + // Make a real storage request to increment the instance's activity sum. + // The watchdog detects the changed sum on the next cycle and resets LastActivityTime. + { + HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), + HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200)}); + uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - + std::chrono::steady_clock::time_point::min()) + .count(); + IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); + const HttpClient::Response PutResult = + PersistentClient.Put(fmt::format("/z$/ns1/b/{}", Key), + IoBufferBuilder::MakeFromMemory(MakeMemoryView(std::string_view("keepalive")))); + CHECK(PutResult); + } + }; + + PokeInstance(IdleInfo.Port); + PokeInstance(PersistentInfo.Port); + + Sleep(100); + + // Phase 1: immediately after setup all three instances must still be alive. + // No timeout has elapsed yet (only 100ms have passed). + CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + + CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + + // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout. + // idle must remain alive - its 2s provisioned timeout has not elapsed yet. + CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle_hib", std::chrono::milliseconds(100), std::chrono::seconds(3)), + "idle_hib was not deprovisioned within its 1s hibernated timeout"); + + CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should be gone after its 1s hibernated timeout elapsed"); + + CHECK_MESSAGE(HubInstance->Find("idle"), + "idle was deprovisioned before its 2s provisioned timeout - only idle_hib's 1s hibernated timeout has elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); + + PokeInstance(PersistentInfo.Port); + + // Phase 3: idle must be deprovisioned by the watchdog within its 2s provisioned timeout. + // persistent must remain alive - its activity timer was reset by PokeInstance. + CHECK_MESSAGE(hub_testutils::WaitForInstanceGone(*HubInstance, "idle", std::chrono::milliseconds(100), std::chrono::seconds(4)), + "idle was not deprovisioned within its 2s provisioned timeout"); + + CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2"); + + CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed"); + + CHECK_MESSAGE(HubInstance->Find("persistent"), + "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); + + HubInstance->Shutdown(); +} + TEST_SUITE_END(); void diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index eb2a06587..c343b19e2 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -9,16 +9,17 @@ #include <zencore/system.h> #include <zenutil/zenserverprocess.h> +#include <chrono> #include <deque> #include <filesystem> #include <functional> #include <memory> #include <thread> #include <unordered_map> -#include <unordered_set> namespace zen { +class HttpClient; class WorkerThreadPool; /** @@ -36,6 +37,19 @@ struct HubProvisionedInstanceInfo class Hub { public: + struct WatchDogConfiguration + { + std::chrono::milliseconds CycleInterval = std::chrono::seconds(3); + std::chrono::milliseconds CycleProcessingBudget = std::chrono::milliseconds(500); + std::chrono::milliseconds InstanceCheckThrottle = std::chrono::milliseconds(5); + std::chrono::seconds ProvisionedInactivityTimeout = std::chrono::minutes(10); + std::chrono::seconds HibernatedInactivityTimeout = std::chrono::minutes(30); + std::chrono::seconds InactivityCheckMargin = std::chrono::minutes(1); + + std::chrono::milliseconds ActivityCheckConnectTimeout = std::chrono::milliseconds(100); + std::chrono::milliseconds ActivityCheckRequestTimeout = std::chrono::milliseconds(200); + }; + struct Configuration { /** Enable or disable the use of a Windows Job Object for child process management. @@ -52,6 +66,8 @@ public: int InstanceCoreLimit = 0; // Automatic std::filesystem::path InstanceConfigPath; std::string HydrationTargetSpecification; + + WatchDogConfiguration WatchDog; }; typedef std::function< @@ -177,11 +193,15 @@ private: std::unique_ptr<StorageServerInstance> Instance; std::atomic<HubInstanceState> State = HubInstanceState::Unprovisioned; // TODO: We should move current metrics here (from StorageServerInstance) + + // Read and updated by WatchDog, updates to State triggers a reset of both + std::atomic<uint64_t> LastKnownActivitySum = 0; + std::atomic<std::chrono::system_clock::time_point> LastActivityTime = std::chrono::system_clock::time_point::min(); }; // UpdateInstanceState is overloaded to accept a locked instance pointer (exclusive or shared) or the hub exclusive // lock scope as a proof token that the caller holds an appropriate lock before mutating ActiveInstance::State. - // State mutation and notification (NotifyStateUpdate) are intentionally decoupled — see NotifyStateUpdate below. + // State mutation and notification (NotifyStateUpdate) are intentionally decoupled - see NotifyStateUpdate below. HubInstanceState UpdateInstanceState(const StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, @@ -213,6 +233,9 @@ private: Event m_WatchDogEvent; void WatchDog(); + bool CheckInstanceStatus(HttpClient& ActivityHttpClient, + StorageServerInstance::SharedLockedPtr&& LockedInstance, + size_t ActiveInstanceIndex); void AttemptRecoverInstance(std::string_view ModuleId); void UpdateStats(); @@ -220,7 +243,7 @@ private: bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); uint16_t GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const; - Response InternalDeprovision(const std::string& ModuleId); + Response InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveInstance& Instance)>&& DeprovisionGate); void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState, diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 269de28c2..314031246 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -147,6 +147,62 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"), ""); #endif // ZEN_PLATFORM_WINDOWS + + Options.add_option("hub", + "", + "hub-watchdog-cycle-interval-ms", + "Interval between watchdog cycles in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-cycle-processing-budget-ms", + "Maximum processing time budget per watchdog cycle in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs)->default_value("500"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-instance-check-throttle-ms", + "Delay between checking successive instances per watchdog cycle in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs)->default_value("5"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-provisioned-inactivity-timeout-seconds", + "Seconds of inactivity after which a provisioned instance is deprovisioned", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds)->default_value("600"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-hibernated-inactivity-timeout-seconds", + "Seconds of inactivity after which a hibernated instance is deprovisioned", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds)->default_value("1800"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-inactivity-check-margin-seconds", + "Margin in seconds subtracted from inactivity timeout before triggering an activity check", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds)->default_value("60"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-activity-check-connect-timeout-ms", + "Connect timeout in milliseconds for instance activity check requests", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs)->default_value("100"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-activity-check-request-timeout-ms", + "Request timeout in milliseconds for instance activity check requests", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"), + "<ms>"); } void @@ -320,17 +376,27 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) void ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) { - ZEN_UNUSED(ServerConfig); - ZEN_INFO("instantiating Hub"); m_Hub = std::make_unique<Hub>( - Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject, - .BasePortNumber = ServerConfig.HubBasePortNumber, - .InstanceLimit = ServerConfig.HubInstanceLimit, - .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, - .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, - .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, - .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification}, + Hub::Configuration{ + .UseJobObject = ServerConfig.HubUseJobObject, + .BasePortNumber = ServerConfig.HubBasePortNumber, + .InstanceLimit = ServerConfig.HubInstanceLimit, + .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, + .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, + .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, + .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, + .WatchDog = + { + .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), + .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs), + .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs), + .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds), + .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds), + .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds), + .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), + .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), + }}, ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", @@ -349,10 +415,10 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); ZEN_INFO("instantiating hub service"); - m_HubService = std::make_unique<HttpHubService>(*m_Hub); + m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); } void diff --git a/src/zenserver/hub/zenhubserver.h b/src/zenserver/hub/zenhubserver.h index c4a45a8fe..77df3eaa3 100644 --- a/src/zenserver/hub/zenhubserver.h +++ b/src/zenserver/hub/zenhubserver.h @@ -20,6 +20,18 @@ class HttpApiService; class HttpFrontendService; class HttpHubService; +struct ZenHubWatchdogConfig +{ + uint32_t CycleIntervalMs = 3000; + uint32_t CycleProcessingBudgetMs = 500; + uint32_t InstanceCheckThrottleMs = 5; + uint32_t ProvisionedInactivityTimeoutSeconds = 600; + uint32_t HibernatedInactivityTimeoutSeconds = 1800; + uint32_t InactivityCheckMarginSeconds = 60; // Activity check is triggered this far before the inactivity timeout + uint32_t ActivityCheckConnectTimeoutMs = 100; + uint32_t ActivityCheckRequestTimeoutMs = 200; +}; + struct ZenHubServerConfig : public ZenServerConfig { std::string UpstreamNotificationEndpoint; @@ -36,6 +48,7 @@ struct ZenHubServerConfig : public ZenServerConfig int HubInstanceCoreLimit = 0; // Automatic std::filesystem::path HubInstanceConfigPath; // Path to Lua config file std::string HydrationTargetSpecification; // hydration/dehydration target specification + ZenHubWatchdogConfig WatchdogConfig; }; class Hub; |