diff options
| author | Dan Engelbrecht <[email protected]> | 2026-03-24 18:50:59 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-24 18:50:59 +0100 |
| commit | b730eebe53d1d6827d4b6c320ccfd80566a629a6 (patch) | |
| tree | d8df23d781b5fb3b1d7bd170fa7d81e2501ab901 /src | |
| parent | Subprocess Manager (#889) (diff) | |
| download | zen-b730eebe53d1d6827d4b6c320ccfd80566a629a6.tar.xz zen-b730eebe53d1d6827d4b6c320ccfd80566a629a6.zip | |
- Improvement: Hub provision, deprovision, hibernate, and wake operations are now async. HTTP requests returns 202 Accepted while the operation completes in the background
- Improvement: Hub returns 202 Accepted (instead of 409 Conflict) when the same async operation is already in progress for a module
- Improvement: Hub returns 200 OK when a requested state transition is already satisfied
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/thread.h | 7 | ||||
| -rw-r--r-- | src/zenserver-test/hub-tests.cpp | 204 | ||||
| -rw-r--r-- | src/zenserver/hub/httphubservice.cpp | 165 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 1090 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 54 | ||||
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 2 |
6 files changed, 1126 insertions, 396 deletions
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h index d7262324f..56ce5904b 100644 --- a/src/zencore/include/zencore/thread.h +++ b/src/zencore/include/zencore/thread.h @@ -190,6 +190,13 @@ class Latch public: Latch(std::ptrdiff_t Count) : Counter(Count) {} + void Reset(std::ptrdiff_t Count) + { + ZEN_ASSERT(Counter.load() == 0); + Complete.Reset(); + Counter.store(Count); + } + void CountDown() { std::ptrdiff_t Old = Counter.fetch_sub(1); diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp index dbe6fa785..f86bdc5c7 100644 --- a/src/zenserver-test/hub-tests.cpp +++ b/src/zenserver-test/hub-tests.cpp @@ -33,6 +33,77 @@ using namespace std::literals; static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)}; +static bool +WaitForModuleState(HttpClient& Client, std::string_view ModuleId, std::string_view ExpectedState, int TimeoutMs = 10000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId)); + if (R && R.AsObject()["state"].AsString() == ExpectedState) + { + return true; + } + Sleep(100); + } + HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId)); + return R && R.AsObject()["state"].AsString() == ExpectedState; +} + +// Provision a module, retrying on 409 Conflict to handle the window where an async +// deprovision has removed the module from InstanceLookup but not yet from +// DeprovisioningModules (which CanProvisionInstance checks). +static HttpClient::Response +ProvisionModule(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000) +{ + Stopwatch Timer; + HttpClient::Response Result; + do + { + Result = Client.Post(fmt::format("modules/{}/provision", ModuleId)); + if (Result || Result.StatusCode != HttpResponseCode::Conflict) + { + return Result; + } + Sleep(100); + } while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)); + return Result; +} + +// Wait for a port to stop accepting connections (i.e. the process has terminated). +// Needed after async deprovision: WaitForModuleGone returns as soon as the module +// leaves m_InstanceLookup (synchronous), but the background worker that kills the +// process may not have run yet. +static bool +WaitForPortUnreachable(HttpClient& Client, std::string_view Path = "/health/", int TimeoutMs = 10000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + if (!Client.Get(Path)) + { + return true; + } + Sleep(100); + } + return !Client.Get(Path); +} + +static bool +WaitForModuleGone(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000) +{ + Stopwatch Timer; + while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs)) + { + if (Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound) + { + return true; + } + Sleep(100); + } + return Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound; +} + TEST_SUITE_BEGIN("server.hub"); TEST_CASE("hub.lifecycle.children") @@ -65,9 +136,7 @@ TEST_CASE("hub.lifecycle.children") AbcPort = AbcResult["port"].AsUInt16(0); CHECK_NE(AbcPort, 0); - Result = Client.Get("modules/abc"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + REQUIRE(WaitForModuleState(Client, "abc", "provisioned")); // This should be a fresh instance with no contents @@ -91,6 +160,8 @@ TEST_CASE("hub.lifecycle.children") DefPort = DefResult["port"].AsUInt16(0); REQUIRE_NE(DefPort, 0); + REQUIRE(WaitForModuleState(Client, "def", "provisioned")); + // This should be a fresh instance with no contents HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); @@ -110,21 +181,24 @@ TEST_CASE("hub.lifecycle.children") Result = Client.Post("modules/ghi/provision"); REQUIRE(Result); + REQUIRE(WaitForModuleState(Client, "ghi", "provisioned")); // Tear down instances Result = Client.Post("modules/abc/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "abc")); { HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } Result = Client.Post("modules/def/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "def")); { HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } Result = Client.Post("modules/ghi/deprovision"); @@ -132,7 +206,7 @@ TEST_CASE("hub.lifecycle.children") // re-provision to verify that (de)hydration preserved state { - Result = Client.Post("modules/abc/provision"); + Result = ProvisionModule(Client, "abc"); REQUIRE(Result); CbObject AbcResult = Result.AsObject(); @@ -140,6 +214,8 @@ TEST_CASE("hub.lifecycle.children") AbcPort = AbcResult["port"].AsUInt16(0); REQUIRE_NE(AbcPort, 0); + REQUIRE(WaitForModuleState(Client, "abc", "provisioned")); + // This should contain the content from the previous run HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); @@ -156,7 +232,7 @@ TEST_CASE("hub.lifecycle.children") } { - Result = Client.Post("modules/def/provision"); + Result = ProvisionModule(Client, "def"); REQUIRE(Result); CbObject DefResult = Result.AsObject(); @@ -164,6 +240,8 @@ TEST_CASE("hub.lifecycle.children") DefPort = DefResult["port"].AsUInt16(0); REQUIRE_NE(DefPort, 0); + REQUIRE(WaitForModuleState(Client, "def", "provisioned")); + // This should contain the content from the previous run HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); @@ -181,22 +259,24 @@ TEST_CASE("hub.lifecycle.children") Result = Client.Post("modules/abc/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "abc")); { HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } Result = Client.Post("modules/def/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "def")); { HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } // re-provision to verify that (de)hydration preserved state, including // state which was generated after the very first dehydration { - Result = Client.Post("modules/abc/provision"); + Result = ProvisionModule(Client, "abc"); REQUIRE(Result); CbObject AbcResult = Result.AsObject(); @@ -204,6 +284,8 @@ TEST_CASE("hub.lifecycle.children") AbcPort = AbcResult["port"].AsUInt16(0); REQUIRE_NE(AbcPort, 0); + REQUIRE(WaitForModuleState(Client, "abc", "provisioned")); + // This should contain the content from the previous two runs HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); @@ -221,7 +303,7 @@ TEST_CASE("hub.lifecycle.children") } { - Result = Client.Post("modules/def/provision"); + Result = ProvisionModule(Client, "def"); REQUIRE(Result); CbObject DefResult = Result.AsObject(); @@ -229,6 +311,8 @@ TEST_CASE("hub.lifecycle.children") DefPort = DefResult["port"].AsUInt16(0); REQUIRE_NE(DefPort, 0); + REQUIRE(WaitForModuleState(Client, "def", "provisioned")); + // This should contain the content from the previous two runs HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); @@ -247,16 +331,18 @@ TEST_CASE("hub.lifecycle.children") Result = Client.Post("modules/abc/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "abc")); { HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } Result = Client.Post("modules/def/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "def")); { HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } // final sanity check that the hub is still responsive and all modules are gone @@ -393,7 +479,7 @@ TEST_CASE("hub.consul.provision.registration") HttpClient::Response Result = HubClient.Post("modules/testmod/provision"); REQUIRE(Result); - CHECK(Client.HasService("testmod")); + REQUIRE(WaitForConsulService(Client, "testmod", true, 10000)); { const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0); REQUIRE(ModulePort != 0); @@ -457,6 +543,7 @@ TEST_CASE("hub.consul.provision.registration") Result = HubClient.Post("modules/testmod/deprovision"); REQUIRE(Result); + REQUIRE(WaitForConsulService(Client, "testmod", false, 10000)); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); @@ -482,13 +569,12 @@ TEST_CASE("hub.hibernate.lifecycle") // Provision HttpClient::Response Result = Client.Post("modules/testmod/provision"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0); REQUIRE_NE(ModulePort, 0); - Result = Client.Get("modules/testmod"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + REQUIRE(WaitForModuleState(Client, "testmod", "provisioned")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); CHECK(ModClient.Get("/health/")); @@ -502,11 +588,10 @@ TEST_CASE("hub.hibernate.lifecycle") // Hibernate - state should become "hibernated", server should be unreachable Result = Client.Post("modules/testmod/hibernate"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); - Result = Client.Get("modules/testmod"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["state"].AsString(), "hibernated"sv); + REQUIRE(WaitForModuleState(Client, "testmod", "hibernated")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); CHECK(!ModClient.Get("/health/")); @@ -515,11 +600,10 @@ TEST_CASE("hub.hibernate.lifecycle") // Wake - state should return to "provisioned", server should be reachable, data should be intact Result = Client.Post("modules/testmod/wake"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); - Result = Client.Get("modules/testmod"); - REQUIRE(Result); - CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv); + REQUIRE(WaitForModuleState(Client, "testmod", "provisioned")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); CHECK(ModClient.Get("/health/")); @@ -532,17 +616,20 @@ TEST_CASE("hub.hibernate.lifecycle") // Deprovision - server should become unreachable Result = Client.Post("modules/testmod/deprovision"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + REQUIRE(WaitForModuleGone(Client, "testmod")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } // Re-provision - server should be reachable on its (potentially new) port - Result = Client.Post("modules/testmod/provision"); + Result = ProvisionModule(Client, "testmod"); REQUIRE(Result); CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv); const uint16_t ModulePort2 = Result.AsObject()["port"].AsUInt16(0); REQUIRE_NE(ModulePort2, 0); + REQUIRE(WaitForModuleState(Client, "testmod", "provisioned")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout); CHECK(ModClient.Get("/health/")); @@ -551,9 +638,10 @@ TEST_CASE("hub.hibernate.lifecycle") // Final deprovision - server should become unreachable Result = Client.Post("modules/testmod/deprovision"); REQUIRE(Result); + REQUIRE(WaitForModuleGone(Client, "testmod")); { HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout); - CHECK(!ModClient.Get("/health/")); + CHECK(WaitForPortUnreachable(ModClient)); } } @@ -574,24 +662,76 @@ TEST_CASE("hub.hibernate.errors") CHECK(!Result); CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); - // Double-hibernate: first call succeeds, second returns 400 (wrong state) + Result = Client.Post("modules/unknown/deprovision"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + Result = Client.Delete("modules/unknown"); + CHECK(!Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound); + + // Double-provision: second call while first is in-flight returns 202 Accepted with the same port. Result = Client.Post("modules/errmod/provision"); REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + const uint16_t ErrmodPort = Result.AsObject()["port"].AsUInt16(0); + REQUIRE_NE(ErrmodPort, 0); + + // Provisioning the same module while in-flight returns 202 Accepted with the allocated port. + // Evaluated synchronously before WorkerPool dispatch, so safe regardless of timing. + Result = Client.Post("modules/errmod/provision"); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + CHECK_EQ(Result.AsObject()["port"].AsUInt16(0), ErrmodPort); + + REQUIRE(WaitForModuleState(Client, "errmod", "provisioned")); + + // Already provisioned: provision and wake both return 200 Completed. + Result = Client.Post("modules/errmod/provision"); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + Result = Client.Post("modules/errmod/wake"); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + // Double-hibernate: second call while first is in-flight returns 202 Accepted. Result = Client.Post("modules/errmod/hibernate"); REQUIRE(Result); Result = Client.Post("modules/errmod/hibernate"); - CHECK(!Result); - CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + + REQUIRE(WaitForModuleState(Client, "errmod", "hibernated")); - // Wake on provisioned: succeeds (state restored), then waking again returns 400 + // Already hibernated: hibernate returns 200 Completed. + Result = Client.Post("modules/errmod/hibernate"); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::OK); + + // Double-wake: second call while first is in-flight returns 202 Accepted. Result = Client.Post("modules/errmod/wake"); REQUIRE(Result); Result = Client.Post("modules/errmod/wake"); - CHECK(!Result); - CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + + // Double-deprovision: second call while first is in-flight returns 202 Accepted. + // errmod2 is a fresh module to avoid waiting on the still-waking errmod. + Result = Client.Post("modules/errmod2/provision"); + REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + REQUIRE(WaitForModuleState(Client, "errmod2", "provisioned")); + + Result = Client.Post("modules/errmod2/deprovision"); + REQUIRE(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); + + Result = Client.Post("modules/errmod2/deprovision"); + CHECK(Result); + CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted); } TEST_SUITE_END(); diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp index 34f4294e4..032a61f08 100644 --- a/src/zenserver/hub/httphubservice.cpp +++ b/src/zenserver/hub/httphubservice.cpp @@ -11,6 +11,37 @@ namespace zen { +namespace { + bool HandleFailureResults(HttpServerRequest& Request, const Hub::Response& Resp) + { + if (Resp.ResponseCode == Hub::EResponseCode::Rejected) + { + if (Resp.Message.empty()) + { + Request.WriteResponse(HttpResponseCode::Conflict); + } + else + { + Request.WriteResponse(HttpResponseCode::Conflict, HttpContentType::kText, Resp.Message); + } + return true; + } + if (Resp.ResponseCode == Hub::EResponseCode::NotFound) + { + if (Resp.Message.empty()) + { + Request.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, Resp.Message); + } + return true; + } + return false; + } +} // namespace + HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) { using namespace std::literals; @@ -83,144 +114,113 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub) [this](HttpRouterRequest& Req) { std::string_view ModuleId = Req.GetCapture(1); - std::string FailureReason = "unknown"; - HttpResponseCode ResponseCode = HttpResponseCode::OK; - try { HubProvisionedInstanceInfo Info; - if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason)) - { - CbObjectWriter Obj; - Obj << "moduleId" << ModuleId; - Obj << "baseUri" << Info.BaseUri; - Obj << "port" << Info.Port; - Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + Hub::Response Resp = m_Hub.Provision(ModuleId, Info); - return; - } - else + if (HandleFailureResults(Req.ServerRequest(), Resp)) { - ResponseCode = HttpResponseCode::BadRequest; + return; } + + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + Obj << "baseUri" << Info.BaseUri; + Obj << "port" << Info.Port; + return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what()); - - FailureReason = Ex.what(); - ResponseCode = HttpResponseCode::InternalServerError; + throw; } - - Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); m_Router.RegisterRoute( "modules/{moduleid}/deprovision", [this](HttpRouterRequest& Req) { - std::string_view ModuleId = Req.GetCapture(1); - std::string FailureReason = "unknown"; + std::string_view ModuleId = Req.GetCapture(1); try { - if (!m_Hub.Deprovision(std::string(ModuleId), /* out */ FailureReason)) + Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId)); + + if (HandleFailureResults(Req.ServerRequest(), Resp)) { - if (FailureReason.empty()) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); - } - else - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); - } + return; } + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; CbObjectWriter Obj; Obj << "moduleId" << ModuleId; - - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); - - FailureReason = Ex.what(); + throw; } - - Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); m_Router.RegisterRoute( "modules/{moduleid}/hibernate", [this](HttpRouterRequest& Req) { - std::string_view ModuleId = Req.GetCapture(1); - std::string FailureReason = "unknown"; + std::string_view ModuleId = Req.GetCapture(1); try { - if (!m_Hub.Hibernate(std::string(ModuleId), /* out */ FailureReason)) + Hub::Response Resp = m_Hub.Hibernate(std::string(ModuleId)); + + if (HandleFailureResults(Req.ServerRequest(), Resp)) { - if (FailureReason.empty()) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); - } - else - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); - } + return; } + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; CbObjectWriter Obj; Obj << "moduleId" << ModuleId; - - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while hibernating module '{}': {}", ModuleId, Ex.what()); - - FailureReason = Ex.what(); + throw; } - - Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); m_Router.RegisterRoute( "modules/{moduleid}/wake", [this](HttpRouterRequest& Req) { - std::string_view ModuleId = Req.GetCapture(1); - std::string FailureReason = "unknown"; + std::string_view ModuleId = Req.GetCapture(1); try { - if (!m_Hub.Wake(std::string(ModuleId), /* out */ FailureReason)) + Hub::Response Resp = m_Hub.Wake(std::string(ModuleId)); + + if (HandleFailureResults(Req.ServerRequest(), Resp)) { - if (FailureReason.empty()) - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound); - } - else - { - return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); - } + return; } + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; CbObjectWriter Obj; Obj << "moduleId" << ModuleId; - - return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save()); + return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while waking module '{}': {}", ModuleId, Ex.what()); - - FailureReason = Ex.what(); + throw; } - - Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason); }, HttpVerb::kPost); @@ -288,27 +288,27 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated || InstanceInfo.State == HubInstanceState::Crashed) { - std::string FailureReason; try { - if (!m_Hub.Deprovision(std::string(ModuleId), FailureReason)) + Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId)); + + if (HandleFailureResults(Request, Resp)) { - if (FailureReason.empty()) - { - Request.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason); - } return; } + + // TODO: nuke all related storage + + const HttpResponseCode HttpCode = + (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK; + CbObjectWriter Obj; + Obj << "moduleId" << ModuleId; + return Request.WriteResponse(HttpCode, Obj.Save()); } catch (const std::exception& Ex) { ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what()); - Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Ex.what()); - return; + throw; } } @@ -316,7 +316,6 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view CbObjectWriter Obj; Obj << "moduleId" << ModuleId; - Obj << "state" << ToString(InstanceInfo.State); Request.WriteResponse(HttpResponseCode::OK, Obj.Save()); } diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 6a2609443..ceba21d8d 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -10,6 +10,7 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> +#include <zencore/workthreadpool.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <zencore/filesystem.h> # include <zencore/testing.h> # include <zencore/testutils.h> -# include <zencore/workthreadpool.h> # include <zenhttp/httpclient.h> #endif @@ -122,9 +122,14 @@ private: ////////////////////////////////////////////////////////////////////////// -Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) +Hub::Hub(const Configuration& Config, + ZenServerEnvironment&& RunEnvironment, + WorkerThreadPool* OptionalWorkerPool, + AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback) : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) +, m_WorkerPool(OptionalWorkerPool) +, m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) { m_HostMetrics = GetSystemMetrics(); @@ -196,42 +201,44 @@ Hub::Shutdown() m_WatchDog = {}; - m_Lock.WithExclusiveLock([this] { - for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup) - { - std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - { - StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true)); + bool Expected = false; + bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true); + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + // Shutdown flag is set and all background work is drained, safe to shut down remaining instances - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + m_BackgroundWorkLatch.Reset(1); + } - try - { - (void)Instance.Deprovision(); - } - catch (const std::exception& Ex) - { - ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); - } - // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire. - NewState = HubInstanceState::Unprovisioned; - Instance = {}; + EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) { + ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings... + try + { + const Response DepResp = InternalDeprovision(std::string(ModuleId)); + if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted) + { + ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message); } - InstanceRaw.reset(); } - m_InstanceLookup.clear(); - m_ActiveInstances.clear(); - m_FreeActiveInstanceIndexes.clear(); + catch (const std::exception& Ex) + { + ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what()); + } }); + + if (WaitForBackgroundWork && m_WorkerPool) + { + m_BackgroundWorkLatch.CountDown(); + m_BackgroundWorkLatch.Wait(); + } } -bool -Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason) +Hub::Response +Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo) { + ZEN_ASSERT(!m_ShutdownFlag.load()); StorageServerInstance::ExclusiveLockedPtr Instance; bool IsNewInstance = false; uint16_t AllocatedPort = 0; @@ -245,6 +252,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s } }); + if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end()) + { + // Same operation already in flight -- return the already-allocated port. + // RestoreAllocatedPort is a no-op here because IsNewInstance is still false + // (we return before line 273 where it is set), so no port is double-freed. + OutInfo.Port = It->second; + return Response{EResponseCode::Accepted}; + } + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end()) { std::string Reason; @@ -252,9 +268,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s { ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason); - OutReason = Reason; - - return false; + return Response{EResponseCode::Rejected, Reason}; } IsNewInstance = true; @@ -313,21 +327,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s if (m_RecoveringModules.contains(std::string(ModuleId))) { - OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; } std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; - Instance = InstanceRaw->LockExclusive(/*Wait*/ true); - AllocatedPort = InstanceRaw->GetBasePort(); + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + { + OutInfo.Port = InstanceRaw->GetBasePort(); + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + AllocatedPort = InstanceRaw->GetBasePort(); } - m_ProvisioningModules.emplace(std::string(ModuleId)); + m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_ProvisioningModules tracks which modules are being provisioned, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + AllocatedPort, + IsNewInstance, + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteProvision(*Instance, AllocatedPort, IsNewInstance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + if (IsNewInstance) + { + try + { + AbortProvision(ModuleId); + } + catch (const std::exception& DestroyEx) + { + ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); + } + } + + Instance = {}; + + { + RwLock::ExclusiveLockScope _(m_Lock); + m_ProvisioningModules.erase(std::string(ModuleId)); + if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) + { + m_FreePorts.push_back(AllocatedPort); + } + } + + throw; + } + } + else + { + CompleteProvision(Instance, AllocatedPort, IsNewInstance); + } + + OutInfo.Port = AllocatedPort; + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -340,47 +433,34 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId))) { m_FreePorts.push_back(AllocatedPort); - AllocatedPort = 0; } }); - // NOTE: this is done while not holding the hub lock, as provisioning may take time - // and we don't want to block other operations. We track which modules are being - // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision - // those modules while in this state. - - try + if (m_ShutdownFlag.load() == false) { - (void)Instance.Provision(); // false = already in target state (idempotent); not an error - NewState = Instance.GetState(); - Instance = {}; + try + { + (void)Instance.Provision(); // false = already in target state (idempotent); not an error + NewState = Instance.GetState(); + AllocatedPort = 0; + Instance = {}; + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); + } } - catch (const std::exception& Ex) + + if (Instance) { - ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what()); NewState = Instance.GetState(); Instance = {}; if (IsNewInstance) { - // Clean up failed instance provisioning - std::unique_ptr<StorageServerInstance> DestroyInstance; - { - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) - { - const size_t ActiveInstanceIndex = It->second; - ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); - ZEN_ASSERT(DestroyInstance); - ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); - m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); - m_InstanceLookup.erase(It); - } - } try { - DestroyInstance.reset(); + AbortProvision(ModuleId); NewState = HubInstanceState::Unprovisioned; } catch (const std::exception& DestroyEx) @@ -388,18 +468,42 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what()); } } - throw; } +} - OutReason.clear(); - OutInfo.Port = AllocatedPort; - // TODO: base URI? Would need to know what host name / IP to use +void +Hub::AbortProvision(std::string_view ModuleId) +{ + std::unique_ptr<StorageServerInstance> DestroyInstance; + { + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end()) + { + const size_t ActiveInstanceIndex = It->second; + ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); + DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); + ZEN_ASSERT(DestroyInstance); + ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); + m_InstanceLookup.erase(It); + } + else + { + ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId); + } + } + DestroyInstance.reset(); +} - return true; +Hub::Response +Hub::Deprovision(const std::string& ModuleId) +{ + ZEN_ASSERT(!m_ShutdownFlag.load()); + return InternalDeprovision(ModuleId); } -bool -Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::InternalDeprovision(const std::string& ModuleId) { std::unique_ptr<StorageServerInstance> RawInstance; StorageServerInstance::ExclusiveLockedPtr Instance; @@ -409,26 +513,27 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) if (m_ProvisioningModules.contains(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId); - ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)}; } if (m_RecoveringModules.contains(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId); ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)}; + } + + if (m_DeprovisioningModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; } if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end()) { ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId); - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } else { @@ -436,33 +541,90 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]); ZEN_ASSERT(RawInstance != nullptr); + m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex); m_InstanceLookup.erase(It); - m_DeprovisioningModules.emplace(ModuleId); + m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort()); Instance = RawInstance->LockExclusive(/*Wait*/ true); } } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(RawInstance); ZEN_ASSERT(Instance); + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)), + RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteDeprovision(*Instance); + RawInstance.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; + uint16_t BasePort = Instance.GetBasePort(); + InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {}); + + // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly. + Instance = {}; + NewState = HubInstanceState::Unprovisioned; + + { + RwLock::ExclusiveLockScope _(m_Lock); + m_DeprovisioningModules.erase(std::string(ModuleId)); + m_FreePorts.push_back(BasePort); + } + throw; + } + } + else + { + CompleteDeprovision(Instance); + RawInstance.reset(); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); HubInstanceState NewState = OldState; InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); - // The module is deprovisioned outside the hub lock to avoid blocking other operations. - // - // To ensure that no new provisioning can occur while we're deprovisioning, - // we add the module ID to m_DeprovisioningModules and remove it once - // deprovisioning is complete. - auto _ = MakeGuard([&] { { RwLock::ExclusiveLockScope _(m_Lock); - m_DeprovisioningModules.erase(ModuleId); + m_DeprovisioningModules.erase(std::string(ModuleId)); m_FreePorts.push_back(BasePort); } }); @@ -470,6 +632,8 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) try { (void)Instance.Deprovision(); + NewState = Instance.GetState(); + Instance = {}; } catch (const std::exception& Ex) { @@ -479,42 +643,116 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - NewState = Instance.GetState(); - Instance = {}; - OutReason.clear(); - - return true; } -bool -Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::Hibernate(const std::string& ModuleId) { + ZEN_ASSERT(!m_ShutdownFlag.load()); + StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) + if (m_HibernatingModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + if (IsModuleInFlightLocked(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); - m_HibernatingModules.emplace(ModuleId); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Hibernated) + { + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort()); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_HibernatingModules tracks which modules are being hibernated, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + + // Validate state while holding the exclusive instance lock (state cannot change). + // This gives a synchronous error response for invalid-state calls on both the sync + // and async paths, matching the existing behaviour. + if (Instance.GetState() != HubInstanceState::Provisioned) + { + const Response HibernateResp = + Response{EResponseCode::Rejected, + fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))}; + Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(ModuleId); + return HibernateResp; + } + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteHibernate(*Instance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo the latch increment and tracking-set membership. + // State has not changed so no callback is needed. + // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + Instance = {}; + { + RwLock::ExclusiveLockScope _(m_Lock); + m_HibernatingModules.erase(std::string(ModuleId)); + } + throw; + } + } + else + { + CompleteHibernate(Instance); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -523,19 +761,17 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) auto RemoveHibernatingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); - m_HibernatingModules.erase(ModuleId); + m_HibernatingModules.erase(std::string(ModuleId)); }); - // NOTE: done while not holding the hub lock, as hibernation may take time. - // m_HibernatingModules tracks which modules are being hibernated, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. try { if (!Instance.Hibernate()) { - OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - return false; + ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); + Instance = {}; + return; } NewState = Instance.GetState(); Instance = {}; @@ -547,42 +783,116 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - - OutReason.clear(); - - return true; } -bool -Hub::Wake(const std::string& ModuleId, std::string& OutReason) +Hub::Response +Hub::Wake(const std::string& ModuleId) { + ZEN_ASSERT(!m_ShutdownFlag.load()); + StorageServerInstance::ExclusiveLockedPtr Instance; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) || - m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId)) + if (m_WakingModules.contains(ModuleId)) + { + return Response{EResponseCode::Accepted}; + } + + if (IsModuleInFlightLocked(ModuleId)) { - OutReason = fmt::format("Module '{}' is currently changing state", ModuleId); - return false; + return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)}; } auto It = m_InstanceLookup.find(ModuleId); if (It == m_InstanceLookup.end()) { - OutReason.clear(); // empty = not found (-> 404) - return false; + return Response{EResponseCode::NotFound}; } const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); - m_WakingModules.emplace(ModuleId); + std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex]; + ZEN_ASSERT(InstanceRaw); + + if (InstanceRaw->GetState() == HubInstanceState::Provisioned) + { + return Response{EResponseCode::Completed}; + } + + Instance = InstanceRaw->LockExclusive(/*Wait*/ true); + m_WakingModules.emplace(ModuleId, Instance.GetBasePort()); } + // NOTE: done while not holding the hub lock, to avoid blocking other operations. + // m_WakingModules tracks which modules are being woken, blocking + // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. + ZEN_ASSERT(Instance); + // Validate state while holding the exclusive instance lock (state cannot change). + // This gives a synchronous error response for invalid-state calls on both the sync + // and async paths, matching the existing behaviour. + if (Instance.GetState() != HubInstanceState::Hibernated) + { + const Response WakeResp = + Response{EResponseCode::Rejected, + fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))}; + Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(ModuleId); + return WakeResp; + } + + if (m_WorkerPool) + { + m_BackgroundWorkLatch.AddCount(1); + try + { + m_WorkerPool->ScheduleWork( + [this, + ModuleId = std::string(ModuleId), + Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() { + auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); }); + try + { + CompleteWake(*Instance); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what()); + } + }, + WorkerThreadPool::EMode::EnableBacklog); + } + catch (const std::exception& DispatchEx) + { + // Dispatch failed: undo the latch increment and tracking-set membership. + // State has not changed so no callback is needed. + // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock) + ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what()); + m_BackgroundWorkLatch.CountDown(); + Instance = {}; + { + RwLock::ExclusiveLockScope _(m_Lock); + m_WakingModules.erase(std::string(ModuleId)); + } + throw; + } + } + else + { + CompleteWake(Instance); + } + + return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed}; +} + +void +Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance) +{ + const std::string ModuleId(Instance.GetModuleId()); uint16_t BasePort = Instance.GetBasePort(); std::string BaseUri; // TODO? HubInstanceState OldState = Instance.GetState(); @@ -591,19 +901,17 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) auto RemoveWakingModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); - m_WakingModules.erase(ModuleId); + m_WakingModules.erase(std::string(ModuleId)); }); - // NOTE: done while not holding the hub lock, as waking may take time. - // m_WakingModules tracks which modules are being woken, blocking - // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module. try { if (!Instance.Wake()) { - OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState())); - NewState = Instance.GetState(); - return false; + ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState())); + NewState = Instance.GetState(); + Instance = {}; + return; } NewState = Instance.GetState(); Instance = {}; @@ -615,10 +923,6 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason) Instance = {}; throw; } - - OutReason.clear(); - - return true; } bool @@ -739,18 +1043,23 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason) return true; } +bool +Hub::IsModuleInFlightLocked(std::string_view ModuleId) const +{ + const std::string Key(ModuleId); + return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) || + m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key); +} + void Hub::AttemptRecoverInstance(std::string_view ModuleId) { StorageServerInstance::ExclusiveLockedPtr Instance; - StorageServerInstance* RawInstance = nullptr; { RwLock::ExclusiveLockScope _(m_Lock); - if (m_RecoveringModules.contains(std::string(ModuleId)) || m_ProvisioningModules.contains(std::string(ModuleId)) || - m_DeprovisioningModules.contains(std::string(ModuleId)) || m_HibernatingModules.contains(std::string(ModuleId)) || - m_WakingModules.contains(std::string(ModuleId))) + if (IsModuleInFlightLocked(ModuleId)) { return; } @@ -763,38 +1072,42 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) const size_t ActiveInstanceIndex = It->second; ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size()); - RawInstance = m_ActiveInstances[ActiveInstanceIndex].get(); - Instance = RawInstance->LockExclusive(/*Wait*/ true); + Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true); + + // Definitive check while hub lock is held: exclusive instance lock is also held, + // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision) + // or the process restarted since the watchdog fired. + if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) + { + return; + } + m_RecoveringModules.emplace(std::string(ModuleId)); } ZEN_ASSERT(Instance); - uint16_t BasePort = Instance.GetBasePort(); - std::string BaseUri; // TODO? - HubInstanceState OldState = Instance.GetState(); - HubInstanceState NewState = OldState; - InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri); + const uint16_t BasePort = Instance.GetBasePort(); + std::string BaseUri; // TODO? + HubInstanceState OldState = Instance.GetState(); + HubInstanceState NewState = OldState; auto RemoveRecoveringModule = MakeGuard([&] { RwLock::ExclusiveLockScope _(m_Lock); m_RecoveringModules.erase(std::string(ModuleId)); }); - // Re-validate: state may have changed between releasing shared lock and acquiring exclusive lock - if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning()) - { - return; - } - if (Instance.RecoverFromCrash()) { + // Spawn succeeded -- instance is back in Provisioned state. NewState = Instance.GetState(); Instance = {}; + OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); return; } - // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup. + // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down + // so any salvageable data is preserved. try { (void)Instance.Deprovision(); @@ -803,7 +1116,6 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what()); } - NewState = Instance.GetState(); Instance = {}; std::unique_ptr<StorageServerInstance> DestroyInstance; @@ -831,6 +1143,10 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId) { ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what()); } + + // Notify after all cleanup -- port is back in m_FreePorts and the callback sees + // a consistent end-state: module gone, transition complete. + OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri); } void @@ -942,9 +1258,10 @@ namespace hub_testutils { std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir, Hub::Configuration Config = {}, - Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}) + Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {}, + WorkerThreadPool* WorkerPool = nullptr) { - return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback)); + return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback)); } struct CallbackRecord @@ -989,10 +1306,8 @@ TEST_CASE("hub.provision_basic") CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); - CHECK(Reason.empty()); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; @@ -1004,9 +1319,8 @@ TEST_CASE("hub.provision_basic") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); - CHECK(Reason.empty()); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); @@ -1037,9 +1351,8 @@ TEST_CASE("hub.provision_config") CHECK_FALSE(HubInstance->Find("module_a")); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); CHECK_EQ(HubInstance->GetInstanceCount(), 1); Hub::InstanceInfo InstanceInfo; @@ -1056,8 +1369,8 @@ TEST_CASE("hub.provision_config") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("module_a")); @@ -1076,10 +1389,9 @@ TEST_CASE("hub.provision_callbacks") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc()); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); { RwLock::SharedLockScope _(CaptureInstance.CallbackMutex); @@ -1094,8 +1406,8 @@ TEST_CASE("hub.provision_callbacks") CHECK(ModClient.Get("/health/")); } - const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); { HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); @@ -1121,27 +1433,24 @@ TEST_CASE("hub.instance_limit") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason); - REQUIRE_MESSAGE(FirstResult, Reason); + const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message); - const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason); - REQUIRE_MESSAGE(SecondResult, Reason); + const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info); + REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - Reason.clear(); - const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason); - CHECK_FALSE(ThirdResult); + const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info); + CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected); CHECK_EQ(HubInstance->GetInstanceCount(), 2); - CHECK_NE(Reason.find("instance limit"), std::string::npos); + CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos); - HubInstance->Deprovision("limit_a", Reason); + HubInstance->Deprovision("limit_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); - Reason.clear(); - const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason); - CHECK_MESSAGE(FourthResult, Reason); + const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info); + CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message); CHECK_EQ(HubInstance->GetInstanceCount(), 2); } @@ -1151,10 +1460,15 @@ TEST_CASE("hub.enumerate_modules") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason); - REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("enum_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Provision("enum_b", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } std::vector<std::string> Ids; int ProvisionedCount = 0; @@ -1172,7 +1486,7 @@ TEST_CASE("hub.enumerate_modules") CHECK(FoundA); CHECK(FoundB); - HubInstance->Deprovision("enum_a", Reason); + HubInstance->Deprovision("enum_a"); Ids.clear(); ProvisionedCount = 0; HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) { @@ -1195,17 +1509,22 @@ TEST_CASE("hub.max_instance_count") CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("max_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_GE(HubInstance->GetMaxInstanceCount(), 1); - REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("max_b", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_GE(HubInstance->GetMaxInstanceCount(), 2); const int MaxAfterTwo = HubInstance->GetMaxInstanceCount(); - HubInstance->Deprovision("max_a", Reason); + HubInstance->Deprovision("max_a"); CHECK_EQ(HubInstance->GetInstanceCount(), 1); CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo); } @@ -1228,8 +1547,8 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason); + const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info); + REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message); } CHECK_EQ(HubInstance->GetInstanceCount(), kHalf); @@ -1253,23 +1572,21 @@ TEST_CASE("hub.concurrent_callbacks") for (int I = 0; I < kHalf; ++I) { - ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { - HubProvisionedInstanceInfo Info; - std::string Reason; - const bool Result = - HubInstance->Provision(fmt::format("new_{}", I), Info, Reason); - ProvisionResults[I] = Result ? 1 : 0; - ProvisionReasons[I] = Reason; - }), - WorkerThreadPool::EMode::EnableBacklog); - - DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { - std::string Reason; - const bool Result = - HubInstance->Deprovision(fmt::format("pre_{}", I), Reason); - DeprovisionResults[I] = Result ? 1 : 0; - }), - WorkerThreadPool::EMode::EnableBacklog); + ProvisionFutures[I] = + Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] { + HubProvisionedInstanceInfo Info; + const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info); + ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + ProvisionReasons[I] = Result.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + + DeprovisionFutures[I] = + Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I)); + DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0; + }), + WorkerThreadPool::EMode::EnableBacklog); } for (std::future<void>& F : ProvisionFutures) @@ -1324,14 +1641,13 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } @@ -1344,14 +1660,13 @@ TEST_CASE("hub.job_object") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo Info; - std::string Reason; - const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason); - REQUIRE_MESSAGE(ProvisionResult, Reason); + const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info); + REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message); CHECK_NE(Info.Port, 0); - const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason); - CHECK(DeprovisionResult); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_EQ(HubInstance->GetInstanceCount(), 0); } } @@ -1366,11 +1681,12 @@ TEST_CASE("hub.hibernate_wake") HubProvisionedInstanceInfo ProvInfo; Hub::InstanceInfo Info; - std::string Reason; // Provision - REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { @@ -1379,9 +1695,8 @@ TEST_CASE("hub.hibernate_wake") } // Hibernate - const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason); - REQUIRE_MESSAGE(HibernateResult, Reason); - CHECK(Reason.empty()); + const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a"); + REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Hibernated); { @@ -1390,9 +1705,8 @@ TEST_CASE("hub.hibernate_wake") } // Wake - const bool WakeResult = HubInstance->Wake("hib_a", Reason); - REQUIRE_MESSAGE(WakeResult, Reason); - CHECK(Reason.empty()); + const Hub::Response WakeResult = HubInstance->Wake("hib_a"); + REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message); REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { @@ -1401,9 +1715,8 @@ TEST_CASE("hub.hibernate_wake") } // Deprovision - const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason); - CHECK(DeprovisionResult); - CHECK(Reason.empty()); + const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a"); + CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed); CHECK_FALSE(HubInstance->Find("hib_a")); { HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); @@ -1419,36 +1732,131 @@ TEST_CASE("hub.hibernate_wake_errors") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config); HubProvisionedInstanceInfo ProvInfo; - std::string Reason; - // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404) - CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason)); - CHECK(Reason.empty()); + // Hibernate/wake on a non-existent module - returns NotFound (-> 404) + CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); + CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); - CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason)); - CHECK(Reason.empty()); + // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent) + { + const Hub::Response R = HubInstance->Provision("err_b", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } + { + const Hub::Response R = HubInstance->Hibernate("err_b"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } - // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400) - REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason); - CHECK(Reason.empty()); - REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response HibResp = HubInstance->Hibernate("err_b"); + CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed); + } - Reason.clear(); - CHECK_FALSE(HubInstance->Hibernate("err_b", Reason)); - CHECK_FALSE(Reason.empty()); + // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent) + { + const Hub::Response R = HubInstance->Wake("err_b"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } - // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400) - REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason); - CHECK(Reason.empty()); + { + const Hub::Response WakeResp = HubInstance->Wake("err_b"); + CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed); + } - Reason.clear(); - CHECK_FALSE(HubInstance->Wake("err_b", Reason)); - CHECK_FALSE(Reason.empty()); + // Deprovision not-found - returns NotFound (-> 404) + CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound); +} + +TEST_CASE("hub.async_hibernate_wake") +{ + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.BasePortNumber = 23000; + + WorkerThreadPool WorkerPool(2, "hub_async_hib_wake"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + HubProvisionedInstanceInfo ProvInfo; + Hub::InstanceInfo Info; - // Deprovision not-found - should return false with empty reason (-> 404) - CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason)); - CHECK(Reason.empty()); + constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kTimeout = std::chrono::seconds(30); + + // Provision and wait until Provisioned + { + const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Ready = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Ready = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout"); + } + + // Hibernate asynchronously and poll until Hibernated + { + const Hub::Response R = HubInstance->Hibernate("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Hibernated = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated) + { + Hibernated = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(!ModClient.Get("/health/")); + } + + // Wake asynchronously and poll until Provisioned + { + const Hub::Response R = HubInstance->Wake("async_hib_a"); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + { + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + bool Woken = false; + while (std::chrono::steady_clock::now() < Deadline) + { + if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned) + { + Woken = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); + } + { + HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + CHECK(ModClient.Get("/health/")); + } + + // Deprovision + { + const Hub::Response R = HubInstance->Deprovision("async_hib_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message); + } + CHECK_FALSE(HubInstance->Find("async_hib_a")); } TEST_CASE("hub.recover_process_crash") @@ -1457,8 +1865,10 @@ TEST_CASE("hub.recover_process_crash") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } // Kill the child process to simulate a crash, then poll until the watchdog detects it, // recovers the instance, and the new process is serving requests. @@ -1494,8 +1904,10 @@ TEST_CASE("hub.recover_process_crash_then_deprovision") std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path()); HubProvisionedInstanceInfo Info; - std::string Reason; - REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", Info); + REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } // Kill the child process, wait for the watchdog to detect and recover the instance. HubInstance->TerminateModuleForTesting("module_a"); @@ -1518,16 +1930,166 @@ TEST_CASE("hub.recover_process_crash_then_deprovision") REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout"); // After recovery, deprovision should succeed and a re-provision should work. - CHECK_MESSAGE(HubInstance->Deprovision("module_a", Reason), Reason); + { + const Hub::Response R = HubInstance->Deprovision("module_a"); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_EQ(HubInstance->GetInstanceCount(), 0); HubProvisionedInstanceInfo NewInfo; - CHECK_MESSAGE(HubInstance->Provision("module_a", NewInfo, Reason), Reason); + { + const Hub::Response R = HubInstance->Provision("module_a", NewInfo); + CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); + } CHECK_NE(NewInfo.Port, 0); HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); } +TEST_CASE("hub.async_provision_concurrent") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.BasePortNumber = 22800; + Config.InstanceLimit = kModuleCount; + + WorkerThreadPool WorkerPool(4, "hub_async_concurrent"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + std::vector<std::string> Reasons(kModuleCount); + std::vector<int> Results(kModuleCount, 0); + + { + WorkerThreadPool Callers(kModuleCount, "hub_async_callers"); + std::vector<std::future<void>> Futures(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] { + const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0; + Reasons[I] = Resp.Message; + }), + WorkerThreadPool::EMode::EnableBacklog); + } + for (std::future<void>& F : Futures) + { + F.get(); + } + } + + for (int I = 0; I < kModuleCount; ++I) + { + REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]); + CHECK_NE(Infos[I].Port, 0); + } + + // Poll until all instances reach Provisioned state + constexpr auto kPollInterval = std::chrono::milliseconds(200); + constexpr auto kTimeout = std::chrono::seconds(30); + const auto Deadline = std::chrono::steady_clock::now() + kTimeout; + + bool AllProvisioned = false; + while (std::chrono::steady_clock::now() < Deadline) + { + int ProvisionedCount = 0; + for (int I = 0; I < kModuleCount; ++I) + { + Hub::InstanceInfo InstanceInfo; + if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned) + { + ++ProvisionedCount; + } + } + if (ProvisionedCount == kModuleCount) + { + AllProvisioned = true; + break; + } + std::this_thread::sleep_for(kPollInterval); + } + CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout"); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); + } + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I)); + CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message); + } + CHECK_EQ(HubInstance->GetInstanceCount(), 0); +} + +TEST_CASE("hub.async_provision_shutdown_waits") +{ + ScopedTemporaryDirectory TempDir; + + constexpr int kModuleCount = 8; + + Hub::Configuration Config; + Config.InstanceLimit = kModuleCount; + Config.BasePortNumber = 22900; + + WorkerThreadPool WorkerPool(2, "hub_async_shutdown"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount); + + for (int I = 0; I < kModuleCount; ++I) + { + const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]); + REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message); + REQUIRE_NE(Infos[I].Port, 0); + } + + // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up. + HubInstance->Shutdown(); + + CHECK_EQ(HubInstance->GetInstanceCount(), 0); + + for (int I = 0; I < kModuleCount; ++I) + { + HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + CHECK_FALSE(ModClient.Get("/health/")); + } +} + +TEST_CASE("hub.async_provision_rejected") +{ + // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present. + ScopedTemporaryDirectory TempDir; + + Hub::Configuration Config; + Config.InstanceLimit = 1; + Config.BasePortNumber = 23100; + + WorkerThreadPool WorkerPool(2, "hub_async_rejected"); + std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool); + + HubProvisionedInstanceInfo Info; + + // First provision: dispatched to WorkerPool, returns Accepted + const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info); + REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message); + REQUIRE_NE(Info.Port, 0); + + // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected + HubProvisionedInstanceInfo Info2; + const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2); + CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected); + CHECK_FALSE(SecondResult.Message.empty()); + CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos); + CHECK_EQ(HubInstance->GetInstanceCount(), 1); +} + TEST_SUITE_END(); void diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 28e77e729..f0bde10fc 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -4,6 +4,7 @@ #include "hubinstancestate.h" #include "resourcemetrics.h" +#include "storageserverinstance.h" #include <zencore/system.h> #include <zenutil/zenserverprocess.h> @@ -18,7 +19,7 @@ namespace zen { -class StorageServerInstance; +class WorkerThreadPool; /** * Hub @@ -59,6 +60,7 @@ public: Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, + WorkerThreadPool* OptionalWorkerPool = nullptr, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {}); ~Hub(); @@ -78,42 +80,49 @@ public: */ void Shutdown(); + enum class EResponseCode + { + NotFound, + Rejected, + Accepted, + Completed + }; + + struct Response + { + EResponseCode ResponseCode = EResponseCode::Rejected; + std::string Message; + }; + /** * Provision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to provision. - * @param OutInfo If successful, information about the provisioned instance will be returned here. - * @param OutReason If unsuccessful, the reason will be returned here. + * @param OutInfo On success, information about the provisioned instance is returned here. */ - bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason); + Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo); /** * Deprovision a storage server instance for the given module ID. * * @param ModuleId The ID of the module to deprovision. - * @param OutReason If unsuccessful, the reason will be returned here. - * @return true if the instance was found and deprovisioned, false otherwise. */ - bool Deprovision(const std::string& ModuleId, std::string& OutReason); + Response Deprovision(const std::string& ModuleId); /** * Hibernate a storage server instance for the given module ID. * The instance is shut down but its data is preserved; it can be woken later. * * @param ModuleId The ID of the module to hibernate. - * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). - * @return true if the instance was hibernated, false otherwise. */ - bool Hibernate(const std::string& ModuleId, std::string& OutReason); + Response Hibernate(const std::string& ModuleId); /** * Wake a hibernated storage server instance for the given module ID. * * @param ModuleId The ID of the module to wake. - * @param OutReason If unsuccessful, the reason will be returned here (empty = not found). - * @return true if the instance was woken, false otherwise. */ - bool Wake(const std::string& ModuleId, std::string& OutReason); + Response Wake(const std::string& ModuleId); /** * Find info about storage server instance for the given module ID. @@ -144,6 +153,9 @@ public: private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; + WorkerThreadPool* m_WorkerPool = nullptr; + Latch m_BackgroundWorkLatch; + std::atomic<bool> m_ShutdownFlag = false; AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; @@ -155,10 +167,10 @@ private: #endif RwLock m_Lock; std::unordered_map<std::string, size_t> m_InstanceLookup; - std::unordered_set<std::string> m_DeprovisioningModules; - std::unordered_set<std::string> m_ProvisioningModules; - std::unordered_set<std::string> m_HibernatingModules; - std::unordered_set<std::string> m_WakingModules; + std::unordered_map<std::string, uint16_t> m_DeprovisioningModules; + std::unordered_map<std::string, uint16_t> m_ProvisioningModules; + std::unordered_map<std::string, uint16_t> m_HibernatingModules; + std::unordered_map<std::string, uint16_t> m_WakingModules; std::unordered_set<std::string> m_RecoveringModules; std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances; std::vector<size_t> m_FreeActiveInstanceIndexes; @@ -175,6 +187,14 @@ private: void UpdateStats(); void UpdateCapacityMetrics(); bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason); + bool IsModuleInFlightLocked(std::string_view ModuleId) const; + + Response InternalDeprovision(const std::string& ModuleId); + void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance); + void AbortProvision(std::string_view ModuleId); + void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance); + void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance); + void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance); class InstanceStateUpdateGuard { diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index f9ff655ec..4d70fbddd 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -16,6 +16,7 @@ #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> +#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cxxopts.hpp> @@ -315,6 +316,7 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), + &GetMediumWorkerPool(EWorkloadType::Background), m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( std::string_view ModuleId, const HubProvisionedInstanceInfo& Info, |