// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "hordetransport.h" #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END #if ZEN_PLATFORM_WINDOWS # undef SendMessage #endif #include #include #include #include #include #include namespace zen::horde { class AsyncComputeTransport; /** Handler called when a data frame arrives for a channel. */ using FrameHandler = std::function Data)>; /** Handler called when a channel is detached by the remote peer. */ using DetachHandler = std::function; /** Handler for async send completion. */ using SendHandler = std::function; /** Async multiplexed socket that routes data between channels over a single transport. * * Uses an async recv pump, a serialized send queue, and a periodic ping timer - * all running on a shared io_context. * * Wire format per frame: [channelId(4B)][size(4B)][data]. * Control messages use negative sizes: -2 = detach, -3 = ping. */ class AsyncComputeSocket : public std::enable_shared_from_this { public: AsyncComputeSocket(std::unique_ptr Transport, asio::io_context& IoContext); ~AsyncComputeSocket(); AsyncComputeSocket(const AsyncComputeSocket&) = delete; AsyncComputeSocket& operator=(const AsyncComputeSocket&) = delete; /** Register callbacks for a channel. Must be called before StartRecvPump(). */ void RegisterChannel(int ChannelId, FrameHandler OnFrame, DetachHandler OnDetach); /** Begin the async recv pump and ping timer. */ void StartRecvPump(); /** Enqueue a data frame for async transmission. */ void AsyncSendFrame(int ChannelId, std::vector Data, SendHandler Handler = {}); /** Send a control frame (detach) for a channel. */ void AsyncSendDetach(int ChannelId, SendHandler Handler = {}); /** Close the transport and cancel all pending operations. */ void Close(); /** The strand on which all socket I/O callbacks run. Channels that need to serialize * their own state with OnFrame/OnDetach (which are invoked from this strand) should * bind their timers and async operations to it as well. */ asio::strand& GetStrand() { return m_Strand; } private: struct FrameHeader { int32_t Channel = 0; int32_t Size = 0; }; struct PendingWrite { FrameHeader Header; std::vector Data; SendHandler Handler; }; static constexpr int32_t ControlDetach = -2; static constexpr int32_t ControlPing = -3; LoggerRef Log() { return m_Log; } void DoRecvHeader(); void DoRecvPayload(FrameHeader Header); void FlushNextSend(); void StartPingTimer(); void HandleError(); LoggerRef m_Log; std::unique_ptr m_Transport; asio::strand m_Strand; asio::steady_timer m_PingTimer; std::unordered_map m_FrameHandlers; std::unordered_map m_DetachHandlers; FrameHeader m_RecvHeader; std::deque m_SendQueue; bool m_Closed = false; }; } // namespace zen::horde