1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
|
// Copyright Epic Games, Inc. All Rights Reserved.
#include <zencore/fmtutils.h>
#include <zencore/iobuffer.h>
#include <zencore/logging.h>
#include <zencore/memoryview.h>
#include <zencore/trace.h>
#include <zenhorde/hordeclient.h>
#include <zenhttp/httpclient.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <json11.hpp>
ZEN_THIRD_PARTY_INCLUDES_END
namespace zen::horde {
HordeClient::HordeClient(const HordeConfig& Config) : m_Config(Config), m_Log(zen::logging::Get("horde.client"))
{
}
HordeClient::~HordeClient() = default;
bool
HordeClient::Initialize()
{
ZEN_TRACE_CPU("HordeClient::Initialize");
HttpClientSettings Settings;
Settings.LogCategory = "horde.http";
Settings.ConnectTimeout = std::chrono::milliseconds{10000};
Settings.Timeout = std::chrono::milliseconds{60000};
Settings.RetryCount = 1;
Settings.ExpectedErrorCodes = {HttpResponseCode::ServiceUnavailable, HttpResponseCode::TooManyRequests};
if (!m_Config.AuthToken.empty())
{
Settings.AccessTokenProvider = [token = m_Config.AuthToken]() -> HttpClientAccessToken {
HttpClientAccessToken Token;
Token.Value = token;
Token.ExpireTime = HttpClientAccessToken::Clock::now() + std::chrono::hours{24};
return Token;
};
}
m_Http = std::make_unique<zen::HttpClient>(m_Config.ServerUrl, Settings);
if (!m_Config.AuthToken.empty())
{
if (!m_Http->Authenticate())
{
ZEN_WARN("failed to authenticate with Horde server");
return false;
}
}
return true;
}
std::string
HordeClient::BuildRequestBody() const
{
json11::Json::object Requirements;
if (m_Config.Mode == ConnectionMode::Direct && !m_Config.Pool.empty())
{
Requirements["pool"] = m_Config.Pool;
}
std::string Condition;
#if ZEN_PLATFORM_WINDOWS
ExtendableStringBuilder<256> CondBuf;
CondBuf << "(OSFamily == 'Windows' || WineEnabled == '" << (m_Config.AllowWine ? "true" : "false") << "')";
Condition = std::string(CondBuf);
#elif ZEN_PLATFORM_MAC
Condition = "OSFamily == 'MacOS'";
#else
Condition = "OSFamily == 'Linux'";
#endif
if (!m_Config.Condition.empty())
{
Condition += " ";
Condition += m_Config.Condition;
}
Requirements["condition"] = Condition;
Requirements["exclusive"] = true;
json11::Json::object Connection;
Connection["modePreference"] = ToString(m_Config.Mode);
if (m_Config.EncryptionMode != Encryption::None)
{
Connection["encryption"] = ToString(m_Config.EncryptionMode);
}
// Request configured zen service port to be forwarded. The Horde agent will map this
// to a local port on the provisioned machine and report it back in the response.
json11::Json::object PortsObj;
PortsObj["ZenPort"] = json11::Json(m_Config.ZenServicePort);
Connection["ports"] = PortsObj;
json11::Json::object Root;
Root["requirements"] = Requirements;
Root["connection"] = Connection;
return json11::Json(Root).dump();
}
bool
HordeClient::ResolveCluster(const std::string& RequestBody, ClusterInfo& OutCluster)
{
ZEN_TRACE_CPU("HordeClient::ResolveCluster");
const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{RequestBody.data(), RequestBody.size()}, ZenContentType::kJSON);
const HttpClient::Response Response = m_Http->Post("api/v2/compute/_cluster", Payload);
if (Response.Error)
{
ZEN_WARN("cluster resolution failed: {}", Response.Error->ErrorMessage);
return false;
}
const int StatusCode = static_cast<int>(Response.StatusCode);
if (StatusCode == 503 || StatusCode == 429)
{
ZEN_DEBUG("cluster resolution returned HTTP/{}: no resources", StatusCode);
return false;
}
if (StatusCode == 401)
{
ZEN_WARN("cluster resolution returned HTTP/401: token expired");
return false;
}
if (!Response.IsSuccess())
{
ZEN_WARN("cluster resolution failed with HTTP/{}", StatusCode);
return false;
}
const std::string Body(Response.AsText());
std::string Err;
const json11::Json Json = json11::Json::parse(Body, Err);
if (!Err.empty())
{
ZEN_WARN("invalid JSON response for cluster resolution: {}", Err);
return false;
}
const json11::Json ClusterIdVal = Json["clusterId"];
if (!ClusterIdVal.is_string() || ClusterIdVal.string_value().empty())
{
ZEN_WARN("missing 'clusterId' in cluster resolution response");
return false;
}
OutCluster.ClusterId = ClusterIdVal.string_value();
return true;
}
bool
HordeClient::ParseHexBytes(std::string_view Hex, uint8_t* Out, size_t OutSize)
{
if (Hex.size() != OutSize * 2)
{
return false;
}
for (size_t i = 0; i < OutSize; ++i)
{
auto HexToByte = [](char c) -> int {
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
return -1;
};
const int Hi = HexToByte(Hex[i * 2]);
const int Lo = HexToByte(Hex[i * 2 + 1]);
if (Hi < 0 || Lo < 0)
{
return false;
}
Out[i] = static_cast<uint8_t>((Hi << 4) | Lo);
}
return true;
}
bool
HordeClient::RequestMachine(const std::string& RequestBody, const std::string& ClusterId, MachineInfo& OutMachine)
{
ZEN_TRACE_CPU("HordeClient::RequestMachine");
ZEN_INFO("requesting machine from Horde with cluster '{}'", ClusterId.empty() ? "default" : ClusterId.c_str());
ExtendableStringBuilder<128> ResourcePath;
ResourcePath << "api/v2/compute/" << (ClusterId.empty() ? "default" : ClusterId.c_str());
const IoBuffer Payload = IoBufferBuilder::MakeFromMemory(MemoryView{RequestBody.data(), RequestBody.size()}, ZenContentType::kJSON);
const HttpClient::Response Response = m_Http->Post(ResourcePath.ToView(), Payload);
// Reset output to invalid state
OutMachine = {};
OutMachine.Port = 0xFFFF;
if (Response.Error)
{
ZEN_WARN("machine request failed: {}", Response.Error->ErrorMessage);
return false;
}
const int StatusCode = static_cast<int>(Response.StatusCode);
if (StatusCode == 404 || StatusCode == 503 || StatusCode == 429)
{
ZEN_DEBUG("machine request returned HTTP/{}: no resources", StatusCode);
return false;
}
if (StatusCode == 401)
{
ZEN_WARN("machine request returned HTTP/401: token expired");
return false;
}
if (!Response.IsSuccess())
{
ZEN_WARN("machine request failed with HTTP/{}", StatusCode);
return false;
}
const std::string Body(Response.AsText());
std::string Err;
const json11::Json Json = json11::Json::parse(Body, Err);
if (!Err.empty())
{
ZEN_WARN("invalid JSON response for machine request: {}", Err);
return false;
}
// Required fields
const json11::Json NonceVal = Json["nonce"];
const json11::Json IpVal = Json["ip"];
const json11::Json PortVal = Json["port"];
if (!NonceVal.is_string() || !IpVal.is_string() || !PortVal.is_number())
{
ZEN_WARN("missing 'nonce', 'ip', or 'port' in machine response");
return false;
}
OutMachine.Ip = IpVal.string_value();
OutMachine.Port = static_cast<uint16_t>(PortVal.int_value());
if (!ParseHexBytes(NonceVal.string_value(), OutMachine.Nonce, NonceSize))
{
ZEN_WARN("invalid nonce hex string in machine response");
return false;
}
if (const json11::Json PortsVal = Json["ports"]; PortsVal.is_object())
{
for (const auto& [Key, Val] : PortsVal.object_items())
{
PortInfo Info;
if (Val["port"].is_number())
{
Info.Port = static_cast<uint16_t>(Val["port"].int_value());
}
if (Val["agentPort"].is_number())
{
Info.AgentPort = static_cast<uint16_t>(Val["agentPort"].int_value());
}
OutMachine.Ports[Key] = Info;
}
}
if (const json11::Json ConnectionModeVal = Json["connectionMode"]; ConnectionModeVal.is_string())
{
if (FromString(OutMachine.Mode, ConnectionModeVal.string_value()))
{
if (const json11::Json ConnectionAddressVal = Json["connectionAddress"]; ConnectionAddressVal.is_string())
{
OutMachine.ConnectionAddress = ConnectionAddressVal.string_value();
}
}
}
// Properties are a flat string array of "Key=Value" pairs describing the machine.
// We extract OS family and core counts for sizing decisions. If neither core count
// is available, we fall back to 16 as a conservative default.
uint16_t LogicalCores = 0;
uint16_t PhysicalCores = 0;
if (const json11::Json PropertiesVal = Json["properties"]; PropertiesVal.is_array())
{
for (const json11::Json& PropVal : PropertiesVal.array_items())
{
if (!PropVal.is_string())
{
continue;
}
const std::string Prop = PropVal.string_value();
if (Prop.starts_with("OSFamily="))
{
if (Prop.substr(9) == "Windows")
{
OutMachine.IsWindows = true;
}
}
else if (Prop.starts_with("LogicalCores="))
{
LogicalCores = static_cast<uint16_t>(std::atoi(Prop.c_str() + 13));
}
else if (Prop.starts_with("PhysicalCores="))
{
PhysicalCores = static_cast<uint16_t>(std::atoi(Prop.c_str() + 14));
}
}
}
if (LogicalCores > 0)
{
OutMachine.LogicalCores = LogicalCores;
}
else if (PhysicalCores > 0)
{
OutMachine.LogicalCores = PhysicalCores * 2;
}
else
{
OutMachine.LogicalCores = 16;
}
if (const json11::Json EncryptionVal = Json["encryption"]; EncryptionVal.is_string())
{
if (FromString(OutMachine.EncryptionMode, EncryptionVal.string_value()))
{
if (OutMachine.EncryptionMode == Encryption::AES)
{
const json11::Json KeyVal = Json["key"];
if (KeyVal.is_string() && !KeyVal.string_value().empty())
{
if (!ParseHexBytes(KeyVal.string_value(), OutMachine.Key, KeySize))
{
ZEN_WARN("invalid AES key in machine response");
}
}
else
{
ZEN_WARN("AES encryption requested but no key provided");
}
}
}
}
if (const json11::Json LeaseIdVal = Json["leaseId"]; LeaseIdVal.is_string())
{
OutMachine.LeaseId = LeaseIdVal.string_value();
}
ZEN_INFO("Horde machine assigned [{}:{}] cores={} lease={}",
OutMachine.GetConnectionAddress(),
OutMachine.GetConnectionPort(),
OutMachine.LogicalCores,
OutMachine.LeaseId);
return true;
}
} // namespace zen::horde
|