aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/computeserver.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-04 14:13:46 +0100
committerGitHub Enterprise <[email protected]>2026-03-04 14:13:46 +0100
commit0763d09a81e5a1d3df11763a7ec75e7860c9510a (patch)
tree074575ba6ea259044a179eab0bb396d37268fb09 /src/zenserver/compute/computeserver.cpp
parentnative xmake toolchain definition for UE-clang (#805) (diff)
downloadzen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.tar.xz
zen-0763d09a81e5a1d3df11763a7ec75e7860c9510a.zip
compute orchestration (#763)
- Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default
Diffstat (limited to 'src/zenserver/compute/computeserver.cpp')
-rw-r--r--src/zenserver/compute/computeserver.cpp725
1 files changed, 708 insertions, 17 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index 0f9ef0287..802d06caf 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -1,9 +1,9 @@
// Copyright Epic Games, Inc. All Rights Reserved.
#include "computeserver.h"
-#include <zencompute/httpfunctionservice.h>
-#include "computeservice.h"
-
+#include <zencompute/cloudmetadata.h>
+#include <zencompute/httpcomputeservice.h>
+#include <zencompute/httporchestrator.h>
#if ZEN_WITH_COMPUTE_SERVICES
# include <zencore/fmtutils.h>
@@ -13,10 +13,20 @@
# include <zencore/scopeguard.h>
# include <zencore/sentryintegration.h>
# include <zencore/system.h>
+# include <zencore/compactbinarybuilder.h>
# include <zencore/windows.h>
+# include <zenhttp/httpclient.h>
# include <zenhttp/httpapiservice.h>
# include <zenstore/cidstore.h>
# include <zenutil/service.h>
+# if ZEN_WITH_HORDE
+# include <zenhorde/hordeconfig.h>
+# include <zenhorde/hordeprovisioner.h>
+# endif
+# if ZEN_WITH_NOMAD
+# include <zennomad/nomadconfig.h>
+# include <zennomad/nomadprovisioner.h>
+# endif
ZEN_THIRD_PARTY_INCLUDES_START
# include <cxxopts.hpp>
@@ -29,6 +39,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
{
Options.add_option("compute",
"",
+ "max-actions",
+ "Maximum number of concurrent local actions (0 = auto)",
+ cxxopts::value<int32_t>(m_ServerOptions.MaxConcurrentActions)->default_value("0"),
+ "");
+
+ Options.add_option("compute",
+ "",
"upstream-notification-endpoint",
"Endpoint URL for upstream notifications",
cxxopts::value<std::string>(m_ServerOptions.UpstreamNotificationEndpoint)->default_value(""),
@@ -40,6 +57,236 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Instance ID for use in notifications",
cxxopts::value<std::string>(m_ServerOptions.InstanceId)->default_value(""),
"");
+
+ Options.add_option("compute",
+ "",
+ "coordinator-endpoint",
+ "Endpoint URL for coordinator service",
+ cxxopts::value<std::string>(m_ServerOptions.CoordinatorEndpoint)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "idms",
+ "Enable IDMS cloud detection; optionally specify a custom probe endpoint",
+ cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "worker-websocket",
+ "Use WebSocket for worker-orchestrator link (instant reachability detection)",
+ cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"),
+ "");
+
+# if ZEN_WITH_HORDE
+ // Horde provisioning options
+ Options.add_option("horde",
+ "",
+ "horde-enabled",
+ "Enable Horde worker provisioning",
+ cxxopts::value<bool>(m_ServerOptions.HordeConfig.Enabled)->default_value("false"),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-server",
+ "Horde server URL",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.ServerUrl)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-token",
+ "Horde authentication token",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.AuthToken)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-pool",
+ "Horde pool name",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.Pool)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-cluster",
+ "Horde cluster ID ('default' or '_auto' for auto-resolve)",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.Cluster)->default_value("default"),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-mode",
+ "Horde connection mode (direct, tunnel, relay)",
+ cxxopts::value<std::string>(m_HordeModeStr)->default_value("direct"),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-encryption",
+ "Horde transport encryption (none, aes)",
+ cxxopts::value<std::string>(m_HordeEncryptionStr)->default_value("none"),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-max-cores",
+ "Maximum number of Horde cores to provision",
+ cxxopts::value<int>(m_ServerOptions.HordeConfig.MaxCores)->default_value("2048"),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-host",
+ "Host address for Horde agents to connect back to",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-condition",
+ "Additional Horde agent filter condition",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.Condition)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-binaries",
+ "Path to directory containing zenserver binary for remote upload",
+ cxxopts::value<std::string>(m_ServerOptions.HordeConfig.BinariesPath)->default_value(""),
+ "");
+
+ Options.add_option("horde",
+ "",
+ "horde-zen-service-port",
+ "Port number for Zen service communication",
+ cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"),
+ "");
+# endif
+
+# if ZEN_WITH_NOMAD
+ // Nomad provisioning options
+ Options.add_option("nomad",
+ "",
+ "nomad-enabled",
+ "Enable Nomad worker provisioning",
+ cxxopts::value<bool>(m_ServerOptions.NomadConfig.Enabled)->default_value("false"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-server",
+ "Nomad HTTP API URL",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.ServerUrl)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-token",
+ "Nomad ACL token",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.AclToken)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-datacenter",
+ "Nomad target datacenter",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.Datacenter)->default_value("dc1"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-namespace",
+ "Nomad namespace",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.Namespace)->default_value("default"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-region",
+ "Nomad region (empty for server default)",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.Region)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-driver",
+ "Nomad task driver (raw_exec, docker)",
+ cxxopts::value<std::string>(m_NomadDriverStr)->default_value("raw_exec"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-distribution",
+ "Binary distribution mode (predeployed, artifact)",
+ cxxopts::value<std::string>(m_NomadDistributionStr)->default_value("predeployed"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-binary-path",
+ "Path to zenserver on Nomad clients (predeployed mode)",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.BinaryPath)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-artifact-source",
+ "URL to download zenserver binary (artifact mode)",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.ArtifactSource)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-docker-image",
+ "Docker image for zenserver (docker driver)",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.DockerImage)->default_value(""),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-max-jobs",
+ "Maximum concurrent Nomad jobs",
+ cxxopts::value<int>(m_ServerOptions.NomadConfig.MaxJobs)->default_value("64"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-cpu-mhz",
+ "CPU MHz allocated per Nomad task",
+ cxxopts::value<int>(m_ServerOptions.NomadConfig.CpuMhz)->default_value("1000"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-memory-mb",
+ "Memory MB allocated per Nomad task",
+ cxxopts::value<int>(m_ServerOptions.NomadConfig.MemoryMb)->default_value("2048"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-cores-per-job",
+ "Estimated cores per Nomad job (for scaling)",
+ cxxopts::value<int>(m_ServerOptions.NomadConfig.CoresPerJob)->default_value("32"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-max-cores",
+ "Maximum total cores to provision via Nomad",
+ cxxopts::value<int>(m_ServerOptions.NomadConfig.MaxCores)->default_value("2048"),
+ "");
+
+ Options.add_option("nomad",
+ "",
+ "nomad-job-prefix",
+ "Prefix for generated Nomad job IDs",
+ cxxopts::value<std::string>(m_ServerOptions.NomadConfig.JobPrefix)->default_value("zenserver-worker"),
+ "");
+# endif
}
void
@@ -63,6 +310,15 @@ ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions)
void
ZenComputeServerConfigurator::ValidateOptions()
{
+# if ZEN_WITH_HORDE
+ horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr);
+ horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr);
+# endif
+
+# if ZEN_WITH_NOMAD
+ nomad::FromString(m_ServerOptions.NomadConfig.TaskDriver, m_NomadDriverStr);
+ nomad::FromString(m_ServerOptions.NomadConfig.BinDistribution, m_NomadDistributionStr);
+# endif
}
///////////////////////////////////////////////////////////////////////////
@@ -90,10 +346,14 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ
return EffectiveBasePort;
}
+ m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint;
+ m_InstanceId = ServerConfig.InstanceId;
+ m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket;
+
// This is a workaround to make sure we can have automated tests. Without
// this the ranges for different child zen compute processes could overlap with
// the main test range.
- ZenServerEnvironment::SetBaseChildId(1000);
+ ZenServerEnvironment::SetBaseChildId(2000);
m_DebugOptionForcedCrash = ServerConfig.ShouldCrash;
@@ -113,6 +373,46 @@ ZenComputeServer::Cleanup()
ZEN_INFO(ZEN_APP_NAME " cleaning up");
try
{
+ // Cancel the maintenance timer so it stops re-enqueuing before we
+ // tear down the provisioners it references.
+ m_ProvisionerMaintenanceTimer.cancel();
+ m_AnnounceTimer.cancel();
+
+# if ZEN_WITH_HORDE
+ // Shut down Horde provisioner first — this signals all agent threads
+ // to exit and joins them before we tear down HTTP services.
+ m_HordeProvisioner.reset();
+# endif
+
+# if ZEN_WITH_NOMAD
+ // Shut down Nomad provisioner — stops the management thread and
+ // sends stop requests for all tracked jobs.
+ m_NomadProvisioner.reset();
+# endif
+
+ // Close the orchestrator WebSocket client before stopping the io_context
+ m_WsReconnectTimer.cancel();
+ if (m_OrchestratorWsClient)
+ {
+ m_OrchestratorWsClient->Close();
+ m_OrchestratorWsClient.reset();
+ }
+ m_OrchestratorWsHandler.reset();
+
+ ResolveCloudMetadata();
+ m_CloudMetadata.reset();
+
+ // Shut down services that own threads or use the io_context before we
+ // stop the io_context and close the HTTP server.
+ if (m_OrchestratorService)
+ {
+ m_OrchestratorService->Shutdown();
+ }
+ if (m_ComputeService)
+ {
+ m_ComputeService->Shutdown();
+ }
+
m_IoContext.stop();
if (m_IoRunner.joinable())
{
@@ -139,7 +439,8 @@ ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig)
void
ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
{
- ZEN_INFO("initializing storage");
+ ZEN_TRACE_CPU("ZenComputeServer::InitializeServices");
+ ZEN_INFO("initializing compute services");
CidStoreConfiguration Config;
Config.RootDirectory = m_DataRoot / "cas";
@@ -147,46 +448,405 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
m_CidStore = std::make_unique<CidStore>(m_GcManager);
m_CidStore->Initialize(Config);
+ if (!ServerConfig.IdmsEndpoint.empty())
+ {
+ ZEN_INFO("detecting cloud environment (async)");
+ if (ServerConfig.IdmsEndpoint == "auto")
+ {
+ m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir] {
+ return std::make_unique<zen::compute::CloudMetadata>(DataDir / "cloud");
+ });
+ }
+ else
+ {
+ ZEN_INFO("using custom IDMS endpoint: {}", ServerConfig.IdmsEndpoint);
+ m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir, Endpoint = ServerConfig.IdmsEndpoint] {
+ return std::make_unique<zen::compute::CloudMetadata>(DataDir / "cloud", Endpoint);
+ });
+ }
+ }
+
ZEN_INFO("instantiating API service");
m_ApiService = std::make_unique<zen::HttpApiService>(*m_Http);
- ZEN_INFO("instantiating compute service");
- m_ComputeService = std::make_unique<HttpComputeService>(ServerConfig.DataDir / "compute");
+ ZEN_INFO("instantiating orchestrator service");
+ m_OrchestratorService =
+ std::make_unique<zen::compute::HttpOrchestratorService>(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket);
+
+ ZEN_INFO("instantiating function service");
+ m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_CidStore,
+ m_StatsService,
+ ServerConfig.DataDir / "functions",
+ ServerConfig.MaxConcurrentActions);
- // Ref<zen::compute::FunctionRunner> Runner;
- // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner");
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
- // TODO: (re)implement default configuration here
+# if ZEN_WITH_NOMAD
+ // Nomad provisioner
+ if (ServerConfig.NomadConfig.Enabled && !ServerConfig.NomadConfig.ServerUrl.empty())
+ {
+ ZEN_INFO("instantiating Nomad provisioner (server: {})", ServerConfig.NomadConfig.ServerUrl);
- ZEN_INFO("instantiating function service");
- m_FunctionService =
- std::make_unique<zen::compute::HttpFunctionService>(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions");
+ const auto& NomadCfg = ServerConfig.NomadConfig;
+
+ if (!NomadCfg.Validate())
+ {
+ ZEN_ERROR("invalid Nomad configuration");
+ }
+ else
+ {
+ ExtendableStringBuilder<256> OrchestratorEndpoint;
+ OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get());
+ if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/')
+ {
+ OrchestratorEndpoint << '/';
+ }
+
+ m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint);
+ }
+ }
+# endif
+
+# if ZEN_WITH_HORDE
+ // Horde provisioner
+ if (ServerConfig.HordeConfig.Enabled && !ServerConfig.HordeConfig.ServerUrl.empty())
+ {
+ ZEN_INFO("instantiating Horde provisioner (server: {})", ServerConfig.HordeConfig.ServerUrl);
+
+ const auto& HordeConfig = ServerConfig.HordeConfig;
+
+ if (!HordeConfig.Validate())
+ {
+ ZEN_ERROR("invalid Horde configuration");
+ }
+ else
+ {
+ ExtendableStringBuilder<256> OrchestratorEndpoint;
+ OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get());
+ if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/')
+ {
+ OrchestratorEndpoint << '/';
+ }
+
+ // If no binaries path is specified, just use the running executable's directory
+ std::filesystem::path BinariesPath = HordeConfig.BinariesPath.empty() ? GetRunningExecutablePath().parent_path()
+ : std::filesystem::path(HordeConfig.BinariesPath);
+ std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde";
+
+ m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint);
+ }
+ }
+# endif
+}
+
+void
+ZenComputeServer::ResolveCloudMetadata()
+{
+ if (m_CloudMetadataFuture.valid())
+ {
+ m_CloudMetadata = m_CloudMetadataFuture.get();
+ }
+}
+
+std::string
+ZenComputeServer::GetInstanceId() const
+{
+ if (!m_InstanceId.empty())
+ {
+ return m_InstanceId;
+ }
+ return fmt::format("{}-{}", GetMachineName(), GetCurrentProcessId());
+}
+
+std::string
+ZenComputeServer::GetAnnounceUrl() const
+{
+ return m_Http->GetServiceUri(nullptr);
}
void
ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig)
{
+ ZEN_TRACE_CPU("ZenComputeServer::RegisterServices");
ZEN_UNUSED(ServerConfig);
+ m_Http->RegisterService(m_StatsService);
+
+ if (m_ApiService)
+ {
+ m_Http->RegisterService(*m_ApiService);
+ }
+
+ if (m_OrchestratorService)
+ {
+ m_Http->RegisterService(*m_OrchestratorService);
+ }
+
if (m_ComputeService)
{
m_Http->RegisterService(*m_ComputeService);
}
- if (m_ApiService)
+ if (m_FrontendService)
{
- m_Http->RegisterService(*m_ApiService);
+ m_Http->RegisterService(*m_FrontendService);
+ }
+}
+
+CbObject
+ZenComputeServer::BuildAnnounceBody()
+{
+ CbObjectWriter AnnounceBody;
+ AnnounceBody << "id" << GetInstanceId();
+ AnnounceBody << "uri" << GetAnnounceUrl();
+ AnnounceBody << "hostname" << GetMachineName();
+ AnnounceBody << "platform" << GetRuntimePlatformName();
+
+ ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query());
+
+ AnnounceBody.BeginObject("metrics");
+ Describe(Sm, AnnounceBody);
+ AnnounceBody.EndObject();
+
+ AnnounceBody << "cpu_usage" << Sm.CpuUsagePercent;
+ AnnounceBody << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024;
+ AnnounceBody << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024;
+
+ AnnounceBody << "bytes_received" << m_Http->GetTotalBytesReceived();
+ AnnounceBody << "bytes_sent" << m_Http->GetTotalBytesSent();
+
+ auto Actions = m_ComputeService->GetActionCounts();
+ AnnounceBody << "actions_pending" << Actions.Pending;
+ AnnounceBody << "actions_running" << Actions.Running;
+ AnnounceBody << "actions_completed" << Actions.Completed;
+ AnnounceBody << "active_queues" << Actions.ActiveQueues;
+
+ // Derive provisioner from instance ID prefix (e.g. "horde-xxx" or "nomad-xxx")
+ if (m_InstanceId.starts_with("horde-"))
+ {
+ AnnounceBody << "provisioner"
+ << "horde";
+ }
+ else if (m_InstanceId.starts_with("nomad-"))
+ {
+ AnnounceBody << "provisioner"
+ << "nomad";
+ }
+
+ ResolveCloudMetadata();
+ if (m_CloudMetadata)
+ {
+ m_CloudMetadata->Describe(AnnounceBody);
+ }
+
+ return AnnounceBody.Save();
+}
+
+void
+ZenComputeServer::PostAnnounce()
+{
+ ZEN_TRACE_CPU("ZenComputeServer::PostAnnounce");
+
+ if (!m_ComputeService || m_CoordinatorEndpoint.empty())
+ {
+ return;
+ }
+
+ ZEN_INFO("notifying coordinator at '{}' of our availability at '{}'", m_CoordinatorEndpoint, GetAnnounceUrl());
+
+ try
+ {
+ CbObject Body = BuildAnnounceBody();
+
+ // If we have an active WebSocket connection, send via that instead of HTTP POST
+ if (m_OrchestratorWsClient && m_OrchestratorWsClient->IsOpen())
+ {
+ MemoryView View = Body.GetView();
+ m_OrchestratorWsClient->SendBinary(std::span<const uint8_t>(reinterpret_cast<const uint8_t*>(View.GetData()), View.GetSize()));
+ ZEN_INFO("announced to coordinator via WebSocket");
+ return;
+ }
+
+ HttpClient CoordinatorHttp(m_CoordinatorEndpoint);
+ HttpClient::Response Result = CoordinatorHttp.Post("announce", std::move(Body));
+
+ if (Result.Error)
+ {
+ ZEN_ERROR("failed to notify coordinator at '{}': HTTP error {} - {}",
+ m_CoordinatorEndpoint,
+ Result.Error->ErrorCode,
+ Result.Error->ErrorMessage);
+ }
+ else if (!IsHttpOk(Result.StatusCode))
+ {
+ ZEN_ERROR("failed to notify coordinator at '{}': unexpected HTTP status code {}",
+ m_CoordinatorEndpoint,
+ static_cast<int>(Result.StatusCode));
+ }
+ else
+ {
+ ZEN_INFO("successfully notified coordinator at '{}'", m_CoordinatorEndpoint);
+ }
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("failed to notify coordinator at '{}': {}", m_CoordinatorEndpoint, Ex.what());
+ }
+}
+
+void
+ZenComputeServer::EnqueueAnnounceTimer()
+{
+ if (!m_ComputeService || m_CoordinatorEndpoint.empty())
+ {
+ return;
+ }
+
+ m_AnnounceTimer.expires_after(std::chrono::seconds(15));
+ m_AnnounceTimer.async_wait([this](const asio::error_code& Ec) {
+ if (!Ec)
+ {
+ PostAnnounce();
+ EnqueueAnnounceTimer();
+ }
+ });
+ EnsureIoRunner();
+}
+
+void
+ZenComputeServer::InitializeOrchestratorWebSocket()
+{
+ if (!m_EnableWorkerWebSocket || m_CoordinatorEndpoint.empty())
+ {
+ return;
+ }
+
+ // Convert http://host:port → ws://host:port/orch/ws
+ std::string WsUrl = m_CoordinatorEndpoint;
+ if (WsUrl.starts_with("http://"))
+ {
+ WsUrl = "ws://" + WsUrl.substr(7);
+ }
+ else if (WsUrl.starts_with("https://"))
+ {
+ WsUrl = "wss://" + WsUrl.substr(8);
+ }
+ if (!WsUrl.empty() && WsUrl.back() != '/')
+ {
+ WsUrl += '/';
+ }
+ WsUrl += "orch/ws";
+
+ ZEN_INFO("establishing WebSocket link to orchestrator at {}", WsUrl);
+
+ m_OrchestratorWsHandler = std::make_unique<OrchestratorWsHandler>(*this);
+ m_OrchestratorWsClient =
+ std::make_unique<HttpWsClient>(WsUrl, *m_OrchestratorWsHandler, m_IoContext, HttpWsClientSettings{.LogCategory = "orch_ws"});
+
+ m_OrchestratorWsClient->Connect();
+ EnsureIoRunner();
+}
+
+void
+ZenComputeServer::EnqueueWsReconnect()
+{
+ m_WsReconnectTimer.expires_after(std::chrono::seconds(5));
+ m_WsReconnectTimer.async_wait([this](const asio::error_code& Ec) {
+ if (!Ec && m_OrchestratorWsClient)
+ {
+ ZEN_INFO("attempting WebSocket reconnect to orchestrator");
+ m_OrchestratorWsClient->Connect();
+ }
+ });
+ EnsureIoRunner();
+}
+
+void
+ZenComputeServer::OrchestratorWsHandler::OnWsOpen()
+{
+ ZEN_INFO("WebSocket link to orchestrator established");
+
+ // Send initial announce immediately over the WebSocket
+ Server.PostAnnounce();
+}
+
+void
+ZenComputeServer::OrchestratorWsHandler::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg)
+{
+ // Orchestrator does not push messages to workers; ignore
+}
+
+void
+ZenComputeServer::OrchestratorWsHandler::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_WARN("WebSocket link to orchestrator closed (code {}), falling back to HTTP announce", Code);
+
+ // Trigger an immediate HTTP announce so the orchestrator has fresh state,
+ // then schedule a reconnect attempt.
+ Server.PostAnnounce();
+ Server.EnqueueWsReconnect();
+}
+
+void
+ZenComputeServer::ProvisionerMaintenanceTick()
+{
+# if ZEN_WITH_HORDE
+ if (m_HordeProvisioner)
+ {
+ m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_HordeProvisioner->GetStats();
+ ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}",
+ Stats.TargetCoreCount,
+ Stats.EstimatedCoreCount,
+ Stats.ActiveCoreCount);
+ }
+# endif
+
+# if ZEN_WITH_NOMAD
+ if (m_NomadProvisioner)
+ {
+ m_NomadProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_NomadProvisioner->GetStats();
+ ZEN_DEBUG("Nomad maintenance: target={}, estimated={}, running jobs={}",
+ Stats.TargetCoreCount,
+ Stats.EstimatedCoreCount,
+ Stats.RunningJobCount);
}
+# endif
+}
+
+void
+ZenComputeServer::EnqueueProvisionerMaintenanceTimer()
+{
+ bool HasProvisioner = false;
+# if ZEN_WITH_HORDE
+ HasProvisioner = HasProvisioner || (m_HordeProvisioner != nullptr);
+# endif
+# if ZEN_WITH_NOMAD
+ HasProvisioner = HasProvisioner || (m_NomadProvisioner != nullptr);
+# endif
- if (m_FunctionService)
+ if (!HasProvisioner)
{
- m_Http->RegisterService(*m_FunctionService);
+ return;
}
+
+ m_ProvisionerMaintenanceTimer.expires_after(std::chrono::seconds(15));
+ m_ProvisionerMaintenanceTimer.async_wait([this](const asio::error_code& Ec) {
+ if (!Ec)
+ {
+ ProvisionerMaintenanceTick();
+ EnqueueProvisionerMaintenanceTimer();
+ }
+ });
+ EnsureIoRunner();
}
void
ZenComputeServer::Run()
{
+ ZEN_TRACE_CPU("ZenComputeServer::Run");
+
if (m_ProcessMonitor.IsActive())
{
CheckOwnerPid();
@@ -236,6 +896,35 @@ ZenComputeServer::Run()
OnReady();
+ PostAnnounce();
+ EnqueueAnnounceTimer();
+ InitializeOrchestratorWebSocket();
+
+# if ZEN_WITH_HORDE
+ // Start Horde provisioning if configured — request maximum allowed cores.
+ // SetTargetCoreCount clamps to HordeConfig::MaxCores internally.
+ if (m_HordeProvisioner)
+ {
+ ZEN_INFO("Horde provisioning starting");
+ m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_HordeProvisioner->GetStats();
+ ZEN_INFO("Horde provisioning started (target cores: {})", Stats.TargetCoreCount);
+ }
+# endif
+
+# if ZEN_WITH_NOMAD
+ // Start Nomad provisioning if configured — request maximum allowed cores.
+ // SetTargetCoreCount clamps to NomadConfig::MaxCores internally.
+ if (m_NomadProvisioner)
+ {
+ m_NomadProvisioner->SetTargetCoreCount(UINT32_MAX);
+ auto Stats = m_NomadProvisioner->GetStats();
+ ZEN_INFO("Nomad provisioning started (target cores: {})", Stats.TargetCoreCount);
+ }
+# endif
+
+ EnqueueProvisionerMaintenanceTimer();
+
m_Http->Run(IsInteractiveMode);
SetNewState(kShuttingDown);
@@ -254,6 +943,8 @@ ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions
void
ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry)
{
+ ZEN_TRACE_CPU("ZenComputeServerMain::DoRun");
+
ZenComputeServer Server;
Server.SetDataRoot(m_ServerOptions.DataDir);
Server.SetContentRoot(m_ServerOptions.ContentDir);