diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 16:38:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 16:38:16 +0200 |
| commit | 795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch) | |
| tree | 7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zenserver/compute/computeserver.cpp | |
| parent | 5.8.4-pre2 (diff) | |
| download | zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip | |
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control.
### Async Horde Agent Rewrite
- Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool
- Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport`
- Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers
- Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed)
### Provisioner Drain / Scale-Down
- `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain`
- Configurable `--horde-drain-grace-period` (default 300s) before force-kill
- Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer
- Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers
### OIDC Authentication
- `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token`
- Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode
- New `--horde-oidctoken-exe-path` CLI option for explicit path override
### Orchestrator & Scheduler
- Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers
- New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment
- Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock
- Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings
- New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions)
- New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target`
### Remote Execution
- Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits
- Track in-flight submissions to prevent over-queuing
- Show remote runner hostname in `GetDisplayName()`
- `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address)
### Frontend Dashboard
- Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules
- Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count
- Editable target-cores input with debounced POST to `/orch/provisioner/target`
- Per-agent provisioning status badges (active / draining / deallocated) in the agents table
- Active vs total CPU counts in agents summary row
### CLI
- New `zen compute record-start` / `record-stop` subcommands
- `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet)
### Other
- `DataDir` supports environment variable expansion
- Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories
- Linux/Mac runners `nice(5)` child processes to avoid starving the main server
- `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset`
- Curl HTTP client logs effective URL on failure
- `MachineInfo` carries `Pool` and `Mode` from Horde response
- Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zenserver/compute/computeserver.cpp')
| -rw-r--r-- | src/zenserver/compute/computeserver.cpp | 108 |
1 files changed, 104 insertions, 4 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 7296098e0..8cd8b4cfe 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -22,6 +22,8 @@ # if ZEN_WITH_HORDE # include <zenhorde/hordeconfig.h> # include <zenhorde/hordeprovisioner.h> +# include <zenhttp/httpclientauth.h> +# include <zenutil/authutils.h> # endif # if ZEN_WITH_NOMAD # include <zennomad/nomadconfig.h> @@ -67,6 +69,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("compute", "", + "coordinator-session", + "Session ID of the orchestrator (for stale-instance rejection)", + cxxopts::value<std::string>(m_ServerOptions.CoordinatorSession)->default_value(""), + ""); + + Options.add_option("compute", + "", + "announce-url", + "Override URL announced to the coordinator (e.g. relay-visible endpoint)", + cxxopts::value<std::string>(m_ServerOptions.AnnounceUrl)->default_value(""), + ""); + + Options.add_option("compute", + "", "idms", "Enable IDMS cloud detection; optionally specify a custom probe endpoint", cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"), @@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"), ""); + Options.add_option("compute", + "", + "provision-clean", + "Pass --clean to provisioned worker instances so they wipe state on startup", + cxxopts::value<bool>(m_ServerOptions.ProvisionClean)->default_value("false"), + ""); + + Options.add_option("compute", + "", + "provision-tracehost", + "Pass --tracehost to provisioned worker instances for remote trace collection", + cxxopts::value<std::string>(m_ServerOptions.ProvisionTraceHost)->default_value(""), + ""); + # if ZEN_WITH_HORDE // Horde provisioning options Options.add_option("horde", @@ -139,6 +169,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) Options.add_option("horde", "", + "horde-drain-grace-period", + "Grace period in seconds for draining agents before force-kill", + cxxopts::value<int>(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"), + ""); + + Options.add_option("horde", + "", "horde-host", "Host address for Horde agents to connect back to", cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""), @@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Port number for Zen service communication", cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"), ""); + + Options.add_option("horde", + "", + "horde-oidctoken-exe-path", + "Path to OidcToken executable for automatic Horde authentication", + cxxopts::value<std::string>(m_HordeOidcTokenExePath)->default_value(""), + ""); # endif # if ZEN_WITH_NOMAD @@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions() # if ZEN_WITH_HORDE horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr); horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr); + + // Set up OidcToken-based authentication if no static token was provided + if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty()) + { + std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath); + if (!OidcExePath.empty()) + { + ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath); + auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath, + m_ServerOptions.HordeConfig.ServerUrl, + /*Quiet=*/true, + /*Unattended=*/false, + /*Hidden=*/true, + /*IsHordeUrl=*/true); + if (Provider) + { + m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider); + } + else + { + ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated"); + } + } + } # endif # if ZEN_WITH_NOMAD @@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ } m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint; + m_CoordinatorSession = ServerConfig.CoordinatorSession; + m_AnnounceUrl = ServerConfig.AnnounceUrl; m_InstanceId = ServerConfig.InstanceId; m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket; @@ -379,7 +449,14 @@ ZenComputeServer::Cleanup() m_AnnounceTimer.cancel(); # if ZEN_WITH_HORDE - // Shut down Horde provisioner first — this signals all agent threads + // Disconnect the provisioner state provider before destroying the + // provisioner so the orchestrator HTTP layer cannot call into it. + if (m_OrchestratorService) + { + m_OrchestratorService->SetProvisionerStateProvider(nullptr); + } + + // Shut down Horde provisioner — this signals all agent threads // to exit and joins them before we tear down HTTP services. m_HordeProvisioner.reset(); # endif @@ -482,6 +559,7 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) m_StatsService, ServerConfig.DataDir / "functions", ServerConfig.MaxConcurrentActions); + m_ComputeService->SetShutdownCallback([this] { RequestExit(0); }); m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService); @@ -506,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) OrchestratorEndpoint << '/'; } - m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint); + m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); } } # endif @@ -537,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) : std::filesystem::path(HordeConfig.BinariesPath); std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde"; - m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint); + m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, + BinariesPath, + WorkingDir, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); + m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get()); } } # endif @@ -565,6 +654,10 @@ ZenComputeServer::GetInstanceId() const std::string ZenComputeServer::GetAnnounceUrl() const { + if (!m_AnnounceUrl.empty()) + { + return m_AnnounceUrl; + } return m_Http->GetServiceUri(nullptr); } @@ -635,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody() << "nomad"; } + if (!m_CoordinatorSession.empty()) + { + AnnounceBody << "coordinator_session" << m_CoordinatorSession; + } + ResolveCloudMetadata(); if (m_CloudMetadata) { @@ -781,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick() # if ZEN_WITH_HORDE if (m_HordeProvisioner) { - m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); + // Re-apply current target to spawn agent threads for any that have + // exited since the last tick, without overwriting a user-set target. auto Stats = m_HordeProvisioner->GetStats(); + m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount); ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}", Stats.TargetCoreCount, Stats.EstimatedCoreCount, |