aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/computeserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/compute/computeserver.cpp')
-rw-r--r--src/zenserver/compute/computeserver.cpp108
1 files changed, 104 insertions, 4 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index 7296098e0..8cd8b4cfe 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -22,6 +22,8 @@
# if ZEN_WITH_HORDE
# include <zenhorde/hordeconfig.h>
# include <zenhorde/hordeprovisioner.h>
+# include <zenhttp/httpclientauth.h>
+# include <zenutil/authutils.h>
# endif
# if ZEN_WITH_NOMAD
# include <zennomad/nomadconfig.h>
@@ -67,6 +69,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("compute",
"",
+ "coordinator-session",
+ "Session ID of the orchestrator (for stale-instance rejection)",
+ cxxopts::value<std::string>(m_ServerOptions.CoordinatorSession)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "announce-url",
+ "Override URL announced to the coordinator (e.g. relay-visible endpoint)",
+ cxxopts::value<std::string>(m_ServerOptions.AnnounceUrl)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
"idms",
"Enable IDMS cloud detection; optionally specify a custom probe endpoint",
cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"),
@@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"),
"");
+ Options.add_option("compute",
+ "",
+ "provision-clean",
+ "Pass --clean to provisioned worker instances so they wipe state on startup",
+ cxxopts::value<bool>(m_ServerOptions.ProvisionClean)->default_value("false"),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "provision-tracehost",
+ "Pass --tracehost to provisioned worker instances for remote trace collection",
+ cxxopts::value<std::string>(m_ServerOptions.ProvisionTraceHost)->default_value(""),
+ "");
+
# if ZEN_WITH_HORDE
// Horde provisioning options
Options.add_option("horde",
@@ -139,6 +169,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("horde",
"",
+ "horde-drain-grace-period",
+ "Grace period in seconds for draining agents before force-kill",
+ cxxopts::value<int>(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"),
+ "");
+
+ Options.add_option("horde",
+ "",
"horde-host",
"Host address for Horde agents to connect back to",
cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""),
@@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Port number for Zen service communication",
cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"),
"");
+
+ Options.add_option("horde",
+ "",
+ "horde-oidctoken-exe-path",
+ "Path to OidcToken executable for automatic Horde authentication",
+ cxxopts::value<std::string>(m_HordeOidcTokenExePath)->default_value(""),
+ "");
# endif
# if ZEN_WITH_NOMAD
@@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions()
# if ZEN_WITH_HORDE
horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr);
horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr);
+
+ // Set up OidcToken-based authentication if no static token was provided
+ if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty())
+ {
+ std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath);
+ if (!OidcExePath.empty())
+ {
+ ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath);
+ auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath,
+ m_ServerOptions.HordeConfig.ServerUrl,
+ /*Quiet=*/true,
+ /*Unattended=*/false,
+ /*Hidden=*/true,
+ /*IsHordeUrl=*/true);
+ if (Provider)
+ {
+ m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider);
+ }
+ else
+ {
+ ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated");
+ }
+ }
+ }
# endif
# if ZEN_WITH_NOMAD
@@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ
}
m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint;
+ m_CoordinatorSession = ServerConfig.CoordinatorSession;
+ m_AnnounceUrl = ServerConfig.AnnounceUrl;
m_InstanceId = ServerConfig.InstanceId;
m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket;
@@ -379,7 +449,14 @@ ZenComputeServer::Cleanup()
m_AnnounceTimer.cancel();
# if ZEN_WITH_HORDE
- // Shut down Horde provisioner first — this signals all agent threads
+ // Disconnect the provisioner state provider before destroying the
+ // provisioner so the orchestrator HTTP layer cannot call into it.
+ if (m_OrchestratorService)
+ {
+ m_OrchestratorService->SetProvisionerStateProvider(nullptr);
+ }
+
+ // Shut down Horde provisioner — this signals all agent threads
// to exit and joins them before we tear down HTTP services.
m_HordeProvisioner.reset();
# endif
@@ -482,6 +559,7 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
m_StatsService,
ServerConfig.DataDir / "functions",
ServerConfig.MaxConcurrentActions);
+ m_ComputeService->SetShutdownCallback([this] { RequestExit(0); });
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
@@ -506,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
OrchestratorEndpoint << '/';
}
- m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint);
+ m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
}
}
# endif
@@ -537,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
: std::filesystem::path(HordeConfig.BinariesPath);
std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde";
- m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint);
+ m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig,
+ BinariesPath,
+ WorkingDir,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
+ m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get());
}
}
# endif
@@ -565,6 +654,10 @@ ZenComputeServer::GetInstanceId() const
std::string
ZenComputeServer::GetAnnounceUrl() const
{
+ if (!m_AnnounceUrl.empty())
+ {
+ return m_AnnounceUrl;
+ }
return m_Http->GetServiceUri(nullptr);
}
@@ -635,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody()
<< "nomad";
}
+ if (!m_CoordinatorSession.empty())
+ {
+ AnnounceBody << "coordinator_session" << m_CoordinatorSession;
+ }
+
ResolveCloudMetadata();
if (m_CloudMetadata)
{
@@ -781,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick()
# if ZEN_WITH_HORDE
if (m_HordeProvisioner)
{
- m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX);
+ // Re-apply current target to spawn agent threads for any that have
+ // exited since the last tick, without overwriting a user-set target.
auto Stats = m_HordeProvisioner->GetStats();
+ m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount);
ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}",
Stats.TargetCoreCount,
Stats.EstimatedCoreCount,