diff options
Diffstat (limited to 'src/zenserver/hub/hub.cpp')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 768 |
1 files changed, 497 insertions, 271 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 76c7a8f6d..c03c1a9a0 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -9,6 +9,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/string.h> #include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> @@ -170,32 +171,35 @@ Hub::GetMachineMetrics(SystemMetrics& OutSystemMetrict, DiskSpace& OutDiskSpace) ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, - ZenServerEnvironment&& RunEnvironment, - WorkerThreadPool* OptionalWorkerPool, - AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) +Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) -, m_WorkerPool(OptionalWorkerPool) +, m_WorkerPool(Config.OptionalProvisionWorkerPool) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) , m_FreeActiveInstanceIndexes(Config.InstanceLimit) { + ZEN_ASSERT_FORMAT( + Config.OptionalProvisionWorkerPool != Config.OptionalHydrationWorkerPool || Config.OptionalProvisionWorkerPool == nullptr, + "Provision and hydration worker pools must be distinct to avoid deadlocks"); + + HydrationBase::Configuration HydrationConfig; if (!m_Config.HydrationTargetSpecification.empty()) { - m_HydrationTargetSpecification = m_Config.HydrationTargetSpecification; + HydrationConfig.TargetSpecification = m_Config.HydrationTargetSpecification; } else if (!m_Config.HydrationOptions) { std::filesystem::path FileHydrationPath = m_RunEnvironment.CreateChildDir("hydration_storage"); ZEN_INFO("using file hydration path: '{}'", FileHydrationPath); - m_HydrationTargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); + HydrationConfig.TargetSpecification = fmt::format("file://{}", WideToUtf8(FileHydrationPath.native())); } else { - m_HydrationOptions = m_Config.HydrationOptions; + HydrationConfig.Options = m_Config.HydrationOptions; } + m_Hydration = InitHydration(HydrationConfig); m_HydrationTempPath = m_RunEnvironment.CreateChildDir("hydration_temp"); ZEN_INFO("using hydration temp path: '{}'", m_HydrationTempPath); @@ -323,13 +327,18 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { auto NewInstance = std::make_unique<StorageServerInstance>( m_RunEnvironment, - StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), - .HydrationTempPath = m_HydrationTempPath, - .HydrationTargetSpecification = m_HydrationTargetSpecification, - .HydrationOptions = m_HydrationOptions, - .HttpThreadCount = m_Config.InstanceHttpThreadCount, - .CoreLimit = m_Config.InstanceCoreLimit, - .ConfigPath = m_Config.InstanceConfigPath}, + *m_Hydration, + StorageServerInstance::Configuration{.BasePort = GetInstanceIndexAssignedPort(ActiveInstanceIndex), + .StateDir = m_RunEnvironment.CreateChildDir(ModuleId), + .TempDir = m_HydrationTempPath / ModuleId, + .HttpThreadCount = m_Config.InstanceHttpThreadCount, + .CoreLimit = m_Config.InstanceCoreLimit, + .ConfigPath = m_Config.InstanceConfigPath, + .Malloc = m_Config.InstanceMalloc, + .Trace = m_Config.InstanceTrace, + .TraceHost = m_Config.InstanceTraceHost, + .TraceFile = m_Config.InstanceTraceFile, + .OptionalWorkerPool = m_Config.OptionalHydrationWorkerPool}, ModuleId); #if ZEN_PLATFORM_WINDOWS @@ -383,11 +392,14 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) switch (CurrentState) { case HubInstanceState::Provisioning: + case HubInstanceState::Recovering: + case HubInstanceState::Waking: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Unprovisioned: break; case HubInstanceState::Provisioned: + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return Response{EResponseCode::Completed}; case HubInstanceState::Hibernated: _.ReleaseNow(); @@ -408,6 +420,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) Instance = {}; if (ActualState == HubInstanceState::Provisioned) { + m_ActiveInstances[ActiveInstanceIndex].LastActivityTime.store(std::chrono::system_clock::now()); return Response{EResponseCode::Completed}; } if (ActualState == HubInstanceState::Provisioning) @@ -594,6 +607,7 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI switch (CurrentState) { case HubInstanceState::Deprovisioning: + case HubInstanceState::Obliterating: return Response{EResponseCode::Accepted}; case HubInstanceState::Crashed: case HubInstanceState::Hibernated: @@ -639,11 +653,11 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI try { m_WorkerPool->ScheduleWork( - [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr), OldState]() mutable { auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); try { - CompleteDeprovision(*Instance, ActiveInstanceIndex); + CompleteDeprovision(*Instance, ActiveInstanceIndex, OldState); } catch (const std::exception& Ex) { @@ -671,20 +685,235 @@ Hub::InternalDeprovision(const std::string& ModuleId, std::function<bool(ActiveI } else { - CompleteDeprovision(Instance, ActiveInstanceIndex); + CompleteDeprovision(Instance, ActiveInstanceIndex, OldState); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +Hub::Response +Hub::Obliterate(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + + StorageServerInstance::ExclusiveLockedPtr Instance; + size_t ActiveInstanceIndex = (size_t)-1; + { + RwLock::ExclusiveLockScope Lock(m_Lock); + + if (auto It = m_InstanceLookup.find(ModuleId); It != m_InstanceLookup.end()) + { + ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + + HubInstanceState CurrentState = m_ActiveInstances[ActiveInstanceIndex].State.load(); + + switch (CurrentState) + { + case HubInstanceState::Obliterating: + return Response{EResponseCode::Accepted}; + case HubInstanceState::Provisioned: + case HubInstanceState::Hibernated: + case HubInstanceState::Crashed: + break; + case HubInstanceState::Deprovisioning: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is being deprovisioned, retry after completion", ModuleId)}; + case HubInstanceState::Recovering: + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + case HubInstanceState::Unprovisioned: + return Response{EResponseCode::Completed}; + default: + return Response{EResponseCode::Rejected, + fmt::format("Module '{}' is currently in state '{}'", ModuleId, ToString(CurrentState))}; + } + + std::unique_ptr<StorageServerInstance>& RawInstance = m_ActiveInstances[ActiveInstanceIndex].Instance; + ZEN_ASSERT(RawInstance != nullptr); + + Instance = RawInstance->LockExclusive(/*Wait*/ true); + } + else + { + // Module not tracked by hub - obliterate backend data directly. + // Covers the deprovisioned case where data was preserved via dehydration. + if (m_ObliteratingInstances.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + m_ObliteratingInstances.insert(ModuleId); + Lock.ReleaseNow(); + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId)]() { + auto Guard = MakeGuard([this, ModuleId]() { + m_Lock.WithExclusiveLock([this, ModuleId]() { m_ObliteratingInstances.erase(ModuleId); }); + m_BackgroundWorkLatch.CountDown(); + }); + try + { + ObliterateBackendData(ModuleId); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of untracked module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed to dispatch async obliterate of untracked module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + } + throw; + } + + return Response{EResponseCode::Accepted}; + } + + auto _ = MakeGuard([this, &ModuleId]() { + RwLock::ExclusiveLockScope _(m_Lock); + m_ObliteratingInstances.erase(ModuleId); + }); + + ObliterateBackendData(ModuleId); + + return Response{EResponseCode::Completed}; + } + } + + HubInstanceState OldState = UpdateInstanceState(Instance, ActiveInstanceIndex, HubInstanceState::Obliterating); + const uint16_t Port = Instance.GetBasePort(); + NotifyStateUpdate(ModuleId, OldState, HubInstanceState::Obliterating, Port, {}); + + if (m_WorkerPool) + { + std::shared_ptr<StorageServerInstance::ExclusiveLockedPtr> SharedInstancePtr = + std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)); + + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, ModuleId = std::string(ModuleId), ActiveInstanceIndex, Instance = std::move(SharedInstancePtr)]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteObliterate(*Instance, ActiveInstanceIndex); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async obliterate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch obliterate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, OldState, Port, {}); + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId)) != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(m_InstanceLookup.find(std::string(ModuleId))->second == ActiveInstanceIndex); + UpdateInstanceState(HubLock, ActiveInstanceIndex, OldState); + } + + throw; + } + } + else + { + CompleteObliterate(Instance, ActiveInstanceIndex); } return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; } void -Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) +Hub::CompleteObliterate(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex) { const std::string ModuleId(Instance.GetModuleId()); const uint16_t Port = Instance.GetBasePort(); try { + Instance.Obliterate(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to obliterate storage server instance for module '{}': {}", ModuleId, Ex.what()); + Instance = {}; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Crashed); + } + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Crashed, Port, {}); + throw; + } + + NotifyStateUpdate(ModuleId, HubInstanceState::Obliterating, HubInstanceState::Unprovisioned, Port, {}); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); +} + +void +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, HubInstanceState OldState) +{ + const std::string ModuleId(Instance.GetModuleId()); + const uint16_t Port = Instance.GetBasePort(); + + try + { + if (OldState == HubInstanceState::Provisioned) + { + ZEN_INFO("Triggering GC for module {}", ModuleId); + Stopwatch GcTimer; + + HttpClient GcClient(fmt::format("http://localhost:{}", Port)); + + HttpClient::KeyValueMap Params; + Params.Entries.insert({"smallobjects", "true"}); + Params.Entries.insert({"skipcid", "false"}); + HttpClient::Response Response = GcClient.Post("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject), Params); + bool GcCompleted = false; + Stopwatch DeadlineTimer; + while (Response && DeadlineTimer.GetElapsedTimeMs() < 5000) + { + Response = GcClient.Get("/admin/gc", HttpClient::Accept(HttpContentType::kCbObject)); + if (Response) + { + bool Complete = Response.AsObject()["Status"].AsString() != "Running"; + if (Complete) + { + GcCompleted = true; + break; + } + Sleep(50); + } + } + if (GcCompleted) + { + ZEN_INFO("GC for module {} completed in {}", ModuleId, NiceLatencyNs(GcTimer.GetElapsedTimeUs() * 1000)); + } + else + { + ZEN_WARN("GC for module {} did not complete after {}, proceeding with shutdown", + ModuleId, + NiceLatencyNs(GcTimer.GetElapsedTimeUs() * 1000)); + } + } Instance.Deprovision(); } catch (const std::exception& Ex) @@ -703,20 +932,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si } NotifyStateUpdate(ModuleId, HubInstanceState::Deprovisioning, HubInstanceState::Unprovisioned, Port, {}); - Instance = {}; - - std::unique_ptr<StorageServerInstance> DeleteInstance; - { - RwLock::ExclusiveLockScope HubLock(m_Lock); - auto It = m_InstanceLookup.find(std::string(ModuleId)); - ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); - ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); - DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); - } - DeleteInstance.reset(); + RemoveInstance(Instance, ActiveInstanceIndex, ModuleId); } Hub::Response @@ -989,6 +1205,46 @@ Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t Ac } } +void +Hub::RemoveInstance(StorageServerInstance::ExclusiveLockedPtr& Instance, size_t ActiveInstanceIndex, std::string_view ModuleId) +{ + Instance = {}; + + std::unique_ptr<StorageServerInstance> DeleteInstance; + { + RwLock::ExclusiveLockScope HubLock(m_Lock); + auto It = m_InstanceLookup.find(std::string(ModuleId)); + ZEN_ASSERT_SLOW(It != m_InstanceLookup.end()); + ZEN_ASSERT_SLOW(It->second == ActiveInstanceIndex); + DeleteInstance = std::move(m_ActiveInstances[ActiveInstanceIndex].Instance); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + UpdateInstanceState(HubLock, ActiveInstanceIndex, HubInstanceState::Unprovisioned); + } + DeleteInstance.reset(); +} + +void +Hub::ObliterateBackendData(std::string_view ModuleId) +{ + std::filesystem::path ServerStateDir = m_RunEnvironment.GetChildBaseDir() / ModuleId; + std::filesystem::path TempDir = m_HydrationTempPath / ModuleId; + + std::atomic<bool> AbortFlag{false}; + std::atomic<bool> PauseFlag{false}; + + HydrationConfig Config{.ServerStateDir = ServerStateDir, .TempDir = TempDir, .ModuleId = std::string(ModuleId)}; + if (m_Config.OptionalHydrationWorkerPool) + { + Config.Threading.emplace(HydrationConfig::ThreadingOptions{.WorkerPool = m_Config.OptionalHydrationWorkerPool, + .AbortFlag = &AbortFlag, + .PauseFlag = &PauseFlag}); + } + + std::unique_ptr<HydrationStrategyBase> Hydrator = m_Hydration->CreateHydrator(Config); + Hydrator->Obliterate(); +} + bool Hub::Find(std::string_view ModuleId, InstanceInfo* OutInstanceInfo) { @@ -1047,7 +1303,12 @@ Hub::GetInstanceCount() bool Hub::CanProvisionInstanceLocked(std::string_view ModuleId, std::string& OutReason) { - ZEN_UNUSED(ModuleId); + if (m_ObliteratingInstances.contains(std::string(ModuleId))) + { + OutReason = fmt::format("module '{}' is being obliterated", ModuleId); + return false; + } + if (m_FreeActiveInstanceIndexes.empty()) { OutReason = fmt::format("instance limit ({}) exceeded", m_Config.InstanceLimit); @@ -1083,6 +1344,21 @@ Hub::GetInstanceIndexAssignedPort(size_t ActiveInstanceIndex) const return gsl::narrow<uint16_t>(m_Config.BasePortNumber + ActiveInstanceIndex); } +bool +Hub::IsInstancePort(uint16_t Port) const +{ + if (Port < m_Config.BasePortNumber) + { + return false; + } + size_t Index = Port - m_Config.BasePortNumber; + if (Index >= m_ActiveInstances.size()) + { + return false; + } + return m_ActiveInstances[Index].State.load(std::memory_order_relaxed) != HubInstanceState::Unprovisioned; +} + HubInstanceState Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewState) { @@ -1093,11 +1369,13 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS case HubInstanceState::Unprovisioned: return To == HubInstanceState::Provisioning; case HubInstanceState::Provisioned: - return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed; + return To == HubInstanceState::Hibernating || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Crashed || + To == HubInstanceState::Obliterating; case HubInstanceState::Hibernated: - return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning; + return To == HubInstanceState::Waking || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Obliterating; case HubInstanceState::Crashed: - return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || To == HubInstanceState::Recovering; + return To == HubInstanceState::Provisioning || To == HubInstanceState::Deprovisioning || + To == HubInstanceState::Recovering || To == HubInstanceState::Obliterating; case HubInstanceState::Provisioning: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; case HubInstanceState::Hibernating: @@ -1109,6 +1387,8 @@ Hub::UpdateInstanceStateLocked(size_t ActiveInstanceIndex, HubInstanceState NewS To == HubInstanceState::Crashed; case HubInstanceState::Recovering: return To == HubInstanceState::Provisioned || To == HubInstanceState::Unprovisioned; + case HubInstanceState::Obliterating: + return To == HubInstanceState::Unprovisioned || To == HubInstanceState::Crashed; } return false; }(m_ActiveInstances[ActiveInstanceIndex].State.load(), NewState)); @@ -1124,10 +1404,14 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; size_t ActiveInstanceIndex = (size_t)-1; - { RwLock::ExclusiveLockScope _(m_Lock); + if (m_ShutdownFlag.load()) + { + return; + } + auto It = m_InstanceLookup.find(std::string(ModuleId)); if (It == m_InstanceLookup.end()) { @@ -1351,7 +1635,7 @@ Hub::CheckInstanceStatus(HttpClient& ActivityCheckClient, } else { - // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering) - expected, skip. + // transitional state (Provisioning, Deprovisioning, Hibernating, Waking, Recovering, Obliterating) - expected, skip. // Crashed is handled above via AttemptRecoverInstance; it appears here only when the instance // lock was busy on a previous cycle and recovery is already pending. return true; @@ -1511,6 +1795,14 @@ static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::mill namespace hub_testutils { + struct TestHubPools + { + WorkerThreadPool ProvisionPool; + WorkerThreadPool HydrationPool; + + explicit TestHubPools(int ThreadCount) : ProvisionPool(ThreadCount, "hub_test_prov"), HydrationPool(ThreadCount, "hub_test_hydr") {} + }; + ZenServerEnvironment MakeHubEnvironment(const std::filesystem::path& BaseDir) { return ZenServerEnvironment(ZenServerEnvironment::Hub, GetRunningExecutablePath().parent_path(), BaseDir); @@ -1519,9 +1811,14 @@ namespace hub_testutils { std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, - WorkerThreadPool* WorkerPool = nullptr) + TestHubPools* Pools = nullptr) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); + if (Pools) + { + Config.OptionalProvisionWorkerPool = &Pools->ProvisionPool; + Config.OptionalHydrationWorkerPool = &Pools->HydrationPool; + } + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); } struct CallbackRecord @@ -1593,14 +1890,32 @@ namespace hub_testutils { } // namespace hub_testutils -TEST_CASE("hub.provision_basic") +TEST_CASE("hub.provision") { ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); + + struct TransitionRecord + { + HubInstanceState OldState; + HubInstanceState NewState; + }; + RwLock CaptureMutex; + std::vector<TransitionRecord> Transitions; + + hub_testutils::StateChangeCapture CaptureInstance; + + auto CaptureFunc = + [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { + CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); + CaptureInstance.CaptureFunc()(ModuleId, Info, OldState, NewState); + }; + + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); + // Provision HubProvisionedInstanceInfo Info; const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); @@ -1617,6 +1932,15 @@ TEST_CASE("hub.provision_basic") CHECK(ModClient.Get("/health/")); } + // Verify provision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); + } + + // Deprovision const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); @@ -1626,6 +1950,28 @@ TEST_CASE("hub.provision_basic") HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); CHECK(!ModClient.Get("/health/")); } + + // Verify deprovision callback + { + RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); + REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "module_a"); + CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); + } + + // Verify full transition sequence + { + RwLock::SharedLockScope _(CaptureMutex); + REQUIRE_EQ(Transitions.size(), 4u); + CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); + CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); + CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); + CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); + CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); + } } TEST_CASE("hub.provision_config") @@ -1678,92 +2024,6 @@ TEST_CASE("hub.provision_config") } } -TEST_CASE("hub.provision_callbacks") -{ - ScopedTemporaryDirectory TempDir; - - hub_testutils::StateChangeCapture CaptureInstance; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); - - HubProvisionedInstanceInfo Info; - - const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); - REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.ProvisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.ProvisionCallbacks[0].Port, Info.Port); - CHECK_NE(CaptureInstance.ProvisionCallbacks[0].Port, 0); - } - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(ModClient.Get("/health/")); - } - - const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); - CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); - - { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); - CHECK(!ModClient.Get("/health/")); - } - - { - RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); - REQUIRE_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].ModuleId, "cb_module"); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks[0].Port, Info.Port); - CHECK_EQ(CaptureInstance.DeprovisionCallbacks.size(), 1u); - } -} - -TEST_CASE("hub.provision_callback_sequence") -{ - ScopedTemporaryDirectory TempDir; - - struct TransitionRecord - { - HubInstanceState OldState; - HubInstanceState NewState; - }; - RwLock CaptureMutex; - std::vector<TransitionRecord> Transitions; - - auto CaptureFunc = - [&](std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, HubInstanceState OldState, HubInstanceState NewState) { - ZEN_UNUSED(ModuleId); - ZEN_UNUSED(Info); - CaptureMutex.WithExclusiveLock([&]() { Transitions.push_back({OldState, NewState}); }); - }; - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, std::move(CaptureFunc)); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("seq_module", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Deprovision("seq_module"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - RwLock::SharedLockScope _(CaptureMutex); - REQUIRE_EQ(Transitions.size(), 4u); - CHECK_EQ(Transitions[0].OldState, HubInstanceState::Unprovisioned); - CHECK_EQ(Transitions[0].NewState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].OldState, HubInstanceState::Provisioning); - CHECK_EQ(Transitions[1].NewState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].OldState, HubInstanceState::Provisioned); - CHECK_EQ(Transitions[2].NewState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].OldState, HubInstanceState::Deprovisioning); - CHECK_EQ(Transitions[3].NewState, HubInstanceState::Unprovisioned); -} - TEST_CASE("hub.instance_limit") { ScopedTemporaryDirectory TempDir; @@ -1795,54 +2055,7 @@ TEST_CASE("hub.instance_limit") CHECK_EQ(HubInstance->GetInstanceCount(), 2); } -TEST_CASE("hub.enumerate_modules") -{ - ScopedTemporaryDirectory TempDir; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); - - HubProvisionedInstanceInfo Info; - - { - const Hub::Response R = HubInstance->Provision("enum_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - { - const Hub::Response R = HubInstance->Provision("enum_b", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - std::vector<std::string> Ids; - int ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - CHECK_EQ(Ids.size(), 2u); - CHECK_EQ(ProvisionedCount, 2); - const bool FoundA = std::find(Ids.begin(), Ids.end(), "enum_a") != Ids.end(); - const bool FoundB = std::find(Ids.begin(), Ids.end(), "enum_b") != Ids.end(); - CHECK(FoundA); - CHECK(FoundB); - - HubInstance->Deprovision("enum_a"); - Ids.clear(); - ProvisionedCount = 0; - HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { - Ids.push_back(std::string(ModuleId)); - if (InstanceInfo.State == HubInstanceState::Provisioned) - { - ProvisionedCount++; - } - }); - REQUIRE_EQ(Ids.size(), 1u); - CHECK_EQ(Ids[0], "enum_b"); - CHECK_EQ(ProvisionedCount, 1); -} - -TEST_CASE("hub.max_instance_count") +TEST_CASE("hub.enumerate_and_instance_tracking") { ScopedTemporaryDirectory TempDir; std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); @@ -1852,22 +2065,56 @@ TEST_CASE("hub.max_instance_count") HubProvisionedInstanceInfo Info; { - const Hub::Response R = HubInstance->Provision("max_a", Info); + const Hub::Response R = HubInstance->Provision("track_a", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); { - const Hub::Response R = HubInstance->Provision("max_b", Info); + const Hub::Response R = HubInstance->Provision("track_b", Info); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); + // Enumerate both modules + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + CHECK_EQ(Ids.size(), 2u); + CHECK_EQ(ProvisionedCount, 2); + CHECK(std::find(Ids.begin(), Ids.end(), "track_a") != Ids.end()); + CHECK(std::find(Ids.begin(), Ids.end(), "track_b") != Ids.end()); + } + const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - HubInstance->Deprovision("max_a"); + // Deprovision one - max instance count must not decrease + HubInstance->Deprovision("track_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); + + // Enumerate after deprovision + { + std::vector<std::string> Ids; + int ProvisionedCount = 0; + HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { + Ids.push_back(std::string(ModuleId)); + if (InstanceInfo.State == HubInstanceState::Provisioned) + { + ProvisionedCount++; + } + }); + REQUIRE_EQ(Ids.size(), 1u); + CHECK_EQ(Ids[0], "track_b"); + CHECK_EQ(ProvisionedCount, 1); + } } TEST_CASE("hub.concurrent_callbacks") @@ -2013,7 +2260,7 @@ TEST_CASE("hub.job_object") } # endif // ZEN_PLATFORM_WINDOWS -TEST_CASE("hub.hibernate_wake") +TEST_CASE("hub.hibernate_wake_obliterate") { ScopedTemporaryDirectory TempDir; Hub::Configuration Config; @@ -2023,6 +2270,11 @@ TEST_CASE("hub.hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; + // Error cases on non-existent modules (no provision needed) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Provision { const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); @@ -2038,9 +2290,14 @@ TEST_CASE("hub.hibernate_wake") CHECK(ModClient.Get("/health/")); } + // Double-wake on provisioned module is idempotent + CHECK(HubInstance->Wake("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Hibernate - const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); - REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); + { + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime; @@ -2050,9 +2307,14 @@ TEST_CASE("hub.hibernate_wake") CHECK(!ModClient.Get("/health/")); } + // Double-hibernate on already-hibernated module is idempotent + CHECK(HubInstance->Hibernate("hib_a").ResponseCode == Hub::EResponseCode::Completed); + // Wake - const Hub::Response WakeResult = HubInstance->Wake("hib_a"); - REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); + { + const Hub::Response R = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); CHECK_GE(Info.StateChangeTime, HibernatedTime); @@ -2061,57 +2323,63 @@ TEST_CASE("hub.hibernate_wake") CHECK(ModClient.Get("/health/")); } - // Deprovision - const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); - CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); - CHECK_FALSE(HubInstance->Find("hib_a")); + // Hibernate again for obliterate-from-hibernated test { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + const Hub::Response R = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } -} - -TEST_CASE("hub.hibernate_wake_errors") -{ - ScopedTemporaryDirectory TempDir; - Hub::Configuration Config; - Config.BasePortNumber = 22700; - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo ProvInfo; + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Hibernated); - // Hibernate/wake on a non-existent module - returns NotFound (-> 404) - CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Obliterate from hibernated + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); - // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) + // Re-provision for obliterate-from-provisioned test { - const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } + REQUIRE(HubInstance->Find("hib_a", &Info)); + CHECK_EQ(Info.State, HubInstanceState::Provisioned); { - const Hub::Response R = HubInstance->Hibernate("err_b"); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); } + // Obliterate from provisioned + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + CHECK_FALSE(HubInstance->Find("hib_a")); { - const Hub::Response HibResp = HubInstance->Hibernate("err_b"); - CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); } - // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) + // Obliterate deprovisioned module (not tracked by hub, backend data may exist) { - const Hub::Response R = HubInstance->Wake("err_b"); + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } - { - const Hub::Response WakeResp = HubInstance->Wake("err_b"); - CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); + const Hub::Response R = HubInstance->Deprovision("hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + CHECK_FALSE(HubInstance->Find("hib_a")); + { + const Hub::Response R = HubInstance->Obliterate("hib_a"); + CHECK(R.ResponseCode == Hub::EResponseCode::Completed); } - // Deprovision not-found - returns NotFound (-> 404) - CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + // Obliterate of a never-provisioned module also succeeds (no-op backend cleanup) + CHECK(HubInstance->Obliterate("never_existed").ResponseCode == Hub::EResponseCode::Completed); } TEST_CASE("hub.async_hibernate_wake") @@ -2121,8 +2389,8 @@ TEST_CASE("hub.async_hibernate_wake") Hub::Configuration Config; Config.BasePortNumber = 23000; - WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; @@ -2252,25 +2520,21 @@ TEST_CASE("hub.recover_process_crash") if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned && ModClient.Get("/health/")) { - // Recovery must reuse the same port - the instance was never removed from the hub's - // port table during recovery, so AttemptRecoverInstance reuses m_Config.BasePort. CHECK_EQ(InstanceInfo.Port, Info.Port); Recovered = true; break; } } - CHECK_MESSAGE(Recovered, "Instance did not recover within timeout"); + REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // Verify the full crash/recovery callback sequence { RwLock::SharedLockScope _(CaptureMutex); REQUIRE_GE(Transitions.size(), 3u); - // Find the Provisioned->Crashed transition const auto CrashedIt = std::find_if(Transitions.begin(), Transitions.end(), [](const TransitionRecord& R) { return R.OldState == HubInstanceState::Provisioned && R.NewState == HubInstanceState::Crashed; }); REQUIRE_NE(CrashedIt, Transitions.end()); - // Recovery sequence follows: Crashed->Recovering, Recovering->Provisioned const auto RecoveringIt = CrashedIt + 1; REQUIRE_NE(RecoveringIt, Transitions.end()); CHECK_EQ(RecoveringIt->OldState, HubInstanceState::Crashed); @@ -2280,44 +2544,6 @@ TEST_CASE("hub.recover_process_crash") CHECK_EQ(RecoveredIt->OldState, HubInstanceState::Recovering); CHECK_EQ(RecoveredIt->NewState, HubInstanceState::Provisioned); } -} - -TEST_CASE("hub.recover_process_crash_then_deprovision") -{ - ScopedTemporaryDirectory TempDir; - - // Fast watchdog cycle so crash detection is near-instant instead of waiting up to the 3s default. - Hub::Configuration Config; - Config.WatchDog.CycleInterval = std::chrono::milliseconds(10); - Config.WatchDog.InstanceCheckThrottle = std::chrono::milliseconds(1); - - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); - - HubProvisionedInstanceInfo Info; - { - const Hub::Response R = HubInstance->Provision("module_a", Info); - REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); - } - - // Kill the child process, wait for the watchdog to detect and recover the instance. - HubInstance->TerminateModuleForTesting("module_a"); - - constexpr auto kPollIntervalMs = std::chrono::milliseconds(50); - constexpr auto kTimeoutMs = std::chrono::seconds(15); - const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; - - bool Recovered = false; - while (std::chrono::steady_clock::now() < Deadline) - { - std::this_thread::sleep_for(kPollIntervalMs); - Hub::InstanceInfo InstanceInfo; - if (HubInstance->Find("module_a", &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) - { - Recovered = true; - break; - } - } - REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. { @@ -2346,8 +2572,8 @@ TEST_CASE("hub.async_provision_concurrent") Config.BasePortNumber = 22800; Config.InstanceLimit = kModuleCount; - WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(4); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); std::vector<std::string> Reasons(kModuleCount); @@ -2428,8 +2654,8 @@ TEST_CASE("hub.async_provision_shutdown_waits") Config.InstanceLimit = kModuleCount; Config.BasePortNumber = 22900; - WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); @@ -2461,8 +2687,8 @@ TEST_CASE("hub.async_provision_rejected") Config.InstanceLimit = 1; Config.BasePortNumber = 23100; - WorkerThreadPool WorkerPool(2, "hub_async_rejected"); - std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + hub_testutils::TestHubPools Pools(2); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &Pools); HubProvisionedInstanceInfo Info; @@ -2550,12 +2776,12 @@ TEST_CASE("hub.instance.inactivity.deprovision") // Phase 1: immediately after setup all three instances must still be alive. // No timeout has elapsed yet (only 100ms have passed). - CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + CHECK_MESSAGE(HubInstance->Find("idle"), "idle was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("idle_hib"), "idle_hib was deprovisioned within 100ms - its 1s hibernated timeout has not elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), - "persistent was deprovisioned within 100ms - its 2s provisioned timeout has not elapsed"); + "persistent was deprovisioned within 100ms - its 4s provisioned timeout has not elapsed"); // Phase 2: idle_hib must be deprovisioned by the watchdog within its 1s hibernated timeout. // idle must remain alive - its 2s provisioned timeout has not elapsed yet. @@ -2579,7 +2805,7 @@ TEST_CASE("hub.instance.inactivity.deprovision") CHECK_MESSAGE(!HubInstance->Find("idle_hib"), "idle_hib should still be gone - it was deprovisioned in phase 2"); - CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 3s provisioned timeout elapsed"); + CHECK_MESSAGE(!HubInstance->Find("idle"), "idle should be gone after its 4s provisioned timeout elapsed"); CHECK_MESSAGE(HubInstance->Find("persistent"), "persistent was incorrectly deprovisioned - its activity timer was reset by PokeInstance"); |