aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/hub/zenhubserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/hub/zenhubserver.cpp')
-rw-r--r--src/zenserver/hub/zenhubserver.cpp483
1 files changed, 446 insertions, 37 deletions
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp
index b36a0778e..a32c1f1d1 100644
--- a/src/zenserver/hub/zenhubserver.cpp
+++ b/src/zenserver/hub/zenhubserver.cpp
@@ -2,17 +2,26 @@
#include "zenhubserver.h"
+#include "config/luaconfig.h"
#include "frontend/frontend.h"
#include "httphubservice.h"
+#include "httpproxyhandler.h"
#include "hub.h"
+#include <zencore/compactbinary.h>
#include <zencore/config.h>
+#include <zencore/except.h>
+#include <zencore/except_fmt.h>
+#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
+#include <zencore/intmath.h>
#include <zencore/memory/llm.h>
#include <zencore/memory/memorytrace.h>
#include <zencore/memory/tagtrace.h>
#include <zencore/scopeguard.h>
#include <zencore/sentryintegration.h>
+#include <zencore/system.h>
+#include <zencore/thread.h>
#include <zencore/windows.h>
#include <zenhttp/httpapiservice.h>
#include <zenutil/service.h>
@@ -23,6 +32,13 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+const std::string&
+GetDefaultConsulTokenEnvVariableName()
+{
+ static const std::string Name = "CONSUL_HTTP_TOKEN";
+ return Name;
+}
+
void
ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
{
@@ -45,12 +61,19 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
"instance-id",
- "Instance ID for use in notifications",
+ "Instance ID for use in notifications (deprecated, use --upstream-notification-instance-id)",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
Options.add_option("hub",
"",
+ "upstream-notification-instance-id",
+ "Instance ID for use in notifications",
+ cxxopts::value<std::string>(m_ServerOptions.InstanceId),
+ "");
+
+ Options.add_option("hub",
+ "",
"consul-endpoint",
"Consul endpoint URL for service registration (empty = disabled)",
cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""),
@@ -58,13 +81,49 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("hub",
"",
+ "consul-token-env",
+ fmt::format("Name of environment variable that holds the consul access token (defaults to '{}')",
+ GetDefaultConsulTokenEnvVariableName()),
+ cxxopts::value<std::string>(m_ServerOptions.ConsulTokenEnv)->default_value(""),
+ "<envvariable>");
+
+ Options.add_option("hub",
+ "",
+ "consul-health-interval-seconds",
+ "Interval in seconds between Consul health checks",
+ cxxopts::value<uint32_t>(m_ServerOptions.ConsulHealthIntervalSeconds)->default_value("10"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "consul-deregister-after-seconds",
+ "Seconds after which Consul deregisters an unhealthy service",
+ cxxopts::value<uint32_t>(m_ServerOptions.ConsulDeregisterAfterSeconds)->default_value("30"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "consul-register-hub",
+ "Register the hub parent service with Consul (instance registration is unaffected)",
+ cxxopts::value<bool>(m_ServerOptions.ConsulRegisterHub)->default_value("true"),
+ "");
+
+ Options.add_option("hub",
+ "",
"hub-base-port-number",
- "Base port number for provisioned instances",
+ "Base port number for provisioned instances (deprecated, use --hub-instance-base-port-number)",
cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"),
"");
Options.add_option("hub",
"",
+ "hub-instance-base-port-number",
+ "Base port number for provisioned instances",
+ cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber),
+ "");
+
+ Options.add_option("hub",
+ "",
"hub-instance-limit",
"Maximum number of provisioned instances for this hub",
cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"),
@@ -101,6 +160,42 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value(m_ServerOptions.HubInstanceConfigPath),
"<instance config>");
+ const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option("hub",
+ "",
+ "hub-instance-provision-threads",
+ fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount)
+ ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)),
+ "<threads>");
+
+ Options.add_option("hub",
+ "",
+ "hub-hydration-target-spec",
+ "Specification for hydration target. 'file://<path>' prefix indicates file storage at <path>. Defaults to "
+ "<data-dir>/servers/hydration_storage",
+ cxxopts::value(m_ServerOptions.HydrationTargetSpecification),
+ "<hydration-target-spec>");
+
+ Options.add_option("hub",
+ "",
+ "hub-hydration-target-config",
+ "Path to JSON file specifying the hydration target (mutually exclusive with "
+ "--hub-hydration-target-spec). Supported types: 'file', 's3'.",
+ cxxopts::value(m_ServerOptions.HydrationTargetConfigPath),
+ "<path>");
+
+ const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u);
+
+ Options.add_option(
+ "hub",
+ "",
+ "hub-hydration-threads",
+ fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount),
+ cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)),
+ "<threads>");
+
#if ZEN_PLATFORM_WINDOWS
Options.add_option("hub",
"",
@@ -109,12 +204,164 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"),
"");
#endif // ZEN_PLATFORM_WINDOWS
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-interval-ms",
+ "Interval between watchdog cycles in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-cycle-processing-budget-ms",
+ "Maximum processing time budget per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs)->default_value("500"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-instance-check-throttle-ms",
+ "Delay between checking successive instances per watchdog cycle in milliseconds",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs)->default_value("5"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-provisioned-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a provisioned instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds)->default_value("600"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-hibernated-inactivity-timeout-seconds",
+ "Seconds of inactivity after which a hibernated instance is deprovisioned",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds)->default_value("1800"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-inactivity-check-margin-seconds",
+ "Margin in seconds subtracted from inactivity timeout before triggering an activity check",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds)->default_value("60"),
+ "<seconds>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-connect-timeout-ms",
+ "Connect timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs)->default_value("100"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-watchdog-activity-check-request-timeout-ms",
+ "Request timeout in milliseconds for instance activity check requests",
+ cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"),
+ "<ms>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-disk-limit-bytes",
+ "Reject provisioning when used disk bytes exceed this value (0 = no limit).",
+ cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionDiskLimitBytes),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-disk-limit-percent",
+ "Reject provisioning when used disk exceeds this percentage of total disk (0 = no limit).",
+ cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionDiskLimitPercent),
+ "<percent>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-memory-limit-bytes",
+ "Reject provisioning when used memory bytes exceed this value (0 = no limit).",
+ cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionMemoryLimitBytes),
+ "<bytes>");
+
+ Options.add_option("hub",
+ "",
+ "hub-provision-memory-limit-percent",
+ "Reject provisioning when used memory exceeds this percentage of total RAM (0 = no limit).",
+ cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionMemoryLimitPercent),
+ "<percent>");
}
void
ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options)
{
- ZEN_UNUSED(Options);
+ using namespace std::literals;
+
+ Options.AddOption("hub.upstreamnotification.endpoint"sv,
+ m_ServerOptions.UpstreamNotificationEndpoint,
+ "upstream-notification-endpoint"sv);
+ Options.AddOption("hub.upstreamnotification.instanceid"sv, m_ServerOptions.InstanceId, "upstream-notification-instance-id"sv);
+
+ Options.AddOption("hub.consul.endpoint"sv, m_ServerOptions.ConsulEndpoint, "consul-endpoint"sv);
+ Options.AddOption("hub.consul.tokenenv"sv, m_ServerOptions.ConsulTokenEnv, "consul-token-env"sv);
+ Options.AddOption("hub.consul.healthintervalseconds"sv,
+ m_ServerOptions.ConsulHealthIntervalSeconds,
+ "consul-health-interval-seconds"sv);
+ Options.AddOption("hub.consul.deregisterafterseconds"sv,
+ m_ServerOptions.ConsulDeregisterAfterSeconds,
+ "consul-deregister-after-seconds"sv);
+ Options.AddOption("hub.consul.registerhub"sv, m_ServerOptions.ConsulRegisterHub, "consul-register-hub"sv);
+
+ Options.AddOption("hub.instance.baseportnumber"sv, m_ServerOptions.HubBasePortNumber, "hub-instance-base-port-number"sv);
+ Options.AddOption("hub.instance.http"sv, m_ServerOptions.HubInstanceHttpClass, "hub-instance-http"sv);
+ Options.AddOption("hub.instance.httpthreads"sv, m_ServerOptions.HubInstanceHttpThreadCount, "hub-instance-http-threads"sv);
+ Options.AddOption("hub.instance.corelimit"sv, m_ServerOptions.HubInstanceCoreLimit, "hub-instance-corelimit"sv);
+ Options.AddOption("hub.instance.config"sv, m_ServerOptions.HubInstanceConfigPath, "hub-instance-config"sv);
+ Options.AddOption("hub.instance.limits.count"sv, m_ServerOptions.HubInstanceLimit, "hub-instance-limit"sv);
+ Options.AddOption("hub.instance.limits.disklimitbytes"sv,
+ m_ServerOptions.HubProvisionDiskLimitBytes,
+ "hub-provision-disk-limit-bytes"sv);
+ Options.AddOption("hub.instance.limits.disklimitpercent"sv,
+ m_ServerOptions.HubProvisionDiskLimitPercent,
+ "hub-provision-disk-limit-percent"sv);
+ Options.AddOption("hub.instance.limits.memorylimitbytes"sv,
+ m_ServerOptions.HubProvisionMemoryLimitBytes,
+ "hub-provision-memory-limit-bytes"sv);
+ Options.AddOption("hub.instance.limits.memorylimitpercent"sv,
+ m_ServerOptions.HubProvisionMemoryLimitPercent,
+ "hub-provision-memory-limit-percent"sv);
+ Options.AddOption("hub.instance.provisionthreads"sv,
+ m_ServerOptions.HubInstanceProvisionThreadCount,
+ "hub-instance-provision-threads"sv);
+
+ Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv);
+ Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv);
+ Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv);
+
+ Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv);
+ Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv,
+ m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs,
+ "hub-watchdog-cycle-processing-budget-ms"sv);
+ Options.AddOption("hub.watchdog.instancecheckthrottlems"sv,
+ m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs,
+ "hub-watchdog-instance-check-throttle-ms"sv);
+ Options.AddOption("hub.watchdog.provisionedinactivitytimeoutseconds"sv,
+ m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds,
+ "hub-watchdog-provisioned-inactivity-timeout-seconds"sv);
+ Options.AddOption("hub.watchdog.hibernatedinactivitytimeoutseconds"sv,
+ m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds,
+ "hub-watchdog-hibernated-inactivity-timeout-seconds"sv);
+ Options.AddOption("hub.watchdog.inactivitycheckmarginseconds"sv,
+ m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds,
+ "hub-watchdog-inactivity-check-margin-seconds"sv);
+ Options.AddOption("hub.watchdog.activitycheckconnecttimeoutms"sv,
+ m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs,
+ "hub-watchdog-activity-check-connect-timeout-ms"sv);
+ Options.AddOption("hub.watchdog.activitycheckrequesttimeoutms"sv,
+ m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs,
+ "hub-watchdog-activity-check-request-timeout-ms"sv);
+
+#if ZEN_PLATFORM_WINDOWS
+ Options.AddOption("hub.usejobobject"sv, m_ServerOptions.HubUseJobObject, "hub-use-job-object"sv);
+#endif
}
void
@@ -132,6 +379,28 @@ ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
void
ZenHubServerConfigurator::ValidateOptions()
{
+ if (m_ServerOptions.HubProvisionDiskLimitPercent > 100)
+ {
+ throw OptionParseException(
+ fmt::format("'--hub-provision-disk-limit-percent' ({}) must be in range 0..100", m_ServerOptions.HubProvisionDiskLimitPercent),
+ {});
+ }
+ if (m_ServerOptions.HubProvisionMemoryLimitPercent > 100)
+ {
+ throw OptionParseException(fmt::format("'--hub-provision-memory-limit-percent' ({}) must be in range 0..100",
+ m_ServerOptions.HubProvisionMemoryLimitPercent),
+ {});
+ }
+ if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty())
+ {
+ throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {});
+ }
+ if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath))
+ {
+ throw OptionParseException(
+ fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()),
+ {});
+ }
}
///////////////////////////////////////////////////////////////////////////
@@ -146,21 +415,40 @@ ZenHubServer::~ZenHubServer()
}
void
-ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
+ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId,
+ std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState)
{
- if (m_ConsulClient)
+ ZEN_UNUSED(PreviousState);
+
+ if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating)
+ {
+ if (Info.Port != 0)
+ {
+ m_Proxy->PrunePort(Info.Port);
+ }
+ }
+
+ if (!m_ConsulClient)
+ {
+ return;
+ }
+
+ if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned)
{
consul::ServiceRegistrationInfo ServiceInfo{
- .ServiceId = std::string(ModuleId),
- .ServiceName = "zen-storage",
- // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri?
+ .ServiceId = std::string(ModuleId),
+ .ServiceName = "zen-storage",
.Port = Info.Port,
.HealthEndpoint = "health",
.Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)),
std::make_pair("zen-hub", std::string(HubInstanceId)),
std::make_pair("version", std::string(ZEN_CFG_VERSION))},
- .HealthIntervalSeconds = 10,
- .DeregisterAfterSeconds = 30};
+ .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulHealthIntervalSeconds,
+ .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulDeregisterAfterSeconds,
+ .InitialStatus = NewState == HubInstanceState::Provisioned ? "passing" : ""};
if (!m_ConsulClient->RegisterService(ServiceInfo))
{
@@ -174,13 +462,7 @@ ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view Mod
ServiceInfo.ServiceName);
}
}
-}
-
-void
-ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info)
-{
- ZEN_UNUSED(HubInstanceId);
- if (m_ConsulClient)
+ else if (NewState == HubInstanceState::Unprovisioned)
{
if (!m_ConsulClient->DeregisterService(ModuleId))
{
@@ -193,6 +475,8 @@ ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view M
ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port);
}
}
+ // Transitional states (Waking, Recovering, Crashed) and stable states
+ // not handled above (Hibernated) are intentionally ignored by Consul.
}
int
@@ -214,6 +498,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState:
// the main test range.
ZenServerEnvironment::SetBaseChildId(1000);
+ m_ProvisionWorkerPool =
+ std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision");
+ m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration");
+
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
InitializeState(ServerConfig);
@@ -245,10 +533,21 @@ ZenHubServer::Cleanup()
m_Http->Close();
}
+ if (m_Proxy)
+ {
+ m_Proxy->Shutdown();
+ }
+
+ if (m_Hub)
+ {
+ m_Hub->Shutdown();
+ }
+
m_FrontendService.reset();
m_HubService.reset();
m_ApiService.reset();
m_Hub.reset();
+ m_Proxy.reset();
m_ConsulRegistration.reset();
m_ConsulClient.reset();
@@ -265,40 +564,120 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig)
ZEN_UNUSED(ServerConfig);
}
+ResourceMetrics
+ZenHubServer::ResolveLimits(const ZenHubServerConfig& ServerConfig)
+{
+ uint64_t DiskTotal = 0;
+ uint64_t MemoryTotal = 0;
+
+ if (ServerConfig.HubProvisionDiskLimitPercent > 0)
+ {
+ DiskSpace Disk;
+ if (DiskSpaceInfo(ServerConfig.DataDir, Disk))
+ {
+ DiskTotal = Disk.Total;
+ }
+ else
+ {
+ ZEN_WARN("Failed to query disk space for '{}'; disk percent limit will not be applied", ServerConfig.DataDir);
+ }
+ }
+ if (ServerConfig.HubProvisionMemoryLimitPercent > 0)
+ {
+ MemoryTotal = GetSystemMetrics().SystemMemoryMiB * 1024 * 1024;
+ }
+
+ auto Resolve = [](uint64_t Bytes, uint32_t Pct, uint64_t Total) -> uint64_t {
+ const uint64_t PctBytes = Pct > 0 ? (Total * Pct) / 100 : 0;
+ if (Bytes > 0 && PctBytes > 0)
+ {
+ return Min(Bytes, PctBytes);
+ }
+ return Bytes > 0 ? Bytes : PctBytes;
+ };
+
+ return {
+ .DiskUsageBytes = Resolve(ServerConfig.HubProvisionDiskLimitBytes, ServerConfig.HubProvisionDiskLimitPercent, DiskTotal),
+ .MemoryUsageBytes = Resolve(ServerConfig.HubProvisionMemoryLimitBytes, ServerConfig.HubProvisionMemoryLimitPercent, MemoryTotal),
+ };
+}
+
void
ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig)
{
- ZEN_UNUSED(ServerConfig);
-
ZEN_INFO("instantiating Hub");
+ Hub::Configuration HubConfig{
+ .UseJobObject = ServerConfig.HubUseJobObject,
+ .BasePortNumber = ServerConfig.HubBasePortNumber,
+ .InstanceLimit = ServerConfig.HubInstanceLimit,
+ .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
+ .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
+ .InstanceConfigPath = ServerConfig.HubInstanceConfigPath,
+ .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification,
+ .WatchDog =
+ {
+ .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs),
+ .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs),
+ .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs),
+ .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds),
+ .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds),
+ .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds),
+ .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs),
+ .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs),
+ },
+ .ResourceLimits = ResolveLimits(ServerConfig),
+ .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(),
+ .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()};
+
+ if (!ServerConfig.HydrationTargetConfigPath.empty())
+ {
+ FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath);
+ if (!Contents)
+ {
+ throw zen::runtime_error("Failed to read hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ Contents.ErrorCode.message());
+ }
+ IoBuffer Buffer(Contents.Flatten());
+ std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize());
+
+ std::string ParseError;
+ CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError);
+ if (!ParseError.empty() || !Root.IsObject())
+ {
+ throw zen::runtime_error("Failed to parse hydration config '{}': {}",
+ ServerConfig.HydrationTargetConfigPath.string(),
+ ParseError.empty() ? "root must be a JSON object" : ParseError);
+ }
+ HubConfig.HydrationOptions = std::move(Root).AsObject();
+ }
+
+ m_Proxy = std::make_unique<HttpProxyHandler>();
+
m_Hub = std::make_unique<Hub>(
- Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject,
- .BasePortNumber = ServerConfig.HubBasePortNumber,
- .InstanceLimit = ServerConfig.HubInstanceLimit,
- .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount,
- .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit,
- .InstanceConfigPath = ServerConfig.HubInstanceConfigPath},
+ std::move(HubConfig),
ZenServerEnvironment(ZenServerEnvironment::Hub,
ServerConfig.DataDir / "hub",
ServerConfig.DataDir / "servers",
ServerConfig.HubInstanceHttpClass),
- m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
- std::string_view ModuleId,
- const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); }
- : Hub::ProvisionModuleCallbackFunc{},
- m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](
- std::string_view ModuleId,
- const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); }
- : Hub::ProvisionModuleCallbackFunc{});
+ Hub::AsyncModuleStateChangeCallbackFunc{
+ [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId,
+ const HubProvisionedInstanceInfo& Info,
+ HubInstanceState PreviousState,
+ HubInstanceState NewState) {
+ OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState);
+ }});
+
+ m_Proxy->SetPortValidator([Hub = m_Hub.get()](uint16_t Port) { return Hub->IsInstancePort(Port); });
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
ZEN_INFO("instantiating hub service");
- m_HubService = std::make_unique<HttpHubService>(*m_Hub);
+ m_HubService = std::make_unique<HttpHubService>(*m_Hub, *m_Proxy, m_StatsService, m_StatusService);
m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId);
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
}
void
@@ -333,22 +712,52 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi
ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint);
+ std::string ConsulAccessTokenEnvName =
+ ServerConfig.ConsulTokenEnv.empty() ? GetDefaultConsulTokenEnvVariableName() : ServerConfig.ConsulTokenEnv;
+ std::string ConsulAccessToken = GetEnvVariable(ConsulAccessTokenEnvName);
+ if (ConsulAccessToken.empty())
+ {
+ if (!ServerConfig.ConsulTokenEnv.empty())
+ {
+ ZEN_WARN("Consul token environment variable '{}' is not set or empty", ServerConfig.ConsulTokenEnv);
+ }
+ }
+ else
+ {
+ ZEN_INFO("Consul token will be read from environment variable '{}'", ConsulAccessTokenEnvName);
+ }
+
try
{
- m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint);
+ m_ConsulClient = std::make_unique<consul::ConsulClient>(consul::ConsulClient::Configuration{
+ .BaseUri = ServerConfig.ConsulEndpoint,
+ .TokenEnvName = ConsulAccessTokenEnvName,
+ });
+ m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds;
+ m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds;
+
+ if (!ServerConfig.ConsulRegisterHub)
+ {
+ ZEN_INFO(
+ "Hub parent Consul registration skipped (consul-register-hub is false); "
+ "instance registration remains enabled");
+ return;
+ }
consul::ServiceRegistrationInfo Info;
Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId);
Info.ServiceName = "zen-hub";
// Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri?
Info.Port = static_cast<uint16_t>(EffectivePort);
- Info.HealthEndpoint = "hub/health";
+ Info.HealthEndpoint = "health";
Info.Tags = std::vector<std::pair<std::string, std::string>>{
std::make_pair("zen-hub", Info.ServiceId),
std::make_pair("version", std::string(ZEN_CFG_VERSION)),
std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)),
std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)),
std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))};
+ Info.HealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds;
+ Info.DeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds;
m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info);