aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordeagentmessage.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhorde/hordeagentmessage.h')
-rw-r--r--src/zenhorde/hordeagentmessage.h161
1 files changed, 161 insertions, 0 deletions
diff --git a/src/zenhorde/hordeagentmessage.h b/src/zenhorde/hordeagentmessage.h
new file mode 100644
index 000000000..38c4375fd
--- /dev/null
+++ b/src/zenhorde/hordeagentmessage.h
@@ -0,0 +1,161 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zenbase/zenbase.h>
+
+#include "hordecomputechannel.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <string_view>
+#include <vector>
+
+namespace zen::horde {
+
+/** Agent message types matching the UE EAgentMessageType byte values.
+ * These are the message opcodes exchanged over the agent/child channels. */
+enum class AgentMessageType : uint8_t
+{
+ None = 0x00,
+ Ping = 0x01,
+ Exception = 0x02,
+ Fork = 0x03,
+ Attach = 0x04,
+ WriteFiles = 0x10,
+ WriteFilesResponse = 0x11,
+ DeleteFiles = 0x12,
+ ExecuteV2 = 0x22,
+ ExecuteOutput = 0x17,
+ ExecuteResult = 0x18,
+ ReadBlob = 0x20,
+ ReadBlobResponse = 0x21,
+};
+
+/** Flags for the ExecuteV2 message. */
+enum class ExecuteProcessFlags : uint8_t
+{
+ None = 0,
+ UseWine = 1, ///< Run the executable under Wine on Linux agents
+};
+
+/** Parsed exception information from an Exception message. */
+struct ExceptionInfo
+{
+ std::string_view Message;
+ std::string_view Description;
+};
+
+/** Parsed blob read request from a ReadBlob message. */
+struct BlobRequest
+{
+ std::string_view Locator;
+ size_t Offset = 0;
+ size_t Length = 0;
+};
+
+/** Channel for sending and receiving agent messages over a ComputeChannel.
+ *
+ * Implements the Horde agent message protocol, matching the UE
+ * FAgentMessageChannel serialization format exactly. Messages are framed as
+ * [type (1B)][payload length (4B)][payload]. Strings use length-prefixed UTF-8;
+ * integers use variable-length encoding.
+ *
+ * The protocol has two directions:
+ * - Requests (initiator -> remote): Close, Ping, Fork, Attach, UploadFiles, Execute, Blob
+ * - Responses (remote -> initiator): ReadResponse returns the type, then call the
+ * appropriate Read* method to parse the payload.
+ */
+class AgentMessageChannel
+{
+public:
+ explicit AgentMessageChannel(Ref<ComputeChannel> Channel);
+ ~AgentMessageChannel();
+
+ AgentMessageChannel(const AgentMessageChannel&) = delete;
+ AgentMessageChannel& operator=(const AgentMessageChannel&) = delete;
+
+ // --- Requests (Initiator -> Remote) ---
+
+ /** Close the channel. */
+ void Close();
+
+ /** Send a keepalive ping. */
+ void Ping();
+
+ /** Fork communication to a new channel with the given ID and buffer size. */
+ void Fork(int ChannelId, int BufferSize);
+
+ /** Send an attach request (used during channel setup handshake). */
+ void Attach();
+
+ /** Request the remote agent to write files from the given bundle locator. */
+ void UploadFiles(const char* Path, const char* Locator);
+
+ /** Execute a process on the remote machine. */
+ void Execute(const char* Exe,
+ const char* const* Args,
+ size_t NumArgs,
+ const char* WorkingDir,
+ const char* const* EnvVars,
+ size_t NumEnvVars,
+ ExecuteProcessFlags Flags = ExecuteProcessFlags::None);
+
+ /** Send blob data in response to a ReadBlob request. */
+ void Blob(const uint8_t* Data, size_t Length);
+
+ // --- Responses (Remote -> Initiator) ---
+
+ /** Read the next response message. Returns the message type, or None on timeout.
+ * After this returns, use GetResponseData()/GetResponseSize() or the typed
+ * Read* methods to access the payload. */
+ AgentMessageType ReadResponse(int32_t TimeoutMs = -1, bool* OutTimedOut = nullptr);
+
+ const void* GetResponseData() const { return m_ResponseData; }
+ size_t GetResponseSize() const { return m_ResponseLength; }
+
+ /** Parse an Exception response payload. */
+ void ReadException(ExceptionInfo& Ex);
+
+ /** Parse an ExecuteResult response payload. Returns the exit code. */
+ int ReadExecuteResult();
+
+ /** Parse a ReadBlob response payload into a BlobRequest. */
+ void ReadBlobRequest(BlobRequest& Req);
+
+private:
+ static constexpr size_t MessageHeaderLength = 5; ///< [type(1B)][length(4B)]
+
+ Ref<ComputeChannel> m_Channel;
+
+ uint8_t* m_RequestData = nullptr;
+ size_t m_RequestSize = 0;
+ size_t m_MaxRequestSize = 0;
+
+ AgentMessageType m_ResponseType = AgentMessageType::None;
+ const uint8_t* m_ResponseData = nullptr;
+ size_t m_ResponseLength = 0;
+
+ void CreateMessage(AgentMessageType Type, size_t MaxLength);
+ void FlushMessage();
+
+ void WriteInt32(int Value);
+ static int ReadInt32(const uint8_t** Pos);
+
+ void WriteFixedLengthBytes(const uint8_t* Data, size_t Length);
+ static const uint8_t* ReadFixedLengthBytes(const uint8_t** Pos, size_t Length);
+
+ static size_t MeasureUnsignedVarInt(size_t Value);
+ void WriteUnsignedVarInt(size_t Value);
+ static size_t ReadUnsignedVarInt(const uint8_t** Pos);
+
+ size_t MeasureString(const char* Text) const;
+ void WriteString(const char* Text);
+ void WriteString(std::string_view Text);
+ static std::string_view ReadString(const uint8_t** Pos);
+
+ void WriteOptionalString(const char* Text);
+};
+
+} // namespace zen::horde