// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include "hordecomputechannel.h" #include #include #include #include #include namespace zen::horde { /** Agent message types matching the UE EAgentMessageType byte values. * These are the message opcodes exchanged over the agent/child channels. */ enum class AgentMessageType : uint8_t { None = 0x00, Ping = 0x01, Exception = 0x02, Fork = 0x03, Attach = 0x04, WriteFiles = 0x10, WriteFilesResponse = 0x11, DeleteFiles = 0x12, ExecuteV2 = 0x22, ExecuteOutput = 0x17, ExecuteResult = 0x18, ReadBlob = 0x20, ReadBlobResponse = 0x21, }; /** Flags for the ExecuteV2 message. */ enum class ExecuteProcessFlags : uint8_t { None = 0, UseWine = 1, ///< Run the executable under Wine on Linux agents }; /** Parsed exception information from an Exception message. */ struct ExceptionInfo { std::string_view Message; std::string_view Description; }; /** Parsed blob read request from a ReadBlob message. */ struct BlobRequest { std::string_view Locator; size_t Offset = 0; size_t Length = 0; }; /** Channel for sending and receiving agent messages over a ComputeChannel. * * Implements the Horde agent message protocol, matching the UE * FAgentMessageChannel serialization format exactly. Messages are framed as * [type (1B)][payload length (4B)][payload]. Strings use length-prefixed UTF-8; * integers use variable-length encoding. * * The protocol has two directions: * - Requests (initiator -> remote): Close, Ping, Fork, Attach, UploadFiles, Execute, Blob * - Responses (remote -> initiator): ReadResponse returns the type, then call the * appropriate Read* method to parse the payload. */ class AgentMessageChannel { public: explicit AgentMessageChannel(Ref Channel); ~AgentMessageChannel(); AgentMessageChannel(const AgentMessageChannel&) = delete; AgentMessageChannel& operator=(const AgentMessageChannel&) = delete; // --- Requests (Initiator -> Remote) --- /** Close the channel. */ void Close(); /** Send a keepalive ping. */ void Ping(); /** Fork communication to a new channel with the given ID and buffer size. */ void Fork(int ChannelId, int BufferSize); /** Send an attach request (used during channel setup handshake). */ void Attach(); /** Request the remote agent to write files from the given bundle locator. */ void UploadFiles(const char* Path, const char* Locator); /** Execute a process on the remote machine. */ void Execute(const char* Exe, const char* const* Args, size_t NumArgs, const char* WorkingDir, const char* const* EnvVars, size_t NumEnvVars, ExecuteProcessFlags Flags = ExecuteProcessFlags::None); /** Send blob data in response to a ReadBlob request. */ void Blob(const uint8_t* Data, size_t Length); // --- Responses (Remote -> Initiator) --- /** Read the next response message. Returns the message type, or None on timeout. * After this returns, use GetResponseData()/GetResponseSize() or the typed * Read* methods to access the payload. */ AgentMessageType ReadResponse(int32_t TimeoutMs = -1, bool* OutTimedOut = nullptr); const void* GetResponseData() const { return m_ResponseData; } size_t GetResponseSize() const { return m_ResponseLength; } /** Parse an Exception response payload. */ void ReadException(ExceptionInfo& Ex); /** Parse an ExecuteResult response payload. Returns the exit code. */ int ReadExecuteResult(); /** Parse a ReadBlob response payload into a BlobRequest. */ void ReadBlobRequest(BlobRequest& Req); private: static constexpr size_t MessageHeaderLength = 5; ///< [type(1B)][length(4B)] Ref m_Channel; uint8_t* m_RequestData = nullptr; size_t m_RequestSize = 0; size_t m_MaxRequestSize = 0; AgentMessageType m_ResponseType = AgentMessageType::None; const uint8_t* m_ResponseData = nullptr; size_t m_ResponseLength = 0; void CreateMessage(AgentMessageType Type, size_t MaxLength); void FlushMessage(); void WriteInt32(int Value); static int ReadInt32(const uint8_t** Pos); void WriteFixedLengthBytes(const uint8_t* Data, size_t Length); static const uint8_t* ReadFixedLengthBytes(const uint8_t** Pos, size_t Length); static size_t MeasureUnsignedVarInt(size_t Value); void WriteUnsignedVarInt(size_t Value); static size_t ReadUnsignedVarInt(const uint8_t** Pos); size_t MeasureString(const char* Text) const; void WriteString(const char* Text); void WriteString(std::string_view Text); static std::string_view ReadString(const uint8_t** Pos); void WriteOptionalString(const char* Text); }; } // namespace zen::horde