aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zenserver/compute
parent5.8.4-pre2 (diff)
downloadzen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zenserver/compute')
-rw-r--r--src/zenserver/compute/computeserver.cpp108
-rw-r--r--src/zenserver/compute/computeserver.h7
2 files changed, 111 insertions, 4 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index 7296098e0..8cd8b4cfe 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -22,6 +22,8 @@
# if ZEN_WITH_HORDE
# include <zenhorde/hordeconfig.h>
# include <zenhorde/hordeprovisioner.h>
+# include <zenhttp/httpclientauth.h>
+# include <zenutil/authutils.h>
# endif
# if ZEN_WITH_NOMAD
# include <zennomad/nomadconfig.h>
@@ -67,6 +69,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("compute",
"",
+ "coordinator-session",
+ "Session ID of the orchestrator (for stale-instance rejection)",
+ cxxopts::value<std::string>(m_ServerOptions.CoordinatorSession)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "announce-url",
+ "Override URL announced to the coordinator (e.g. relay-visible endpoint)",
+ cxxopts::value<std::string>(m_ServerOptions.AnnounceUrl)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
"idms",
"Enable IDMS cloud detection; optionally specify a custom probe endpoint",
cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"),
@@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"),
"");
+ Options.add_option("compute",
+ "",
+ "provision-clean",
+ "Pass --clean to provisioned worker instances so they wipe state on startup",
+ cxxopts::value<bool>(m_ServerOptions.ProvisionClean)->default_value("false"),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "provision-tracehost",
+ "Pass --tracehost to provisioned worker instances for remote trace collection",
+ cxxopts::value<std::string>(m_ServerOptions.ProvisionTraceHost)->default_value(""),
+ "");
+
# if ZEN_WITH_HORDE
// Horde provisioning options
Options.add_option("horde",
@@ -139,6 +169,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("horde",
"",
+ "horde-drain-grace-period",
+ "Grace period in seconds for draining agents before force-kill",
+ cxxopts::value<int>(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"),
+ "");
+
+ Options.add_option("horde",
+ "",
"horde-host",
"Host address for Horde agents to connect back to",
cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""),
@@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Port number for Zen service communication",
cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"),
"");
+
+ Options.add_option("horde",
+ "",
+ "horde-oidctoken-exe-path",
+ "Path to OidcToken executable for automatic Horde authentication",
+ cxxopts::value<std::string>(m_HordeOidcTokenExePath)->default_value(""),
+ "");
# endif
# if ZEN_WITH_NOMAD
@@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions()
# if ZEN_WITH_HORDE
horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr);
horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr);
+
+ // Set up OidcToken-based authentication if no static token was provided
+ if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty())
+ {
+ std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath);
+ if (!OidcExePath.empty())
+ {
+ ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath);
+ auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath,
+ m_ServerOptions.HordeConfig.ServerUrl,
+ /*Quiet=*/true,
+ /*Unattended=*/false,
+ /*Hidden=*/true,
+ /*IsHordeUrl=*/true);
+ if (Provider)
+ {
+ m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider);
+ }
+ else
+ {
+ ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated");
+ }
+ }
+ }
# endif
# if ZEN_WITH_NOMAD
@@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ
}
m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint;
+ m_CoordinatorSession = ServerConfig.CoordinatorSession;
+ m_AnnounceUrl = ServerConfig.AnnounceUrl;
m_InstanceId = ServerConfig.InstanceId;
m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket;
@@ -379,7 +449,14 @@ ZenComputeServer::Cleanup()
m_AnnounceTimer.cancel();
# if ZEN_WITH_HORDE
- // Shut down Horde provisioner first — this signals all agent threads
+ // Disconnect the provisioner state provider before destroying the
+ // provisioner so the orchestrator HTTP layer cannot call into it.
+ if (m_OrchestratorService)
+ {
+ m_OrchestratorService->SetProvisionerStateProvider(nullptr);
+ }
+
+ // Shut down Horde provisioner — this signals all agent threads
// to exit and joins them before we tear down HTTP services.
m_HordeProvisioner.reset();
# endif
@@ -482,6 +559,7 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
m_StatsService,
ServerConfig.DataDir / "functions",
ServerConfig.MaxConcurrentActions);
+ m_ComputeService->SetShutdownCallback([this] { RequestExit(0); });
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
@@ -506,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
OrchestratorEndpoint << '/';
}
- m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint);
+ m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
}
}
# endif
@@ -537,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
: std::filesystem::path(HordeConfig.BinariesPath);
std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde";
- m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint);
+ m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig,
+ BinariesPath,
+ WorkingDir,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
+ m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get());
}
}
# endif
@@ -565,6 +654,10 @@ ZenComputeServer::GetInstanceId() const
std::string
ZenComputeServer::GetAnnounceUrl() const
{
+ if (!m_AnnounceUrl.empty())
+ {
+ return m_AnnounceUrl;
+ }
return m_Http->GetServiceUri(nullptr);
}
@@ -635,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody()
<< "nomad";
}
+ if (!m_CoordinatorSession.empty())
+ {
+ AnnounceBody << "coordinator_session" << m_CoordinatorSession;
+ }
+
ResolveCloudMetadata();
if (m_CloudMetadata)
{
@@ -781,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick()
# if ZEN_WITH_HORDE
if (m_HordeProvisioner)
{
- m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX);
+ // Re-apply current target to spawn agent threads for any that have
+ // exited since the last tick, without overwriting a user-set target.
auto Stats = m_HordeProvisioner->GetStats();
+ m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount);
ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}",
Stats.TargetCoreCount,
Stats.EstimatedCoreCount,
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
index 38f93bc36..63db7e9b3 100644
--- a/src/zenserver/compute/computeserver.h
+++ b/src/zenserver/compute/computeserver.h
@@ -49,9 +49,13 @@ struct ZenComputeServerConfig : public ZenServerConfig
std::string UpstreamNotificationEndpoint;
std::string InstanceId; // For use in notifications
std::string CoordinatorEndpoint;
+ std::string CoordinatorSession; ///< Session ID for stale-instance rejection
+ std::string AnnounceUrl; ///< Override for self-announced URL (e.g. relay-visible endpoint)
std::string IdmsEndpoint;
int32_t MaxConcurrentActions = 0; // 0 = auto (LogicalProcessorCount * 2)
bool EnableWorkerWebSocket = false; // Use WebSocket for worker↔orchestrator link
+ bool ProvisionClean = false; // Pass --clean to provisioned workers
+ std::string ProvisionTraceHost; // Pass --tracehost to provisioned workers
# if ZEN_WITH_HORDE
horde::HordeConfig HordeConfig;
@@ -84,6 +88,7 @@ private:
# if ZEN_WITH_HORDE
std::string m_HordeModeStr = "direct";
std::string m_HordeEncryptionStr = "none";
+ std::string m_HordeOidcTokenExePath;
# endif
# if ZEN_WITH_NOMAD
@@ -147,6 +152,8 @@ private:
# endif
SystemMetricsTracker m_MetricsTracker;
std::string m_CoordinatorEndpoint;
+ std::string m_CoordinatorSession;
+ std::string m_AnnounceUrl;
std::string m_InstanceId;
asio::steady_timer m_AnnounceTimer{m_IoContext};