aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-03-24 18:50:59 +0100
committerGitHub Enterprise <[email protected]>2026-03-24 18:50:59 +0100
commitb730eebe53d1d6827d4b6c320ccfd80566a629a6 (patch)
treed8df23d781b5fb3b1d7bd170fa7d81e2501ab901 /src
parentSubprocess Manager (#889) (diff)
downloadzen-b730eebe53d1d6827d4b6c320ccfd80566a629a6.tar.xz
zen-b730eebe53d1d6827d4b6c320ccfd80566a629a6.zip
hub async provision/deprovision/hibernate/wake (#891)HEADmain
- Improvement: Hub provision, deprovision, hibernate, and wake operations are now async. HTTP requests returns 202 Accepted while the operation completes in the background - Improvement: Hub returns 202 Accepted (instead of 409 Conflict) when the same async operation is already in progress for a module - Improvement: Hub returns 200 OK when a requested state transition is already satisfied
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/thread.h7
-rw-r--r--src/zenserver-test/hub-tests.cpp204
-rw-r--r--src/zenserver/hub/httphubservice.cpp165
-rw-r--r--src/zenserver/hub/hub.cpp1090
-rw-r--r--src/zenserver/hub/hub.h54
-rw-r--r--src/zenserver/hub/zenhubserver.cpp2
6 files changed, 1126 insertions, 396 deletions
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h
index d7262324f..56ce5904b 100644
--- a/src/zencore/include/zencore/thread.h
+++ b/src/zencore/include/zencore/thread.h
@@ -190,6 +190,13 @@ class Latch
public:
Latch(std::ptrdiff_t Count) : Counter(Count) {}
+ void Reset(std::ptrdiff_t Count)
+ {
+ ZEN_ASSERT(Counter.load() == 0);
+ Complete.Reset();
+ Counter.store(Count);
+ }
+
void CountDown()
{
std::ptrdiff_t Old = Counter.fetch_sub(1);
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index dbe6fa785..f86bdc5c7 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -33,6 +33,77 @@ using namespace std::literals;
static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)};
+static bool
+WaitForModuleState(HttpClient& Client, std::string_view ModuleId, std::string_view ExpectedState, int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId));
+ if (R && R.AsObject()["state"].AsString() == ExpectedState)
+ {
+ return true;
+ }
+ Sleep(100);
+ }
+ HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId));
+ return R && R.AsObject()["state"].AsString() == ExpectedState;
+}
+
+// Provision a module, retrying on 409 Conflict to handle the window where an async
+// deprovision has removed the module from InstanceLookup but not yet from
+// DeprovisioningModules (which CanProvisionInstance checks).
+static HttpClient::Response
+ProvisionModule(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ HttpClient::Response Result;
+ do
+ {
+ Result = Client.Post(fmt::format("modules/{}/provision", ModuleId));
+ if (Result || Result.StatusCode != HttpResponseCode::Conflict)
+ {
+ return Result;
+ }
+ Sleep(100);
+ } while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs));
+ return Result;
+}
+
+// Wait for a port to stop accepting connections (i.e. the process has terminated).
+// Needed after async deprovision: WaitForModuleGone returns as soon as the module
+// leaves m_InstanceLookup (synchronous), but the background worker that kills the
+// process may not have run yet.
+static bool
+WaitForPortUnreachable(HttpClient& Client, std::string_view Path = "/health/", int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ if (!Client.Get(Path))
+ {
+ return true;
+ }
+ Sleep(100);
+ }
+ return !Client.Get(Path);
+}
+
+static bool
+WaitForModuleGone(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ if (Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound)
+ {
+ return true;
+ }
+ Sleep(100);
+ }
+ return Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound;
+}
+
TEST_SUITE_BEGIN("server.hub");
TEST_CASE("hub.lifecycle.children")
@@ -65,9 +136,7 @@ TEST_CASE("hub.lifecycle.children")
AbcPort = AbcResult["port"].AsUInt16(0);
CHECK_NE(AbcPort, 0);
- Result = Client.Get("modules/abc");
- REQUIRE(Result);
- CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
// This should be a fresh instance with no contents
@@ -91,6 +160,8 @@ TEST_CASE("hub.lifecycle.children")
DefPort = DefResult["port"].AsUInt16(0);
REQUIRE_NE(DefPort, 0);
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
+
// This should be a fresh instance with no contents
HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
@@ -110,21 +181,24 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Post("modules/ghi/provision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleState(Client, "ghi", "provisioned"));
// Tear down instances
Result = Client.Post("modules/abc/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
Result = Client.Post("modules/def/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
Result = Client.Post("modules/ghi/deprovision");
@@ -132,7 +206,7 @@ TEST_CASE("hub.lifecycle.children")
// re-provision to verify that (de)hydration preserved state
{
- Result = Client.Post("modules/abc/provision");
+ Result = ProvisionModule(Client, "abc");
REQUIRE(Result);
CbObject AbcResult = Result.AsObject();
@@ -140,6 +214,8 @@ TEST_CASE("hub.lifecycle.children")
AbcPort = AbcResult["port"].AsUInt16(0);
REQUIRE_NE(AbcPort, 0);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
+
// This should contain the content from the previous run
HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
@@ -156,7 +232,7 @@ TEST_CASE("hub.lifecycle.children")
}
{
- Result = Client.Post("modules/def/provision");
+ Result = ProvisionModule(Client, "def");
REQUIRE(Result);
CbObject DefResult = Result.AsObject();
@@ -164,6 +240,8 @@ TEST_CASE("hub.lifecycle.children")
DefPort = DefResult["port"].AsUInt16(0);
REQUIRE_NE(DefPort, 0);
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
+
// This should contain the content from the previous run
HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
@@ -181,22 +259,24 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Post("modules/abc/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
Result = Client.Post("modules/def/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
// re-provision to verify that (de)hydration preserved state, including
// state which was generated after the very first dehydration
{
- Result = Client.Post("modules/abc/provision");
+ Result = ProvisionModule(Client, "abc");
REQUIRE(Result);
CbObject AbcResult = Result.AsObject();
@@ -204,6 +284,8 @@ TEST_CASE("hub.lifecycle.children")
AbcPort = AbcResult["port"].AsUInt16(0);
REQUIRE_NE(AbcPort, 0);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
+
// This should contain the content from the previous two runs
HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
@@ -221,7 +303,7 @@ TEST_CASE("hub.lifecycle.children")
}
{
- Result = Client.Post("modules/def/provision");
+ Result = ProvisionModule(Client, "def");
REQUIRE(Result);
CbObject DefResult = Result.AsObject();
@@ -229,6 +311,8 @@ TEST_CASE("hub.lifecycle.children")
DefPort = DefResult["port"].AsUInt16(0);
REQUIRE_NE(DefPort, 0);
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
+
// This should contain the content from the previous two runs
HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
@@ -247,16 +331,18 @@ TEST_CASE("hub.lifecycle.children")
Result = Client.Post("modules/abc/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
Result = Client.Post("modules/def/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
// final sanity check that the hub is still responsive and all modules are gone
@@ -393,7 +479,7 @@ TEST_CASE("hub.consul.provision.registration")
HttpClient::Response Result = HubClient.Post("modules/testmod/provision");
REQUIRE(Result);
- CHECK(Client.HasService("testmod"));
+ REQUIRE(WaitForConsulService(Client, "testmod", true, 10000));
{
const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
REQUIRE(ModulePort != 0);
@@ -457,6 +543,7 @@ TEST_CASE("hub.consul.provision.registration")
Result = HubClient.Post("modules/testmod/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForConsulService(Client, "testmod", false, 10000));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
@@ -482,13 +569,12 @@ TEST_CASE("hub.hibernate.lifecycle")
// Provision
HttpClient::Response Result = Client.Post("modules/testmod/provision");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
REQUIRE_NE(ModulePort, 0);
- Result = Client.Get("modules/testmod");
- REQUIRE(Result);
- CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv);
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
CHECK(ModClient.Get("/health/"));
@@ -502,11 +588,10 @@ TEST_CASE("hub.hibernate.lifecycle")
// Hibernate - state should become "hibernated", server should be unreachable
Result = Client.Post("modules/testmod/hibernate");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
- Result = Client.Get("modules/testmod");
- REQUIRE(Result);
- CHECK_EQ(Result.AsObject()["state"].AsString(), "hibernated"sv);
+ REQUIRE(WaitForModuleState(Client, "testmod", "hibernated"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
CHECK(!ModClient.Get("/health/"));
@@ -515,11 +600,10 @@ TEST_CASE("hub.hibernate.lifecycle")
// Wake - state should return to "provisioned", server should be reachable, data should be intact
Result = Client.Post("modules/testmod/wake");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
- Result = Client.Get("modules/testmod");
- REQUIRE(Result);
- CHECK_EQ(Result.AsObject()["state"].AsString(), "provisioned"sv);
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
CHECK(ModClient.Get("/health/"));
@@ -532,17 +616,20 @@ TEST_CASE("hub.hibernate.lifecycle")
// Deprovision - server should become unreachable
Result = Client.Post("modules/testmod/deprovision");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ REQUIRE(WaitForModuleGone(Client, "testmod"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
// Re-provision - server should be reachable on its (potentially new) port
- Result = Client.Post("modules/testmod/provision");
+ Result = ProvisionModule(Client, "testmod");
REQUIRE(Result);
CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
const uint16_t ModulePort2 = Result.AsObject()["port"].AsUInt16(0);
REQUIRE_NE(ModulePort2, 0);
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout);
CHECK(ModClient.Get("/health/"));
@@ -551,9 +638,10 @@ TEST_CASE("hub.hibernate.lifecycle")
// Final deprovision - server should become unreachable
Result = Client.Post("modules/testmod/deprovision");
REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "testmod"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout);
- CHECK(!ModClient.Get("/health/"));
+ CHECK(WaitForPortUnreachable(ModClient));
}
}
@@ -574,24 +662,76 @@ TEST_CASE("hub.hibernate.errors")
CHECK(!Result);
CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
- // Double-hibernate: first call succeeds, second returns 400 (wrong state)
+ Result = Client.Post("modules/unknown/deprovision");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ Result = Client.Delete("modules/unknown");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ // Double-provision: second call while first is in-flight returns 202 Accepted with the same port.
Result = Client.Post("modules/errmod/provision");
REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ const uint16_t ErrmodPort = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE_NE(ErrmodPort, 0);
+
+ // Provisioning the same module while in-flight returns 202 Accepted with the allocated port.
+ // Evaluated synchronously before WorkerPool dispatch, so safe regardless of timing.
+ Result = Client.Post("modules/errmod/provision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ CHECK_EQ(Result.AsObject()["port"].AsUInt16(0), ErrmodPort);
+
+ REQUIRE(WaitForModuleState(Client, "errmod", "provisioned"));
+
+ // Already provisioned: provision and wake both return 200 Completed.
+ Result = Client.Post("modules/errmod/provision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+
+ Result = Client.Post("modules/errmod/wake");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ // Double-hibernate: second call while first is in-flight returns 202 Accepted.
Result = Client.Post("modules/errmod/hibernate");
REQUIRE(Result);
Result = Client.Post("modules/errmod/hibernate");
- CHECK(!Result);
- CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ REQUIRE(WaitForModuleState(Client, "errmod", "hibernated"));
- // Wake on provisioned: succeeds (state restored), then waking again returns 400
+ // Already hibernated: hibernate returns 200 Completed.
+ Result = Client.Post("modules/errmod/hibernate");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+
+ // Double-wake: second call while first is in-flight returns 202 Accepted.
Result = Client.Post("modules/errmod/wake");
REQUIRE(Result);
Result = Client.Post("modules/errmod/wake");
- CHECK(!Result);
- CHECK_EQ(Result.StatusCode, HttpResponseCode::BadRequest);
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ // Double-deprovision: second call while first is in-flight returns 202 Accepted.
+ // errmod2 is a fresh module to avoid waiting on the still-waking errmod.
+ Result = Client.Post("modules/errmod2/provision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ REQUIRE(WaitForModuleState(Client, "errmod2", "provisioned"));
+
+ Result = Client.Post("modules/errmod2/deprovision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ Result = Client.Post("modules/errmod2/deprovision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
}
TEST_SUITE_END();
diff --git a/src/zenserver/hub/httphubservice.cpp b/src/zenserver/hub/httphubservice.cpp
index 34f4294e4..032a61f08 100644
--- a/src/zenserver/hub/httphubservice.cpp
+++ b/src/zenserver/hub/httphubservice.cpp
@@ -11,6 +11,37 @@
namespace zen {
+namespace {
+ bool HandleFailureResults(HttpServerRequest& Request, const Hub::Response& Resp)
+ {
+ if (Resp.ResponseCode == Hub::EResponseCode::Rejected)
+ {
+ if (Resp.Message.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::Conflict);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::Conflict, HttpContentType::kText, Resp.Message);
+ }
+ return true;
+ }
+ if (Resp.ResponseCode == Hub::EResponseCode::NotFound)
+ {
+ if (Resp.Message.empty())
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Request.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, Resp.Message);
+ }
+ return true;
+ }
+ return false;
+ }
+} // namespace
+
HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
{
using namespace std::literals;
@@ -83,144 +114,113 @@ HttpHubService::HttpHubService(Hub& Hub) : m_Hub(Hub)
[this](HttpRouterRequest& Req) {
std::string_view ModuleId = Req.GetCapture(1);
- std::string FailureReason = "unknown";
- HttpResponseCode ResponseCode = HttpResponseCode::OK;
-
try
{
HubProvisionedInstanceInfo Info;
- if (m_Hub.Provision(ModuleId, /* out */ Info, /* out */ FailureReason))
- {
- CbObjectWriter Obj;
- Obj << "moduleId" << ModuleId;
- Obj << "baseUri" << Info.BaseUri;
- Obj << "port" << Info.Port;
- Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ Hub::Response Resp = m_Hub.Provision(ModuleId, Info);
- return;
- }
- else
+ if (HandleFailureResults(Req.ServerRequest(), Resp))
{
- ResponseCode = HttpResponseCode::BadRequest;
+ return;
}
+
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
+ CbObjectWriter Obj;
+ Obj << "moduleId" << ModuleId;
+ Obj << "baseUri" << Info.BaseUri;
+ Obj << "port" << Info.Port;
+ return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save());
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Exception while provisioning module '{}': {}", ModuleId, Ex.what());
-
- FailureReason = Ex.what();
- ResponseCode = HttpResponseCode::InternalServerError;
+ throw;
}
-
- Req.ServerRequest().WriteResponse(ResponseCode, HttpContentType::kText, FailureReason);
},
HttpVerb::kPost);
m_Router.RegisterRoute(
"modules/{moduleid}/deprovision",
[this](HttpRouterRequest& Req) {
- std::string_view ModuleId = Req.GetCapture(1);
- std::string FailureReason = "unknown";
+ std::string_view ModuleId = Req.GetCapture(1);
try
{
- if (!m_Hub.Deprovision(std::string(ModuleId), /* out */ FailureReason))
+ Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId));
+
+ if (HandleFailureResults(Req.ServerRequest(), Resp))
{
- if (FailureReason.empty())
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
- }
+ return;
}
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save());
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what());
-
- FailureReason = Ex.what();
+ throw;
}
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason);
},
HttpVerb::kPost);
m_Router.RegisterRoute(
"modules/{moduleid}/hibernate",
[this](HttpRouterRequest& Req) {
- std::string_view ModuleId = Req.GetCapture(1);
- std::string FailureReason = "unknown";
+ std::string_view ModuleId = Req.GetCapture(1);
try
{
- if (!m_Hub.Hibernate(std::string(ModuleId), /* out */ FailureReason))
+ Hub::Response Resp = m_Hub.Hibernate(std::string(ModuleId));
+
+ if (HandleFailureResults(Req.ServerRequest(), Resp))
{
- if (FailureReason.empty())
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
- }
+ return;
}
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save());
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Exception while hibernating module '{}': {}", ModuleId, Ex.what());
-
- FailureReason = Ex.what();
+ throw;
}
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason);
},
HttpVerb::kPost);
m_Router.RegisterRoute(
"modules/{moduleid}/wake",
[this](HttpRouterRequest& Req) {
- std::string_view ModuleId = Req.GetCapture(1);
- std::string FailureReason = "unknown";
+ std::string_view ModuleId = Req.GetCapture(1);
try
{
- if (!m_Hub.Wake(std::string(ModuleId), /* out */ FailureReason))
+ Hub::Response Resp = m_Hub.Wake(std::string(ModuleId));
+
+ if (HandleFailureResults(Req.ServerRequest(), Resp))
{
- if (FailureReason.empty())
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- return Req.ServerRequest().WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
- }
+ return;
}
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
-
- return Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Obj.Save());
+ return Req.ServerRequest().WriteResponse(HttpCode, Obj.Save());
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Exception while waking module '{}': {}", ModuleId, Ex.what());
-
- FailureReason = Ex.what();
+ throw;
}
-
- Req.ServerRequest().WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, FailureReason);
},
HttpVerb::kPost);
@@ -288,27 +288,27 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view
if (InstanceInfo.State == HubInstanceState::Provisioned || InstanceInfo.State == HubInstanceState::Hibernated ||
InstanceInfo.State == HubInstanceState::Crashed)
{
- std::string FailureReason;
try
{
- if (!m_Hub.Deprovision(std::string(ModuleId), FailureReason))
+ Hub::Response Resp = m_Hub.Deprovision(std::string(ModuleId));
+
+ if (HandleFailureResults(Request, Resp))
{
- if (FailureReason.empty())
- {
- Request.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Request.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, FailureReason);
- }
return;
}
+
+ // TODO: nuke all related storage
+
+ const HttpResponseCode HttpCode =
+ (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? HttpResponseCode::Accepted : HttpResponseCode::OK;
+ CbObjectWriter Obj;
+ Obj << "moduleId" << ModuleId;
+ return Request.WriteResponse(HttpCode, Obj.Save());
}
catch (const std::exception& Ex)
{
ZEN_ERROR("Exception while deprovisioning module '{}': {}", ModuleId, Ex.what());
- Request.WriteResponse(HttpResponseCode::InternalServerError, HttpContentType::kText, Ex.what());
- return;
+ throw;
}
}
@@ -316,7 +316,6 @@ HttpHubService::HandleModuleDelete(HttpServerRequest& Request, std::string_view
CbObjectWriter Obj;
Obj << "moduleId" << ModuleId;
- Obj << "state" << ToString(InstanceInfo.State);
Request.WriteResponse(HttpResponseCode::OK, Obj.Save());
}
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 6a2609443..ceba21d8d 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -10,6 +10,7 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/timer.h>
+#include <zencore/workthreadpool.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -20,7 +21,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <zencore/filesystem.h>
# include <zencore/testing.h>
# include <zencore/testutils.h>
-# include <zencore/workthreadpool.h>
# include <zenhttp/httpclient.h>
#endif
@@ -122,9 +122,14 @@ private:
//////////////////////////////////////////////////////////////////////////
-Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
+Hub::Hub(const Configuration& Config,
+ ZenServerEnvironment&& RunEnvironment,
+ WorkerThreadPool* OptionalWorkerPool,
+ AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback)
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
+, m_WorkerPool(OptionalWorkerPool)
+, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
{
m_HostMetrics = GetSystemMetrics();
@@ -196,42 +201,44 @@ Hub::Shutdown()
m_WatchDog = {};
- m_Lock.WithExclusiveLock([this] {
- for (auto& [ModuleId, ActiveInstanceIndex] : m_InstanceLookup)
- {
- std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- {
- StorageServerInstance::ExclusiveLockedPtr Instance(InstanceRaw->LockExclusive(/*Wait*/ true));
+ bool Expected = false;
+ bool WaitForBackgroundWork = m_ShutdownFlag.compare_exchange_strong(Expected, true);
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ // Shutdown flag is set and all background work is drained, safe to shut down remaining instances
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+ m_BackgroundWorkLatch.Reset(1);
+ }
- try
- {
- (void)Instance.Deprovision();
- }
- catch (const std::exception& Ex)
- {
- ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
- }
- // Instance is being destroyed; always report Unprovisioned so callbacks (e.g. Consul) fire.
- NewState = HubInstanceState::Unprovisioned;
- Instance = {};
+ EnumerateModules([&](std::string_view ModuleId, const InstanceInfo& Info) {
+ ZEN_UNUSED(Info); // This might need to be checked to avoid spurious non-relevant warnings...
+ try
+ {
+ const Response DepResp = InternalDeprovision(std::string(ModuleId));
+ if (DepResp.ResponseCode != EResponseCode::Completed && DepResp.ResponseCode != EResponseCode::Accepted)
+ {
+ ZEN_WARN("Deprovision instance for module '{}' during hub shutdown rejected: {}", ModuleId, DepResp.Message);
}
- InstanceRaw.reset();
}
- m_InstanceLookup.clear();
- m_ActiveInstances.clear();
- m_FreeActiveInstanceIndexes.clear();
+ catch (const std::exception& Ex)
+ {
+ ZEN_WARN("Failed to deprovision instance for module '{}' during hub shutdown: {}", ModuleId, Ex.what());
+ }
});
+
+ if (WaitForBackgroundWork && m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.CountDown();
+ m_BackgroundWorkLatch.Wait();
+ }
}
-bool
-Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason)
+Hub::Response
+Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
StorageServerInstance::ExclusiveLockedPtr Instance;
bool IsNewInstance = false;
uint16_t AllocatedPort = 0;
@@ -245,6 +252,15 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
}
});
+ if (auto It = m_ProvisioningModules.find(std::string(ModuleId)); It != m_ProvisioningModules.end())
+ {
+ // Same operation already in flight -- return the already-allocated port.
+ // RestoreAllocatedPort is a no-op here because IsNewInstance is still false
+ // (we return before line 273 where it is set), so no port is double-freed.
+ OutInfo.Port = It->second;
+ return Response{EResponseCode::Accepted};
+ }
+
if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It == m_InstanceLookup.end())
{
std::string Reason;
@@ -252,9 +268,7 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
{
ZEN_WARN("Cannot provision new storage server instance for module '{}': {}", ModuleId, Reason);
- OutReason = Reason;
-
- return false;
+ return Response{EResponseCode::Rejected, Reason};
}
IsNewInstance = true;
@@ -313,21 +327,100 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
if (m_RecoveringModules.contains(std::string(ModuleId)))
{
- OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId);
ZEN_WARN("Attempted to provision module '{}' which is currently recovering", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
}
std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
- Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
- AllocatedPort = InstanceRaw->GetBasePort();
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ {
+ OutInfo.Port = InstanceRaw->GetBasePort();
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ AllocatedPort = InstanceRaw->GetBasePort();
}
- m_ProvisioningModules.emplace(std::string(ModuleId));
+ m_ProvisioningModules.emplace(std::string(ModuleId), AllocatedPort);
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_ProvisioningModules tracks which modules are being provisioned, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ AllocatedPort,
+ IsNewInstance,
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteProvision(*Instance, AllocatedPort, IsNewInstance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async provision of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch provision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ if (IsNewInstance)
+ {
+ try
+ {
+ AbortProvision(ModuleId);
+ }
+ catch (const std::exception& DestroyEx)
+ {
+ ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
+ }
+ }
+
+ Instance = {};
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_ProvisioningModules.erase(std::string(ModuleId));
+ if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
+ {
+ m_FreePorts.push_back(AllocatedPort);
+ }
+ }
+
+ throw;
+ }
+ }
+ else
+ {
+ CompleteProvision(Instance, AllocatedPort, IsNewInstance);
+ }
+
+ OutInfo.Port = AllocatedPort;
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -340,47 +433,34 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
if (IsNewInstance && AllocatedPort != 0 && !m_InstanceLookup.contains(std::string(ModuleId)))
{
m_FreePorts.push_back(AllocatedPort);
- AllocatedPort = 0;
}
});
- // NOTE: this is done while not holding the hub lock, as provisioning may take time
- // and we don't want to block other operations. We track which modules are being
- // provisioned using m_ProvisioningModules, and reject attempts to provision/deprovision
- // those modules while in this state.
-
- try
+ if (m_ShutdownFlag.load() == false)
{
- (void)Instance.Provision(); // false = already in target state (idempotent); not an error
- NewState = Instance.GetState();
- Instance = {};
+ try
+ {
+ (void)Instance.Provision(); // false = already in target state (idempotent); not an error
+ NewState = Instance.GetState();
+ AllocatedPort = 0;
+ Instance = {};
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
+ }
}
- catch (const std::exception& Ex)
+
+ if (Instance)
{
- ZEN_ERROR("Failed to provision storage server instance for module '{}': {}", ModuleId, Ex.what());
NewState = Instance.GetState();
Instance = {};
if (IsNewInstance)
{
- // Clean up failed instance provisioning
- std::unique_ptr<StorageServerInstance> DestroyInstance;
- {
- RwLock::ExclusiveLockScope _(m_Lock);
- if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
- {
- const size_t ActiveInstanceIndex = It->second;
- ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
- ZEN_ASSERT(DestroyInstance);
- ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
- m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
- m_InstanceLookup.erase(It);
- }
- }
try
{
- DestroyInstance.reset();
+ AbortProvision(ModuleId);
NewState = HubInstanceState::Unprovisioned;
}
catch (const std::exception& DestroyEx)
@@ -388,18 +468,42 @@ Hub::Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, s
ZEN_ERROR("Failed to destroy instance for failed provision module '{}': {}", ModuleId, DestroyEx.what());
}
}
- throw;
}
+}
- OutReason.clear();
- OutInfo.Port = AllocatedPort;
- // TODO: base URI? Would need to know what host name / IP to use
+void
+Hub::AbortProvision(std::string_view ModuleId)
+{
+ std::unique_ptr<StorageServerInstance> DestroyInstance;
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ if (auto It = m_InstanceLookup.find(std::string(ModuleId)); It != m_InstanceLookup.end())
+ {
+ const size_t ActiveInstanceIndex = It->second;
+ ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
+ DestroyInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
+ ZEN_ASSERT(DestroyInstance);
+ ZEN_ASSERT(!m_ActiveInstances[ActiveInstanceIndex]);
+ m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
+ m_InstanceLookup.erase(It);
+ }
+ else
+ {
+ ZEN_WARN("AbortProvision called for unknown module '{}'", ModuleId);
+ }
+ }
+ DestroyInstance.reset();
+}
- return true;
+Hub::Response
+Hub::Deprovision(const std::string& ModuleId)
+{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+ return InternalDeprovision(ModuleId);
}
-bool
-Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::InternalDeprovision(const std::string& ModuleId)
{
std::unique_ptr<StorageServerInstance> RawInstance;
StorageServerInstance::ExclusiveLockedPtr Instance;
@@ -409,26 +513,27 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
if (m_ProvisioningModules.contains(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently being provisioned", ModuleId);
-
ZEN_WARN("Attempted to deprovision module '{}' which is currently being provisioned", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently being provisioned", ModuleId)};
}
if (m_RecoveringModules.contains(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently recovering from a crash", ModuleId);
ZEN_WARN("Attempted to deprovision module '{}' which is currently recovering", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently recovering from a crash", ModuleId)};
+ }
+
+ if (m_DeprovisioningModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
}
if (auto It = m_InstanceLookup.find(ModuleId); It == m_InstanceLookup.end())
{
ZEN_WARN("Attempted to deprovision non-existent module '{}'", ModuleId);
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
else
{
@@ -436,33 +541,90 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
RawInstance = std::move(m_ActiveInstances[ActiveInstanceIndex]);
ZEN_ASSERT(RawInstance != nullptr);
+
m_FreeActiveInstanceIndexes.push_back(ActiveInstanceIndex);
m_InstanceLookup.erase(It);
- m_DeprovisioningModules.emplace(ModuleId);
+ m_DeprovisioningModules.emplace(ModuleId, RawInstance->GetBasePort());
Instance = RawInstance->LockExclusive(/*Wait*/ true);
}
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_DeprovisioningModules tracks which modules are being deprovisioned, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(RawInstance);
ZEN_ASSERT(Instance);
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance)),
+ RawInstance = std::shared_ptr<StorageServerInstance>(std::move(RawInstance))]() mutable {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteDeprovision(*Instance);
+ RawInstance.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async deprovision of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ ZEN_ERROR("Failed async dispatch deprovision of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
+ uint16_t BasePort = Instance.GetBasePort();
+ InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, /*BaseUri*/ {});
+
+ // The module is already removed from m_InstanceLookup; treat as gone so callbacks fire correctly.
+ Instance = {};
+ NewState = HubInstanceState::Unprovisioned;
+
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_DeprovisioningModules.erase(std::string(ModuleId));
+ m_FreePorts.push_back(BasePort);
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteDeprovision(Instance);
+ RawInstance.reset();
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
HubInstanceState NewState = OldState;
InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
- // The module is deprovisioned outside the hub lock to avoid blocking other operations.
- //
- // To ensure that no new provisioning can occur while we're deprovisioning,
- // we add the module ID to m_DeprovisioningModules and remove it once
- // deprovisioning is complete.
-
auto _ = MakeGuard([&] {
{
RwLock::ExclusiveLockScope _(m_Lock);
- m_DeprovisioningModules.erase(ModuleId);
+ m_DeprovisioningModules.erase(std::string(ModuleId));
m_FreePorts.push_back(BasePort);
}
});
@@ -470,6 +632,8 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
try
{
(void)Instance.Deprovision();
+ NewState = Instance.GetState();
+ Instance = {};
}
catch (const std::exception& Ex)
{
@@ -479,42 +643,116 @@ Hub::Deprovision(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
- NewState = Instance.GetState();
- Instance = {};
- OutReason.clear();
-
- return true;
}
-bool
-Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::Hibernate(const std::string& ModuleId)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
StorageServerInstance::ExclusiveLockedPtr Instance;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
- m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId))
+ if (m_HibernatingModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ if (IsModuleInFlightLocked(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
}
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
- m_HibernatingModules.emplace(ModuleId);
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Hibernated)
+ {
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ m_HibernatingModules.emplace(ModuleId, Instance.GetBasePort());
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_HibernatingModules tracks which modules are being hibernated, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+
+ // Validate state while holding the exclusive instance lock (state cannot change).
+ // This gives a synchronous error response for invalid-state calls on both the sync
+ // and async paths, matching the existing behaviour.
+ if (Instance.GetState() != HubInstanceState::Provisioned)
+ {
+ const Response HibernateResp =
+ Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()))};
+ Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_HibernatingModules.erase(ModuleId);
+ return HibernateResp;
+ }
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteHibernate(*Instance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async hibernate of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo the latch increment and tracking-set membership.
+ // State has not changed so no callback is needed.
+ // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ ZEN_ERROR("Failed async dispatch hibernate of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_HibernatingModules.erase(std::string(ModuleId));
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteHibernate(Instance);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -523,19 +761,17 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
auto RemoveHibernatingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
- m_HibernatingModules.erase(ModuleId);
+ m_HibernatingModules.erase(std::string(ModuleId));
});
- // NOTE: done while not holding the hub lock, as hibernation may take time.
- // m_HibernatingModules tracks which modules are being hibernated, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
try
{
if (!Instance.Hibernate())
{
- OutReason = fmt::format("Module '{}' cannot be hibernated from state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- return false;
+ ZEN_WARN("Hibernate returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
+ Instance = {};
+ return;
}
NewState = Instance.GetState();
Instance = {};
@@ -547,42 +783,116 @@ Hub::Hibernate(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
-
- OutReason.clear();
-
- return true;
}
-bool
-Hub::Wake(const std::string& ModuleId, std::string& OutReason)
+Hub::Response
+Hub::Wake(const std::string& ModuleId)
{
+ ZEN_ASSERT(!m_ShutdownFlag.load());
+
StorageServerInstance::ExclusiveLockedPtr Instance;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_ProvisioningModules.contains(ModuleId) || m_DeprovisioningModules.contains(ModuleId) ||
- m_HibernatingModules.contains(ModuleId) || m_WakingModules.contains(ModuleId) || m_RecoveringModules.contains(ModuleId))
+ if (m_WakingModules.contains(ModuleId))
+ {
+ return Response{EResponseCode::Accepted};
+ }
+
+ if (IsModuleInFlightLocked(ModuleId))
{
- OutReason = fmt::format("Module '{}' is currently changing state", ModuleId);
- return false;
+ return Response{EResponseCode::Rejected, fmt::format("Module '{}' is currently changing state", ModuleId)};
}
auto It = m_InstanceLookup.find(ModuleId);
if (It == m_InstanceLookup.end())
{
- OutReason.clear(); // empty = not found (-> 404)
- return false;
+ return Response{EResponseCode::NotFound};
}
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
- m_WakingModules.emplace(ModuleId);
+ std::unique_ptr<StorageServerInstance>& InstanceRaw = m_ActiveInstances[ActiveInstanceIndex];
+ ZEN_ASSERT(InstanceRaw);
+
+ if (InstanceRaw->GetState() == HubInstanceState::Provisioned)
+ {
+ return Response{EResponseCode::Completed};
+ }
+
+ Instance = InstanceRaw->LockExclusive(/*Wait*/ true);
+ m_WakingModules.emplace(ModuleId, Instance.GetBasePort());
}
+ // NOTE: done while not holding the hub lock, to avoid blocking other operations.
+ // m_WakingModules tracks which modules are being woken, blocking
+ // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
+
ZEN_ASSERT(Instance);
+ // Validate state while holding the exclusive instance lock (state cannot change).
+ // This gives a synchronous error response for invalid-state calls on both the sync
+ // and async paths, matching the existing behaviour.
+ if (Instance.GetState() != HubInstanceState::Hibernated)
+ {
+ const Response WakeResp =
+ Response{EResponseCode::Rejected,
+ fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()))};
+ Instance = {}; // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_WakingModules.erase(ModuleId);
+ return WakeResp;
+ }
+
+ if (m_WorkerPool)
+ {
+ m_BackgroundWorkLatch.AddCount(1);
+ try
+ {
+ m_WorkerPool->ScheduleWork(
+ [this,
+ ModuleId = std::string(ModuleId),
+ Instance = std::make_shared<StorageServerInstance::ExclusiveLockedPtr>(std::move(Instance))]() {
+ auto _ = MakeGuard([this]() { m_BackgroundWorkLatch.CountDown(); });
+ try
+ {
+ CompleteWake(*Instance);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed async wake of module '{}': {}", ModuleId, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ catch (const std::exception& DispatchEx)
+ {
+ // Dispatch failed: undo the latch increment and tracking-set membership.
+ // State has not changed so no callback is needed.
+ // Release instance lock before acquiring hub lock (established ordering: hub lock -> instance lock)
+ ZEN_ERROR("Failed async dispatch wake of module '{}': {}", ModuleId, DispatchEx.what());
+ m_BackgroundWorkLatch.CountDown();
+ Instance = {};
+ {
+ RwLock::ExclusiveLockScope _(m_Lock);
+ m_WakingModules.erase(std::string(ModuleId));
+ }
+ throw;
+ }
+ }
+ else
+ {
+ CompleteWake(Instance);
+ }
+
+ return Response{m_WorkerPool ? EResponseCode::Accepted : EResponseCode::Completed};
+}
+
+void
+Hub::CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance)
+{
+ const std::string ModuleId(Instance.GetModuleId());
uint16_t BasePort = Instance.GetBasePort();
std::string BaseUri; // TODO?
HubInstanceState OldState = Instance.GetState();
@@ -591,19 +901,17 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
auto RemoveWakingModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
- m_WakingModules.erase(ModuleId);
+ m_WakingModules.erase(std::string(ModuleId));
});
- // NOTE: done while not holding the hub lock, as waking may take time.
- // m_WakingModules tracks which modules are being woken, blocking
- // concurrent Hibernate/Wake/Provision/Deprovision attempts on the same module.
try
{
if (!Instance.Wake())
{
- OutReason = fmt::format("Module '{}' cannot be woken from state '{}'", ModuleId, ToString(Instance.GetState()));
- NewState = Instance.GetState();
- return false;
+ ZEN_WARN("Wake returned false unexpectedly for module '{}' in state '{}'", ModuleId, ToString(Instance.GetState()));
+ NewState = Instance.GetState();
+ Instance = {};
+ return;
}
NewState = Instance.GetState();
Instance = {};
@@ -615,10 +923,6 @@ Hub::Wake(const std::string& ModuleId, std::string& OutReason)
Instance = {};
throw;
}
-
- OutReason.clear();
-
- return true;
}
bool
@@ -739,18 +1043,23 @@ Hub::CanProvisionInstance(std::string_view ModuleId, std::string& OutReason)
return true;
}
+bool
+Hub::IsModuleInFlightLocked(std::string_view ModuleId) const
+{
+ const std::string Key(ModuleId);
+ return m_ProvisioningModules.contains(Key) || m_DeprovisioningModules.contains(Key) || m_HibernatingModules.contains(Key) ||
+ m_WakingModules.contains(Key) || m_RecoveringModules.contains(Key);
+}
+
void
Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
StorageServerInstance::ExclusiveLockedPtr Instance;
- StorageServerInstance* RawInstance = nullptr;
{
RwLock::ExclusiveLockScope _(m_Lock);
- if (m_RecoveringModules.contains(std::string(ModuleId)) || m_ProvisioningModules.contains(std::string(ModuleId)) ||
- m_DeprovisioningModules.contains(std::string(ModuleId)) || m_HibernatingModules.contains(std::string(ModuleId)) ||
- m_WakingModules.contains(std::string(ModuleId)))
+ if (IsModuleInFlightLocked(ModuleId))
{
return;
}
@@ -763,38 +1072,42 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
const size_t ActiveInstanceIndex = It->second;
ZEN_ASSERT(ActiveInstanceIndex < m_ActiveInstances.size());
- RawInstance = m_ActiveInstances[ActiveInstanceIndex].get();
- Instance = RawInstance->LockExclusive(/*Wait*/ true);
+ Instance = m_ActiveInstances[ActiveInstanceIndex]->LockExclusive(/*Wait*/ true);
+
+ // Definitive check while hub lock is held: exclusive instance lock is also held,
+ // so state cannot change under us. Bail if state changed (e.g. concurrent deprovision)
+ // or the process restarted since the watchdog fired.
+ if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning())
+ {
+ return;
+ }
+
m_RecoveringModules.emplace(std::string(ModuleId));
}
ZEN_ASSERT(Instance);
- uint16_t BasePort = Instance.GetBasePort();
- std::string BaseUri; // TODO?
- HubInstanceState OldState = Instance.GetState();
- HubInstanceState NewState = OldState;
- InstanceStateUpdateGuard StateGuard(*this, ModuleId, OldState, NewState, BasePort, BaseUri);
+ const uint16_t BasePort = Instance.GetBasePort();
+ std::string BaseUri; // TODO?
+ HubInstanceState OldState = Instance.GetState();
+ HubInstanceState NewState = OldState;
auto RemoveRecoveringModule = MakeGuard([&] {
RwLock::ExclusiveLockScope _(m_Lock);
m_RecoveringModules.erase(std::string(ModuleId));
});
- // Re-validate: state may have changed between releasing shared lock and acquiring exclusive lock
- if (Instance.GetState() != HubInstanceState::Provisioned || Instance.IsRunning())
- {
- return;
- }
-
if (Instance.RecoverFromCrash())
{
+ // Spawn succeeded -- instance is back in Provisioned state.
NewState = Instance.GetState();
Instance = {};
+ OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
return;
}
- // Restart threw but data dir is intact - run Dehydrate via Deprovision before cleanup.
+ // Spawn failed -- instance is now in Crashed state. Dehydrate before tearing down
+ // so any salvageable data is preserved.
try
{
(void)Instance.Deprovision();
@@ -803,7 +1116,6 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
ZEN_ERROR("Failed to deprovision instance for module '{}' during crash recovery cleanup: {}", ModuleId, Ex.what());
}
- NewState = Instance.GetState();
Instance = {};
std::unique_ptr<StorageServerInstance> DestroyInstance;
@@ -831,6 +1143,10 @@ Hub::AttemptRecoverInstance(std::string_view ModuleId)
{
ZEN_ERROR("Failed to destroy recovered instance for module '{}': {}", ModuleId, Ex.what());
}
+
+ // Notify after all cleanup -- port is back in m_FreePorts and the callback sees
+ // a consistent end-state: module gone, transition complete.
+ OnStateUpdate(ModuleId, OldState, NewState, BasePort, BaseUri);
}
void
@@ -942,9 +1258,10 @@ namespace hub_testutils {
std::unique_ptr<Hub> MakeHub(const std::filesystem::path& BaseDir,
Hub::Configuration Config = {},
- Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {})
+ Hub::AsyncModuleStateChangeCallbackFunc StateChangeCallback = {},
+ WorkerThreadPool* WorkerPool = nullptr)
{
- return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), std::move(StateChangeCallback));
+ return std::make_unique<Hub>(Config, MakeHubEnvironment(BaseDir), WorkerPool, std::move(StateChangeCallback));
}
struct CallbackRecord
@@ -989,10 +1306,8 @@ TEST_CASE("hub.provision_basic")
CHECK_FALSE(HubInstance->Find("module_a"));
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
Hub::InstanceInfo InstanceInfo;
@@ -1004,9 +1319,8 @@ TEST_CASE("hub.provision_basic")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
- CHECK(Reason.empty());
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
@@ -1037,9 +1351,8 @@ TEST_CASE("hub.provision_config")
CHECK_FALSE(HubInstance->Find("module_a"));
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("module_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
Hub::InstanceInfo InstanceInfo;
@@ -1056,8 +1369,8 @@ TEST_CASE("hub.provision_config")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("module_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("module_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("module_a"));
@@ -1076,10 +1389,9 @@ TEST_CASE("hub.provision_callbacks")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), {}, CaptureInstance.CaptureFunc());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("cb_module", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("cb_module", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
{
RwLock::SharedLockScope _(CaptureInstance.CallbackMutex);
@@ -1094,8 +1406,8 @@ TEST_CASE("hub.provision_callbacks")
CHECK(ModClient.Get("/health/"));
}
- const bool DeprovisionResult = HubInstance->Deprovision("cb_module", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("cb_module");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
{
HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
@@ -1121,27 +1433,24 @@ TEST_CASE("hub.instance_limit")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool FirstResult = HubInstance->Provision("limit_a", Info, Reason);
- REQUIRE_MESSAGE(FirstResult, Reason);
+ const Hub::Response FirstResult = HubInstance->Provision("limit_a", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Completed, FirstResult.Message);
- const bool SecondResult = HubInstance->Provision("limit_b", Info, Reason);
- REQUIRE_MESSAGE(SecondResult, Reason);
+ const Hub::Response SecondResult = HubInstance->Provision("limit_b", Info);
+ REQUIRE_MESSAGE(SecondResult.ResponseCode == Hub::EResponseCode::Completed, SecondResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- Reason.clear();
- const bool ThirdResult = HubInstance->Provision("limit_c", Info, Reason);
- CHECK_FALSE(ThirdResult);
+ const Hub::Response ThirdResult = HubInstance->Provision("limit_c", Info);
+ CHECK(ThirdResult.ResponseCode == Hub::EResponseCode::Rejected);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
- CHECK_NE(Reason.find("instance limit"), std::string::npos);
+ CHECK_NE(ThirdResult.Message.find("instance limit"), std::string::npos);
- HubInstance->Deprovision("limit_a", Reason);
+ HubInstance->Deprovision("limit_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
- Reason.clear();
- const bool FourthResult = HubInstance->Provision("limit_d", Info, Reason);
- CHECK_MESSAGE(FourthResult, Reason);
+ const Hub::Response FourthResult = HubInstance->Provision("limit_d", Info);
+ CHECK_MESSAGE(FourthResult.ResponseCode == Hub::EResponseCode::Completed, FourthResult.Message);
CHECK_EQ(HubInstance->GetInstanceCount(), 2);
}
@@ -1151,10 +1460,15 @@ TEST_CASE("hub.enumerate_modules")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("enum_a", Info, Reason), Reason);
- REQUIRE_MESSAGE(HubInstance->Provision("enum_b", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("enum_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Provision("enum_b", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
std::vector<std::string> Ids;
int ProvisionedCount = 0;
@@ -1172,7 +1486,7 @@ TEST_CASE("hub.enumerate_modules")
CHECK(FoundA);
CHECK(FoundB);
- HubInstance->Deprovision("enum_a", Reason);
+ HubInstance->Deprovision("enum_a");
Ids.clear();
ProvisionedCount = 0;
HubInstance->EnumerateModules([&](std::string_view ModuleId, const Hub::InstanceInfo& InstanceInfo) {
@@ -1195,17 +1509,22 @@ TEST_CASE("hub.max_instance_count")
CHECK_EQ(HubInstance->GetMaxInstanceCount(), 0);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("max_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("max_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_GE(HubInstance->GetMaxInstanceCount(), 1);
- REQUIRE_MESSAGE(HubInstance->Provision("max_b", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("max_b", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_GE(HubInstance->GetMaxInstanceCount(), 2);
const int MaxAfterTwo = HubInstance->GetMaxInstanceCount();
- HubInstance->Deprovision("max_a", Reason);
+ HubInstance->Deprovision("max_a");
CHECK_EQ(HubInstance->GetInstanceCount(), 1);
CHECK_EQ(HubInstance->GetMaxInstanceCount(), MaxAfterTwo);
}
@@ -1228,8 +1547,8 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision(fmt::format("pre_{}", I), Info, Reason), Reason);
+ const Hub::Response ProvR = HubInstance->Provision(fmt::format("pre_{}", I), Info);
+ REQUIRE_MESSAGE(ProvR.ResponseCode == Hub::EResponseCode::Completed, ProvR.Message);
}
CHECK_EQ(HubInstance->GetInstanceCount(), kHalf);
@@ -1253,23 +1572,21 @@ TEST_CASE("hub.concurrent_callbacks")
for (int I = 0; I < kHalf; ++I)
{
- ProvisionFutures[I] = Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
- HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool Result =
- HubInstance->Provision(fmt::format("new_{}", I), Info, Reason);
- ProvisionResults[I] = Result ? 1 : 0;
- ProvisionReasons[I] = Reason;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
-
- DeprovisionFutures[I] = Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
- std::string Reason;
- const bool Result =
- HubInstance->Deprovision(fmt::format("pre_{}", I), Reason);
- DeprovisionResults[I] = Result ? 1 : 0;
- }),
- WorkerThreadPool::EMode::EnableBacklog);
+ ProvisionFutures[I] =
+ Provisioners.EnqueueTask(std::packaged_task<void()>([&, I] {
+ HubProvisionedInstanceInfo Info;
+ const Hub::Response Result = HubInstance->Provision(fmt::format("new_{}", I), Info);
+ ProvisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ ProvisionReasons[I] = Result.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+
+ DeprovisionFutures[I] =
+ Deprovisioneers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Result = HubInstance->Deprovision(fmt::format("pre_{}", I));
+ DeprovisionResults[I] = (Result.ResponseCode == Hub::EResponseCode::Completed) ? 1 : 0;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
}
for (std::future<void>& F : ProvisionFutures)
@@ -1324,14 +1641,13 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("jobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("jobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("jobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("jobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
@@ -1344,14 +1660,13 @@ TEST_CASE("hub.job_object")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo Info;
- std::string Reason;
- const bool ProvisionResult = HubInstance->Provision("nojobobj_a", Info, Reason);
- REQUIRE_MESSAGE(ProvisionResult, Reason);
+ const Hub::Response ProvisionResult = HubInstance->Provision("nojobobj_a", Info);
+ REQUIRE_MESSAGE(ProvisionResult.ResponseCode == Hub::EResponseCode::Completed, ProvisionResult.Message);
CHECK_NE(Info.Port, 0);
- const bool DeprovisionResult = HubInstance->Deprovision("nojobobj_a", Reason);
- CHECK(DeprovisionResult);
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("nojobobj_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
}
}
@@ -1366,11 +1681,12 @@ TEST_CASE("hub.hibernate_wake")
HubProvisionedInstanceInfo ProvInfo;
Hub::InstanceInfo Info;
- std::string Reason;
// Provision
- REQUIRE_MESSAGE(HubInstance->Provision("hib_a", ProvInfo, Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response R = HubInstance->Provision("hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
@@ -1379,9 +1695,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Hibernate
- const bool HibernateResult = HubInstance->Hibernate("hib_a", Reason);
- REQUIRE_MESSAGE(HibernateResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response HibernateResult = HubInstance->Hibernate("hib_a");
+ REQUIRE_MESSAGE(HibernateResult.ResponseCode == Hub::EResponseCode::Completed, HibernateResult.Message);
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Hibernated);
{
@@ -1390,9 +1705,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Wake
- const bool WakeResult = HubInstance->Wake("hib_a", Reason);
- REQUIRE_MESSAGE(WakeResult, Reason);
- CHECK(Reason.empty());
+ const Hub::Response WakeResult = HubInstance->Wake("hib_a");
+ REQUIRE_MESSAGE(WakeResult.ResponseCode == Hub::EResponseCode::Completed, WakeResult.Message);
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
@@ -1401,9 +1715,8 @@ TEST_CASE("hub.hibernate_wake")
}
// Deprovision
- const bool DeprovisionResult = HubInstance->Deprovision("hib_a", Reason);
- CHECK(DeprovisionResult);
- CHECK(Reason.empty());
+ const Hub::Response DeprovisionResult = HubInstance->Deprovision("hib_a");
+ CHECK(DeprovisionResult.ResponseCode == Hub::EResponseCode::Completed);
CHECK_FALSE(HubInstance->Find("hib_a"));
{
HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
@@ -1419,36 +1732,131 @@ TEST_CASE("hub.hibernate_wake_errors")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config);
HubProvisionedInstanceInfo ProvInfo;
- std::string Reason;
- // Hibernate/wake on a non-existent module - should return false with empty reason (-> 404)
- CHECK_FALSE(HubInstance->Hibernate("never_provisioned", Reason));
- CHECK(Reason.empty());
+ // Hibernate/wake on a non-existent module - returns NotFound (-> 404)
+ CHECK(HubInstance->Hibernate("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+ CHECK(HubInstance->Wake("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
- CHECK_FALSE(HubInstance->Wake("never_provisioned", Reason));
- CHECK(Reason.empty());
+ // Double-hibernate: second hibernate on already-hibernated module returns Completed (idempotent)
+ {
+ const Hub::Response R = HubInstance->Provision("err_b", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
+ {
+ const Hub::Response R = HubInstance->Hibernate("err_b");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
- // Double-hibernate: first hibernate succeeds, second returns false with non-empty reason (-> 400)
- REQUIRE_MESSAGE(HubInstance->Provision("err_b", ProvInfo, Reason), Reason);
- CHECK(Reason.empty());
- REQUIRE_MESSAGE(HubInstance->Hibernate("err_b", Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response HibResp = HubInstance->Hibernate("err_b");
+ CHECK(HibResp.ResponseCode == Hub::EResponseCode::Completed);
+ }
- Reason.clear();
- CHECK_FALSE(HubInstance->Hibernate("err_b", Reason));
- CHECK_FALSE(Reason.empty());
+ // Wake on provisioned: succeeds (-> Provisioned), then wake again returns Completed (idempotent)
+ {
+ const Hub::Response R = HubInstance->Wake("err_b");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
- // Wake on provisioned: succeeds (-> Provisioned), then wake again returns false (-> 400)
- REQUIRE_MESSAGE(HubInstance->Wake("err_b", Reason), Reason);
- CHECK(Reason.empty());
+ {
+ const Hub::Response WakeResp = HubInstance->Wake("err_b");
+ CHECK(WakeResp.ResponseCode == Hub::EResponseCode::Completed);
+ }
- Reason.clear();
- CHECK_FALSE(HubInstance->Wake("err_b", Reason));
- CHECK_FALSE(Reason.empty());
+ // Deprovision not-found - returns NotFound (-> 404)
+ CHECK(HubInstance->Deprovision("never_provisioned").ResponseCode == Hub::EResponseCode::NotFound);
+}
+
+TEST_CASE("hub.async_hibernate_wake")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 23000;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_hib_wake");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ HubProvisionedInstanceInfo ProvInfo;
+ Hub::InstanceInfo Info;
- // Deprovision not-found - should return false with empty reason (-> 404)
- CHECK_FALSE(HubInstance->Deprovision("never_provisioned", Reason));
- CHECK(Reason.empty());
+ constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+
+ // Provision and wait until Provisioned
+ {
+ const Hub::Response R = HubInstance->Provision("async_hib_a", ProvInfo);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Ready = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Ready = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Ready, "Instance did not reach Provisioned state within timeout");
+ }
+
+ // Hibernate asynchronously and poll until Hibernated
+ {
+ const Hub::Response R = HubInstance->Hibernate("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Hibernated = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Hibernated)
+ {
+ Hibernated = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Wake asynchronously and poll until Provisioned
+ {
+ const Hub::Response R = HubInstance->Wake("async_hib_a");
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ {
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+ bool Woken = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ if (HubInstance->Find("async_hib_a", &Info) && Info.State == HubInstanceState::Provisioned)
+ {
+ Woken = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout");
+ }
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Deprovision
+ {
+ const Hub::Response R = HubInstance->Deprovision("async_hib_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Accepted, R.Message);
+ }
+ CHECK_FALSE(HubInstance->Find("async_hib_a"));
}
TEST_CASE("hub.recover_process_crash")
@@ -1457,8 +1865,10 @@ TEST_CASE("hub.recover_process_crash")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
// Kill the child process to simulate a crash, then poll until the watchdog detects it,
// recovers the instance, and the new process is serving requests.
@@ -1494,8 +1904,10 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path());
HubProvisionedInstanceInfo Info;
- std::string Reason;
- REQUIRE_MESSAGE(HubInstance->Provision("module_a", Info, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", Info);
+ REQUIRE_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
// Kill the child process, wait for the watchdog to detect and recover the instance.
HubInstance->TerminateModuleForTesting("module_a");
@@ -1518,16 +1930,166 @@ TEST_CASE("hub.recover_process_crash_then_deprovision")
REQUIRE_MESSAGE(Recovered, "Instance did not recover within timeout");
// After recovery, deprovision should succeed and a re-provision should work.
- CHECK_MESSAGE(HubInstance->Deprovision("module_a", Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Deprovision("module_a");
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
HubProvisionedInstanceInfo NewInfo;
- CHECK_MESSAGE(HubInstance->Provision("module_a", NewInfo, Reason), Reason);
+ {
+ const Hub::Response R = HubInstance->Provision("module_a", NewInfo);
+ CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
+ }
CHECK_NE(NewInfo.Port, 0);
HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout);
CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests");
}
+TEST_CASE("hub.async_provision_concurrent")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.BasePortNumber = 22800;
+ Config.InstanceLimit = kModuleCount;
+
+ WorkerThreadPool WorkerPool(4, "hub_async_concurrent");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+ std::vector<std::string> Reasons(kModuleCount);
+ std::vector<int> Results(kModuleCount, 0);
+
+ {
+ WorkerThreadPool Callers(kModuleCount, "hub_async_callers");
+ std::vector<std::future<void>> Futures(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Futures[I] = Callers.EnqueueTask(std::packaged_task<void()>([&, I] {
+ const Hub::Response Resp = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ Results[I] = (Resp.ResponseCode == Hub::EResponseCode::Accepted) ? 1 : 0;
+ Reasons[I] = Resp.Message;
+ }),
+ WorkerThreadPool::EMode::EnableBacklog);
+ }
+ for (std::future<void>& F : Futures)
+ {
+ F.get();
+ }
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ REQUIRE_MESSAGE(Results[I] != 0, Reasons[I]);
+ CHECK_NE(Infos[I].Port, 0);
+ }
+
+ // Poll until all instances reach Provisioned state
+ constexpr auto kPollInterval = std::chrono::milliseconds(200);
+ constexpr auto kTimeout = std::chrono::seconds(30);
+ const auto Deadline = std::chrono::steady_clock::now() + kTimeout;
+
+ bool AllProvisioned = false;
+ while (std::chrono::steady_clock::now() < Deadline)
+ {
+ int ProvisionedCount = 0;
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ Hub::InstanceInfo InstanceInfo;
+ if (HubInstance->Find(fmt::format("async_c{}", I), &InstanceInfo) && InstanceInfo.State == HubInstanceState::Provisioned)
+ {
+ ++ProvisionedCount;
+ }
+ }
+ if (ProvisionedCount == kModuleCount)
+ {
+ AllProvisioned = true;
+ break;
+ }
+ std::this_thread::sleep_for(kPollInterval);
+ }
+ CHECK_MESSAGE(AllProvisioned, "Not all instances reached Provisioned state within timeout");
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I));
+ }
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response DepResp = HubInstance->Deprovision(fmt::format("async_c{}", I));
+ CHECK_MESSAGE(DepResp.ResponseCode == Hub::EResponseCode::Accepted, DepResp.Message);
+ }
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+}
+
+TEST_CASE("hub.async_provision_shutdown_waits")
+{
+ ScopedTemporaryDirectory TempDir;
+
+ constexpr int kModuleCount = 8;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = kModuleCount;
+ Config.BasePortNumber = 22900;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_shutdown");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ std::vector<HubProvisionedInstanceInfo> Infos(kModuleCount);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ const Hub::Response ProvResult = HubInstance->Provision(fmt::format("async_c{}", I), Infos[I]);
+ REQUIRE_MESSAGE(ProvResult.ResponseCode == Hub::EResponseCode::Accepted, ProvResult.Message);
+ REQUIRE_NE(Infos[I].Port, 0);
+ }
+
+ // Shut down without polling for Provisioned; Shutdown() must drain the latch and clean up.
+ HubInstance->Shutdown();
+
+ CHECK_EQ(HubInstance->GetInstanceCount(), 0);
+
+ for (int I = 0; I < kModuleCount; ++I)
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ CHECK_FALSE(ModClient.Get("/health/"));
+ }
+}
+
+TEST_CASE("hub.async_provision_rejected")
+{
+ // Rejection from CanProvisionInstance fires synchronously even when a WorkerPool is present.
+ ScopedTemporaryDirectory TempDir;
+
+ Hub::Configuration Config;
+ Config.InstanceLimit = 1;
+ Config.BasePortNumber = 23100;
+
+ WorkerThreadPool WorkerPool(2, "hub_async_rejected");
+ std::unique_ptr<Hub> HubInstance = hub_testutils::MakeHub(TempDir.Path(), Config, {}, &WorkerPool);
+
+ HubProvisionedInstanceInfo Info;
+
+ // First provision: dispatched to WorkerPool, returns Accepted
+ const Hub::Response FirstResult = HubInstance->Provision("async_r1", Info);
+ REQUIRE_MESSAGE(FirstResult.ResponseCode == Hub::EResponseCode::Accepted, FirstResult.Message);
+ REQUIRE_NE(Info.Port, 0);
+
+ // Second provision: CanProvisionInstance rejects synchronously (limit reached), returns Rejected
+ HubProvisionedInstanceInfo Info2;
+ const Hub::Response SecondResult = HubInstance->Provision("async_r2", Info2);
+ CHECK(SecondResult.ResponseCode == Hub::EResponseCode::Rejected);
+ CHECK_FALSE(SecondResult.Message.empty());
+ CHECK_NE(SecondResult.Message.find("instance limit"), std::string::npos);
+ CHECK_EQ(HubInstance->GetInstanceCount(), 1);
+}
+
TEST_SUITE_END();
void
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 28e77e729..f0bde10fc 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -4,6 +4,7 @@
#include "hubinstancestate.h"
#include "resourcemetrics.h"
+#include "storageserverinstance.h"
#include <zencore/system.h>
#include <zenutil/zenserverprocess.h>
@@ -18,7 +19,7 @@
namespace zen {
-class StorageServerInstance;
+class WorkerThreadPool;
/**
* Hub
@@ -59,6 +60,7 @@ public:
Hub(const Configuration& Config,
ZenServerEnvironment&& RunEnvironment,
+ WorkerThreadPool* OptionalWorkerPool = nullptr,
AsyncModuleStateChangeCallbackFunc&& ModuleStateChangeCallback = {});
~Hub();
@@ -78,42 +80,49 @@ public:
*/
void Shutdown();
+ enum class EResponseCode
+ {
+ NotFound,
+ Rejected,
+ Accepted,
+ Completed
+ };
+
+ struct Response
+ {
+ EResponseCode ResponseCode = EResponseCode::Rejected;
+ std::string Message;
+ };
+
/**
* Provision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to provision.
- * @param OutInfo If successful, information about the provisioned instance will be returned here.
- * @param OutReason If unsuccessful, the reason will be returned here.
+ * @param OutInfo On success, information about the provisioned instance is returned here.
*/
- bool Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo, std::string& OutReason);
+ Response Provision(std::string_view ModuleId, HubProvisionedInstanceInfo& OutInfo);
/**
* Deprovision a storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to deprovision.
- * @param OutReason If unsuccessful, the reason will be returned here.
- * @return true if the instance was found and deprovisioned, false otherwise.
*/
- bool Deprovision(const std::string& ModuleId, std::string& OutReason);
+ Response Deprovision(const std::string& ModuleId);
/**
* Hibernate a storage server instance for the given module ID.
* The instance is shut down but its data is preserved; it can be woken later.
*
* @param ModuleId The ID of the module to hibernate.
- * @param OutReason If unsuccessful, the reason will be returned here (empty = not found).
- * @return true if the instance was hibernated, false otherwise.
*/
- bool Hibernate(const std::string& ModuleId, std::string& OutReason);
+ Response Hibernate(const std::string& ModuleId);
/**
* Wake a hibernated storage server instance for the given module ID.
*
* @param ModuleId The ID of the module to wake.
- * @param OutReason If unsuccessful, the reason will be returned here (empty = not found).
- * @return true if the instance was woken, false otherwise.
*/
- bool Wake(const std::string& ModuleId, std::string& OutReason);
+ Response Wake(const std::string& ModuleId);
/**
* Find info about storage server instance for the given module ID.
@@ -144,6 +153,9 @@ public:
private:
const Configuration m_Config;
ZenServerEnvironment m_RunEnvironment;
+ WorkerThreadPool* m_WorkerPool = nullptr;
+ Latch m_BackgroundWorkLatch;
+ std::atomic<bool> m_ShutdownFlag = false;
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;
@@ -155,10 +167,10 @@ private:
#endif
RwLock m_Lock;
std::unordered_map<std::string, size_t> m_InstanceLookup;
- std::unordered_set<std::string> m_DeprovisioningModules;
- std::unordered_set<std::string> m_ProvisioningModules;
- std::unordered_set<std::string> m_HibernatingModules;
- std::unordered_set<std::string> m_WakingModules;
+ std::unordered_map<std::string, uint16_t> m_DeprovisioningModules;
+ std::unordered_map<std::string, uint16_t> m_ProvisioningModules;
+ std::unordered_map<std::string, uint16_t> m_HibernatingModules;
+ std::unordered_map<std::string, uint16_t> m_WakingModules;
std::unordered_set<std::string> m_RecoveringModules;
std::vector<std::unique_ptr<StorageServerInstance>> m_ActiveInstances;
std::vector<size_t> m_FreeActiveInstanceIndexes;
@@ -175,6 +187,14 @@ private:
void UpdateStats();
void UpdateCapacityMetrics();
bool CanProvisionInstance(std::string_view ModuleId, std::string& OutReason);
+ bool IsModuleInFlightLocked(std::string_view ModuleId) const;
+
+ Response InternalDeprovision(const std::string& ModuleId);
+ void CompleteProvision(StorageServerInstance::ExclusiveLockedPtr& Instance, uint16_t AllocatedPort, bool IsNewInstance);
+ void AbortProvision(std::string_view ModuleId);
+ void CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance);
+ void CompleteHibernate(StorageServerInstance::ExclusiveLockedPtr& Instance);
+ void CompleteWake(StorageServerInstance::ExclusiveLockedPtr& Instance);
class InstanceStateUpdateGuard
{
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index f9ff655ec..4d70fbddd 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -16,6 +16,7 @@
#include <zencore/windows.h>
#include <zenhttp/httpapiservice.h>
#include <zenutil/service.h>
+#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <cxxopts.hpp>
@@ -315,6 +316,7 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
ServerConfig.HubInstanceHttpClass),
+ &GetMediumWorkerPool(EWorkloadType::Background),
m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
std::string_view ModuleId,
const HubProvisionedInstanceInfo& Info,