1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "hordeagentmessage.h"
#include "hordecomputesocket.h"
#include <zenhorde/hordeclient.h>
#include <zencore/logbase.h>
#include <filesystem>
#include <functional>
#include <memory>
#include <string>
#include <vector>
namespace asio {
class io_context;
}
namespace zen::horde {
class AsyncComputeTransport;
/** Result passed to the completion handler when an async agent finishes. */
struct AsyncAgentResult
{
bool Success = false;
int32_t ExitCode = -1;
uint16_t CoreCount = 0; ///< Logical cores on the provisioned machine
};
/** Completion handler for async agent lifecycle. */
using AsyncAgentCompletionHandler = std::function<void(const AsyncAgentResult&)>;
/** Configuration for launching a remote zenserver instance via an async agent. */
struct AsyncAgentConfig
{
MachineInfo Machine;
std::vector<std::pair<std::string, std::filesystem::path>> Bundles; ///< (locator, bundleDir) pairs
std::string Executable;
std::vector<std::string> Args;
bool UseWine = false;
};
/** Async agent that manages the full lifecycle of a single Horde compute connection.
*
* Driven by a state machine using callbacks on a shared io_context - no dedicated
* threads. Call Start() to begin the connection/handshake/upload/execute/poll
* sequence. The completion handler is invoked when the remote process exits or
* an error occurs.
*/
class AsyncHordeAgent : public std::enable_shared_from_this<AsyncHordeAgent>
{
public:
AsyncHordeAgent(asio::io_context& IoContext);
~AsyncHordeAgent();
AsyncHordeAgent(const AsyncHordeAgent&) = delete;
AsyncHordeAgent& operator=(const AsyncHordeAgent&) = delete;
/** Start the full agent lifecycle. The completion handler is called exactly once. */
void Start(AsyncAgentConfig Config, AsyncAgentCompletionHandler OnDone);
/** Cancel in-flight operations. The completion handler is still called (with Success=false). */
void Cancel();
const MachineInfo& GetMachineInfo() const { return m_Config.Machine; }
enum class State
{
Idle,
Connecting,
WaitAgentAttach,
SentFork,
WaitChildAttach,
Uploading,
Executing,
Polling,
Done
};
private:
LoggerRef Log() { return m_Log; }
void DoConnect();
void OnConnected(const std::error_code& Ec);
void DoWaitAgentAttach();
void OnAgentResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
void DoSendFork();
void DoWaitChildAttach();
void OnChildAttachResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
void DoUploadNext();
void OnUploadResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
void DoExecute();
void DoPoll();
void OnPollResponse(AgentMessageType Type, const uint8_t* Data, size_t Size);
void Finish(bool Success, int32_t ExitCode = -1);
asio::io_context& m_IoContext;
LoggerRef m_Log;
State m_State = State::Idle;
bool m_Cancelled = false;
AsyncAgentConfig m_Config;
AsyncAgentCompletionHandler m_OnDone;
size_t m_CurrentBundleIndex = 0;
std::unique_ptr<AsyncTcpComputeTransport> m_TcpTransport;
std::unique_ptr<AsyncComputeTransport> m_Transport;
std::shared_ptr<AsyncComputeSocket> m_Socket;
std::unique_ptr<AsyncAgentMessageChannel> m_AgentChannel;
std::unique_ptr<AsyncAgentMessageChannel> m_ChildChannel;
};
} // namespace zen::horde
|