diff options
Diffstat (limited to 'src/zenserver/hub/zenhubserver.cpp')
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 314 |
1 files changed, 253 insertions, 61 deletions
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index 499586abc..ebc2cf2f1 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -2,8 +2,10 @@ #include "zenhubserver.h" +#include "config/luaconfig.h" #include "frontend/frontend.h" #include "httphubservice.h" +#include "httpproxyhandler.h" #include "hub.h" #include <zencore/compactbinary.h> @@ -12,16 +14,17 @@ #include <zencore/except_fmt.h> #include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/intmath.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> #include <zencore/memory/tagtrace.h> #include <zencore/scopeguard.h> #include <zencore/sentryintegration.h> #include <zencore/system.h> +#include <zencore/thread.h> #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> -#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <cxxopts.hpp> @@ -58,12 +61,19 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", "instance-id", - "Instance ID for use in notifications", + "Instance ID for use in notifications (deprecated, use --upstream-notification-instance-id)", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); Options.add_option("hub", "", + "upstream-notification-instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId), + ""); + + Options.add_option("hub", + "", "consul-endpoint", "Consul endpoint URL for service registration (empty = disabled)", cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), @@ -93,13 +103,27 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "consul-register-hub", + "Register the hub parent service with Consul (instance registration is unaffected)", + cxxopts::value<bool>(m_ServerOptions.ConsulRegisterHub)->default_value("true"), + ""); + + Options.add_option("hub", + "", "hub-base-port-number", - "Base port number for provisioned instances", + "Base port number for provisioned instances (deprecated, use --hub-instance-base-port-number)", cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), ""); Options.add_option("hub", "", + "hub-instance-base-port-number", + "Base port number for provisioned instances", + cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber), + ""); + + Options.add_option("hub", + "", "hub-instance-limit", "Maximum number of provisioned instances for this hub", cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"), @@ -118,6 +142,34 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "hub-instance-malloc", + "Select memory allocator for provisioned instances (ansi|stomp|rpmalloc|mimalloc)", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceMalloc)->default_value(""), + "<allocator>"); + + Options.add_option("hub", + "", + "hub-instance-trace", + "Trace channel specification for provisioned instances (e.g. default, cpu,log, memory)", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTrace)->default_value(""), + "<channels>"); + + Options.add_option("hub", + "", + "hub-instance-tracehost", + "Trace host for provisioned instances", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceHost)->default_value(""), + "<host>"); + + Options.add_option("hub", + "", + "hub-instance-tracefile", + "Trace file path for provisioned instances", + cxxopts::value<std::string>(m_ServerOptions.HubInstanceTraceFile)->default_value(""), + "<path>"); + + Options.add_option("hub", + "", "hub-instance-http-threads", "Number of http server connection threads for provisioned instances", cxxopts::value<unsigned int>(m_ServerOptions.HubInstanceHttpThreadCount), @@ -136,6 +188,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HubInstanceConfigPath), "<instance config>"); + const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option("hub", + "", + "hub-instance-provision-threads", + fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount) + ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)), + "<threads>"); + Options.add_option("hub", "", "hub-hydration-target-spec", @@ -152,6 +214,16 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HydrationTargetConfigPath), "<path>"); + const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option( + "hub", + "", + "hub-hydration-threads", + fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)), + "<threads>"); + #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", @@ -249,7 +321,79 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) void ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { - ZEN_UNUSED(Options); + using namespace std::literals; + + Options.AddOption("hub.upstreamnotification.endpoint"sv, + m_ServerOptions.UpstreamNotificationEndpoint, + "upstream-notification-endpoint"sv); + Options.AddOption("hub.upstreamnotification.instanceid"sv, m_ServerOptions.InstanceId, "upstream-notification-instance-id"sv); + + Options.AddOption("hub.consul.endpoint"sv, m_ServerOptions.ConsulEndpoint, "consul-endpoint"sv); + Options.AddOption("hub.consul.tokenenv"sv, m_ServerOptions.ConsulTokenEnv, "consul-token-env"sv); + Options.AddOption("hub.consul.healthintervalseconds"sv, + m_ServerOptions.ConsulHealthIntervalSeconds, + "consul-health-interval-seconds"sv); + Options.AddOption("hub.consul.deregisterafterseconds"sv, + m_ServerOptions.ConsulDeregisterAfterSeconds, + "consul-deregister-after-seconds"sv); + Options.AddOption("hub.consul.registerhub"sv, m_ServerOptions.ConsulRegisterHub, "consul-register-hub"sv); + + Options.AddOption("hub.instance.baseportnumber"sv, m_ServerOptions.HubBasePortNumber, "hub-instance-base-port-number"sv); + Options.AddOption("hub.instance.http"sv, m_ServerOptions.HubInstanceHttpClass, "hub-instance-http"sv); + Options.AddOption("hub.instance.malloc"sv, m_ServerOptions.HubInstanceMalloc, "hub-instance-malloc"sv); + Options.AddOption("hub.instance.trace"sv, m_ServerOptions.HubInstanceTrace, "hub-instance-trace"sv); + Options.AddOption("hub.instance.tracehost"sv, m_ServerOptions.HubInstanceTraceHost, "hub-instance-tracehost"sv); + Options.AddOption("hub.instance.tracefile"sv, m_ServerOptions.HubInstanceTraceFile, "hub-instance-tracefile"sv); + Options.AddOption("hub.instance.httpthreads"sv, m_ServerOptions.HubInstanceHttpThreadCount, "hub-instance-http-threads"sv); + Options.AddOption("hub.instance.corelimit"sv, m_ServerOptions.HubInstanceCoreLimit, "hub-instance-corelimit"sv); + Options.AddOption("hub.instance.config"sv, m_ServerOptions.HubInstanceConfigPath, "hub-instance-config"sv); + Options.AddOption("hub.instance.limits.count"sv, m_ServerOptions.HubInstanceLimit, "hub-instance-limit"sv); + Options.AddOption("hub.instance.limits.disklimitbytes"sv, + m_ServerOptions.HubProvisionDiskLimitBytes, + "hub-provision-disk-limit-bytes"sv); + Options.AddOption("hub.instance.limits.disklimitpercent"sv, + m_ServerOptions.HubProvisionDiskLimitPercent, + "hub-provision-disk-limit-percent"sv); + Options.AddOption("hub.instance.limits.memorylimitbytes"sv, + m_ServerOptions.HubProvisionMemoryLimitBytes, + "hub-provision-memory-limit-bytes"sv); + Options.AddOption("hub.instance.limits.memorylimitpercent"sv, + m_ServerOptions.HubProvisionMemoryLimitPercent, + "hub-provision-memory-limit-percent"sv); + Options.AddOption("hub.instance.provisionthreads"sv, + m_ServerOptions.HubInstanceProvisionThreadCount, + "hub-instance-provision-threads"sv); + + Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); + Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); + Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); + + Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); + Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, + m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs, + "hub-watchdog-cycle-processing-budget-ms"sv); + Options.AddOption("hub.watchdog.instancecheckthrottlems"sv, + m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs, + "hub-watchdog-instance-check-throttle-ms"sv); + Options.AddOption("hub.watchdog.provisionedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds, + "hub-watchdog-provisioned-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.hibernatedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds, + "hub-watchdog-hibernated-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.inactivitycheckmarginseconds"sv, + m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds, + "hub-watchdog-inactivity-check-margin-seconds"sv); + Options.AddOption("hub.watchdog.activitycheckconnecttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs, + "hub-watchdog-activity-check-connect-timeout-ms"sv); + Options.AddOption("hub.watchdog.activitycheckrequesttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs, + "hub-watchdog-activity-check-request-timeout-ms"sv); + +#if ZEN_PLATFORM_WINDOWS + Options.AddOption("hub.usejobobject"sv, m_ServerOptions.HubUseJobObject, "hub-use-job-object"sv); +#endif } void @@ -310,55 +454,71 @@ ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, HubInstanceState NewState) { ZEN_UNUSED(PreviousState); - if (!m_ConsulClient) - { - return; - } - if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned) + if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating) { - consul::ServiceRegistrationInfo ServiceInfo{ - .ServiceId = std::string(ModuleId), - .ServiceName = "zen-storage", - .Port = Info.Port, - .HealthEndpoint = "health", - .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), - std::make_pair("zen-hub", std::string(HubInstanceId)), - std::make_pair("version", std::string(ZEN_CFG_VERSION))}, - .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning - ? 0u - : m_ConsulHealthIntervalSeconds, // Disable health checks while not finished provisioning - .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning - ? 0u - : m_ConsulDeregisterAfterSeconds}; // Disable health checks while not finished provisioning - - if (!m_ConsulClient->RegisterService(ServiceInfo)) - { - ZEN_WARN("Failed to register storage server instance for module '{}' with Consul, continuing anyway", ModuleId); - } - else + if (Info.Port != 0) { - ZEN_INFO("Registered storage server instance for module '{}' at port {} with Consul as '{}'", - ModuleId, - Info.Port, - ServiceInfo.ServiceName); + m_Proxy->PrunePort(Info.Port); } } - else if (NewState == HubInstanceState::Unprovisioned) + + if (!m_ConsulClient) { - if (!m_ConsulClient->DeregisterService(ModuleId)) - { - ZEN_WARN("Failed to deregister storage server instance for module '{}' at port {} from Consul, continuing anyway", - ModuleId, - Info.Port); - } - else - { - ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); - } + return; + } + + switch (NewState) + { + case HubInstanceState::Provisioning: + case HubInstanceState::Waking: + case HubInstanceState::Recovering: + case HubInstanceState::Provisioned: + { + const bool IsProvisioned = NewState == HubInstanceState::Provisioned; + + consul::ServiceRegistrationInfo ServiceInfo{ + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", + .Port = Info.Port, + .HealthEndpoint = "health", + .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), + std::make_pair("zen-hub", std::string(HubInstanceId)), + std::make_pair("version", std::string(ZEN_CFG_VERSION))}, + .HealthIntervalSeconds = IsProvisioned ? m_ConsulHealthIntervalSeconds : 0u, + .DeregisterAfterSeconds = IsProvisioned ? m_ConsulDeregisterAfterSeconds : 0u, + .InitialStatus = IsProvisioned ? "passing" : ""}; + + m_ConsulClient->RegisterService(ServiceInfo); + ZEN_INFO("Submitted Consul registration for storage server instance for module '{}' at port {} as '{}'", + ModuleId, + Info.Port, + ServiceInfo.ServiceName); + break; + } + case HubInstanceState::Deprovisioning: + case HubInstanceState::Hibernating: + case HubInstanceState::Obliterating: + case HubInstanceState::Crashed: + case HubInstanceState::Hibernated: + case HubInstanceState::Unprovisioned: + { + // A Consul registration is "live" while the module is in a register-state + // (Provisioning / Waking / Recovering / Provisioned). Deregister once when + // we leave a register-state into any non-register-state + const bool WasRegisteredState = + PreviousState == HubInstanceState::Provisioning || PreviousState == HubInstanceState::Waking || + PreviousState == HubInstanceState::Recovering || PreviousState == HubInstanceState::Provisioned; + if (WasRegisteredState) + { + m_ConsulClient->DeregisterService(ModuleId); + ZEN_INFO("Submitted Consul deregistration for storage server instance for module '{}' at port {}", ModuleId, Info.Port); + } + } + break; + default: + break; } - // Transitional states (Deprovisioning, Hibernating, Waking, Recovering, Crashed) - // and Hibernated are intentionally ignored. } int @@ -380,6 +540,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: // the main test range. ZenServerEnvironment::SetBaseChildId(1000); + m_ProvisionWorkerPool = + std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision"); + m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration"); + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); @@ -405,12 +569,18 @@ ZenHubServer::Cleanup() m_IoRunner.join(); } - ShutdownServices(); if (m_Http) { m_Http->Close(); } + ShutdownServices(); + + if (m_Proxy) + { + m_Proxy->Shutdown(); + } + if (m_Hub) { m_Hub->Shutdown(); @@ -420,6 +590,7 @@ ZenHubServer::Cleanup() m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); + m_Proxy.reset(); m_ConsulRegistration.reset(); m_ConsulClient.reset(); @@ -484,6 +655,10 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) .InstanceLimit = ServerConfig.HubInstanceLimit, .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, + .InstanceMalloc = ServerConfig.HubInstanceMalloc, + .InstanceTrace = ServerConfig.HubInstanceTrace, + .InstanceTraceHost = ServerConfig.HubInstanceTraceHost, + .InstanceTraceFile = ServerConfig.HubInstanceTraceFile, .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, .WatchDog = @@ -497,7 +672,9 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), }, - .ResourceLimits = ResolveLimits(ServerConfig)}; + .ResourceLimits = ResolveLimits(ServerConfig), + .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(), + .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()}; if (!ServerConfig.HydrationTargetConfigPath.empty()) { @@ -522,27 +699,29 @@ ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) HubConfig.HydrationOptions = std::move(Root).AsObject(); } + m_Proxy = std::make_unique<HttpProxyHandler>(); + m_Hub = std::make_unique<Hub>( std::move(HubConfig), ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), - &GetMediumWorkerPool(EWorkloadType::Background), - m_ConsulClient ? Hub::AsyncModuleStateChangeCallbackFunc{[this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info, - HubInstanceState PreviousState, - HubInstanceState NewState) { - OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); - }} - : Hub::AsyncModuleStateChangeCallbackFunc{}); + Hub::AsyncModuleStateChangeCallbackFunc{ + [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); + }}); + + m_Proxy->SetPortValidator([Hub = m_Hub.get()](uint16_t Port) { return Hub->IsInstancePort(Port); }); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); ZEN_INFO("instantiating hub service"); - m_HubService = std::make_unique<HttpHubService>(*m_Hub, m_StatsService, m_StatusService); + m_HubService = std::make_unique<HttpHubService>(*m_Hub, *m_Proxy, m_StatsService, m_StatusService); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); @@ -592,21 +771,32 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi } else { - ZEN_INFO("Consul token read from environment variable '{}'", ConsulAccessTokenEnvName); + ZEN_INFO("Consul token will be read from environment variable '{}'", ConsulAccessTokenEnvName); } try { - m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint, ConsulAccessToken); + m_ConsulClient = std::make_unique<consul::ConsulClient>(consul::ConsulClient::Configuration{ + .BaseUri = ServerConfig.ConsulEndpoint, + .TokenEnvName = ConsulAccessTokenEnvName, + }); m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; + if (!ServerConfig.ConsulRegisterHub) + { + ZEN_INFO( + "Hub parent Consul registration skipped (consul-register-hub is false); " + "instance registration remains enabled"); + return; + } + consul::ServiceRegistrationInfo Info; Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); Info.ServiceName = "zen-hub"; // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? Info.Port = static_cast<uint16_t>(EffectivePort); - Info.HealthEndpoint = "hub/health"; + Info.HealthEndpoint = "health"; Info.Tags = std::vector<std::pair<std::string, std::string>>{ std::make_pair("zen-hub", Info.ServiceId), std::make_pair("version", std::string(ZEN_CFG_VERSION)), @@ -696,6 +886,8 @@ ZenHubServer::Run() OnReady(); + StartSelfSession("zenhub"); + m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); |