// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include "hordecomputesocket.h" #include #include #include #include #include #include #include #include #include namespace asio { class io_context; } // namespace asio namespace zen::horde { /** Agent message types matching the UE EAgentMessageType byte values. * These are the message opcodes exchanged over the agent/child channels. */ enum class AgentMessageType : uint8_t { None = 0x00, Ping = 0x01, Exception = 0x02, Fork = 0x03, Attach = 0x04, WriteFiles = 0x10, WriteFilesResponse = 0x11, DeleteFiles = 0x12, ExecuteV2 = 0x22, ExecuteOutput = 0x17, ExecuteResult = 0x18, ReadBlob = 0x20, ReadBlobResponse = 0x21, }; /** Flags for the ExecuteV2 message. */ enum class ExecuteProcessFlags : uint8_t { None = 0, UseWine = 1, ///< Run the executable under Wine on Linux agents }; /** Parsed exception information from an Exception message. */ struct ExceptionInfo { std::string_view Message; std::string_view Description; }; /** Parsed blob read request from a ReadBlob message. */ struct BlobRequest { std::string_view Locator; size_t Offset = 0; size_t Length = 0; }; /** Handler for async response reads. Receives the message type and a view of the payload data. * The payload vector is valid until the next AsyncReadResponse call. */ using AsyncResponseHandler = std::function; /** Async channel for sending and receiving agent messages over an AsyncComputeSocket. * * Send methods build messages into vectors and submit them via AsyncComputeSocket. * Receives are delivered via the socket's FrameHandler callback and queued internally. * AsyncReadResponse checks the queue and invokes the handler, with optional timeout. * * All operations must be externally serialized (e.g. via the socket's strand). */ class AsyncAgentMessageChannel { public: AsyncAgentMessageChannel(std::shared_ptr Socket, int ChannelId, asio::io_context& IoContext); ~AsyncAgentMessageChannel(); AsyncAgentMessageChannel(const AsyncAgentMessageChannel&) = delete; AsyncAgentMessageChannel& operator=(const AsyncAgentMessageChannel&) = delete; // --- Requests (fire-and-forget sends) --- void Close(); void Ping(); void Fork(int ChannelId, int BufferSize); void Attach(); void UploadFiles(const char* Path, const char* Locator); void Execute(const char* Exe, const char* const* Args, size_t NumArgs, const char* WorkingDir, const char* const* EnvVars, size_t NumEnvVars, ExecuteProcessFlags Flags = ExecuteProcessFlags::None); void Blob(const uint8_t* Data, size_t Length); // --- Async response reading --- /** Read the next response. If a frame is already queued, the handler is posted immediately. * Otherwise waits up to TimeoutMs for a frame to arrive. On timeout, invokes the handler * with AgentMessageType::None. */ void AsyncReadResponse(int32_t TimeoutMs, AsyncResponseHandler Handler); /** Called by the socket's FrameHandler when a frame arrives for this channel. */ void OnFrame(std::vector Data); /** Called by the socket's DetachHandler. */ void OnDetach(); /** Returns true if the channel has been detached (connection lost). */ bool IsDetached() const { return m_Detached; } // --- Response parsing helpers --- /** Parse an Exception message payload. Returns false on malformed/truncated input. */ [[nodiscard]] static bool ReadException(const uint8_t* Data, size_t Size, ExceptionInfo& Ex); /** Parse an ExecuteResult message payload. Returns false on malformed/truncated input. */ [[nodiscard]] static bool ReadExecuteResult(const uint8_t* Data, size_t Size, int32_t& OutExitCode); /** Parse a ReadBlob message payload. Returns false on malformed/truncated input or * if the Locator contains characters that would not be safe to use as a path component. */ [[nodiscard]] static bool ReadBlobRequest(const uint8_t* Data, size_t Size, BlobRequest& Req); private: static constexpr size_t MessageHeaderLength = 5; // Message building helpers std::vector BeginMessage(AgentMessageType Type, size_t ReservePayload); void FinalizeAndSend(std::vector Msg); /** Bounds-checked reader cursor. All Read* helpers set ParseError instead of reading past End. */ struct ReadCursor { const uint8_t* Pos = nullptr; const uint8_t* End = nullptr; bool ParseError = false; [[nodiscard]] bool CheckAvailable(size_t N) { if (ParseError || static_cast(End - Pos) < N) { ParseError = true; return false; } return true; } }; static void WriteInt32(std::vector& Buf, int Value); static int ReadInt32(ReadCursor& C); static void WriteFixedLengthBytes(std::vector& Buf, const uint8_t* Data, size_t Length); static const uint8_t* ReadFixedLengthBytes(ReadCursor& C, size_t Length); static size_t MeasureUnsignedVarInt(size_t Value); static void WriteUnsignedVarInt(std::vector& Buf, size_t Value); static size_t ReadUnsignedVarInt(ReadCursor& C); static void WriteString(std::vector& Buf, const char* Text); static void WriteString(std::vector& Buf, std::string_view Text); static std::string_view ReadString(ReadCursor& C); static void WriteOptionalString(std::vector& Buf, const char* Text); std::shared_ptr m_Socket; int m_ChannelId; asio::io_context& m_IoContext; std::deque> m_IncomingFrames; AsyncResponseHandler m_PendingHandler; std::unique_ptr m_TimeoutTimer; bool m_Detached = false; }; } // namespace zen::horde