aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/computeserver.h
blob: 8f4edc0f0068b0e1e0567ec4c7d5b78075fee0e9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include "zenserver.h"

#if ZEN_WITH_COMPUTE_SERVICES

#	include <future>
#	include <zencore/system.h>
#	include <zenhttp/httpwsclient.h>
#	include <zenstore/gc.h>
#	include "frontend/frontend.h"

namespace cxxopts {
class Options;
}
namespace zen::LuaConfig {
struct Options;
}

namespace zen::compute {
class CloudMetadata;
class HttpComputeService;
class HttpOrchestratorService;
}  // namespace zen::compute

#	if ZEN_WITH_HORDE
#		include <zenhorde/hordeconfig.h>
namespace zen::horde {
class HordeProvisioner;
}  // namespace zen::horde
#	endif

#	if ZEN_WITH_NOMAD
#		include <zennomad/nomadconfig.h>
namespace zen::nomad {
class NomadProvisioner;
}  // namespace zen::nomad
#	endif

namespace zen {

class CidStore;
class HttpApiService;

struct ZenComputeServerConfig : public ZenServerConfig
{
	std::string UpstreamNotificationEndpoint;
	std::string InstanceId;	 // For use in notifications
	std::string CoordinatorEndpoint;
	std::string IdmsEndpoint;
	int32_t		MaxConcurrentActions  = 0;		// 0 = auto (LogicalProcessorCount * 2)
	bool		EnableWorkerWebSocket = false;	// Use WebSocket for worker↔orchestrator link

#	if ZEN_WITH_HORDE
	horde::HordeConfig HordeConfig;
#	endif

#	if ZEN_WITH_NOMAD
	nomad::NomadConfig NomadConfig;
#	endif
};

struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase
{
	ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions)
	: ZenServerConfiguratorBase(ServerOptions)
	, m_ServerOptions(ServerOptions)
	{
	}

	~ZenComputeServerConfigurator() = default;

private:
	virtual void AddCliOptions(cxxopts::Options& Options) override;
	virtual void AddConfigOptions(LuaConfig::Options& Options) override;
	virtual void ApplyOptions(cxxopts::Options& Options) override;
	virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override;
	virtual void ValidateOptions() override;

	ZenComputeServerConfig& m_ServerOptions;

#	if ZEN_WITH_HORDE
	std::string m_HordeModeStr		 = "direct";
	std::string m_HordeEncryptionStr = "none";
#	endif

#	if ZEN_WITH_NOMAD
	std::string m_NomadDriverStr	   = "raw_exec";
	std::string m_NomadDistributionStr = "predeployed";
#	endif
};

class ZenComputeServerMain : public ZenServerMain
{
public:
	ZenComputeServerMain(ZenComputeServerConfig& ServerOptions);
	virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override;

	ZenComputeServerMain(const ZenComputeServerMain&) = delete;
	ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete;

	typedef ZenComputeServerConfig		 Config;
	typedef ZenComputeServerConfigurator Configurator;

private:
	ZenComputeServerConfig& m_ServerOptions;
};

/**
 * The compute server handles DDC build function execution requests
 * only. It's intended to be used on a pure compute resource and does
 * not handle any storage tasks. The actual scheduling happens upstream
 * in a storage server instance.
 */

class ZenComputeServer : public ZenServerBase
{
	ZenComputeServer& operator=(ZenComputeServer&&) = delete;
	ZenComputeServer(ZenComputeServer&&)			= delete;

public:
	ZenComputeServer();
	~ZenComputeServer();

	int	 Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry);
	void Run();
	void Cleanup();

private:
	GcManager												  m_GcManager;
	GcScheduler												  m_GcScheduler{m_GcManager};
	std::unique_ptr<CidStore>								  m_CidStore;
	std::unique_ptr<HttpApiService>							  m_ApiService;
	std::unique_ptr<zen::compute::HttpComputeService>		  m_ComputeService;
	std::unique_ptr<zen::compute::HttpOrchestratorService>	  m_OrchestratorService;
	std::unique_ptr<zen::compute::CloudMetadata>			  m_CloudMetadata;
	std::future<std::unique_ptr<zen::compute::CloudMetadata>> m_CloudMetadataFuture;
	std::unique_ptr<HttpFrontendService>					  m_FrontendService;
#	if ZEN_WITH_HORDE
	std::unique_ptr<zen::horde::HordeProvisioner> m_HordeProvisioner;
#	endif
#	if ZEN_WITH_NOMAD
	std::unique_ptr<zen::nomad::NomadProvisioner> m_NomadProvisioner;
#	endif
	SystemMetricsTracker m_MetricsTracker;
	std::string			 m_CoordinatorEndpoint;
	std::string			 m_InstanceId;

	asio::steady_timer m_AnnounceTimer{m_IoContext};
	asio::steady_timer m_ProvisionerMaintenanceTimer{m_IoContext};

	void		InitializeState(const ZenComputeServerConfig& ServerConfig);
	void		InitializeServices(const ZenComputeServerConfig& ServerConfig);
	void		RegisterServices(const ZenComputeServerConfig& ServerConfig);
	void		ResolveCloudMetadata();
	void		PostAnnounce();
	void		EnqueueAnnounceTimer();
	void		EnqueueProvisionerMaintenanceTimer();
	void		ProvisionerMaintenanceTick();
	std::string GetAnnounceUrl() const;
	std::string GetInstanceId() const;
	CbObject	BuildAnnounceBody();

	// Worker→orchestrator WebSocket client
	struct OrchestratorWsHandler : public IWsClientHandler
	{
		ZenComputeServer& Server;
		explicit OrchestratorWsHandler(ZenComputeServer& S) : Server(S) {}

		void OnWsOpen() override;
		void OnWsMessage(const WebSocketMessage& Msg) override;
		void OnWsClose(uint16_t Code, std::string_view Reason) override;
	};

	std::unique_ptr<OrchestratorWsHandler> m_OrchestratorWsHandler;
	std::unique_ptr<HttpWsClient>		   m_OrchestratorWsClient;
	asio::steady_timer					   m_WsReconnectTimer{m_IoContext};
	bool								   m_EnableWorkerWebSocket = false;

	void InitializeOrchestratorWebSocket();
	void EnqueueWsReconnect();
};

}  // namespace zen

#endif	// ZEN_WITH_COMPUTE_SERVICES