// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include "hordecomputebuffer.h" #include "hordecomputechannel.h" #include "hordetransport.h" #include #include #include #include #include #include #include namespace zen::horde { /** Multiplexed socket that routes data between multiple channels over a single transport. * * Each channel is identified by an integer ID and backed by a pair of ComputeBuffers. * A recv thread demultiplexes incoming frames to channel-specific buffers, while * per-channel send threads multiplex outgoing data onto the shared transport. * * Wire format per frame: [channelId (4B)][size (4B)][data] * Control messages use negative sizes: -2 = detach (channel closed), -3 = ping. */ class ComputeSocket { public: explicit ComputeSocket(std::unique_ptr Transport); ~ComputeSocket(); ComputeSocket(const ComputeSocket&) = delete; ComputeSocket& operator=(const ComputeSocket&) = delete; /** Create a channel with the given ID. * Allocates anonymous in-process buffers and spawns a send thread for the channel. */ Ref CreateChannel(int ChannelId); /** Start the recv pump and ping threads. Must be called after all channels are created. */ void StartCommunication(); private: struct FrameHeader { int32_t Channel = 0; int32_t Size = 0; }; static constexpr int32_t ControlDetach = -2; static constexpr int32_t ControlPing = -3; LoggerRef Log() { return m_Log; } void RecvThreadProc(); void SendThreadProc(int Channel, ComputeBufferReader Reader); void PingThreadProc(); LoggerRef m_Log; std::unique_ptr m_Transport; std::mutex m_SendMutex; ///< Serializes writes to the transport std::mutex m_WritersMutex; std::unordered_map m_Writers; ///< Recv-side: writers keyed by channel ID std::vector m_Readers; ///< Send-side: readers for join on destruction std::unordered_map m_SendThreads; ///< One send thread per channel std::thread m_RecvThread; std::thread m_PingThread; bool m_PingShouldStop = false; std::mutex m_PingMutex; std::condition_variable m_PingCV; }; } // namespace zen::horde