diff options
Diffstat (limited to 'src/zenserver/compute')
| -rw-r--r-- | src/zenserver/compute/computeserver.cpp | 134 | ||||
| -rw-r--r-- | src/zenserver/compute/computeserver.h | 16 |
2 files changed, 131 insertions, 19 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index d1875f41a..b110f7538 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -22,6 +22,8 @@ # if ZEN_WITH_HORDE # include <zenhorde/hordeconfig.h> # include <zenhorde/hordeprovisioner.h> +# include <zenhttp/httpclientauth.h> +# include <zenutil/authutils.h> # endif # if ZEN_WITH_NOMAD # include <zennomad/nomadconfig.h> @@ -67,6 +69,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("compute", "", + "coordinator-session", + "Session ID of the orchestrator (for stale-instance rejection)", + cxxopts::value<std::string>(m_ServerOptions.CoordinatorSession)->default_value(""), + ""); + + Options.add_option("compute", + "", + "announce-url", + "Override URL announced to the coordinator (e.g. relay-visible endpoint)", + cxxopts::value<std::string>(m_ServerOptions.AnnounceUrl)->default_value(""), + ""); + + Options.add_option("compute", + "", "idms", "Enable IDMS cloud detection; optionally specify a custom probe endpoint", cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"), @@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"), ""); + Options.add_option("compute", + "", + "provision-clean", + "Pass --clean to provisioned worker instances so they wipe state on startup", + cxxopts::value<bool>(m_ServerOptions.ProvisionClean)->default_value("false"), + ""); + + Options.add_option("compute", + "", + "provision-tracehost", + "Pass --tracehost to provisioned worker instances for remote trace collection", + cxxopts::value<std::string>(m_ServerOptions.ProvisionTraceHost)->default_value(""), + ""); + # if ZEN_WITH_HORDE // Horde provisioning options Options.add_option("horde", @@ -139,6 +169,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("horde", "", + "horde-drain-grace-period", + "Grace period in seconds for draining agents before force-kill", + cxxopts::value<int>(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"), + ""); + + Options.add_option("horde", + "", "horde-host", "Host address for Horde agents to connect back to", cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""), @@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Port number for Zen service communication", cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"), ""); + + Options.add_option("horde", + "", + "horde-oidctoken-exe-path", + "Path to OidcToken executable for automatic Horde authentication", + cxxopts::value<std::string>(m_HordeOidcTokenExePath)->default_value(""), + ""); # endif # if ZEN_WITH_NOMAD @@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions() # if ZEN_WITH_HORDE horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr); horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr); + + // Set up OidcToken-based authentication if no static token was provided + if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty()) + { + std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath); + if (!OidcExePath.empty()) + { + ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath); + auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath, + m_ServerOptions.HordeConfig.ServerUrl, + /*Quiet=*/true, + /*Unattended=*/false, + /*Hidden=*/true, + /*IsHordeUrl=*/true); + if (Provider) + { + m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider); + } + else + { + ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated"); + } + } + } # endif # if ZEN_WITH_NOMAD @@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ } m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint; + m_CoordinatorSession = ServerConfig.CoordinatorSession; + m_AnnounceUrl = ServerConfig.AnnounceUrl; m_InstanceId = ServerConfig.InstanceId; m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket; @@ -379,13 +449,20 @@ ZenComputeServer::Cleanup() m_AnnounceTimer.cancel(); # if ZEN_WITH_HORDE - // Shut down Horde provisioner first — this signals all agent threads + // Disconnect the provisioner state provider before destroying the + // provisioner so the orchestrator HTTP layer cannot call into it. + if (m_OrchestratorService) + { + m_OrchestratorService->SetProvisionerStateProvider(nullptr); + } + + // Shut down Horde provisioner - this signals all agent threads // to exit and joins them before we tear down HTTP services. m_HordeProvisioner.reset(); # endif # if ZEN_WITH_NOMAD - // Shut down Nomad provisioner — stops the management thread and + // Shut down Nomad provisioner - stops the management thread and // sends stop requests for all tracked jobs. m_NomadProvisioner.reset(); # endif @@ -419,12 +496,12 @@ ZenComputeServer::Cleanup() m_IoRunner.join(); } - ShutdownServices(); - if (m_Http) { m_Http->Close(); } + + ShutdownServices(); } catch (const std::exception& Ex) { @@ -444,11 +521,12 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) ZEN_TRACE_CPU("ZenComputeServer::InitializeServices"); ZEN_INFO("initializing compute services"); - CidStoreConfiguration Config; - Config.RootDirectory = m_DataRoot / "cas"; + m_ActionStore = std::make_unique<MemoryCidStore>(); - m_CidStore = std::make_unique<CidStore>(m_GcManager); - m_CidStore->Initialize(Config); + CidStoreConfiguration WorkerStoreConfig; + WorkerStoreConfig.RootDirectory = m_DataRoot / "cas"; + m_WorkerStore = std::make_unique<CidStore>(m_GcManager); + m_WorkerStore->Initialize(WorkerStoreConfig); if (!ServerConfig.IdmsEndpoint.empty()) { @@ -476,12 +554,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) std::make_unique<zen::compute::HttpOrchestratorService>(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket); ZEN_INFO("instantiating function service"); - m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_CidStore, + m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_ActionStore, + *m_WorkerStore, m_StatsService, ServerConfig.DataDir / "functions", ServerConfig.MaxConcurrentActions); + m_ComputeService->SetShutdownCallback([this] { RequestExit(0); }); - m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService); + m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); # if ZEN_WITH_NOMAD // Nomad provisioner @@ -504,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) OrchestratorEndpoint << '/'; } - m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint); + m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); } } # endif @@ -535,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) : std::filesystem::path(HordeConfig.BinariesPath); std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde"; - m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint); + m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, + BinariesPath, + WorkingDir, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); + m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get()); } } # endif @@ -563,6 +654,10 @@ ZenComputeServer::GetInstanceId() const std::string ZenComputeServer::GetAnnounceUrl() const { + if (!m_AnnounceUrl.empty()) + { + return m_AnnounceUrl; + } return m_Http->GetServiceUri(nullptr); } @@ -633,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody() << "nomad"; } + if (!m_CoordinatorSession.empty()) + { + AnnounceBody << "coordinator_session" << m_CoordinatorSession; + } + ResolveCloudMetadata(); if (m_CloudMetadata) { @@ -779,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick() # if ZEN_WITH_HORDE if (m_HordeProvisioner) { - m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); + // Re-apply current target to spawn agent threads for any that have + // exited since the last tick, without overwriting a user-set target. auto Stats = m_HordeProvisioner->GetStats(); + m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount); ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}", Stats.TargetCoreCount, Stats.EstimatedCoreCount, @@ -882,12 +984,14 @@ ZenComputeServer::Run() OnReady(); + StartSelfSession("zencompute"); + PostAnnounce(); EnqueueAnnounceTimer(); InitializeOrchestratorWebSocket(); # if ZEN_WITH_HORDE - // Start Horde provisioning if configured — request maximum allowed cores. + // Start Horde provisioning if configured - request maximum allowed cores. // SetTargetCoreCount clamps to HordeConfig::MaxCores internally. if (m_HordeProvisioner) { @@ -899,7 +1003,7 @@ ZenComputeServer::Run() # endif # if ZEN_WITH_NOMAD - // Start Nomad provisioning if configured — request maximum allowed cores. + // Start Nomad provisioning if configured - request maximum allowed cores. // SetTargetCoreCount clamps to NomadConfig::MaxCores internally. if (m_NomadProvisioner) { diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h index 8f4edc0f0..aa9c1a5b3 100644 --- a/src/zenserver/compute/computeserver.h +++ b/src/zenserver/compute/computeserver.h @@ -10,6 +10,7 @@ # include <zencore/system.h> # include <zenhttp/httpwsclient.h> # include <zenstore/gc.h> +# include <zenstore/memorycidstore.h> # include "frontend/frontend.h" namespace cxxopts { @@ -41,7 +42,6 @@ class NomadProvisioner; namespace zen { -class CidStore; class HttpApiService; struct ZenComputeServerConfig : public ZenServerConfig @@ -49,9 +49,13 @@ struct ZenComputeServerConfig : public ZenServerConfig std::string UpstreamNotificationEndpoint; std::string InstanceId; // For use in notifications std::string CoordinatorEndpoint; + std::string CoordinatorSession; ///< Session ID for stale-instance rejection + std::string AnnounceUrl; ///< Override for self-announced URL (e.g. relay-visible endpoint) std::string IdmsEndpoint; int32_t MaxConcurrentActions = 0; // 0 = auto (LogicalProcessorCount * 2) - bool EnableWorkerWebSocket = false; // Use WebSocket for worker↔orchestrator link + bool EnableWorkerWebSocket = false; // Use WebSocket for worker<->orchestrator link + bool ProvisionClean = false; // Pass --clean to provisioned workers + std::string ProvisionTraceHost; // Pass --tracehost to provisioned workers # if ZEN_WITH_HORDE horde::HordeConfig HordeConfig; @@ -84,6 +88,7 @@ private: # if ZEN_WITH_HORDE std::string m_HordeModeStr = "direct"; std::string m_HordeEncryptionStr = "none"; + std::string m_HordeOidcTokenExePath; # endif # if ZEN_WITH_NOMAD @@ -131,7 +136,8 @@ public: private: GcManager m_GcManager; GcScheduler m_GcScheduler{m_GcManager}; - std::unique_ptr<CidStore> m_CidStore; + std::unique_ptr<MemoryCidStore> m_ActionStore; + std::unique_ptr<CidStore> m_WorkerStore; std::unique_ptr<HttpApiService> m_ApiService; std::unique_ptr<zen::compute::HttpComputeService> m_ComputeService; std::unique_ptr<zen::compute::HttpOrchestratorService> m_OrchestratorService; @@ -146,6 +152,8 @@ private: # endif SystemMetricsTracker m_MetricsTracker; std::string m_CoordinatorEndpoint; + std::string m_CoordinatorSession; + std::string m_AnnounceUrl; std::string m_InstanceId; asio::steady_timer m_AnnounceTimer{m_IoContext}; @@ -163,7 +171,7 @@ private: std::string GetInstanceId() const; CbObject BuildAnnounceBody(); - // Worker→orchestrator WebSocket client + // Worker->orchestrator WebSocket client struct OrchestratorWsHandler : public IWsClientHandler { ZenComputeServer& Server; |