blob: 8f4edc0f0068b0e1e0567ec4c7d5b78075fee0e9 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "zenserver.h"
#if ZEN_WITH_COMPUTE_SERVICES
# include <future>
# include <zencore/system.h>
# include <zenhttp/httpwsclient.h>
# include <zenstore/gc.h>
# include "frontend/frontend.h"
namespace cxxopts {
class Options;
}
namespace zen::LuaConfig {
struct Options;
}
namespace zen::compute {
class CloudMetadata;
class HttpComputeService;
class HttpOrchestratorService;
} // namespace zen::compute
# if ZEN_WITH_HORDE
# include <zenhorde/hordeconfig.h>
namespace zen::horde {
class HordeProvisioner;
} // namespace zen::horde
# endif
# if ZEN_WITH_NOMAD
# include <zennomad/nomadconfig.h>
namespace zen::nomad {
class NomadProvisioner;
} // namespace zen::nomad
# endif
namespace zen {
class CidStore;
class HttpApiService;
struct ZenComputeServerConfig : public ZenServerConfig
{
std::string UpstreamNotificationEndpoint;
std::string InstanceId; // For use in notifications
std::string CoordinatorEndpoint;
std::string IdmsEndpoint;
int32_t MaxConcurrentActions = 0; // 0 = auto (LogicalProcessorCount * 2)
bool EnableWorkerWebSocket = false; // Use WebSocket for worker↔orchestrator link
# if ZEN_WITH_HORDE
horde::HordeConfig HordeConfig;
# endif
# if ZEN_WITH_NOMAD
nomad::NomadConfig NomadConfig;
# endif
};
struct ZenComputeServerConfigurator : public ZenServerConfiguratorBase
{
ZenComputeServerConfigurator(ZenComputeServerConfig& ServerOptions)
: ZenServerConfiguratorBase(ServerOptions)
, m_ServerOptions(ServerOptions)
{
}
~ZenComputeServerConfigurator() = default;
private:
virtual void AddCliOptions(cxxopts::Options& Options) override;
virtual void AddConfigOptions(LuaConfig::Options& Options) override;
virtual void ApplyOptions(cxxopts::Options& Options) override;
virtual void OnConfigFileParsed(LuaConfig::Options& LuaOptions) override;
virtual void ValidateOptions() override;
ZenComputeServerConfig& m_ServerOptions;
# if ZEN_WITH_HORDE
std::string m_HordeModeStr = "direct";
std::string m_HordeEncryptionStr = "none";
# endif
# if ZEN_WITH_NOMAD
std::string m_NomadDriverStr = "raw_exec";
std::string m_NomadDistributionStr = "predeployed";
# endif
};
class ZenComputeServerMain : public ZenServerMain
{
public:
ZenComputeServerMain(ZenComputeServerConfig& ServerOptions);
virtual void DoRun(ZenServerState::ZenServerEntry* Entry) override;
ZenComputeServerMain(const ZenComputeServerMain&) = delete;
ZenComputeServerMain& operator=(const ZenComputeServerMain&) = delete;
typedef ZenComputeServerConfig Config;
typedef ZenComputeServerConfigurator Configurator;
private:
ZenComputeServerConfig& m_ServerOptions;
};
/**
* The compute server handles DDC build function execution requests
* only. It's intended to be used on a pure compute resource and does
* not handle any storage tasks. The actual scheduling happens upstream
* in a storage server instance.
*/
class ZenComputeServer : public ZenServerBase
{
ZenComputeServer& operator=(ZenComputeServer&&) = delete;
ZenComputeServer(ZenComputeServer&&) = delete;
public:
ZenComputeServer();
~ZenComputeServer();
int Initialize(const ZenComputeServerConfig& ServerConfig, ZenServerState::ZenServerEntry* ServerEntry);
void Run();
void Cleanup();
private:
GcManager m_GcManager;
GcScheduler m_GcScheduler{m_GcManager};
std::unique_ptr<CidStore> m_CidStore;
std::unique_ptr<HttpApiService> m_ApiService;
std::unique_ptr<zen::compute::HttpComputeService> m_ComputeService;
std::unique_ptr<zen::compute::HttpOrchestratorService> m_OrchestratorService;
std::unique_ptr<zen::compute::CloudMetadata> m_CloudMetadata;
std::future<std::unique_ptr<zen::compute::CloudMetadata>> m_CloudMetadataFuture;
std::unique_ptr<HttpFrontendService> m_FrontendService;
# if ZEN_WITH_HORDE
std::unique_ptr<zen::horde::HordeProvisioner> m_HordeProvisioner;
# endif
# if ZEN_WITH_NOMAD
std::unique_ptr<zen::nomad::NomadProvisioner> m_NomadProvisioner;
# endif
SystemMetricsTracker m_MetricsTracker;
std::string m_CoordinatorEndpoint;
std::string m_InstanceId;
asio::steady_timer m_AnnounceTimer{m_IoContext};
asio::steady_timer m_ProvisionerMaintenanceTimer{m_IoContext};
void InitializeState(const ZenComputeServerConfig& ServerConfig);
void InitializeServices(const ZenComputeServerConfig& ServerConfig);
void RegisterServices(const ZenComputeServerConfig& ServerConfig);
void ResolveCloudMetadata();
void PostAnnounce();
void EnqueueAnnounceTimer();
void EnqueueProvisionerMaintenanceTimer();
void ProvisionerMaintenanceTick();
std::string GetAnnounceUrl() const;
std::string GetInstanceId() const;
CbObject BuildAnnounceBody();
// Worker→orchestrator WebSocket client
struct OrchestratorWsHandler : public IWsClientHandler
{
ZenComputeServer& Server;
explicit OrchestratorWsHandler(ZenComputeServer& S) : Server(S) {}
void OnWsOpen() override;
void OnWsMessage(const WebSocketMessage& Msg) override;
void OnWsClose(uint16_t Code, std::string_view Reason) override;
};
std::unique_ptr<OrchestratorWsHandler> m_OrchestratorWsHandler;
std::unique_ptr<HttpWsClient> m_OrchestratorWsClient;
asio::steady_timer m_WsReconnectTimer{m_IoContext};
bool m_EnableWorkerWebSocket = false;
void InitializeOrchestratorWebSocket();
void EnqueueWsReconnect();
};
} // namespace zen
#endif // ZEN_WITH_COMPUTE_SERVICES
|