diff options
Diffstat (limited to 'src/zenserver/hub/zenhubserver.cpp')
| -rw-r--r-- | src/zenserver/hub/zenhubserver.cpp | 483 |
1 files changed, 446 insertions, 37 deletions
diff --git a/src/zenserver/hub/zenhubserver.cpp b/src/zenserver/hub/zenhubserver.cpp index b36a0778e..a32c1f1d1 100644 --- a/src/zenserver/hub/zenhubserver.cpp +++ b/src/zenserver/hub/zenhubserver.cpp @@ -2,17 +2,26 @@ #include "zenhubserver.h" +#include "config/luaconfig.h" #include "frontend/frontend.h" #include "httphubservice.h" +#include "httpproxyhandler.h" #include "hub.h" +#include <zencore/compactbinary.h> #include <zencore/config.h> +#include <zencore/except.h> +#include <zencore/except_fmt.h> +#include <zencore/filesystem.h> #include <zencore/fmtutils.h> +#include <zencore/intmath.h> #include <zencore/memory/llm.h> #include <zencore/memory/memorytrace.h> #include <zencore/memory/tagtrace.h> #include <zencore/scopeguard.h> #include <zencore/sentryintegration.h> +#include <zencore/system.h> +#include <zencore/thread.h> #include <zencore/windows.h> #include <zenhttp/httpapiservice.h> #include <zenutil/service.h> @@ -23,6 +32,13 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +const std::string& +GetDefaultConsulTokenEnvVariableName() +{ + static const std::string Name = "CONSUL_HTTP_TOKEN"; + return Name; +} + void ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) { @@ -45,12 +61,19 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", "instance-id", - "Instance ID for use in notifications", + "Instance ID for use in notifications (deprecated, use --upstream-notification-instance-id)", cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""), ""); Options.add_option("hub", "", + "upstream-notification-instance-id", + "Instance ID for use in notifications", + cxxopts::value<std::string>(m_ServerOptions.InstanceId), + ""); + + Options.add_option("hub", + "", "consul-endpoint", "Consul endpoint URL for service registration (empty = disabled)", cxxopts::value<std::string>(m_ServerOptions.ConsulEndpoint)->default_value(""), @@ -58,13 +81,49 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("hub", "", + "consul-token-env", + fmt::format("Name of environment variable that holds the consul access token (defaults to '{}')", + GetDefaultConsulTokenEnvVariableName()), + cxxopts::value<std::string>(m_ServerOptions.ConsulTokenEnv)->default_value(""), + "<envvariable>"); + + Options.add_option("hub", + "", + "consul-health-interval-seconds", + "Interval in seconds between Consul health checks", + cxxopts::value<uint32_t>(m_ServerOptions.ConsulHealthIntervalSeconds)->default_value("10"), + "<seconds>"); + + Options.add_option("hub", + "", + "consul-deregister-after-seconds", + "Seconds after which Consul deregisters an unhealthy service", + cxxopts::value<uint32_t>(m_ServerOptions.ConsulDeregisterAfterSeconds)->default_value("30"), + "<seconds>"); + + Options.add_option("hub", + "", + "consul-register-hub", + "Register the hub parent service with Consul (instance registration is unaffected)", + cxxopts::value<bool>(m_ServerOptions.ConsulRegisterHub)->default_value("true"), + ""); + + Options.add_option("hub", + "", "hub-base-port-number", - "Base port number for provisioned instances", + "Base port number for provisioned instances (deprecated, use --hub-instance-base-port-number)", cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber)->default_value("21000"), ""); Options.add_option("hub", "", + "hub-instance-base-port-number", + "Base port number for provisioned instances", + cxxopts::value<uint16_t>(m_ServerOptions.HubBasePortNumber), + ""); + + Options.add_option("hub", + "", "hub-instance-limit", "Maximum number of provisioned instances for this hub", cxxopts::value<int>(m_ServerOptions.HubInstanceLimit)->default_value("1000"), @@ -101,6 +160,42 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HubInstanceConfigPath), "<instance config>"); + const uint32_t DefaultHubInstanceProvisionThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option("hub", + "", + "hub-instance-provision-threads", + fmt::format("Number of threads for instance provisioning (default {})", DefaultHubInstanceProvisionThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubInstanceProvisionThreadCount) + ->default_value(fmt::format("{}", DefaultHubInstanceProvisionThreadCount)), + "<threads>"); + + Options.add_option("hub", + "", + "hub-hydration-target-spec", + "Specification for hydration target. 'file://<path>' prefix indicates file storage at <path>. Defaults to " + "<data-dir>/servers/hydration_storage", + cxxopts::value(m_ServerOptions.HydrationTargetSpecification), + "<hydration-target-spec>"); + + Options.add_option("hub", + "", + "hub-hydration-target-config", + "Path to JSON file specifying the hydration target (mutually exclusive with " + "--hub-hydration-target-spec). Supported types: 'file', 's3'.", + cxxopts::value(m_ServerOptions.HydrationTargetConfigPath), + "<path>"); + + const uint32_t DefaultHubHydrationThreadCount = Max(GetHardwareConcurrency() / 4u, 2u); + + Options.add_option( + "hub", + "", + "hub-hydration-threads", + fmt::format("Number of threads for hydration/dehydration (default {})", DefaultHubHydrationThreadCount), + cxxopts::value<uint32_t>(m_ServerOptions.HubHydrationThreadCount)->default_value(fmt::format("{}", DefaultHubHydrationThreadCount)), + "<threads>"); + #if ZEN_PLATFORM_WINDOWS Options.add_option("hub", "", @@ -109,12 +204,164 @@ ZenHubServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value<bool>(m_ServerOptions.HubUseJobObject)->default_value("true"), ""); #endif // ZEN_PLATFORM_WINDOWS + + Options.add_option("hub", + "", + "hub-watchdog-cycle-interval-ms", + "Interval between watchdog cycles in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleIntervalMs)->default_value("3000"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-cycle-processing-budget-ms", + "Maximum processing time budget per watchdog cycle in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs)->default_value("500"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-instance-check-throttle-ms", + "Delay between checking successive instances per watchdog cycle in milliseconds", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs)->default_value("5"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-provisioned-inactivity-timeout-seconds", + "Seconds of inactivity after which a provisioned instance is deprovisioned", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds)->default_value("600"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-hibernated-inactivity-timeout-seconds", + "Seconds of inactivity after which a hibernated instance is deprovisioned", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds)->default_value("1800"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-inactivity-check-margin-seconds", + "Margin in seconds subtracted from inactivity timeout before triggering an activity check", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds)->default_value("60"), + "<seconds>"); + + Options.add_option("hub", + "", + "hub-watchdog-activity-check-connect-timeout-ms", + "Connect timeout in milliseconds for instance activity check requests", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs)->default_value("100"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-watchdog-activity-check-request-timeout-ms", + "Request timeout in milliseconds for instance activity check requests", + cxxopts::value<uint32_t>(m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs)->default_value("200"), + "<ms>"); + + Options.add_option("hub", + "", + "hub-provision-disk-limit-bytes", + "Reject provisioning when used disk bytes exceed this value (0 = no limit).", + cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionDiskLimitBytes), + "<bytes>"); + + Options.add_option("hub", + "", + "hub-provision-disk-limit-percent", + "Reject provisioning when used disk exceeds this percentage of total disk (0 = no limit).", + cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionDiskLimitPercent), + "<percent>"); + + Options.add_option("hub", + "", + "hub-provision-memory-limit-bytes", + "Reject provisioning when used memory bytes exceed this value (0 = no limit).", + cxxopts::value<uint64_t>(m_ServerOptions.HubProvisionMemoryLimitBytes), + "<bytes>"); + + Options.add_option("hub", + "", + "hub-provision-memory-limit-percent", + "Reject provisioning when used memory exceeds this percentage of total RAM (0 = no limit).", + cxxopts::value<uint32_t>(m_ServerOptions.HubProvisionMemoryLimitPercent), + "<percent>"); } void ZenHubServerConfigurator::AddConfigOptions(LuaConfig::Options& Options) { - ZEN_UNUSED(Options); + using namespace std::literals; + + Options.AddOption("hub.upstreamnotification.endpoint"sv, + m_ServerOptions.UpstreamNotificationEndpoint, + "upstream-notification-endpoint"sv); + Options.AddOption("hub.upstreamnotification.instanceid"sv, m_ServerOptions.InstanceId, "upstream-notification-instance-id"sv); + + Options.AddOption("hub.consul.endpoint"sv, m_ServerOptions.ConsulEndpoint, "consul-endpoint"sv); + Options.AddOption("hub.consul.tokenenv"sv, m_ServerOptions.ConsulTokenEnv, "consul-token-env"sv); + Options.AddOption("hub.consul.healthintervalseconds"sv, + m_ServerOptions.ConsulHealthIntervalSeconds, + "consul-health-interval-seconds"sv); + Options.AddOption("hub.consul.deregisterafterseconds"sv, + m_ServerOptions.ConsulDeregisterAfterSeconds, + "consul-deregister-after-seconds"sv); + Options.AddOption("hub.consul.registerhub"sv, m_ServerOptions.ConsulRegisterHub, "consul-register-hub"sv); + + Options.AddOption("hub.instance.baseportnumber"sv, m_ServerOptions.HubBasePortNumber, "hub-instance-base-port-number"sv); + Options.AddOption("hub.instance.http"sv, m_ServerOptions.HubInstanceHttpClass, "hub-instance-http"sv); + Options.AddOption("hub.instance.httpthreads"sv, m_ServerOptions.HubInstanceHttpThreadCount, "hub-instance-http-threads"sv); + Options.AddOption("hub.instance.corelimit"sv, m_ServerOptions.HubInstanceCoreLimit, "hub-instance-corelimit"sv); + Options.AddOption("hub.instance.config"sv, m_ServerOptions.HubInstanceConfigPath, "hub-instance-config"sv); + Options.AddOption("hub.instance.limits.count"sv, m_ServerOptions.HubInstanceLimit, "hub-instance-limit"sv); + Options.AddOption("hub.instance.limits.disklimitbytes"sv, + m_ServerOptions.HubProvisionDiskLimitBytes, + "hub-provision-disk-limit-bytes"sv); + Options.AddOption("hub.instance.limits.disklimitpercent"sv, + m_ServerOptions.HubProvisionDiskLimitPercent, + "hub-provision-disk-limit-percent"sv); + Options.AddOption("hub.instance.limits.memorylimitbytes"sv, + m_ServerOptions.HubProvisionMemoryLimitBytes, + "hub-provision-memory-limit-bytes"sv); + Options.AddOption("hub.instance.limits.memorylimitpercent"sv, + m_ServerOptions.HubProvisionMemoryLimitPercent, + "hub-provision-memory-limit-percent"sv); + Options.AddOption("hub.instance.provisionthreads"sv, + m_ServerOptions.HubInstanceProvisionThreadCount, + "hub-instance-provision-threads"sv); + + Options.AddOption("hub.hydration.targetspec"sv, m_ServerOptions.HydrationTargetSpecification, "hub-hydration-target-spec"sv); + Options.AddOption("hub.hydration.targetconfig"sv, m_ServerOptions.HydrationTargetConfigPath, "hub-hydration-target-config"sv); + Options.AddOption("hub.hydration.threads"sv, m_ServerOptions.HubHydrationThreadCount, "hub-hydration-threads"sv); + + Options.AddOption("hub.watchdog.cycleintervalms"sv, m_ServerOptions.WatchdogConfig.CycleIntervalMs, "hub-watchdog-cycle-interval-ms"sv); + Options.AddOption("hub.watchdog.cycleprocessingbudgetms"sv, + m_ServerOptions.WatchdogConfig.CycleProcessingBudgetMs, + "hub-watchdog-cycle-processing-budget-ms"sv); + Options.AddOption("hub.watchdog.instancecheckthrottlems"sv, + m_ServerOptions.WatchdogConfig.InstanceCheckThrottleMs, + "hub-watchdog-instance-check-throttle-ms"sv); + Options.AddOption("hub.watchdog.provisionedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.ProvisionedInactivityTimeoutSeconds, + "hub-watchdog-provisioned-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.hibernatedinactivitytimeoutseconds"sv, + m_ServerOptions.WatchdogConfig.HibernatedInactivityTimeoutSeconds, + "hub-watchdog-hibernated-inactivity-timeout-seconds"sv); + Options.AddOption("hub.watchdog.inactivitycheckmarginseconds"sv, + m_ServerOptions.WatchdogConfig.InactivityCheckMarginSeconds, + "hub-watchdog-inactivity-check-margin-seconds"sv); + Options.AddOption("hub.watchdog.activitycheckconnecttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckConnectTimeoutMs, + "hub-watchdog-activity-check-connect-timeout-ms"sv); + Options.AddOption("hub.watchdog.activitycheckrequesttimeoutms"sv, + m_ServerOptions.WatchdogConfig.ActivityCheckRequestTimeoutMs, + "hub-watchdog-activity-check-request-timeout-ms"sv); + +#if ZEN_PLATFORM_WINDOWS + Options.AddOption("hub.usejobobject"sv, m_ServerOptions.HubUseJobObject, "hub-use-job-object"sv); +#endif } void @@ -132,6 +379,28 @@ ZenHubServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) void ZenHubServerConfigurator::ValidateOptions() { + if (m_ServerOptions.HubProvisionDiskLimitPercent > 100) + { + throw OptionParseException( + fmt::format("'--hub-provision-disk-limit-percent' ({}) must be in range 0..100", m_ServerOptions.HubProvisionDiskLimitPercent), + {}); + } + if (m_ServerOptions.HubProvisionMemoryLimitPercent > 100) + { + throw OptionParseException(fmt::format("'--hub-provision-memory-limit-percent' ({}) must be in range 0..100", + m_ServerOptions.HubProvisionMemoryLimitPercent), + {}); + } + if (!m_ServerOptions.HydrationTargetSpecification.empty() && !m_ServerOptions.HydrationTargetConfigPath.empty()) + { + throw OptionParseException("'--hub-hydration-target-spec' and '--hub-hydration-target-config' are mutually exclusive", {}); + } + if (!m_ServerOptions.HydrationTargetConfigPath.empty() && !std::filesystem::exists(m_ServerOptions.HydrationTargetConfigPath)) + { + throw OptionParseException( + fmt::format("'--hub-hydration-target-config': file not found: '{}'", m_ServerOptions.HydrationTargetConfigPath.string()), + {}); + } } /////////////////////////////////////////////////////////////////////////// @@ -146,21 +415,40 @@ ZenHubServer::~ZenHubServer() } void -ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) +ZenHubServer::OnModuleStateChanged(std::string_view HubInstanceId, + std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { - if (m_ConsulClient) + ZEN_UNUSED(PreviousState); + + if (NewState == HubInstanceState::Deprovisioning || NewState == HubInstanceState::Hibernating) + { + if (Info.Port != 0) + { + m_Proxy->PrunePort(Info.Port); + } + } + + if (!m_ConsulClient) + { + return; + } + + if (NewState == HubInstanceState::Provisioning || NewState == HubInstanceState::Provisioned) { consul::ServiceRegistrationInfo ServiceInfo{ - .ServiceId = std::string(ModuleId), - .ServiceName = "zen-storage", - // .Address = "localhost", // Let the consul agent figure out out external address // TODO: Info.BaseUri? + .ServiceId = std::string(ModuleId), + .ServiceName = "zen-storage", .Port = Info.Port, .HealthEndpoint = "health", .Tags = std::vector<std::pair<std::string, std::string>>{std::make_pair("module", std::string(ModuleId)), std::make_pair("zen-hub", std::string(HubInstanceId)), std::make_pair("version", std::string(ZEN_CFG_VERSION))}, - .HealthIntervalSeconds = 10, - .DeregisterAfterSeconds = 30}; + .HealthIntervalSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulHealthIntervalSeconds, + .DeregisterAfterSeconds = NewState == HubInstanceState::Provisioning ? 0u : m_ConsulDeregisterAfterSeconds, + .InitialStatus = NewState == HubInstanceState::Provisioned ? "passing" : ""}; if (!m_ConsulClient->RegisterService(ServiceInfo)) { @@ -174,13 +462,7 @@ ZenHubServer::OnProvisioned(std::string_view HubInstanceId, std::string_view Mod ServiceInfo.ServiceName); } } -} - -void -ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view ModuleId, const HubProvisionedInstanceInfo& Info) -{ - ZEN_UNUSED(HubInstanceId); - if (m_ConsulClient) + else if (NewState == HubInstanceState::Unprovisioned) { if (!m_ConsulClient->DeregisterService(ModuleId)) { @@ -193,6 +475,8 @@ ZenHubServer::OnDeprovisioned(std::string_view HubInstanceId, std::string_view M ZEN_INFO("Deregistered storage server instance for module '{}' at port {} from Consul", ModuleId, Info.Port); } } + // Transitional states (Waking, Recovering, Crashed) and stable states + // not handled above (Hibernated) are intentionally ignored by Consul. } int @@ -214,6 +498,10 @@ ZenHubServer::Initialize(const ZenHubServerConfig& ServerConfig, ZenServerState: // the main test range. ZenServerEnvironment::SetBaseChildId(1000); + m_ProvisionWorkerPool = + std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubInstanceProvisionThreadCount), "hub_provision"); + m_HydrationWorkerPool = std::make_unique<WorkerThreadPool>(gsl::narrow<int>(ServerConfig.HubHydrationThreadCount), "hub_hydration"); + m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; InitializeState(ServerConfig); @@ -245,10 +533,21 @@ ZenHubServer::Cleanup() m_Http->Close(); } + if (m_Proxy) + { + m_Proxy->Shutdown(); + } + + if (m_Hub) + { + m_Hub->Shutdown(); + } + m_FrontendService.reset(); m_HubService.reset(); m_ApiService.reset(); m_Hub.reset(); + m_Proxy.reset(); m_ConsulRegistration.reset(); m_ConsulClient.reset(); @@ -265,40 +564,120 @@ ZenHubServer::InitializeState(const ZenHubServerConfig& ServerConfig) ZEN_UNUSED(ServerConfig); } +ResourceMetrics +ZenHubServer::ResolveLimits(const ZenHubServerConfig& ServerConfig) +{ + uint64_t DiskTotal = 0; + uint64_t MemoryTotal = 0; + + if (ServerConfig.HubProvisionDiskLimitPercent > 0) + { + DiskSpace Disk; + if (DiskSpaceInfo(ServerConfig.DataDir, Disk)) + { + DiskTotal = Disk.Total; + } + else + { + ZEN_WARN("Failed to query disk space for '{}'; disk percent limit will not be applied", ServerConfig.DataDir); + } + } + if (ServerConfig.HubProvisionMemoryLimitPercent > 0) + { + MemoryTotal = GetSystemMetrics().SystemMemoryMiB * 1024 * 1024; + } + + auto Resolve = [](uint64_t Bytes, uint32_t Pct, uint64_t Total) -> uint64_t { + const uint64_t PctBytes = Pct > 0 ? (Total * Pct) / 100 : 0; + if (Bytes > 0 && PctBytes > 0) + { + return Min(Bytes, PctBytes); + } + return Bytes > 0 ? Bytes : PctBytes; + }; + + return { + .DiskUsageBytes = Resolve(ServerConfig.HubProvisionDiskLimitBytes, ServerConfig.HubProvisionDiskLimitPercent, DiskTotal), + .MemoryUsageBytes = Resolve(ServerConfig.HubProvisionMemoryLimitBytes, ServerConfig.HubProvisionMemoryLimitPercent, MemoryTotal), + }; +} + void ZenHubServer::InitializeServices(const ZenHubServerConfig& ServerConfig) { - ZEN_UNUSED(ServerConfig); - ZEN_INFO("instantiating Hub"); + Hub::Configuration HubConfig{ + .UseJobObject = ServerConfig.HubUseJobObject, + .BasePortNumber = ServerConfig.HubBasePortNumber, + .InstanceLimit = ServerConfig.HubInstanceLimit, + .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, + .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, + .InstanceConfigPath = ServerConfig.HubInstanceConfigPath, + .HydrationTargetSpecification = ServerConfig.HydrationTargetSpecification, + .WatchDog = + { + .CycleInterval = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleIntervalMs), + .CycleProcessingBudget = std::chrono::milliseconds(ServerConfig.WatchdogConfig.CycleProcessingBudgetMs), + .InstanceCheckThrottle = std::chrono::milliseconds(ServerConfig.WatchdogConfig.InstanceCheckThrottleMs), + .ProvisionedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.ProvisionedInactivityTimeoutSeconds), + .HibernatedInactivityTimeout = std::chrono::seconds(ServerConfig.WatchdogConfig.HibernatedInactivityTimeoutSeconds), + .InactivityCheckMargin = std::chrono::seconds(ServerConfig.WatchdogConfig.InactivityCheckMarginSeconds), + .ActivityCheckConnectTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckConnectTimeoutMs), + .ActivityCheckRequestTimeout = std::chrono::milliseconds(ServerConfig.WatchdogConfig.ActivityCheckRequestTimeoutMs), + }, + .ResourceLimits = ResolveLimits(ServerConfig), + .OptionalProvisionWorkerPool = m_ProvisionWorkerPool.get(), + .OptionalHydrationWorkerPool = m_HydrationWorkerPool.get()}; + + if (!ServerConfig.HydrationTargetConfigPath.empty()) + { + FileContents Contents = ReadFile(ServerConfig.HydrationTargetConfigPath); + if (!Contents) + { + throw zen::runtime_error("Failed to read hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + Contents.ErrorCode.message()); + } + IoBuffer Buffer(Contents.Flatten()); + std::string_view JsonText(static_cast<const char*>(Buffer.GetData()), Buffer.GetSize()); + + std::string ParseError; + CbFieldIterator Root = LoadCompactBinaryFromJson(JsonText, ParseError); + if (!ParseError.empty() || !Root.IsObject()) + { + throw zen::runtime_error("Failed to parse hydration config '{}': {}", + ServerConfig.HydrationTargetConfigPath.string(), + ParseError.empty() ? "root must be a JSON object" : ParseError); + } + HubConfig.HydrationOptions = std::move(Root).AsObject(); + } + + m_Proxy = std::make_unique<HttpProxyHandler>(); + m_Hub = std::make_unique<Hub>( - Hub::Configuration{.UseJobObject = ServerConfig.HubUseJobObject, - .BasePortNumber = ServerConfig.HubBasePortNumber, - .InstanceLimit = ServerConfig.HubInstanceLimit, - .InstanceHttpThreadCount = ServerConfig.HubInstanceHttpThreadCount, - .InstanceCoreLimit = ServerConfig.HubInstanceCoreLimit, - .InstanceConfigPath = ServerConfig.HubInstanceConfigPath}, + std::move(HubConfig), ZenServerEnvironment(ZenServerEnvironment::Hub, ServerConfig.DataDir / "hub", ServerConfig.DataDir / "servers", ServerConfig.HubInstanceHttpClass), - m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info) { OnProvisioned(HubInstanceId, ModuleId, Info); } - : Hub::ProvisionModuleCallbackFunc{}, - m_ConsulClient ? [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)]( - std::string_view ModuleId, - const HubProvisionedInstanceInfo& Info) { OnDeprovisioned(HubInstanceId, ModuleId, Info); } - : Hub::ProvisionModuleCallbackFunc{}); + Hub::AsyncModuleStateChangeCallbackFunc{ + [this, HubInstanceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId)](std::string_view ModuleId, + const HubProvisionedInstanceInfo& Info, + HubInstanceState PreviousState, + HubInstanceState NewState) { + OnModuleStateChanged(HubInstanceId, ModuleId, Info, PreviousState, NewState); + }}); + + m_Proxy->SetPortValidator([Hub = m_Hub.get()](uint16_t Port) { return Hub->IsInstancePort(Port); }); ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http); ZEN_INFO("instantiating hub service"); - m_HubService = std::make_unique<HttpHubService>(*m_Hub); + m_HubService = std::make_unique<HttpHubService>(*m_Hub, *m_Proxy, m_StatsService, m_StatusService); m_HubService->SetNotificationEndpoint(ServerConfig.UpstreamNotificationEndpoint, ServerConfig.InstanceId); - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); } void @@ -333,22 +712,52 @@ ZenHubServer::InitializeConsulRegistration(const ZenHubServerConfig& ServerConfi ZEN_INFO("Initializing Consul registration with endpoint: {}", ServerConfig.ConsulEndpoint); + std::string ConsulAccessTokenEnvName = + ServerConfig.ConsulTokenEnv.empty() ? GetDefaultConsulTokenEnvVariableName() : ServerConfig.ConsulTokenEnv; + std::string ConsulAccessToken = GetEnvVariable(ConsulAccessTokenEnvName); + if (ConsulAccessToken.empty()) + { + if (!ServerConfig.ConsulTokenEnv.empty()) + { + ZEN_WARN("Consul token environment variable '{}' is not set or empty", ServerConfig.ConsulTokenEnv); + } + } + else + { + ZEN_INFO("Consul token will be read from environment variable '{}'", ConsulAccessTokenEnvName); + } + try { - m_ConsulClient = std::make_unique<consul::ConsulClient>(ServerConfig.ConsulEndpoint); + m_ConsulClient = std::make_unique<consul::ConsulClient>(consul::ConsulClient::Configuration{ + .BaseUri = ServerConfig.ConsulEndpoint, + .TokenEnvName = ConsulAccessTokenEnvName, + }); + m_ConsulHealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; + m_ConsulDeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; + + if (!ServerConfig.ConsulRegisterHub) + { + ZEN_INFO( + "Hub parent Consul registration skipped (consul-register-hub is false); " + "instance registration remains enabled"); + return; + } consul::ServiceRegistrationInfo Info; Info.ServiceId = fmt::format("zen-hub-{}", ServerConfig.InstanceId); Info.ServiceName = "zen-hub"; // Info.Address = "localhost"; // Let the consul agent figure out out external address // TODO: Info.BaseUri? Info.Port = static_cast<uint16_t>(EffectivePort); - Info.HealthEndpoint = "hub/health"; + Info.HealthEndpoint = "health"; Info.Tags = std::vector<std::pair<std::string, std::string>>{ std::make_pair("zen-hub", Info.ServiceId), std::make_pair("version", std::string(ZEN_CFG_VERSION)), std::make_pair("base-port-number", fmt::format("{}", ServerConfig.HubBasePortNumber)), std::make_pair("instance-limit", fmt::format("{}", ServerConfig.HubInstanceLimit)), std::make_pair("use-job-object", fmt::format("{}", ServerConfig.HubUseJobObject))}; + Info.HealthIntervalSeconds = ServerConfig.ConsulHealthIntervalSeconds; + Info.DeregisterAfterSeconds = ServerConfig.ConsulDeregisterAfterSeconds; m_ConsulRegistration = std::make_unique<consul::ServiceRegistration>(m_ConsulClient.get(), Info); |