diff options
Diffstat (limited to 'src/zenhorde/hordeagentmessage.h')
| -rw-r--r-- | src/zenhorde/hordeagentmessage.h | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/src/zenhorde/hordeagentmessage.h b/src/zenhorde/hordeagentmessage.h new file mode 100644 index 000000000..38c4375fd --- /dev/null +++ b/src/zenhorde/hordeagentmessage.h @@ -0,0 +1,161 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/zenbase.h> + +#include "hordecomputechannel.h" + +#include <cstddef> +#include <cstdint> +#include <string> +#include <string_view> +#include <vector> + +namespace zen::horde { + +/** Agent message types matching the UE EAgentMessageType byte values. + * These are the message opcodes exchanged over the agent/child channels. */ +enum class AgentMessageType : uint8_t +{ + None = 0x00, + Ping = 0x01, + Exception = 0x02, + Fork = 0x03, + Attach = 0x04, + WriteFiles = 0x10, + WriteFilesResponse = 0x11, + DeleteFiles = 0x12, + ExecuteV2 = 0x22, + ExecuteOutput = 0x17, + ExecuteResult = 0x18, + ReadBlob = 0x20, + ReadBlobResponse = 0x21, +}; + +/** Flags for the ExecuteV2 message. */ +enum class ExecuteProcessFlags : uint8_t +{ + None = 0, + UseWine = 1, ///< Run the executable under Wine on Linux agents +}; + +/** Parsed exception information from an Exception message. */ +struct ExceptionInfo +{ + std::string_view Message; + std::string_view Description; +}; + +/** Parsed blob read request from a ReadBlob message. */ +struct BlobRequest +{ + std::string_view Locator; + size_t Offset = 0; + size_t Length = 0; +}; + +/** Channel for sending and receiving agent messages over a ComputeChannel. + * + * Implements the Horde agent message protocol, matching the UE + * FAgentMessageChannel serialization format exactly. Messages are framed as + * [type (1B)][payload length (4B)][payload]. Strings use length-prefixed UTF-8; + * integers use variable-length encoding. + * + * The protocol has two directions: + * - Requests (initiator -> remote): Close, Ping, Fork, Attach, UploadFiles, Execute, Blob + * - Responses (remote -> initiator): ReadResponse returns the type, then call the + * appropriate Read* method to parse the payload. + */ +class AgentMessageChannel +{ +public: + explicit AgentMessageChannel(Ref<ComputeChannel> Channel); + ~AgentMessageChannel(); + + AgentMessageChannel(const AgentMessageChannel&) = delete; + AgentMessageChannel& operator=(const AgentMessageChannel&) = delete; + + // --- Requests (Initiator -> Remote) --- + + /** Close the channel. */ + void Close(); + + /** Send a keepalive ping. */ + void Ping(); + + /** Fork communication to a new channel with the given ID and buffer size. */ + void Fork(int ChannelId, int BufferSize); + + /** Send an attach request (used during channel setup handshake). */ + void Attach(); + + /** Request the remote agent to write files from the given bundle locator. */ + void UploadFiles(const char* Path, const char* Locator); + + /** Execute a process on the remote machine. */ + void Execute(const char* Exe, + const char* const* Args, + size_t NumArgs, + const char* WorkingDir, + const char* const* EnvVars, + size_t NumEnvVars, + ExecuteProcessFlags Flags = ExecuteProcessFlags::None); + + /** Send blob data in response to a ReadBlob request. */ + void Blob(const uint8_t* Data, size_t Length); + + // --- Responses (Remote -> Initiator) --- + + /** Read the next response message. Returns the message type, or None on timeout. + * After this returns, use GetResponseData()/GetResponseSize() or the typed + * Read* methods to access the payload. */ + AgentMessageType ReadResponse(int32_t TimeoutMs = -1, bool* OutTimedOut = nullptr); + + const void* GetResponseData() const { return m_ResponseData; } + size_t GetResponseSize() const { return m_ResponseLength; } + + /** Parse an Exception response payload. */ + void ReadException(ExceptionInfo& Ex); + + /** Parse an ExecuteResult response payload. Returns the exit code. */ + int ReadExecuteResult(); + + /** Parse a ReadBlob response payload into a BlobRequest. */ + void ReadBlobRequest(BlobRequest& Req); + +private: + static constexpr size_t MessageHeaderLength = 5; ///< [type(1B)][length(4B)] + + Ref<ComputeChannel> m_Channel; + + uint8_t* m_RequestData = nullptr; + size_t m_RequestSize = 0; + size_t m_MaxRequestSize = 0; + + AgentMessageType m_ResponseType = AgentMessageType::None; + const uint8_t* m_ResponseData = nullptr; + size_t m_ResponseLength = 0; + + void CreateMessage(AgentMessageType Type, size_t MaxLength); + void FlushMessage(); + + void WriteInt32(int Value); + static int ReadInt32(const uint8_t** Pos); + + void WriteFixedLengthBytes(const uint8_t* Data, size_t Length); + static const uint8_t* ReadFixedLengthBytes(const uint8_t** Pos, size_t Length); + + static size_t MeasureUnsignedVarInt(size_t Value); + void WriteUnsignedVarInt(size_t Value); + static size_t ReadUnsignedVarInt(const uint8_t** Pos); + + size_t MeasureString(const char* Text) const; + void WriteString(const char* Text); + void WriteString(std::string_view Text); + static std::string_view ReadString(const uint8_t** Pos); + + void WriteOptionalString(const char* Text); +}; + +} // namespace zen::horde |