From 0763d09a81e5a1d3df11763a7ec75e7860c9510a Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Wed, 4 Mar 2026 14:13:46 +0100 Subject: compute orchestration (#763) - Added local process runners for Linux/Wine, Mac with some sandboxing support - Horde & Nomad provisioning for development and testing - Client session queues with lifecycle management (active/draining/cancelled), automatic retry with configurable limits, and manual reschedule API - Improved web UI for orchestrator, compute, and hub dashboards with WebSocket push updates - Some security hardening - Improved scalability and `zen exec` command Still experimental - compute support is disabled by default --- src/zenserver/compute/computeserver.cpp | 725 +++++++++++++++++++++++++++++++- 1 file changed, 708 insertions(+), 17 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 0f9ef0287..802d06caf 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -1,9 +1,9 @@ // Copyright Epic Games, Inc. All Rights Reserved. #include "computeserver.h" -#include -#include "computeservice.h" - +#include +#include +#include #if ZEN_WITH_COMPUTE_SERVICES # include @@ -13,10 +13,20 @@ # include # include # include +# include # include +# include # include # include # include +# if ZEN_WITH_HORDE +# include +# include +# endif +# if ZEN_WITH_NOMAD +# include +# include +# endif ZEN_THIRD_PARTY_INCLUDES_START # include @@ -27,6 +37,13 @@ namespace zen { void ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) { + Options.add_option("compute", + "", + "max-actions", + "Maximum number of concurrent local actions (0 = auto)", + cxxopts::value(m_ServerOptions.MaxConcurrentActions)->default_value("0"), + ""); + Options.add_option("compute", "", "upstream-notification-endpoint", @@ -40,6 +57,236 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options) "Instance ID for use in notifications", cxxopts::value(m_ServerOptions.InstanceId)->default_value(""), ""); + + Options.add_option("compute", + "", + "coordinator-endpoint", + "Endpoint URL for coordinator service", + cxxopts::value(m_ServerOptions.CoordinatorEndpoint)->default_value(""), + ""); + + Options.add_option("compute", + "", + "idms", + "Enable IDMS cloud detection; optionally specify a custom probe endpoint", + cxxopts::value(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"), + ""); + + Options.add_option("compute", + "", + "worker-websocket", + "Use WebSocket for worker-orchestrator link (instant reachability detection)", + cxxopts::value(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"), + ""); + +# if ZEN_WITH_HORDE + // Horde provisioning options + Options.add_option("horde", + "", + "horde-enabled", + "Enable Horde worker provisioning", + cxxopts::value(m_ServerOptions.HordeConfig.Enabled)->default_value("false"), + ""); + + Options.add_option("horde", + "", + "horde-server", + "Horde server URL", + cxxopts::value(m_ServerOptions.HordeConfig.ServerUrl)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-token", + "Horde authentication token", + cxxopts::value(m_ServerOptions.HordeConfig.AuthToken)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-pool", + "Horde pool name", + cxxopts::value(m_ServerOptions.HordeConfig.Pool)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-cluster", + "Horde cluster ID ('default' or '_auto' for auto-resolve)", + cxxopts::value(m_ServerOptions.HordeConfig.Cluster)->default_value("default"), + ""); + + Options.add_option("horde", + "", + "horde-mode", + "Horde connection mode (direct, tunnel, relay)", + cxxopts::value(m_HordeModeStr)->default_value("direct"), + ""); + + Options.add_option("horde", + "", + "horde-encryption", + "Horde transport encryption (none, aes)", + cxxopts::value(m_HordeEncryptionStr)->default_value("none"), + ""); + + Options.add_option("horde", + "", + "horde-max-cores", + "Maximum number of Horde cores to provision", + cxxopts::value(m_ServerOptions.HordeConfig.MaxCores)->default_value("2048"), + ""); + + Options.add_option("horde", + "", + "horde-host", + "Host address for Horde agents to connect back to", + cxxopts::value(m_ServerOptions.HordeConfig.HostAddress)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-condition", + "Additional Horde agent filter condition", + cxxopts::value(m_ServerOptions.HordeConfig.Condition)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-binaries", + "Path to directory containing zenserver binary for remote upload", + cxxopts::value(m_ServerOptions.HordeConfig.BinariesPath)->default_value(""), + ""); + + Options.add_option("horde", + "", + "horde-zen-service-port", + "Port number for Zen service communication", + cxxopts::value(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"), + ""); +# endif + +# if ZEN_WITH_NOMAD + // Nomad provisioning options + Options.add_option("nomad", + "", + "nomad-enabled", + "Enable Nomad worker provisioning", + cxxopts::value(m_ServerOptions.NomadConfig.Enabled)->default_value("false"), + ""); + + Options.add_option("nomad", + "", + "nomad-server", + "Nomad HTTP API URL", + cxxopts::value(m_ServerOptions.NomadConfig.ServerUrl)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-token", + "Nomad ACL token", + cxxopts::value(m_ServerOptions.NomadConfig.AclToken)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-datacenter", + "Nomad target datacenter", + cxxopts::value(m_ServerOptions.NomadConfig.Datacenter)->default_value("dc1"), + ""); + + Options.add_option("nomad", + "", + "nomad-namespace", + "Nomad namespace", + cxxopts::value(m_ServerOptions.NomadConfig.Namespace)->default_value("default"), + ""); + + Options.add_option("nomad", + "", + "nomad-region", + "Nomad region (empty for server default)", + cxxopts::value(m_ServerOptions.NomadConfig.Region)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-driver", + "Nomad task driver (raw_exec, docker)", + cxxopts::value(m_NomadDriverStr)->default_value("raw_exec"), + ""); + + Options.add_option("nomad", + "", + "nomad-distribution", + "Binary distribution mode (predeployed, artifact)", + cxxopts::value(m_NomadDistributionStr)->default_value("predeployed"), + ""); + + Options.add_option("nomad", + "", + "nomad-binary-path", + "Path to zenserver on Nomad clients (predeployed mode)", + cxxopts::value(m_ServerOptions.NomadConfig.BinaryPath)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-artifact-source", + "URL to download zenserver binary (artifact mode)", + cxxopts::value(m_ServerOptions.NomadConfig.ArtifactSource)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-docker-image", + "Docker image for zenserver (docker driver)", + cxxopts::value(m_ServerOptions.NomadConfig.DockerImage)->default_value(""), + ""); + + Options.add_option("nomad", + "", + "nomad-max-jobs", + "Maximum concurrent Nomad jobs", + cxxopts::value(m_ServerOptions.NomadConfig.MaxJobs)->default_value("64"), + ""); + + Options.add_option("nomad", + "", + "nomad-cpu-mhz", + "CPU MHz allocated per Nomad task", + cxxopts::value(m_ServerOptions.NomadConfig.CpuMhz)->default_value("1000"), + ""); + + Options.add_option("nomad", + "", + "nomad-memory-mb", + "Memory MB allocated per Nomad task", + cxxopts::value(m_ServerOptions.NomadConfig.MemoryMb)->default_value("2048"), + ""); + + Options.add_option("nomad", + "", + "nomad-cores-per-job", + "Estimated cores per Nomad job (for scaling)", + cxxopts::value(m_ServerOptions.NomadConfig.CoresPerJob)->default_value("32"), + ""); + + Options.add_option("nomad", + "", + "nomad-max-cores", + "Maximum total cores to provision via Nomad", + cxxopts::value(m_ServerOptions.NomadConfig.MaxCores)->default_value("2048"), + ""); + + Options.add_option("nomad", + "", + "nomad-job-prefix", + "Prefix for generated Nomad job IDs", + cxxopts::value(m_ServerOptions.NomadConfig.JobPrefix)->default_value("zenserver-worker"), + ""); +# endif } void @@ -63,6 +310,15 @@ ZenComputeServerConfigurator::OnConfigFileParsed(LuaConfig::Options& LuaOptions) void ZenComputeServerConfigurator::ValidateOptions() { +# if ZEN_WITH_HORDE + horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr); + horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr); +# endif + +# if ZEN_WITH_NOMAD + nomad::FromString(m_ServerOptions.NomadConfig.TaskDriver, m_NomadDriverStr); + nomad::FromString(m_ServerOptions.NomadConfig.BinDistribution, m_NomadDistributionStr); +# endif } /////////////////////////////////////////////////////////////////////////// @@ -90,10 +346,14 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ return EffectiveBasePort; } + m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint; + m_InstanceId = ServerConfig.InstanceId; + m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket; + // This is a workaround to make sure we can have automated tests. Without // this the ranges for different child zen compute processes could overlap with // the main test range. - ZenServerEnvironment::SetBaseChildId(1000); + ZenServerEnvironment::SetBaseChildId(2000); m_DebugOptionForcedCrash = ServerConfig.ShouldCrash; @@ -113,6 +373,46 @@ ZenComputeServer::Cleanup() ZEN_INFO(ZEN_APP_NAME " cleaning up"); try { + // Cancel the maintenance timer so it stops re-enqueuing before we + // tear down the provisioners it references. + m_ProvisionerMaintenanceTimer.cancel(); + m_AnnounceTimer.cancel(); + +# if ZEN_WITH_HORDE + // Shut down Horde provisioner first — this signals all agent threads + // to exit and joins them before we tear down HTTP services. + m_HordeProvisioner.reset(); +# endif + +# if ZEN_WITH_NOMAD + // Shut down Nomad provisioner — stops the management thread and + // sends stop requests for all tracked jobs. + m_NomadProvisioner.reset(); +# endif + + // Close the orchestrator WebSocket client before stopping the io_context + m_WsReconnectTimer.cancel(); + if (m_OrchestratorWsClient) + { + m_OrchestratorWsClient->Close(); + m_OrchestratorWsClient.reset(); + } + m_OrchestratorWsHandler.reset(); + + ResolveCloudMetadata(); + m_CloudMetadata.reset(); + + // Shut down services that own threads or use the io_context before we + // stop the io_context and close the HTTP server. + if (m_OrchestratorService) + { + m_OrchestratorService->Shutdown(); + } + if (m_ComputeService) + { + m_ComputeService->Shutdown(); + } + m_IoContext.stop(); if (m_IoRunner.joinable()) { @@ -139,7 +439,8 @@ ZenComputeServer::InitializeState(const ZenComputeServerConfig& ServerConfig) void ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) { - ZEN_INFO("initializing storage"); + ZEN_TRACE_CPU("ZenComputeServer::InitializeServices"); + ZEN_INFO("initializing compute services"); CidStoreConfiguration Config; Config.RootDirectory = m_DataRoot / "cas"; @@ -147,46 +448,405 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig) m_CidStore = std::make_unique(m_GcManager); m_CidStore->Initialize(Config); + if (!ServerConfig.IdmsEndpoint.empty()) + { + ZEN_INFO("detecting cloud environment (async)"); + if (ServerConfig.IdmsEndpoint == "auto") + { + m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir] { + return std::make_unique(DataDir / "cloud"); + }); + } + else + { + ZEN_INFO("using custom IDMS endpoint: {}", ServerConfig.IdmsEndpoint); + m_CloudMetadataFuture = std::async(std::launch::async, [DataDir = ServerConfig.DataDir, Endpoint = ServerConfig.IdmsEndpoint] { + return std::make_unique(DataDir / "cloud", Endpoint); + }); + } + } + ZEN_INFO("instantiating API service"); m_ApiService = std::make_unique(*m_Http); - ZEN_INFO("instantiating compute service"); - m_ComputeService = std::make_unique(ServerConfig.DataDir / "compute"); + ZEN_INFO("instantiating orchestrator service"); + m_OrchestratorService = + std::make_unique(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket); + + ZEN_INFO("instantiating function service"); + m_ComputeService = std::make_unique(*m_CidStore, + m_StatsService, + ServerConfig.DataDir / "functions", + ServerConfig.MaxConcurrentActions); - // Ref Runner; - // Runner = zen::compute::CreateLocalRunner(*m_CidStore, ServerConfig.DataDir / "runner"); + m_FrontendService = std::make_unique(m_ContentRoot, m_StatusService); - // TODO: (re)implement default configuration here +# if ZEN_WITH_NOMAD + // Nomad provisioner + if (ServerConfig.NomadConfig.Enabled && !ServerConfig.NomadConfig.ServerUrl.empty()) + { + ZEN_INFO("instantiating Nomad provisioner (server: {})", ServerConfig.NomadConfig.ServerUrl); - ZEN_INFO("instantiating function service"); - m_FunctionService = - std::make_unique(*m_CidStore, m_StatsService, ServerConfig.DataDir / "functions"); + const auto& NomadCfg = ServerConfig.NomadConfig; + + if (!NomadCfg.Validate()) + { + ZEN_ERROR("invalid Nomad configuration"); + } + else + { + ExtendableStringBuilder<256> OrchestratorEndpoint; + OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get()); + if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/') + { + OrchestratorEndpoint << '/'; + } + + m_NomadProvisioner = std::make_unique(NomadCfg, OrchestratorEndpoint); + } + } +# endif + +# if ZEN_WITH_HORDE + // Horde provisioner + if (ServerConfig.HordeConfig.Enabled && !ServerConfig.HordeConfig.ServerUrl.empty()) + { + ZEN_INFO("instantiating Horde provisioner (server: {})", ServerConfig.HordeConfig.ServerUrl); + + const auto& HordeConfig = ServerConfig.HordeConfig; + + if (!HordeConfig.Validate()) + { + ZEN_ERROR("invalid Horde configuration"); + } + else + { + ExtendableStringBuilder<256> OrchestratorEndpoint; + OrchestratorEndpoint << m_Http->GetServiceUri(m_OrchestratorService.get()); + if (auto View = OrchestratorEndpoint.ToView(); !View.empty() && View.back() != '/') + { + OrchestratorEndpoint << '/'; + } + + // If no binaries path is specified, just use the running executable's directory + std::filesystem::path BinariesPath = HordeConfig.BinariesPath.empty() ? GetRunningExecutablePath().parent_path() + : std::filesystem::path(HordeConfig.BinariesPath); + std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde"; + + m_HordeProvisioner = std::make_unique(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint); + } + } +# endif +} + +void +ZenComputeServer::ResolveCloudMetadata() +{ + if (m_CloudMetadataFuture.valid()) + { + m_CloudMetadata = m_CloudMetadataFuture.get(); + } +} + +std::string +ZenComputeServer::GetInstanceId() const +{ + if (!m_InstanceId.empty()) + { + return m_InstanceId; + } + return fmt::format("{}-{}", GetMachineName(), GetCurrentProcessId()); +} + +std::string +ZenComputeServer::GetAnnounceUrl() const +{ + return m_Http->GetServiceUri(nullptr); } void ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) { + ZEN_TRACE_CPU("ZenComputeServer::RegisterServices"); ZEN_UNUSED(ServerConfig); + m_Http->RegisterService(m_StatsService); + + if (m_ApiService) + { + m_Http->RegisterService(*m_ApiService); + } + + if (m_OrchestratorService) + { + m_Http->RegisterService(*m_OrchestratorService); + } + if (m_ComputeService) { m_Http->RegisterService(*m_ComputeService); } - if (m_ApiService) + if (m_FrontendService) { - m_Http->RegisterService(*m_ApiService); + m_Http->RegisterService(*m_FrontendService); + } +} + +CbObject +ZenComputeServer::BuildAnnounceBody() +{ + CbObjectWriter AnnounceBody; + AnnounceBody << "id" << GetInstanceId(); + AnnounceBody << "uri" << GetAnnounceUrl(); + AnnounceBody << "hostname" << GetMachineName(); + AnnounceBody << "platform" << GetRuntimePlatformName(); + + ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); + + AnnounceBody.BeginObject("metrics"); + Describe(Sm, AnnounceBody); + AnnounceBody.EndObject(); + + AnnounceBody << "cpu_usage" << Sm.CpuUsagePercent; + AnnounceBody << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; + AnnounceBody << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; + + AnnounceBody << "bytes_received" << m_Http->GetTotalBytesReceived(); + AnnounceBody << "bytes_sent" << m_Http->GetTotalBytesSent(); + + auto Actions = m_ComputeService->GetActionCounts(); + AnnounceBody << "actions_pending" << Actions.Pending; + AnnounceBody << "actions_running" << Actions.Running; + AnnounceBody << "actions_completed" << Actions.Completed; + AnnounceBody << "active_queues" << Actions.ActiveQueues; + + // Derive provisioner from instance ID prefix (e.g. "horde-xxx" or "nomad-xxx") + if (m_InstanceId.starts_with("horde-")) + { + AnnounceBody << "provisioner" + << "horde"; + } + else if (m_InstanceId.starts_with("nomad-")) + { + AnnounceBody << "provisioner" + << "nomad"; + } + + ResolveCloudMetadata(); + if (m_CloudMetadata) + { + m_CloudMetadata->Describe(AnnounceBody); + } + + return AnnounceBody.Save(); +} + +void +ZenComputeServer::PostAnnounce() +{ + ZEN_TRACE_CPU("ZenComputeServer::PostAnnounce"); + + if (!m_ComputeService || m_CoordinatorEndpoint.empty()) + { + return; + } + + ZEN_INFO("notifying coordinator at '{}' of our availability at '{}'", m_CoordinatorEndpoint, GetAnnounceUrl()); + + try + { + CbObject Body = BuildAnnounceBody(); + + // If we have an active WebSocket connection, send via that instead of HTTP POST + if (m_OrchestratorWsClient && m_OrchestratorWsClient->IsOpen()) + { + MemoryView View = Body.GetView(); + m_OrchestratorWsClient->SendBinary(std::span(reinterpret_cast(View.GetData()), View.GetSize())); + ZEN_INFO("announced to coordinator via WebSocket"); + return; + } + + HttpClient CoordinatorHttp(m_CoordinatorEndpoint); + HttpClient::Response Result = CoordinatorHttp.Post("announce", std::move(Body)); + + if (Result.Error) + { + ZEN_ERROR("failed to notify coordinator at '{}': HTTP error {} - {}", + m_CoordinatorEndpoint, + Result.Error->ErrorCode, + Result.Error->ErrorMessage); + } + else if (!IsHttpOk(Result.StatusCode)) + { + ZEN_ERROR("failed to notify coordinator at '{}': unexpected HTTP status code {}", + m_CoordinatorEndpoint, + static_cast(Result.StatusCode)); + } + else + { + ZEN_INFO("successfully notified coordinator at '{}'", m_CoordinatorEndpoint); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("failed to notify coordinator at '{}': {}", m_CoordinatorEndpoint, Ex.what()); + } +} + +void +ZenComputeServer::EnqueueAnnounceTimer() +{ + if (!m_ComputeService || m_CoordinatorEndpoint.empty()) + { + return; + } + + m_AnnounceTimer.expires_after(std::chrono::seconds(15)); + m_AnnounceTimer.async_wait([this](const asio::error_code& Ec) { + if (!Ec) + { + PostAnnounce(); + EnqueueAnnounceTimer(); + } + }); + EnsureIoRunner(); +} + +void +ZenComputeServer::InitializeOrchestratorWebSocket() +{ + if (!m_EnableWorkerWebSocket || m_CoordinatorEndpoint.empty()) + { + return; + } + + // Convert http://host:port → ws://host:port/orch/ws + std::string WsUrl = m_CoordinatorEndpoint; + if (WsUrl.starts_with("http://")) + { + WsUrl = "ws://" + WsUrl.substr(7); + } + else if (WsUrl.starts_with("https://")) + { + WsUrl = "wss://" + WsUrl.substr(8); + } + if (!WsUrl.empty() && WsUrl.back() != '/') + { + WsUrl += '/'; + } + WsUrl += "orch/ws"; + + ZEN_INFO("establishing WebSocket link to orchestrator at {}", WsUrl); + + m_OrchestratorWsHandler = std::make_unique(*this); + m_OrchestratorWsClient = + std::make_unique(WsUrl, *m_OrchestratorWsHandler, m_IoContext, HttpWsClientSettings{.LogCategory = "orch_ws"}); + + m_OrchestratorWsClient->Connect(); + EnsureIoRunner(); +} + +void +ZenComputeServer::EnqueueWsReconnect() +{ + m_WsReconnectTimer.expires_after(std::chrono::seconds(5)); + m_WsReconnectTimer.async_wait([this](const asio::error_code& Ec) { + if (!Ec && m_OrchestratorWsClient) + { + ZEN_INFO("attempting WebSocket reconnect to orchestrator"); + m_OrchestratorWsClient->Connect(); + } + }); + EnsureIoRunner(); +} + +void +ZenComputeServer::OrchestratorWsHandler::OnWsOpen() +{ + ZEN_INFO("WebSocket link to orchestrator established"); + + // Send initial announce immediately over the WebSocket + Server.PostAnnounce(); +} + +void +ZenComputeServer::OrchestratorWsHandler::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg) +{ + // Orchestrator does not push messages to workers; ignore +} + +void +ZenComputeServer::OrchestratorWsHandler::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_WARN("WebSocket link to orchestrator closed (code {}), falling back to HTTP announce", Code); + + // Trigger an immediate HTTP announce so the orchestrator has fresh state, + // then schedule a reconnect attempt. + Server.PostAnnounce(); + Server.EnqueueWsReconnect(); +} + +void +ZenComputeServer::ProvisionerMaintenanceTick() +{ +# if ZEN_WITH_HORDE + if (m_HordeProvisioner) + { + m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_HordeProvisioner->GetStats(); + ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}", + Stats.TargetCoreCount, + Stats.EstimatedCoreCount, + Stats.ActiveCoreCount); + } +# endif + +# if ZEN_WITH_NOMAD + if (m_NomadProvisioner) + { + m_NomadProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_NomadProvisioner->GetStats(); + ZEN_DEBUG("Nomad maintenance: target={}, estimated={}, running jobs={}", + Stats.TargetCoreCount, + Stats.EstimatedCoreCount, + Stats.RunningJobCount); } +# endif +} + +void +ZenComputeServer::EnqueueProvisionerMaintenanceTimer() +{ + bool HasProvisioner = false; +# if ZEN_WITH_HORDE + HasProvisioner = HasProvisioner || (m_HordeProvisioner != nullptr); +# endif +# if ZEN_WITH_NOMAD + HasProvisioner = HasProvisioner || (m_NomadProvisioner != nullptr); +# endif - if (m_FunctionService) + if (!HasProvisioner) { - m_Http->RegisterService(*m_FunctionService); + return; } + + m_ProvisionerMaintenanceTimer.expires_after(std::chrono::seconds(15)); + m_ProvisionerMaintenanceTimer.async_wait([this](const asio::error_code& Ec) { + if (!Ec) + { + ProvisionerMaintenanceTick(); + EnqueueProvisionerMaintenanceTimer(); + } + }); + EnsureIoRunner(); } void ZenComputeServer::Run() { + ZEN_TRACE_CPU("ZenComputeServer::Run"); + if (m_ProcessMonitor.IsActive()) { CheckOwnerPid(); @@ -236,6 +896,35 @@ ZenComputeServer::Run() OnReady(); + PostAnnounce(); + EnqueueAnnounceTimer(); + InitializeOrchestratorWebSocket(); + +# if ZEN_WITH_HORDE + // Start Horde provisioning if configured — request maximum allowed cores. + // SetTargetCoreCount clamps to HordeConfig::MaxCores internally. + if (m_HordeProvisioner) + { + ZEN_INFO("Horde provisioning starting"); + m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_HordeProvisioner->GetStats(); + ZEN_INFO("Horde provisioning started (target cores: {})", Stats.TargetCoreCount); + } +# endif + +# if ZEN_WITH_NOMAD + // Start Nomad provisioning if configured — request maximum allowed cores. + // SetTargetCoreCount clamps to NomadConfig::MaxCores internally. + if (m_NomadProvisioner) + { + m_NomadProvisioner->SetTargetCoreCount(UINT32_MAX); + auto Stats = m_NomadProvisioner->GetStats(); + ZEN_INFO("Nomad provisioning started (target cores: {})", Stats.TargetCoreCount); + } +# endif + + EnqueueProvisionerMaintenanceTimer(); + m_Http->Run(IsInteractiveMode); SetNewState(kShuttingDown); @@ -254,6 +943,8 @@ ZenComputeServerMain::ZenComputeServerMain(ZenComputeServerConfig& ServerOptions void ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) { + ZEN_TRACE_CPU("ZenComputeServerMain::DoRun"); + ZenComputeServer Server; Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); -- cgit v1.2.3 From b37b34ea6ad906f54e8104526e77ba66aed997da Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Mon, 9 Mar 2026 17:43:08 +0100 Subject: Dashboard overhaul, compute integration (#814) - **Frontend dashboard overhaul**: Unified compute/main dashboards into a single shared UI. Added new pages for cache, projects, metrics, sessions, info (build/runtime config, system stats). Added live-update via WebSockets with pause control, sortable detail tables, themed styling. Refactored compute/hub/orchestrator pages into modular JS. - **HTTP server fixes and stats**: Fixed http.sys local-only fallback when default port is in use, implemented root endpoint redirect for http.sys, fixed Linux/Mac port reuse. Added /stats endpoint exposing HTTP server metrics (bytes transferred, request rates). Added WebSocket stats tracking. - **OTEL/diagnostics hardening**: Improved OTLP HTTP exporter with better error handling and resilience. Extended diagnostics services configuration. - **Session management**: Added new sessions service with HTTP endpoints for registering, updating, querying, and removing sessions. Includes session log file support. This is still WIP. - **CLI subcommand support**: Added support for commands with subcommands in the zen CLI tool, with improved command dispatch. - **Misc**: Exposed CPU usage/hostname to frontend, fixed JS compact binary float32/float64 decoding, limited projects displayed on front page to 25 sorted by last access, added vscode:// link support. Also contains some fixes from TSAN analysis. --- src/zenserver/compute/computeserver.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 802d06caf..c64f081b3 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -419,6 +419,8 @@ ZenComputeServer::Cleanup() m_IoRunner.join(); } + ShutdownServices(); + if (m_Http) { m_Http->Close(); @@ -570,8 +572,6 @@ ZenComputeServer::RegisterServices(const ZenComputeServerConfig& ServerConfig) ZEN_TRACE_CPU("ZenComputeServer::RegisterServices"); ZEN_UNUSED(ServerConfig); - m_Http->RegisterService(m_StatsService); - if (m_ApiService) { m_Http->RegisterService(*m_ApiService); -- cgit v1.2.3 From d0a07e555577dcd4a8f55f1b45d9e8e4e6366ab7 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Tue, 10 Mar 2026 17:27:26 +0100 Subject: HttpClient using libcurl, Unix Sockets for HTTP. HTTPS support (#770) The main goal of this change is to eliminate the cpr back-end altogether and replace it with the curl implementation. I would expect to drop cpr as soon as we feel happy with the libcurl back-end. That would leave us with a direct dependency on libcurl only, and cpr can be eliminated as a dependency. ### HttpClient Backend Overhaul - Implemented a new **libcurl-based HttpClient** backend (`httpclientcurl.cpp`, ~2000 lines) as an alternative to the cpr-based one - Made HttpClient backend **configurable at runtime** via constructor arguments and `-httpclient=...` CLI option (for zen, zenserver, and tests) - Extended HttpClient test suite to cover multipart/content-range scenarios ### Unix Domain Socket Support - Added Unix domain socket support to **httpasio** (server side) - Added Unix domain socket support to **HttpClient** - Added Unix domain socket support to **HttpWsClient** (WebSocket client) - Templatized `HttpServerConnectionT` and `WsAsioConnectionT` to handle TCP, Unix, and SSL sockets uniformly via `if constexpr` dispatch ### HTTPS Support - Added **preliminary HTTPS support to httpasio** (for Mac/Linux via OpenSSL) - Added **basic HTTPS support for http.sys** (Windows) - Implemented HTTPS test for httpasio - Split `InitializeServer` into smaller sub-functions for http.sys ### Other Notable Changes - Improved **zenhttp-test stability** with dynamic port allocation - Enhanced port retry logic in http.sys (handles ERROR_ACCESS_DENIED) - Fatal signal/exception handlers for backtrace generation in tests - Added `zen bench http` subcommand to exercise network + HTTP client/server communication stack --- src/zenserver/compute/computeserver.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index c64f081b3..2ac3de599 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -674,7 +674,7 @@ ZenComputeServer::PostAnnounce() { ZEN_ERROR("failed to notify coordinator at '{}': HTTP error {} - {}", m_CoordinatorEndpoint, - Result.Error->ErrorCode, + static_cast(Result.Error->ErrorCode), Result.Error->ErrorMessage); } else if (!IsHttpOk(Result.StatusCode)) -- cgit v1.2.3 From 81bc43aa96f0059cecb28d1bd88338b7d84667f9 Mon Sep 17 00:00:00 2001 From: Stefan Boberg Date: Thu, 12 Mar 2026 15:03:03 +0100 Subject: Transparent proxy mode (#823) Adds a **transparent TCP proxy mode** to zenserver (activated via `zenserver proxy`), allowing it to sit between clients and upstream Zen servers to inspect and monitor HTTP/1.x traffic in real time. Primarily useful during development, to be able to observe multi-server/client interactions in one place. - **Dedicated proxy port** -- Proxy mode defaults to port 8118 with its own data directory to avoid collisions with a normal zenserver instance. - **TCP proxy core** (`src/zenserver/proxy/`) -- A new transparent TCP proxy that forwards connections to upstream targets, with support for both TCP/IP and Unix socket listeners. Multi-threaded I/O for connection handling. Supports Unix domain sockets for both upstream/downstream. - **HTTP traffic inspection** -- Parses HTTP/1.x request/response streams inline to extract method, path, status, content length, and WebSocket upgrades without breaking the proxied data. - **Proxy dashboard** -- A web UI showing live connection stats, per-target request counts, active connections, bytes transferred, and client IP/session ID rollups. - **Server mode display** -- Dashboard banner now shows the running server mode (Zen Proxy, Zen Compute, etc.). Supporting changes included in this branch: - **Wildcard log level matching** -- Log levels can now be set per-category using wildcard patterns (e.g. `proxy.*=debug`). - **`zen down --all`** -- New flag to shut down all running zenserver instances; also used by the new `xmake kill` task. - Minor test stability fixes (flaky hash collisions, per-thread RNG seeds). - Support ZEN_MALLOC environment variable for default allocator selection and switch default to rpmalloc - Fixed sentry-native build to allow LTO on Windows --- src/zenserver/compute/computeserver.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src/zenserver/compute/computeserver.cpp') diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp index 2ac3de599..a02ca7be3 100644 --- a/src/zenserver/compute/computeserver.cpp +++ b/src/zenserver/compute/computeserver.cpp @@ -946,6 +946,7 @@ ZenComputeServerMain::DoRun(ZenServerState::ZenServerEntry* Entry) ZEN_TRACE_CPU("ZenComputeServerMain::DoRun"); ZenComputeServer Server; + Server.SetServerMode("Compute"); Server.SetDataRoot(m_ServerOptions.DataDir); Server.SetContentRoot(m_ServerOptions.ContentDir); Server.SetTestMode(m_ServerOptions.IsTest); -- cgit v1.2.3