1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include <zenbase/zenbase.h>
#include "hordecomputechannel.h"
#include <cstddef>
#include <cstdint>
#include <string>
#include <string_view>
#include <vector>
namespace zen::horde {
/** Agent message types matching the UE EAgentMessageType byte values.
* These are the message opcodes exchanged over the agent/child channels. */
enum class AgentMessageType : uint8_t
{
None = 0x00,
Ping = 0x01,
Exception = 0x02,
Fork = 0x03,
Attach = 0x04,
WriteFiles = 0x10,
WriteFilesResponse = 0x11,
DeleteFiles = 0x12,
ExecuteV2 = 0x22,
ExecuteOutput = 0x17,
ExecuteResult = 0x18,
ReadBlob = 0x20,
ReadBlobResponse = 0x21,
};
/** Flags for the ExecuteV2 message. */
enum class ExecuteProcessFlags : uint8_t
{
None = 0,
UseWine = 1, ///< Run the executable under Wine on Linux agents
};
/** Parsed exception information from an Exception message. */
struct ExceptionInfo
{
std::string_view Message;
std::string_view Description;
};
/** Parsed blob read request from a ReadBlob message. */
struct BlobRequest
{
std::string_view Locator;
size_t Offset = 0;
size_t Length = 0;
};
/** Channel for sending and receiving agent messages over a ComputeChannel.
*
* Implements the Horde agent message protocol, matching the UE
* FAgentMessageChannel serialization format exactly. Messages are framed as
* [type (1B)][payload length (4B)][payload]. Strings use length-prefixed UTF-8;
* integers use variable-length encoding.
*
* The protocol has two directions:
* - Requests (initiator -> remote): Close, Ping, Fork, Attach, UploadFiles, Execute, Blob
* - Responses (remote -> initiator): ReadResponse returns the type, then call the
* appropriate Read* method to parse the payload.
*/
class AgentMessageChannel
{
public:
explicit AgentMessageChannel(Ref<ComputeChannel> Channel);
~AgentMessageChannel();
AgentMessageChannel(const AgentMessageChannel&) = delete;
AgentMessageChannel& operator=(const AgentMessageChannel&) = delete;
// --- Requests (Initiator -> Remote) ---
/** Close the channel. */
void Close();
/** Send a keepalive ping. */
void Ping();
/** Fork communication to a new channel with the given ID and buffer size. */
void Fork(int ChannelId, int BufferSize);
/** Send an attach request (used during channel setup handshake). */
void Attach();
/** Request the remote agent to write files from the given bundle locator. */
void UploadFiles(const char* Path, const char* Locator);
/** Execute a process on the remote machine. */
void Execute(const char* Exe,
const char* const* Args,
size_t NumArgs,
const char* WorkingDir,
const char* const* EnvVars,
size_t NumEnvVars,
ExecuteProcessFlags Flags = ExecuteProcessFlags::None);
/** Send blob data in response to a ReadBlob request. */
void Blob(const uint8_t* Data, size_t Length);
// --- Responses (Remote -> Initiator) ---
/** Read the next response message. Returns the message type, or None on timeout.
* After this returns, use GetResponseData()/GetResponseSize() or the typed
* Read* methods to access the payload. */
AgentMessageType ReadResponse(int32_t TimeoutMs = -1, bool* OutTimedOut = nullptr);
const void* GetResponseData() const { return m_ResponseData; }
size_t GetResponseSize() const { return m_ResponseLength; }
/** Parse an Exception response payload. */
void ReadException(ExceptionInfo& Ex);
/** Parse an ExecuteResult response payload. Returns the exit code. */
int ReadExecuteResult();
/** Parse a ReadBlob response payload into a BlobRequest. */
void ReadBlobRequest(BlobRequest& Req);
private:
static constexpr size_t MessageHeaderLength = 5; ///< [type(1B)][length(4B)]
Ref<ComputeChannel> m_Channel;
uint8_t* m_RequestData = nullptr;
size_t m_RequestSize = 0;
size_t m_MaxRequestSize = 0;
AgentMessageType m_ResponseType = AgentMessageType::None;
const uint8_t* m_ResponseData = nullptr;
size_t m_ResponseLength = 0;
void CreateMessage(AgentMessageType Type, size_t MaxLength);
void FlushMessage();
void WriteInt32(int Value);
static int ReadInt32(const uint8_t** Pos);
void WriteFixedLengthBytes(const uint8_t* Data, size_t Length);
static const uint8_t* ReadFixedLengthBytes(const uint8_t** Pos, size_t Length);
static size_t MeasureUnsignedVarInt(size_t Value);
void WriteUnsignedVarInt(size_t Value);
static size_t ReadUnsignedVarInt(const uint8_t** Pos);
size_t MeasureString(const char* Text) const;
void WriteString(const char* Text);
void WriteString(std::string_view Text);
static std::string_view ReadString(const uint8_t** Pos);
void WriteOptionalString(const char* Text);
};
} // namespace zen::horde
|