diff options
| author | Dan Engelbrecht <[email protected]> | 2026-05-04 22:37:09 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-05-04 22:37:09 +0200 |
| commit | ae745c4e90500f827ec83ce238a22d10ec1ea74d (patch) | |
| tree | d6e7e11f61ca5c3f54d3afb3fa23b44bb62e6c16 /src/zenserver | |
| parent | zenhttp improvements (robustness / correctness) (#968) (diff) | |
| download | archived-zen-ae745c4e90500f827ec83ce238a22d10ec1ea74d.tar.xz archived-zen-ae745c4e90500f827ec83ce238a22d10ec1ea74d.zip | |
watchdog ephemeral port exhaust (#1022)
- Improvement: Hub pools HTTP connections to managed instances so provision/deprovision churn no longer exhausts Windows ephemeral ports
Diffstat (limited to 'src/zenserver')
| -rw-r--r-- | src/zenserver/hub/hub.cpp | 57 | ||||
| -rw-r--r-- | src/zenserver/hub/hub.h | 21 |
2 files changed, 55 insertions, 23 deletions
diff --git a/src/zenserver/hub/hub.cpp b/src/zenserver/hub/hub.cpp index 4ae8d0457..b4e9de2f0 100644 --- a/src/zenserver/hub/hub.cpp +++ b/src/zenserver/hub/hub.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> +#include <zenhttp/httpclientshare.h> ZEN_THIRD_PARTY_INCLUDES_START #include <EASTL/fixed_vector.h> @@ -176,6 +177,7 @@ Hub::Hub(const Configuration& Config, ZenServerEnvironment&& RunEnvironment, Asy : m_Config(Config) , m_RunEnvironment(std::move(RunEnvironment)) , m_WorkerPool(Config.OptionalProvisionWorkerPool) +, m_InstanceClientShare(std::make_unique<HttpClientShare>()) , m_BackgroundWorkLatch(1) , m_ModuleStateChangeCallback(std::move(ModuleStateChangeCallback)) , m_ActiveInstances(Config.InstanceLimit) @@ -246,6 +248,13 @@ Hub::~Hub() } } +HttpClient +Hub::MakeInstanceClient(uint16_t Port, HttpClientSettings Settings) +{ + Settings.OptionalShare = m_InstanceClientShare.get(); + return HttpClient(fmt::format("http://localhost:{}", Port), Settings); +} + void Hub::Shutdown() { @@ -895,7 +904,7 @@ Hub::CompleteDeprovision(StorageServerInstance::ExclusiveLockedPtr& Instance, si ZEN_INFO("Triggering GC for module {}", ModuleId); Stopwatch GcTimer; - HttpClient GcClient(fmt::format("http://localhost:{}", Port)); + HttpClient GcClient = MakeInstanceClient(Port, HttpClientSettings{}); HttpClient::KeyValueMap Params; Params.Entries.insert({"smallobjects", "true"}); @@ -1709,9 +1718,15 @@ Hub::WatchDog() const uint64_t InstanceCheckThrottleMs = std::chrono::duration_cast<std::chrono::milliseconds>(m_Config.WatchDog.InstanceCheckThrottle).count(); + // RetryCount=1 absorbs RST-on-stale-conn after a port is recycled to a + // fresh instance: the share's connection cache cannot be invalidated + // per-port on deprovision, so the first request to a recycled port may + // fail mid-transfer if libcurl's liveness probe missed the RST. HttpClient ActivityCheckClient("http://localhost", HttpClientSettings{.ConnectTimeout = m_Config.WatchDog.ActivityCheckConnectTimeout, - .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout}, + .Timeout = m_Config.WatchDog.ActivityCheckRequestTimeout, + .RetryCount = 1, + .OptionalShare = m_InstanceClientShare.get()}, [&]() -> bool { return m_WatchDogEvent.Wait(0); }); size_t CheckInstanceIndex = SIZE_MAX; // first increment wraps to 0 @@ -1950,7 +1965,7 @@ TEST_CASE("hub.provision") CHECK_LE(InstanceInfo.StateChangeTime, std::chrono::system_clock::now()); { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -1969,7 +1984,7 @@ TEST_CASE("hub.provision") CHECK_FALSE(HubInstance->Find("module_a")); { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout); CHECK(!ModClient.Get("/health/")); } @@ -2031,7 +2046,7 @@ TEST_CASE("hub.provision_config") CHECK(TestResponse.AsObject()["ok"].AsBool()); { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -2041,7 +2056,7 @@ TEST_CASE("hub.provision_config") CHECK_FALSE(HubInstance->Find("module_a")); { - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout); CHECK(!ModClient.Get("/health/")); } } @@ -2308,7 +2323,7 @@ TEST_CASE("hub.hibernate_wake_obliterate") CHECK_NE(ProvisionedTime, std::chrono::system_clock::time_point::min()); CHECK_LE(ProvisionedTime, std::chrono::system_clock::now()); { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -2325,7 +2340,7 @@ TEST_CASE("hub.hibernate_wake_obliterate") const std::chrono::system_clock::time_point HibernatedTime = Info.StateChangeTime; CHECK_GE(HibernatedTime, ProvisionedTime); { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(!ModClient.Get("/health/")); } @@ -2341,7 +2356,7 @@ TEST_CASE("hub.hibernate_wake_obliterate") CHECK_EQ(Info.State, HubInstanceState::Provisioned); CHECK_GE(Info.StateChangeTime, HibernatedTime); { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -2369,7 +2384,7 @@ TEST_CASE("hub.hibernate_wake_obliterate") REQUIRE(HubInstance->Find("hib_a", &Info)); CHECK_EQ(Info.State, HubInstanceState::Provisioned); { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -2381,7 +2396,7 @@ TEST_CASE("hub.hibernate_wake_obliterate") CHECK_EQ(HubInstance->GetInstanceCount(), 0); CHECK_FALSE(HubInstance->Find("hib_a")); { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(!ModClient.Get("/health/")); } @@ -2460,7 +2475,7 @@ TEST_CASE("hub.async_hibernate_wake") REQUIRE_MESSAGE(Hibernated, "Instance did not reach Hibernated state within timeout"); } { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(!ModClient.Get("/health/")); } @@ -2484,7 +2499,7 @@ TEST_CASE("hub.async_hibernate_wake") REQUIRE_MESSAGE(Woken, "Instance did not reach Provisioned state after wake within timeout"); } { - HttpClient ModClient(fmt::format("http://localhost:{}", ProvInfo.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(ProvInfo.Port, kFastTimeout); CHECK(ModClient.Get("/health/")); } @@ -2533,7 +2548,7 @@ TEST_CASE("hub.recover_process_crash") const auto Deadline = std::chrono::steady_clock::now() + kTimeoutMs; // A successful HTTP health check on the same port confirms the new process is up. - HttpClient ModClient(fmt::format("http://localhost:{}", Info.Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Info.Port, kFastTimeout); bool Recovered = false; while (std::chrono::steady_clock::now() < Deadline) { @@ -2580,7 +2595,7 @@ TEST_CASE("hub.recover_process_crash") CHECK_MESSAGE(R.ResponseCode == Hub::EResponseCode::Completed, R.Message); } CHECK_NE(NewInfo.Port, 0); - HttpClient NewClient(fmt::format("http://localhost:{}", NewInfo.Port), kFastTimeout); + HttpClient NewClient = HubInstance->MakeInstanceClient(NewInfo.Port, kFastTimeout); CHECK_MESSAGE(NewClient.Get("/health/"), "Re-provisioned instance is not serving requests"); } @@ -2654,7 +2669,7 @@ TEST_CASE("hub.async_provision_concurrent") for (int I = 0; I < kModuleCount; ++I) { - HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Infos[I].Port, kFastTimeout); CHECK_MESSAGE(ModClient.Get("/health/"), fmt::format("async_c{} not serving requests", I)); } @@ -2695,7 +2710,7 @@ TEST_CASE("hub.async_provision_shutdown_waits") for (int I = 0; I < kModuleCount; ++I) { - HttpClient ModClient(fmt::format("http://localhost:{}", Infos[I].Port), kFastTimeout); + HttpClient ModClient = HubInstance->MakeInstanceClient(Infos[I].Port, kFastTimeout); CHECK_FALSE(ModClient.Get("/health/")); } } @@ -2779,10 +2794,10 @@ TEST_CASE("hub.instance.inactivity.deprovision") // The watchdog detects the changed sum on the next cycle and resets LastActivityTime. // Per-attempt connect is kept tight (200ms) so a genuinely dead endpoint fails fast; // RetryCount=3 absorbs transient localhost-accept slowness on loaded CI runners. - HttpClient PersistentClient(fmt::format("http://localhost:{}", Port), - HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3}); - uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - - std::chrono::steady_clock::time_point::min()) + HttpClient PersistentClient = + HubInstance->MakeInstanceClient(Port, HttpClientSettings{.ConnectTimeout = std::chrono::milliseconds(200), .RetryCount = 3}); + uint64_t Tick = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::steady_clock::now() - + std::chrono::steady_clock::time_point::min()) .count(); IoHash Key = IoHash::HashBuffer(&Tick, sizeof(Tick)); const HttpClient::Response PutResult = diff --git a/src/zenserver/hub/hub.h b/src/zenserver/hub/hub.h index 1ce9bc876..fa504de33 100644 --- a/src/zenserver/hub/hub.h +++ b/src/zenserver/hub/hub.h @@ -23,6 +23,8 @@ namespace zen { class HttpClient; +struct HttpClientSettings; +class HttpClientShare; class WorkerThreadPool; /** @@ -191,6 +193,15 @@ public: const Configuration& GetConfig() const { return m_Config; } + /** + * Construct a sync HttpClient targeting a hub-managed instance on + * localhost. Routes through m_InstanceClientShare so DNS and keep-alive + * TCP connections are reused across all hub->instance traffic, avoiding + * Windows ephemeral-port exhaustion under provision/deprovision churn. + * Settings.OptionalShare is unconditionally overwritten. + */ + HttpClient MakeInstanceClient(uint16_t Port, HttpClientSettings Settings); + #if ZEN_WITH_TESTS void TerminateModuleForTesting(const std::string& ModuleId); #endif @@ -199,8 +210,14 @@ private: const Configuration m_Config; ZenServerEnvironment m_RunEnvironment; WorkerThreadPool* m_WorkerPool = nullptr; - Latch m_BackgroundWorkLatch; - std::atomic<bool> m_ShutdownFlag = false; + // Declared early so it destructs late: every HttpClient referencing the + // share (watchdog ActivityCheckClient, GC client, in-flight worker + // tasks) is required to be gone before this member runs its dtor. + // Hub::Shutdown enforces that by joining the watchdog and draining + // background work before Hub destruction begins. + std::unique_ptr<HttpClientShare> m_InstanceClientShare; + Latch m_BackgroundWorkLatch; + std::atomic<bool> m_ShutdownFlag = false; AsyncModuleStateChangeCallbackFunc m_ModuleStateChangeCallback; |