From 87248f58b4551870af8f08aac3e38e8887e32073 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 13 Apr 2026 12:57:51 +0200 Subject: Add MemoryCidStore and ChunkStore interface (#940) This PR introduces an in-memory `CidStore` option primarily for use with compute, to avoid hitting disk for ephemeral data which is not really worth persisting. And in particular not worth paying the critical path cost of persistence. - **MemoryCidStore**: In-memory CidStore implementation backed by a hash map, optionally layered over a standard CidStore. Writes to the backing store are dispatched asynchronously via a dedicated flush thread to avoid blocking callers on disk I/O. Reads check memory first, then fall back to the backing store without caching the result. - **ChunkStore interface**: Extract `ChunkStore` abstract class (`AddChunk`, `ContainsChunk`, `FilterChunks`) and `FallbackChunkResolver` into `zenstore.h` so `HttpComputeService` can accept different storage backends for action inputs vs worker binaries. `CidStore` and `MemoryCidStore` both implement `ChunkStore`. - **Compute service wiring**: `HttpComputeService` takes two `ChunkStore&` params (action + worker). The compute server uses `MemoryCidStore` for actions (no disk persistence needed) and disk-backed `CidStore` for workers (cross-action reuse). The storage server passes its `CidStore` for both (unchanged behavior). --- src/zenserver/compute/computeserver.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 1673cea6c..7296098e0 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -444,11 +444,12 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) ZEN_TRACE_CPU("ZenComputeServer::InitializeServices"); ZEN_INFO("initializing compute services"); - CidStoreConfiguration Config; - Config.RootDirectory = m_DataRoot / "cas"; + m_ActionStore = std::make_unique(); - m_CidStore = std::make_unique(m_GcManager); - m_CidStore->Initialize(Config); + CidStoreConfiguration WorkerStoreConfig; + WorkerStoreConfig.RootDirectory = m_DataRoot / "cas"; + m_WorkerStore = std::make_unique(m_GcManager); + m_WorkerStore->Initialize(WorkerStoreConfig); if (!ServerConfig.IdmsEndpoint.empty()) { @@ -476,7 +477,8 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) std::make_unique(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket); ZEN_INFO("instantiating function service"); - m_ComputeService = std::make_unique(*m_CidStore, + m_ComputeService = std::make_unique(*m_ActionStore, + *m_WorkerStore, m_StatsService, ServerConfig.DataDir / "functions", ServerConfig.MaxConcurrentActions); -- cgit v1.2.3 From 795345e5fd7974a1f5227d507a58bb3ed75eafd5 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 13 Apr 2026 16:38:16 +0200 Subject: Compute OIDC auth, async Horde agents, and orchestrator improvements (#913) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows --- src/zenserver/compute/computeserver.cpp | 108 ++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 7296098e0..8cd8b4cfe 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -22,6 +22,8 @@ # if ZEN_WITH_HORDE # include # include +# include +# include # endif # if ZEN_WITH_NOMAD # include @@ -65,6 +67,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.CoordinatorEndpoint)->default_value(""), ""); + Options.add_option("compute", + "", + "coordinator-session", + "Session ID of the orchestrator (for stale-instance rejection)", + cxxopts::value(m_ServerOptions.CoordinatorSession)->default_value(""), + ""); + + Options.add_option("compute", + "", + "announce-url", + "Override URL announced to the coordinator (e.g. relay-visible endpoint)", + cxxopts::value(m_ServerOptions.AnnounceUrl)->default_value(""), + ""); + Options.add_option("compute", "", "idms", @@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"), ""); + Options.add_option("compute", + "", + "provision-clean", + "Pass --clean to provisioned worker instances so they wipe state on startup", + cxxopts::value(m_ServerOptions.ProvisionClean)->default_value("false"), + ""); + + Options.add_option("compute", + "", + "provision-tracehost", + "Pass --tracehost to provisioned worker instances for remote trace collection", + cxxopts::value(m_ServerOptions.ProvisionTraceHost)->default_value(""), + ""); + # if ZEN_WITH_HORDE // Horde provisioning options Options.add_option("horde", @@ -137,6 +167,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) cxxopts::value(m_ServerOptions.HordeConfig.MaxCores)->default_value("2048"), ""); + Options.add_option("horde", + "", + "horde-drain-grace-period", + "Grace period in seconds for draining agents before force-kill", + cxxopts::value(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"), + ""); + Options.add_option("horde", "", "horde-host", @@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Port number for Zen service communication", cxxopts::value(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"), ""); + + Options.add_option("horde", + "", + "horde-oidctoken-exe-path", + "Path to OidcToken executable for automatic Horde authentication", + cxxopts::value(m_HordeOidcTokenExePath)->default_value(""), + ""); # endif # if ZEN_WITH_NOMAD @@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions() # if ZEN_WITH_HORDE horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr); horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr); + + // Set up OidcToken-based authentication if no static token was provided + if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty()) + { + std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath); + if (!OidcExePath.empty()) + { + ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath); + auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath, + m_ServerOptions.HordeConfig.ServerUrl, + /*Quiet=*/true, + /*Unattended=*/false, + /*Hidden=*/true, + /*IsHordeUrl=*/true); + if (Provider) + { + m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider); + } + else + { + ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated"); + } + } + } # endif # if ZEN_WITH_NOMAD @@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ } m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint; + m_CoordinatorSession = ServerConfig.CoordinatorSession; + m_AnnounceUrl = ServerConfig.AnnounceUrl; m_InstanceId = ServerConfig.InstanceId; m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket; @@ -379,7 +449,14 @@ ZenComputeServer::Cleanup() m_AnnounceTimer.cancel(); # if ZEN_WITH_HORDE - // Shut down Horde provisioner first — this signals all agent threads + // Disconnect the provisioner state provider before destroying the + // provisioner so the orchestrator HTTP layer cannot call into it. + if (m_OrchestratorService) + { + m_OrchestratorService->SetProvisionerStateProvider(nullptr); + } + + // Shut down Horde provisioner — this signals all agent threads // to exit and joins them before we tear down HTTP services. m_HordeProvisioner.reset(); # endif @@ -482,6 +559,7 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) m_StatsService, ServerConfig.DataDir / "functions", ServerConfig.MaxConcurrentActions); + m_ComputeService->SetShutdownCallback([this] { RequestExit(0); }); m_FrontendService = std::make_unique(m_ContentRoot, m_StatsService, m_StatusService); @@ -506,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) OrchestratorEndpoint << '/'; } - m_NomadProvisioner = std::make_unique(NomadCfg, OrchestratorEndpoint); + m_NomadProvisioner = std::make_unique(NomadCfg, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); } } # endif @@ -537,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) : std::filesystem::path(HordeConfig.BinariesPath); std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde"; - m_HordeProvisioner = std::make_unique(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint); + m_HordeProvisioner = std::make_unique(HordeConfig, + BinariesPath, + WorkingDir, + OrchestratorEndpoint, + m_OrchestratorService->GetSessionId().ToString(), + ServerConfig.ProvisionClean, + ServerConfig.ProvisionTraceHost); + m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get()); } } # endif @@ -565,6 +654,10 @@ ZenComputeServer::GetInstanceId() const std::string ZenComputeServer::GetAnnounceUrl() const { + if (!m_AnnounceUrl.empty()) + { + return m_AnnounceUrl; + } return m_Http->GetServiceUri(nullptr); } @@ -635,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody() << "nomad"; } + if (!m_CoordinatorSession.empty()) + { + AnnounceBody << "coordinator_session" << m_CoordinatorSession; + } + ResolveCloudMetadata(); if (m_CloudMetadata) { @@ -781,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick() # if ZEN_WITH_HORDE if (m_HordeProvisioner) { - m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); + // Re-apply current target to spawn agent threads for any that have + // exited since the last tick, without overwriting a user-set target. auto Stats = m_HordeProvisioner->GetStats(); + m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount); ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}", Stats.TargetCoreCount, Stats.EstimatedCoreCount, -- cgit v1.2.3 From 3d59b5d7036c35fe484d052ff32dbdc9d0a75cf7 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 13 Apr 2026 19:17:09 +0200 Subject: fix utf characters in source code (#953) --- src/zenserver/compute/computeserver.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 8cd8b4cfe..f35fe0f97 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -456,13 +456,13 @@ ZenComputeServer::Cleanup() m_OrchestratorService->SetProvisionerStateProvider(nullptr); } - // Shut down Horde provisioner — this signals all agent threads + // Shut down Horde provisioner - this signals all agent threads // to exit and joins them before we tear down HTTP services. m_HordeProvisioner.reset(); # endif # if ZEN_WITH_NOMAD - // Shut down Nomad provisioner — stops the management thread and + // Shut down Nomad provisioner - stops the management thread and // sends stop requests for all tracked jobs. m_NomadProvisioner.reset(); # endif @@ -989,7 +989,7 @@ ZenComputeServer::Run() InitializeOrchestratorWebSocket(); # if ZEN_WITH_HORDE - // Start Horde provisioning if configured — request maximum allowed cores. + // Start Horde provisioning if configured - request maximum allowed cores. // SetTargetCoreCount clamps to HordeConfig::MaxCores internally. if (m_HordeProvisioner) { @@ -1001,7 +1001,7 @@ ZenComputeServer::Run() # endif # if ZEN_WITH_NOMAD - // Start Nomad provisioning if configured — request maximum allowed cores. + // Start Nomad provisioning if configured - request maximum allowed cores. // SetTargetCoreCount clamps to NomadConfig::MaxCores internally. if (m_NomadProvisioner) { -- cgit v1.2.3 From 5a48e941b6f7e41ff6f0e86e6999f8b0a15d5c5b Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 15 Apr 2026 19:12:44 +0200 Subject: add sessions to hub and proxy (#960) * move session service to zenserver base class and make it available in all zenserver modes * fix deadlock in sessionsclient shutdown --- src/zenserver/compute/computeserver.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index f35fe0f97..b110f7538 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -496,12 +496,12 @@ ZenComputeServer::Cleanup() m_IoRunner.join(); } - ShutdownServices(); - if (m_Http) { m_Http->Close(); } + + ShutdownServices(); } catch (const std::exception& Ex) { @@ -984,6 +984,8 @@ ZenComputeServer::Run() OnReady(); + StartSelfSession("zencompute"); + PostAnnounce(); EnqueueAnnounceTimer(); InitializeOrchestratorWebSocket(); -- cgit v1.2.3