aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-05-04 22:37:09 +0200
committerGitHub Enterprise <[email protected]>2026-05-04 22:37:09 +0200
commitae745c4e90500f827ec83ce238a22d10ec1ea74d (patch)
treed6e7e11f61ca5c3f54d3afb3fa23b44bb62e6c16 /src/zenserver
parentzenhttp improvements (robustness / correctness) (#968) (diff)
downloadarchived-zen-ae745c4e90500f827ec83ce238a22d10ec1ea74d.tar.xz
archived-zen-ae745c4e90500f827ec83ce238a22d10ec1ea74d.zip
watchdog ephemeral port exhaust (#1022)
- Improvement: Hub pools HTTP connections to managed instances so provision/deprovision churn no longer exhausts Windows ephemeral ports
Diffstat (limited to 'src/zenserver')
-rw-r--r--src/zenserver/hub/hub.cpp57
-rw-r--r--src/zenserver/hub/hub.h21
2 files changed, 55 insertions, 23 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp
index 4ae8d0457..b4e9de2f0 100644
--- a/src/zenserver/hub/hub.cpp
+++ b/src/zenserver/hub/hub.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
+#include <zenhttp/httpclientshare.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <EASTL/fixed_vector.h>
@@ -176,6 +177,7 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy
: m_Config(Config)
, m_RunEnvironment(std::move(RunEnvironment))
, m_WorkerPool(Config.OptionalProvisionWorkerPool)
+, m_InstanceClientShare(std::make_unique<HttpClientShare>())
, m_BackgroundWorkLatch(1)
, m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback))
, m_ActiveInstances(Config.InstanceLimit)
@@ -246,6 +248,13 @@ Hub::~Hub()
}
}
+HttpClient
+Hub::MakeInstanceClient(uint16_t Port, HttpClientSettings Settings)
+{
+ Settings.OptionalShare = m_InstanceClientShare.get();
+ return HttpClient(fmt::format("http://localhost:{}", Port), Settings);
+}
+
void
Hub::Shutdown()
{
@@ -895,7 +904,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si
ZEN_INFO("Triggering GC for module {}", ModuleId);
Stopwatch GcTimer;
- HttpClient GcClient(fmt::format("http://localhost:{}", Port));
+ HttpClient GcClient = MakeInstanceClient(Port, HttpClientSettings{});
HttpClient::KeyValueMap Params;
Params.Entries.insert({"smallobjects", "true"});
@@ -1709,9 +1718,15 @@ Hub::WatchDog()
const uint64_t InstanceCheckThrottleMs =
std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count();
+ // RetryCount=1 absorbs RST-on-stale-conn after a port is recycled to a
+ // fresh instance: the share's connection cache cannot be invalidated
+ // per-port on deprovision, so the first request to a recycled port may
+ // fail mid-transfer if libcurl's liveness probe missed the RST.
HttpClient ActivityCheckClient("http://localhost",
HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout,
- .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout},
+ .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout,
+ .RetryCount = 1,
+ .OptionalShare = m_InstanceClientShare.get()},
[&]() -> bool { return m_WatchDogEvent.Wait(0); });
size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0
@@ -1950,7 +1965,7 @@ TEST_CASE("hub.provision")
CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now());
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -1969,7 +1984,7 @@ TEST_CASE("hub.provision")
CHECK_FALSE(HubInstance->Find("module_a"));
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
@@ -2031,7 +2046,7 @@ TEST_CASE("hub.provision_config")
CHECK(TestResponse.AsObject()["ok"].AsBool());
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -2041,7 +2056,7 @@ TEST_CASE("hub.provision_config")
CHECK_FALSE(HubInstance->Find("module_a"));
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
}
@@ -2308,7 +2323,7 @@ TEST_CASE("hub.hibernate_wake_obliterate")
CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min());
CHECK_LE(ProvisionedTime, std::chrono::system_clock::now());
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -2325,7 +2340,7 @@ TEST_CASE("hub.hibernate_wake_obliterate")
const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime;
CHECK_GE(HibernatedTime, ProvisionedTime);
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
@@ -2341,7 +2356,7 @@ TEST_CASE("hub.hibernate_wake_obliterate")
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
CHECK_GE(Info.StateChangeTime, HibernatedTime);
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -2369,7 +2384,7 @@ TEST_CASE("hub.hibernate_wake_obliterate")
REQUIRE(HubInstance->Find("hib_a", &Info));
CHECK_EQ(Info.State, HubInstanceState::Provisioned);
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -2381,7 +2396,7 @@ TEST_CASE("hub.hibernate_wake_obliterate")
CHECK_EQ(HubInstance->GetInstanceCount(), 0);
CHECK_FALSE(HubInstance->Find("hib_a"));
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
@@ -2460,7 +2475,7 @@ TEST_CASE("hub.async_hibernate_wake")
REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout");
}
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(!ModClient.Get("/health/"));
}
@@ -2484,7 +2499,7 @@ TEST_CASE("hub.async_hibernate_wake")
REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout");
}
{
- HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout);
CHECK(ModClient.Get("/health/"));
}
@@ -2533,7 +2548,7 @@ TEST_CASE("hub.recover_process_crash")
const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs;
// A successful HTTP health check on the same port confirms the new process is up.
- HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout);
bool Recovered = false;
while (std::chrono::steady_clock::now() < Deadline)
{
@@ -2580,7 +2595,7 @@ TEST_CASE("hub.recover_process_crash")
CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message);
}
CHECK_NE(NewInfo.Port, 0);
- HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout);
+ HttpClient NewClient = HubInstance->MakeInstanceClient(NewInfo.Port, kFastTimeout);
CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests");
}
@@ -2654,7 +2669,7 @@ TEST_CASE("hub.async_provision_concurrent")
for (int I = 0; I < kModuleCount; ++I)
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Infos[I].Port, kFastTimeout);
CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I));
}
@@ -2695,7 +2710,7 @@ TEST_CASE("hub.async_provision_shutdown_waits")
for (int I = 0; I < kModuleCount; ++I)
{
- HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout);
+ HttpClient ModClient = HubInstance->MakeInstanceClient(Infos[I].Port, kFastTimeout);
CHECK_FALSE(ModClient.Get("/health/"));
}
}
@@ -2779,10 +2794,10 @@ TEST_CASE("hub.instance.inactivity.deprovision")
// The watchdog detects the changed sum on the next cycle and resets LastActivityTime.
// Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast;
// RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners.
- HttpClient PersistentClient(fmt::format("http://localhost:{}", Port),
- HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3});
- uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
- std::chrono::steady_clock::time_point::min())
+ HttpClient PersistentClient =
+ HubInstance->MakeInstanceClient(Port, HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3});
+ uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() -
+ std::chrono::steady_clock::time_point::min())
.count();
IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick));
const HttpClient::Response PutResult =
diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h
index 1ce9bc876..fa504de33 100644
--- a/src/zenserver/hub/hub.h
+++ b/src/zenserver/hub/hub.h
@@ -23,6 +23,8 @@
namespace zen {
class HttpClient;
+struct HttpClientSettings;
+class HttpClientShare;
class WorkerThreadPool;
/**
@@ -191,6 +193,15 @@ public:
const Configuration& GetConfig() const { return m_Config; }
+ /**
+ * Construct a sync HttpClient targeting a hub-managed instance on
+ * localhost. Routes through m_InstanceClientShare so DNS and keep-alive
+ * TCP connections are reused across all hub->instance traffic, avoiding
+ * Windows ephemeral-port exhaustion under provision/deprovision churn.
+ * Settings.OptionalShare is unconditionally overwritten.
+ */
+ HttpClient MakeInstanceClient(uint16_t Port, HttpClientSettings Settings);
+
#if ZEN_WITH_TESTS
void TerminateModuleForTesting(const std::string& ModuleId);
#endif
@@ -199,8 +210,14 @@ private:
const Configuration m_Config;
ZenServerEnvironment m_RunEnvironment;
WorkerThreadPool* m_WorkerPool = nullptr;
- Latch m_BackgroundWorkLatch;
- std::atomic<bool> m_ShutdownFlag = false;
+ // Declared early so it destructs late: every HttpClient referencing the
+ // share (watchdog ActivityCheckClient, GC client, in-flight worker
+ // tasks) is required to be gone before this member runs its dtor.
+ // Hub::Shutdown enforces that by joining the watchdog and draining
+ // background work before Hub destruction begins.
+ std::unique_ptr<HttpClientShare> m_InstanceClientShare;
+ Latch m_BackgroundWorkLatch;
+ std::atomic<bool> m_ShutdownFlag = false;
AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback;