aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/compute')
-rw-r--r--src/zenserver/compute/computeserver.cpp134
-rw-r--r--src/zenserver/compute/computeserver.h16
2 files changed, 131 insertions, 19 deletions
diff --git a/src/zenserver/compute/computeserver.cpp b/src/zenserver/compute/computeserver.cpp
index d1875f41a..b110f7538 100644
--- a/src/zenserver/compute/computeserver.cpp
+++ b/src/zenserver/compute/computeserver.cpp
@@ -22,6 +22,8 @@
# if ZEN_WITH_HORDE
# include <zenhorde/hordeconfig.h>
# include <zenhorde/hordeprovisioner.h>
+# include <zenhttp/httpclientauth.h>
+# include <zenutil/authutils.h>
# endif
# if ZEN_WITH_NOMAD
# include <zennomad/nomadconfig.h>
@@ -67,6 +69,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("compute",
"",
+ "coordinator-session",
+ "Session ID of the orchestrator (for stale-instance rejection)",
+ cxxopts::value<std::string>(m_ServerOptions.CoordinatorSession)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "announce-url",
+ "Override URL announced to the coordinator (e.g. relay-visible endpoint)",
+ cxxopts::value<std::string>(m_ServerOptions.AnnounceUrl)->default_value(""),
+ "");
+
+ Options.add_option("compute",
+ "",
"idms",
"Enable IDMS cloud detection; optionally specify a custom probe endpoint",
cxxopts::value<std::string>(m_ServerOptions.IdmsEndpoint)->default_value("")->implicit_value("auto"),
@@ -79,6 +95,20 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
cxxopts::value<bool>(m_ServerOptions.EnableWorkerWebSocket)->default_value("false"),
"");
+ Options.add_option("compute",
+ "",
+ "provision-clean",
+ "Pass --clean to provisioned worker instances so they wipe state on startup",
+ cxxopts::value<bool>(m_ServerOptions.ProvisionClean)->default_value("false"),
+ "");
+
+ Options.add_option("compute",
+ "",
+ "provision-tracehost",
+ "Pass --tracehost to provisioned worker instances for remote trace collection",
+ cxxopts::value<std::string>(m_ServerOptions.ProvisionTraceHost)->default_value(""),
+ "");
+
# if ZEN_WITH_HORDE
// Horde provisioning options
Options.add_option("horde",
@@ -139,6 +169,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
Options.add_option("horde",
"",
+ "horde-drain-grace-period",
+ "Grace period in seconds for draining agents before force-kill",
+ cxxopts::value<int>(m_ServerOptions.HordeConfig.DrainGracePeriodSeconds)->default_value("300"),
+ "");
+
+ Options.add_option("horde",
+ "",
"horde-host",
"Host address for Horde agents to connect back to",
cxxopts::value<std::string>(m_ServerOptions.HordeConfig.HostAddress)->default_value(""),
@@ -164,6 +201,13 @@ ZenComputeServerConfigurator::AddCliOptions(cxxopts::Options& Options)
"Port number for Zen service communication",
cxxopts::value<uint16_t>(m_ServerOptions.HordeConfig.ZenServicePort)->default_value("8558"),
"");
+
+ Options.add_option("horde",
+ "",
+ "horde-oidctoken-exe-path",
+ "Path to OidcToken executable for automatic Horde authentication",
+ cxxopts::value<std::string>(m_HordeOidcTokenExePath)->default_value(""),
+ "");
# endif
# if ZEN_WITH_NOMAD
@@ -313,6 +357,30 @@ ZenComputeServerConfigurator::ValidateOptions()
# if ZEN_WITH_HORDE
horde::FromString(m_ServerOptions.HordeConfig.Mode, m_HordeModeStr);
horde::FromString(m_ServerOptions.HordeConfig.EncryptionMode, m_HordeEncryptionStr);
+
+ // Set up OidcToken-based authentication if no static token was provided
+ if (m_ServerOptions.HordeConfig.AuthToken.empty() && !m_ServerOptions.HordeConfig.ServerUrl.empty())
+ {
+ std::filesystem::path OidcExePath = FindOidcTokenExePath(m_HordeOidcTokenExePath);
+ if (!OidcExePath.empty())
+ {
+ ZEN_INFO("using OidcToken executable for Horde authentication: {}", OidcExePath);
+ auto Provider = httpclientauth::CreateFromOidcTokenExecutable(OidcExePath,
+ m_ServerOptions.HordeConfig.ServerUrl,
+ /*Quiet=*/true,
+ /*Unattended=*/false,
+ /*Hidden=*/true,
+ /*IsHordeUrl=*/true);
+ if (Provider)
+ {
+ m_ServerOptions.HordeConfig.AccessTokenProvider = std::move(*Provider);
+ }
+ else
+ {
+ ZEN_WARN("OidcToken authentication failed; Horde requests will be unauthenticated");
+ }
+ }
+ }
# endif
# if ZEN_WITH_NOMAD
@@ -347,6 +415,8 @@ ZenComputeServer::Initialize(const ZenComputeServerConfig& ServerConfig, ZenServ
}
m_CoordinatorEndpoint = ServerConfig.CoordinatorEndpoint;
+ m_CoordinatorSession = ServerConfig.CoordinatorSession;
+ m_AnnounceUrl = ServerConfig.AnnounceUrl;
m_InstanceId = ServerConfig.InstanceId;
m_EnableWorkerWebSocket = ServerConfig.EnableWorkerWebSocket;
@@ -379,13 +449,20 @@ ZenComputeServer::Cleanup()
m_AnnounceTimer.cancel();
# if ZEN_WITH_HORDE
- // Shut down Horde provisioner first — this signals all agent threads
+ // Disconnect the provisioner state provider before destroying the
+ // provisioner so the orchestrator HTTP layer cannot call into it.
+ if (m_OrchestratorService)
+ {
+ m_OrchestratorService->SetProvisionerStateProvider(nullptr);
+ }
+
+ // Shut down Horde provisioner - this signals all agent threads
// to exit and joins them before we tear down HTTP services.
m_HordeProvisioner.reset();
# endif
# if ZEN_WITH_NOMAD
- // Shut down Nomad provisioner — stops the management thread and
+ // Shut down Nomad provisioner - stops the management thread and
// sends stop requests for all tracked jobs.
m_NomadProvisioner.reset();
# endif
@@ -419,12 +496,12 @@ ZenComputeServer::Cleanup()
m_IoRunner.join();
}
- ShutdownServices();
-
if (m_Http)
{
m_Http->Close();
}
+
+ ShutdownServices();
}
catch (const std::exception& Ex)
{
@@ -444,11 +521,12 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
ZEN_TRACE_CPU("ZenComputeServer::InitializeServices");
ZEN_INFO("initializing compute services");
- CidStoreConfiguration Config;
- Config.RootDirectory = m_DataRoot / "cas";
+ m_ActionStore = std::make_unique<MemoryCidStore>();
- m_CidStore = std::make_unique<CidStore>(m_GcManager);
- m_CidStore->Initialize(Config);
+ CidStoreConfiguration WorkerStoreConfig;
+ WorkerStoreConfig.RootDirectory = m_DataRoot / "cas";
+ m_WorkerStore = std::make_unique<CidStore>(m_GcManager);
+ m_WorkerStore->Initialize(WorkerStoreConfig);
if (!ServerConfig.IdmsEndpoint.empty())
{
@@ -476,12 +554,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
std::make_unique<zen::compute::HttpOrchestratorService>(ServerConfig.DataDir / "orch", ServerConfig.EnableWorkerWebSocket);
ZEN_INFO("instantiating function service");
- m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_CidStore,
+ m_ComputeService = std::make_unique<zen::compute::HttpComputeService>(*m_ActionStore,
+ *m_WorkerStore,
m_StatsService,
ServerConfig.DataDir / "functions",
ServerConfig.MaxConcurrentActions);
+ m_ComputeService->SetShutdownCallback([this] { RequestExit(0); });
- m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatusService);
+ m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot, m_StatsService, m_StatusService);
# if ZEN_WITH_NOMAD
// Nomad provisioner
@@ -504,7 +584,11 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
OrchestratorEndpoint << '/';
}
- m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg, OrchestratorEndpoint);
+ m_NomadProvisioner = std::make_unique<nomad::NomadProvisioner>(NomadCfg,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
}
}
# endif
@@ -535,7 +619,14 @@ ZenComputeServer::InitializeServices(const ZenComputeServerConfig& ServerConfig)
: std::filesystem::path(HordeConfig.BinariesPath);
std::filesystem::path WorkingDir = ServerConfig.DataDir / "horde";
- m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig, BinariesPath, WorkingDir, OrchestratorEndpoint);
+ m_HordeProvisioner = std::make_unique<horde::HordeProvisioner>(HordeConfig,
+ BinariesPath,
+ WorkingDir,
+ OrchestratorEndpoint,
+ m_OrchestratorService->GetSessionId().ToString(),
+ ServerConfig.ProvisionClean,
+ ServerConfig.ProvisionTraceHost);
+ m_OrchestratorService->SetProvisionerStateProvider(m_HordeProvisioner.get());
}
}
# endif
@@ -563,6 +654,10 @@ ZenComputeServer::GetInstanceId() const
std::string
ZenComputeServer::GetAnnounceUrl() const
{
+ if (!m_AnnounceUrl.empty())
+ {
+ return m_AnnounceUrl;
+ }
return m_Http->GetServiceUri(nullptr);
}
@@ -633,6 +728,11 @@ ZenComputeServer::BuildAnnounceBody()
<< "nomad";
}
+ if (!m_CoordinatorSession.empty())
+ {
+ AnnounceBody << "coordinator_session" << m_CoordinatorSession;
+ }
+
ResolveCloudMetadata();
if (m_CloudMetadata)
{
@@ -779,8 +879,10 @@ ZenComputeServer::ProvisionerMaintenanceTick()
# if ZEN_WITH_HORDE
if (m_HordeProvisioner)
{
- m_HordeProvisioner->SetTargetCoreCount(UINT32_MAX);
+ // Re-apply current target to spawn agent threads for any that have
+ // exited since the last tick, without overwriting a user-set target.
auto Stats = m_HordeProvisioner->GetStats();
+ m_HordeProvisioner->SetTargetCoreCount(Stats.TargetCoreCount);
ZEN_DEBUG("Horde maintenance: target={}, estimated={}, active={}",
Stats.TargetCoreCount,
Stats.EstimatedCoreCount,
@@ -882,12 +984,14 @@ ZenComputeServer::Run()
OnReady();
+ StartSelfSession("zencompute");
+
PostAnnounce();
EnqueueAnnounceTimer();
InitializeOrchestratorWebSocket();
# if ZEN_WITH_HORDE
- // Start Horde provisioning if configured — request maximum allowed cores.
+ // Start Horde provisioning if configured - request maximum allowed cores.
// SetTargetCoreCount clamps to HordeConfig::MaxCores internally.
if (m_HordeProvisioner)
{
@@ -899,7 +1003,7 @@ ZenComputeServer::Run()
# endif
# if ZEN_WITH_NOMAD
- // Start Nomad provisioning if configured — request maximum allowed cores.
+ // Start Nomad provisioning if configured - request maximum allowed cores.
// SetTargetCoreCount clamps to NomadConfig::MaxCores internally.
if (m_NomadProvisioner)
{
diff --git a/src/zenserver/compute/computeserver.h b/src/zenserver/compute/computeserver.h
index 8f4edc0f0..aa9c1a5b3 100644
--- a/src/zenserver/compute/computeserver.h
+++ b/src/zenserver/compute/computeserver.h
@@ -10,6 +10,7 @@
# include <zencore/system.h>
# include <zenhttp/httpwsclient.h>
# include <zenstore/gc.h>
+# include <zenstore/memorycidstore.h>
# include "frontend/frontend.h"
namespace cxxopts {
@@ -41,7 +42,6 @@ class NomadProvisioner;
namespace zen {
-class CidStore;
class HttpApiService;
struct ZenComputeServerConfig : public ZenServerConfig
@@ -49,9 +49,13 @@ struct ZenComputeServerConfig : public ZenServerConfig
std::string UpstreamNotificationEndpoint;
std::string InstanceId; // For use in notifications
std::string CoordinatorEndpoint;
+ std::string CoordinatorSession; ///< Session ID for stale-instance rejection
+ std::string AnnounceUrl; ///< Override for self-announced URL (e.g. relay-visible endpoint)
std::string IdmsEndpoint;
int32_t MaxConcurrentActions = 0; // 0 = auto (LogicalProcessorCount * 2)
- bool EnableWorkerWebSocket = false; // Use WebSocket for worker↔orchestrator link
+ bool EnableWorkerWebSocket = false; // Use WebSocket for worker<->orchestrator link
+ bool ProvisionClean = false; // Pass --clean to provisioned workers
+ std::string ProvisionTraceHost; // Pass --tracehost to provisioned workers
# if ZEN_WITH_HORDE
horde::HordeConfig HordeConfig;
@@ -84,6 +88,7 @@ private:
# if ZEN_WITH_HORDE
std::string m_HordeModeStr = "direct";
std::string m_HordeEncryptionStr = "none";
+ std::string m_HordeOidcTokenExePath;
# endif
# if ZEN_WITH_NOMAD
@@ -131,7 +136,8 @@ public:
private:
GcManager m_GcManager;
GcScheduler m_GcScheduler{m_GcManager};
- std::unique_ptr<CidStore> m_CidStore;
+ std::unique_ptr<MemoryCidStore> m_ActionStore;
+ std::unique_ptr<CidStore> m_WorkerStore;
std::unique_ptr<HttpApiService> m_ApiService;
std::unique_ptr<zen::compute::HttpComputeService> m_ComputeService;
std::unique_ptr<zen::compute::HttpOrchestratorService> m_OrchestratorService;
@@ -146,6 +152,8 @@ private:
# endif
SystemMetricsTracker m_MetricsTracker;
std::string m_CoordinatorEndpoint;
+ std::string m_CoordinatorSession;
+ std::string m_AnnounceUrl;
std::string m_InstanceId;
asio::steady_timer m_AnnounceTimer{m_IoContext};
@@ -163,7 +171,7 @@ private:
std::string GetInstanceId() const;
CbObject BuildAnnounceBody();
- // Worker→orchestrator WebSocket client
+ // Worker->orchestrator WebSocket client
struct OrchestratorWsHandler : public IWsClientHandler
{
ZenComputeServer& Server;