aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver-test/hub-tests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver-test/hub-tests.cpp')
-rw-r--r--src/zenserver-test/hub-tests.cpp737
1 files changed, 595 insertions, 142 deletions
diff --git a/src/zenserver-test/hub-tests.cpp b/src/zenserver-test/hub-tests.cpp
index 958a0b050..82dfd7e91 100644
--- a/src/zenserver-test/hub-tests.cpp
+++ b/src/zenserver-test/hub-tests.cpp
@@ -21,28 +21,91 @@
# include <zenutil/consul.h>
# include <zencore/thread.h>
# include <zencore/timer.h>
+# if ZEN_PLATFORM_WINDOWS
+# include <zencore/windows.h>
+# else
+# include <cstdlib>
+# endif
namespace zen::tests::hub {
using namespace std::literals;
-TEST_SUITE_BEGIN("server.hub");
+static const HttpClientSettings kFastTimeout{.ConnectTimeout = std::chrono::milliseconds(200)};
-TEST_CASE("hub.lifecycle.basic")
+static bool
+WaitForModuleState(HttpClient& Client, std::string_view ModuleId, std::string_view ExpectedState, int TimeoutMs = 10000)
{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
{
- ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId));
+ if (R && R.AsObject()["state"].AsString() == ExpectedState)
+ {
+ return true;
+ }
+ Sleep(100);
+ }
+ HttpClient::Response R = Client.Get(fmt::format("modules/{}", ModuleId));
+ return R && R.AsObject()["state"].AsString() == ExpectedState;
+}
- const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady();
- CHECK(PortNumber != 0);
+// Provision a module, retrying on 409 Conflict to handle the window where an async
+// deprovision has removed the module from InstanceLookup but not yet from
+// DeprovisioningModules (which CanProvisionInstance checks).
+static HttpClient::Response
+ProvisionModule(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ HttpClient::Response Result;
+ do
+ {
+ Result = Client.Post(fmt::format("modules/{}/provision", ModuleId));
+ if (Result || Result.StatusCode != HttpResponseCode::Conflict)
+ {
+ return Result;
+ }
+ Sleep(100);
+ } while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs));
+ return Result;
+}
- HttpClient Client(Instance.GetBaseUri() + "/hub/");
+// Wait for a port to stop accepting connections (i.e. the process has terminated).
+// Needed after async deprovision: WaitForModuleGone returns as soon as the module
+// leaves m_InstanceLookup (synchronous), but the background worker that kills the
+// process may not have run yet.
+static bool
+WaitForPortUnreachable(HttpClient& Client, std::string_view Path = "/health/", int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ if (!Client.Get(Path))
+ {
+ return true;
+ }
+ Sleep(100);
+ }
+ return !Client.Get(Path);
+}
- HttpClient::Response Result = Client.Get("status");
- CHECK(Result);
+static bool
+WaitForModuleGone(HttpClient& Client, std::string_view ModuleId, int TimeoutMs = 10000)
+{
+ Stopwatch Timer;
+ while (Timer.GetElapsedTimeMs() < static_cast<uint64_t>(TimeoutMs))
+ {
+ if (Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound)
+ {
+ return true;
+ }
+ Sleep(100);
}
+ return Client.Get(fmt::format("modules/{}", ModuleId)).StatusCode == HttpResponseCode::NotFound;
}
+TEST_SUITE_BEGIN("server.hub");
+
TEST_CASE("hub.lifecycle.children")
{
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
@@ -50,186 +113,242 @@ TEST_CASE("hub.lifecycle.children")
const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6");
REQUIRE(PortNumber != 0);
- SUBCASE("spawn")
- {
- HttpClient Client(Instance.GetBaseUri() + "/hub/");
+ HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout);
+ // Verify the hub starts with no modules
+ {
HttpClient::Response Result = Client.Get("status");
REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
+ }
- {
- Result = Client.Post("modules/abc/provision");
- REQUIRE(Result);
+ HttpClient::Response Result;
- CbObject AbcResult = Result.AsObject();
- CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
- const uint16_t AbcPort = AbcResult["port"].AsUInt16(0);
- CHECK_NE(AbcPort, 0);
+ uint16_t AbcPort = 0;
+ uint16_t DefPort = 0;
- // This should be a fresh instance with no contents
+ {
+ Result = Client.Post("modules/abc/provision");
+ REQUIRE(Result);
- HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort));
+ CbObject AbcResult = Result.AsObject();
+ CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
+ AbcPort = AbcResult["port"].AsUInt16(0);
+ CHECK_NE(AbcPort, 0);
- Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
- Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567",
- IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv)));
- CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
- }
+ // This should be a fresh instance with no contents
- {
- Result = Client.Post("modules/def/provision");
- REQUIRE(Result);
+ HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(AbcClient.Get("/health/"));
- CbObject DefResult = Result.AsObject();
- CHECK(DefResult["moduleId"].AsString() == "def"sv);
- const uint16_t DefPort = DefResult["port"].AsUInt16(0);
- REQUIRE_NE(DefPort, 0);
+ Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
- // This should be a fresh instance with no contents
+ Result = AbcClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567",
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView("abcdef"sv)));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
+ }
- HttpClient DefClient(fmt::format("http://localhost:{}", DefPort));
+ {
+ Result = Client.Post("modules/def/provision");
+ REQUIRE(Result);
- Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+ CbObject DefResult = Result.AsObject();
+ CHECK(DefResult["moduleId"].AsString() == "def"sv);
+ DefPort = DefResult["port"].AsUInt16(0);
+ REQUIRE_NE(DefPort, 0);
- Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567",
- IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv)));
- CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
- }
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
- // this should be rejected because of the invalid module id
- Result = Client.Post("modules/!!!!!/provision");
- CHECK(!Result);
+ // This should be a fresh instance with no contents
- Result = Client.Post("modules/ghi/provision");
- REQUIRE(Result);
+ HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(DefClient.Get("/health/"));
- // Tear down instances
+ Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
- Result = Client.Post("modules/abc/deprovision");
- REQUIRE(Result);
+ Result = DefClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567",
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView("AbcDef"sv)));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
+ }
- Result = Client.Post("modules/def/deprovision");
- REQUIRE(Result);
+ // this should be rejected because of the invalid module id
+ Result = Client.Post("modules/!!!!!/provision");
+ CHECK(!Result);
+
+ Result = Client.Post("modules/ghi/provision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleState(Client, "ghi", "provisioned"));
+
+ // Tear down instances
+
+ Result = Client.Post("modules/abc/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
- Result = Client.Post("modules/ghi/deprovision");
+ Result = Client.Post("modules/def/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
+
+ Result = Client.Post("modules/ghi/deprovision");
+ REQUIRE(Result);
+
+ // re-provision to verify that (de)hydration preserved state
+ {
+ Result = ProvisionModule(Client, "abc");
REQUIRE(Result);
- // re-provision to verify that (de)hydration preserved state
- {
- Result = Client.Post("modules/abc/provision");
- REQUIRE(Result);
+ CbObject AbcResult = Result.AsObject();
+ CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
+ AbcPort = AbcResult["port"].AsUInt16(0);
+ REQUIRE_NE(AbcPort, 0);
- CbObject AbcResult = Result.AsObject();
- CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
- const uint16_t AbcPort = AbcResult["port"].AsUInt16(0);
- REQUIRE_NE(AbcPort, 0);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
- // This should contain the content from the previous run
+ // This should contain the content from the previous run
- HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort));
+ HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(AbcClient.Get("/health/"));
- Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- CHECK_EQ(Result.AsText(), "abcdef"sv);
+ CHECK_EQ(Result.AsText(), "abcdef"sv);
- Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567",
- IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv)));
- CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
- }
+ Result = AbcClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567",
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView("ghijklmnop"sv)));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
+ }
- {
- Result = Client.Post("modules/def/provision");
- REQUIRE(Result);
+ {
+ Result = ProvisionModule(Client, "def");
+ REQUIRE(Result);
- CbObject DefResult = Result.AsObject();
- CHECK(DefResult["moduleId"].AsString() == "def"sv);
- const uint16_t DefPort = DefResult["port"].AsUInt16(0);
- REQUIRE_NE(DefPort, 0);
+ CbObject DefResult = Result.AsObject();
+ CHECK(DefResult["moduleId"].AsString() == "def"sv);
+ DefPort = DefResult["port"].AsUInt16(0);
+ REQUIRE_NE(DefPort, 0);
- // This should contain the content from the previous run
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
- HttpClient DefClient(fmt::format("http://localhost:{}", DefPort));
+ // This should contain the content from the previous run
- Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(DefClient.Get("/health/"));
- CHECK_EQ(Result.AsText(), "AbcDef"sv);
+ Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567",
- IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv)));
- CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
- }
+ CHECK_EQ(Result.AsText(), "AbcDef"sv);
- Result = Client.Post("modules/abc/deprovision");
- REQUIRE(Result);
+ Result = DefClient.Put("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567",
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView("GhijklmNop"sv)));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
+ }
+
+ Result = Client.Post("modules/abc/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
+
+ Result = Client.Post("modules/def/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
- Result = Client.Post("modules/def/deprovision");
+ // re-provision to verify that (de)hydration preserved state, including
+ // state which was generated after the very first dehydration
+ {
+ Result = ProvisionModule(Client, "abc");
REQUIRE(Result);
- // re-provision to verify that (de)hydration preserved state, including
- // state which was generated after the very first dehydration
- {
- Result = Client.Post("modules/abc/provision");
- REQUIRE(Result);
+ CbObject AbcResult = Result.AsObject();
+ CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
+ AbcPort = AbcResult["port"].AsUInt16(0);
+ REQUIRE_NE(AbcPort, 0);
- CbObject AbcResult = Result.AsObject();
- CHECK(AbcResult["moduleId"].AsString() == "abc"sv);
- const uint16_t AbcPort = AbcResult["port"].AsUInt16(0);
- REQUIRE_NE(AbcPort, 0);
+ REQUIRE(WaitForModuleState(Client, "abc", "provisioned"));
- // This should contain the content from the previous two runs
+ // This should contain the content from the previous two runs
- HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort));
+ HttpClient AbcClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(AbcClient.Get("/health/"));
- Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ Result = AbcClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- CHECK_EQ(Result.AsText(), "abcdef"sv);
+ CHECK_EQ(Result.AsText(), "abcdef"sv);
- Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ Result = AbcClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- CHECK_EQ(Result.AsText(), "ghijklmnop"sv);
- }
+ CHECK_EQ(Result.AsText(), "ghijklmnop"sv);
+ }
- {
- Result = Client.Post("modules/def/provision");
- REQUIRE(Result);
+ {
+ Result = ProvisionModule(Client, "def");
+ REQUIRE(Result);
- CbObject DefResult = Result.AsObject();
- REQUIRE(DefResult["moduleId"].AsString() == "def"sv);
- const uint16_t DefPort = DefResult["port"].AsUInt16(0);
- REQUIRE_NE(DefPort, 0);
+ CbObject DefResult = Result.AsObject();
+ REQUIRE(DefResult["moduleId"].AsString() == "def"sv);
+ DefPort = DefResult["port"].AsUInt16(0);
+ REQUIRE_NE(DefPort, 0);
- // This should contain the content from the previous two runs
+ REQUIRE(WaitForModuleState(Client, "def", "provisioned"));
- HttpClient DefClient(fmt::format("http://localhost:{}", DefPort));
+ // This should contain the content from the previous two runs
- Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ HttpClient DefClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(DefClient.Get("/health/"));
- CHECK_EQ(Result.AsText(), "AbcDef"sv);
+ Result = DefClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567");
- CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ CHECK_EQ(Result.AsText(), "AbcDef"sv);
- CHECK_EQ(Result.AsText(), "GhijklmNop"sv);
- }
+ Result = DefClient.Get("/z$/ns1/b/1123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
- Result = Client.Post("modules/abc/deprovision");
- REQUIRE(Result);
+ CHECK_EQ(Result.AsText(), "GhijklmNop"sv);
+ }
- Result = Client.Post("modules/def/deprovision");
- REQUIRE(Result);
+ Result = Client.Post("modules/abc/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "abc"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", AbcPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
- // final sanity check that the hub is still responsive
- Result = Client.Get("status");
- CHECK(Result);
+ Result = Client.Post("modules/def/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "def"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", DefPort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
}
+
+ // final sanity check that the hub is still responsive and all modules are gone
+ Result = Client.Get("status");
+ REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["modules"].AsArrayView().Num(), 0u);
}
static bool
@@ -258,7 +377,7 @@ TEST_CASE("hub.consul.kv")
consul::ConsulProcess ConsulProc;
ConsulProc.SpawnConsulAgent();
- consul::ConsulClient Client("http://localhost:8500/");
+ consul::ConsulClient Client({.BaseUri = "http://localhost:8500/"});
Client.SetKeyValue("zen/hub/testkey", "testvalue");
std::string RetrievedValue = Client.GetKeyValue("zen/hub/testkey");
@@ -275,11 +394,93 @@ TEST_CASE("hub.consul.hub.registration")
ConsulProc.SpawnConsulAgent();
ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
- const uint16_t PortNumber =
- Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance");
+ const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(
+ "--consul-endpoint=http://localhost:8500/ --instance-id=test-instance "
+ "--consul-health-interval-seconds=5 --consul-deregister-after-seconds=60");
+ REQUIRE(PortNumber != 0);
+
+ consul::ConsulClient Client({.BaseUri = "http://localhost:8500/"});
+ REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000));
+
+ // Verify custom intervals flowed through to the registered check
+ {
+ std::string JsonError;
+ CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), JsonError);
+ REQUIRE(JsonError.empty());
+
+ CbObjectView HubCheck;
+ for (CbFieldView F : ChecksRoot)
+ {
+ if (!F.IsObject())
+ {
+ continue;
+ }
+ for (CbFieldView C : F.AsObjectView())
+ {
+ CbObjectView Check = C.AsObjectView();
+ if (Check["ServiceID"sv].AsString() == "zen-hub-test-instance"sv)
+ {
+ HubCheck = Check;
+ break;
+ }
+ }
+ }
+ REQUIRE(HubCheck);
+ // Consul does not reflect DeregisterCriticalServiceAfter back in /v1/agent/checks for
+ // service-embedded checks; Definition is always an empty object. Only Type and Interval
+ // are accessible at the top level.
+ CHECK_EQ(HubCheck["Type"sv].AsString(), "http"sv);
+ CHECK_EQ(HubCheck["Interval"sv].AsString(), "5s"sv);
+ }
+
+ Instance.Shutdown();
+
+ CHECK(!Client.HasService("zen-hub-test-instance"));
+
+ ConsulProc.StopConsulAgent();
+}
+
+TEST_CASE("hub.consul.hub.registration.token")
+{
+ // Set an env var that the server will read its Consul token from.
+ // Children inherit parent environment, so the spawned server will see it.
+ constexpr const char* TokenEnvVarName = "ZEN_TEST_CONSUL_TOKEN";
+ constexpr const char* TokenValue = "test-token-value";
+# if ZEN_PLATFORM_WINDOWS
+ char PrevBuf[1024] = {};
+ DWORD PrevLen = GetEnvironmentVariableA(TokenEnvVarName, PrevBuf, sizeof(PrevBuf));
+ REQUIRE(PrevLen < sizeof(PrevBuf));
+ SetEnvironmentVariableA(TokenEnvVarName, TokenValue);
+ auto EnvCleanup = MakeGuard([PrevEnvValue = std::string(PrevBuf, PrevLen), HadPrevToken = PrevLen > 0] {
+ SetEnvironmentVariableA(TokenEnvVarName, HadPrevToken ? PrevEnvValue.c_str() : nullptr);
+ });
+# else
+ const char* PrevEnvPtr = getenv(TokenEnvVarName);
+ setenv(TokenEnvVarName, TokenValue, /*overwrite=*/1);
+ auto EnvCleanup = MakeGuard([PrevEnvValue = std::string(PrevEnvPtr ? PrevEnvPtr : ""), HadPrevToken = PrevEnvPtr != nullptr] {
+ if (HadPrevToken)
+ {
+ setenv(TokenEnvVarName, PrevEnvValue.c_str(), /*overwrite=*/1);
+ }
+ else
+ {
+ unsetenv(TokenEnvVarName);
+ }
+ });
+# endif
+
+ consul::ConsulProcess ConsulProc;
+ ConsulProc.SpawnConsulAgent();
+
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady(
+ "--consul-endpoint=http://localhost:8500/ --instance-id=test-instance "
+ "--consul-token-env=ZEN_TEST_CONSUL_TOKEN");
REQUIRE(PortNumber != 0);
- consul::ConsulClient Client("http://localhost:8500/");
+ // Use a plain client -- dev-mode Consul doesn't enforce ACLs, but the
+ // server has exercised the ConsulTokenEnv -> GetEnvVariable -> ConsulClient path.
+ consul::ConsulClient Client({.BaseUri = "http://localhost:8500/"});
REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000));
@@ -300,20 +501,24 @@ TEST_CASE("hub.consul.provision.registration")
Instance.SpawnServerAndWaitUntilReady("--consul-endpoint=http://localhost:8500/ --instance-id=test-instance");
REQUIRE(PortNumber != 0);
- consul::ConsulClient Client("http://localhost:8500/");
+ consul::ConsulClient Client({.BaseUri = "http://localhost:8500/"});
REQUIRE(WaitForConsulService(Client, "zen-hub-test-instance", true, 5000));
- HttpClient HubClient(Instance.GetBaseUri() + "/hub/");
+ HttpClient HubClient(Instance.GetBaseUri() + "/hub/", kFastTimeout);
HttpClient::Response Result = HubClient.Post("modules/testmod/provision");
REQUIRE(Result);
- CHECK(Client.HasService("testmod"));
- {
- const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
- REQUIRE(ModulePort != 0);
+ // Service is registered in Consul during Provisioning (before the child process starts),
+ // so this returns as soon as the state transition fires, not when the server is ready.
+ REQUIRE(WaitForConsulService(Client, "testmod", true, 10000));
+ const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE(ModulePort != 0);
+
+ // Consul fields are set during Provisioning and can be verified before the server is ready.
+ {
std::string JsonError;
CbFieldIterator ServicesRoot = LoadCompactBinaryFromJson(Client.GetAgentServicesJson(), JsonError);
REQUIRE(JsonError.empty());
@@ -328,7 +533,7 @@ TEST_CASE("hub.consul.provision.registration")
}
REQUIRE(ServicesMap);
- // Verify fields registered by OnProvisioned
+ // Verify fields registered by OnModuleStateChanged
{
CbObjectView ModService = ServicesMap["testmod"].AsObjectView();
CHECK_EQ(ModService["ID"sv].AsString(), "testmod"sv);
@@ -365,10 +570,82 @@ TEST_CASE("hub.consul.provision.registration")
CHECK_EQ(HubService["Service"sv].AsString(), "zen-hub"sv);
CHECK_EQ(HubService["Port"sv].AsDouble(0), double(PortNumber));
}
+
+ // Verify hub health check endpoint URL (registered from startup with an active interval)
+ {
+ std::string ChecksJsonError;
+ CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), ChecksJsonError);
+ REQUIRE(ChecksJsonError.empty());
+
+ CbObjectView HubCheck;
+ for (CbFieldView F : ChecksRoot)
+ {
+ if (!F.IsObject())
+ {
+ continue;
+ }
+ for (CbFieldView C : F.AsObjectView())
+ {
+ CbObjectView Check = C.AsObjectView();
+ if (Check["ServiceID"sv].AsString() == "zen-hub-test-instance"sv)
+ {
+ HubCheck = Check;
+ }
+ }
+ }
+ REQUIRE(HubCheck);
+ // Consul does not reflect HTTP URL back in /v1/agent/checks for service-embedded checks.
+ CHECK_EQ(HubCheck["Type"sv].AsString(), "http"sv);
+ }
}
- Result = HubClient.Post("modules/testmod/deprovision");
- REQUIRE(Result);
+ // Wait for Provisioned before touching the module's HTTP endpoint.
+ REQUIRE(WaitForModuleState(HubClient, "testmod", "provisioned"));
+
+ // Verify module health check endpoint URL. No health check is registered during Provisioning
+ // (to avoid Consul marking the service critical before the child process is ready); it is added
+ // on transition to Provisioned.
+ {
+ std::string ChecksJsonError;
+ CbFieldIterator ChecksRoot = LoadCompactBinaryFromJson(Client.GetAgentChecksJson(), ChecksJsonError);
+ REQUIRE(ChecksJsonError.empty());
+
+ CbObjectView ModCheck;
+ for (CbFieldView F : ChecksRoot)
+ {
+ if (!F.IsObject())
+ {
+ continue;
+ }
+ for (CbFieldView C : F.AsObjectView())
+ {
+ CbObjectView Check = C.AsObjectView();
+ if (Check["ServiceID"sv].AsString() == "testmod"sv)
+ {
+ ModCheck = Check;
+ }
+ }
+ }
+ REQUIRE(ModCheck);
+ // Consul does not reflect HTTP URL back in /v1/agent/checks for service-embedded checks.
+ CHECK_EQ(ModCheck["Type"sv].AsString(), "http"sv);
+ }
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ {
+ Result = HubClient.Post("modules/testmod/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForConsulService(Client, "testmod", false, 10000));
+
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+ }
CHECK(!Client.HasService("testmod"));
@@ -377,6 +654,182 @@ TEST_CASE("hub.consul.provision.registration")
ConsulProc.StopConsulAgent();
}
+TEST_CASE("hub.hibernate.lifecycle")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6");
+ REQUIRE(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout);
+
+ // Provision
+ HttpClient::Response Result = Client.Post("modules/testmod/provision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
+ const uint16_t ModulePort = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE_NE(ModulePort, 0);
+
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+
+ // Write data to verify it survives the hibernate/wake cycle
+ Result = ModClient.Put("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567",
+ IoBufferBuilder::MakeFromMemory(MakeMemoryView("hibernatetest"sv)));
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Created);
+ }
+
+ // Hibernate - state should become "hibernated", server should be unreachable
+ Result = Client.Post("modules/testmod/hibernate");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
+
+ REQUIRE(WaitForModuleState(Client, "testmod", "hibernated"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(!ModClient.Get("/health/"));
+ }
+
+ // Wake - state should return to "provisioned", server should be reachable, data should be intact
+ Result = Client.Post("modules/testmod/wake");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
+
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+
+ Result = ModClient.Get("/z$/ns1/b/0123456789abcdef0123456789abcdef01234567");
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+ CHECK_EQ(Result.AsText(), "hibernatetest"sv);
+ }
+
+ // Deprovision - server should become unreachable
+ Result = Client.Post("modules/testmod/deprovision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ REQUIRE(WaitForModuleGone(Client, "testmod"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
+
+ // Re-provision - server should be reachable on its (potentially new) port
+ Result = ProvisionModule(Client, "testmod");
+ REQUIRE(Result);
+ CHECK_EQ(Result.AsObject()["moduleId"].AsString(), "testmod"sv);
+ const uint16_t ModulePort2 = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE_NE(ModulePort2, 0);
+ REQUIRE(WaitForModuleState(Client, "testmod", "provisioned"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout);
+ CHECK(ModClient.Get("/health/"));
+ }
+
+ // Final deprovision - server should become unreachable
+ Result = Client.Post("modules/testmod/deprovision");
+ REQUIRE(Result);
+ REQUIRE(WaitForModuleGone(Client, "testmod"));
+ {
+ HttpClient ModClient(fmt::format("http://localhost:{}", ModulePort2), kFastTimeout);
+ CHECK(WaitForPortUnreachable(ModClient));
+ }
+}
+
+TEST_CASE("hub.hibernate.errors")
+{
+ ZenServerInstance Instance(TestEnv, ZenServerInstance::ServerMode::kHubServer);
+ const uint16_t PortNumber = Instance.SpawnServerAndWaitUntilReady("--hub-instance-corelimit=2 --hub-instance-http-threads=6");
+ REQUIRE(PortNumber != 0);
+
+ HttpClient Client(Instance.GetBaseUri() + "/hub/", kFastTimeout);
+
+ // Hibernate/wake on an unknown module id should return 404
+ HttpClient::Response Result = Client.Post("modules/unknown/hibernate");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ Result = Client.Post("modules/unknown/wake");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ Result = Client.Post("modules/unknown/deprovision");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ Result = Client.Delete("modules/unknown");
+ CHECK(!Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::NotFound);
+
+ // Double-provision: second call while first is in-flight returns 202 Accepted with the same port.
+ Result = Client.Post("modules/errmod/provision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ const uint16_t ErrmodPort = Result.AsObject()["port"].AsUInt16(0);
+ REQUIRE_NE(ErrmodPort, 0);
+
+ // Provisioning the same module while in-flight returns 202 Accepted with the allocated port.
+ // Evaluated synchronously before WorkerPool dispatch, so safe regardless of timing.
+ Result = Client.Post("modules/errmod/provision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ CHECK_EQ(Result.AsObject()["port"].AsUInt16(0), ErrmodPort);
+
+ REQUIRE(WaitForModuleState(Client, "errmod", "provisioned"));
+
+ // Already provisioned: provision and wake both return 200 Completed.
+ Result = Client.Post("modules/errmod/provision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+
+ Result = Client.Post("modules/errmod/wake");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+
+ // Double-hibernate: second call while first is in-flight returns 202 Accepted.
+ Result = Client.Post("modules/errmod/hibernate");
+ REQUIRE(Result);
+
+ Result = Client.Post("modules/errmod/hibernate");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ REQUIRE(WaitForModuleState(Client, "errmod", "hibernated"));
+
+ // Already hibernated: hibernate returns 200 Completed.
+ Result = Client.Post("modules/errmod/hibernate");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::OK);
+
+ // Double-wake: second call while first is in-flight returns 202 Accepted.
+ Result = Client.Post("modules/errmod/wake");
+ REQUIRE(Result);
+
+ Result = Client.Post("modules/errmod/wake");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ // Double-deprovision: second call while first is in-flight returns 202 Accepted.
+ // errmod2 is a fresh module to avoid waiting on the still-waking errmod.
+ Result = Client.Post("modules/errmod2/provision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+ REQUIRE(WaitForModuleState(Client, "errmod2", "provisioned"));
+
+ Result = Client.Post("modules/errmod2/deprovision");
+ REQUIRE(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+
+ Result = Client.Post("modules/errmod2/deprovision");
+ CHECK(Result);
+ CHECK_EQ(Result.StatusCode, HttpResponseCode::Accepted);
+}
+
TEST_SUITE_END();
} // namespace zen::tests::hub