// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "zenserver.h" #if ZEN_WITH_COMPUTE_SERVICES # include # include # include # include # include "frontend/frontend.h" namespace cxxopts { class Options; } namespace zen::LuaConfig { struct Options; } namespace zen::compute { class CloudMetadata; class HttpComputeService; class HttpOrchestratorService; } // namespace zen::compute # if ZEN_WITH_HORDE # include namespace zen::horde { class HordeProvisioner; } // namespace zen::horde # endif # if ZEN_WITH_NOMAD # include namespace zen::nomad { class NomadProvisioner; } // namespace zen::nomad # endif namespace zen { class CidStore; class HttpApiService; struct ZenComputeServerConfig : public ZenServerConfig { std::string UpstreamNotificationEndpoint; std::string InstanceId; // For use in notifications std::string CoordinatorEndpoint; std::string IdmsEndpoint; int32_t MaxConcurrentActions = 0; // 0 = auto (LogicalProcessorCount * 2) bool EnableWorkerWebSocket = false; // Use WebSocket for worker↔orchestrator link # if ZEN_WITH_HORDE horde::HordeConfig HordeConfig; # endif # if ZEN_WITH_NOMAD nomad::NomadConfig NomadConfig; # endif }; struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase { ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions) : ZenServerConfiguratorBase(ServerOptions) , m_ServerOptions(ServerOptions) { } ~ZenComputeServerConfigurator() = default; private: virtual void AddCliOptions(cxxopts::Options& Options) override; virtual void AddConfigOptions(LuaConfig::Options& Options) override; virtual void ApplyOptions(cxxopts::Options& Options) override; virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override; virtual void ValidateOptions() override; ZenComputeServerConfig& m_ServerOptions; # if ZEN_WITH_HORDE std::string m_HordeModeStr = "direct"; std::string m_HordeEncryptionStr = "none"; # endif # if ZEN_WITH_NOMAD std::string m_NomadDriverStr = "raw_exec"; std::string m_NomadDistributionStr = "predeployed"; # endif }; class ZenComputeServerMain : public ZenServerMain { public: ZenComputeServerMain(ZenComputeServerConfig& ServerOptions); virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override; ZenComputeServerMain(const ZenComputeServerMain&) = delete; ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete; typedef ZenComputeServerConfig Config; typedef ZenComputeServerConfigurator Configurator; private: ZenComputeServerConfig& m_ServerOptions; }; /** * The compute server handles DDC build function execution requests * only. It's intended to be used on a pure compute resource and does * not handle any storage tasks. The actual scheduling happens upstream * in a storage server instance. */ class ZenComputeServer : public ZenServerBase { ZenComputeServer& operator=(ZenComputeServer&&) = delete; ZenComputeServer(ZenComputeServer&&) = delete; public: ZenComputeServer(); ~ZenComputeServer(); int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry); void Run(); void Cleanup(); private: GcManager m_GcManager; GcScheduler m_GcScheduler{m_GcManager}; std::unique_ptr m_CidStore; std::unique_ptr m_ApiService; std::unique_ptr m_ComputeService; std::unique_ptr m_OrchestratorService; std::unique_ptr m_CloudMetadata; std::future> m_CloudMetadataFuture; std::unique_ptr m_FrontendService; # if ZEN_WITH_HORDE std::unique_ptr m_HordeProvisioner; # endif # if ZEN_WITH_NOMAD std::unique_ptr m_NomadProvisioner; # endif SystemMetricsTracker m_MetricsTracker; std::string m_CoordinatorEndpoint; std::string m_InstanceId; asio::steady_timer m_AnnounceTimer{m_IoContext}; asio::steady_timer m_ProvisionerMaintenanceTimer{m_IoContext}; void InitializeState(const ZenComputeServerConfig& ServerConfig); void InitializeServices(const ZenComputeServerConfig& ServerConfig); void RegisterServices(const ZenComputeServerConfig& ServerConfig); void ResolveCloudMetadata(); void PostAnnounce(); void EnqueueAnnounceTimer(); void EnqueueProvisionerMaintenanceTimer(); void ProvisionerMaintenanceTick(); std::string GetAnnounceUrl() const; std::string GetInstanceId() const; CbObject BuildAnnounceBody(); // Worker→orchestrator WebSocket client struct OrchestratorWsHandler : public IWsClientHandler { ZenComputeServer& Server; explicit OrchestratorWsHandler(ZenComputeServer& S) : Server(S) {} void OnWsOpen() override; void OnWsMessage(const WebSocketMessage& Msg) override; void OnWsClose(uint16_t Code, std::string_view Reason) override; }; std::unique_ptr m_OrchestratorWsHandler; std::unique_ptr m_OrchestratorWsClient; asio::steady_timer m_WsReconnectTimer{m_IoContext}; bool m_EnableWorkerWebSocket = false; void InitializeOrchestratorWebSocket(); void EnqueueWsReconnect(); }; } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES