diff options
Diffstat (limited to 'src/zenhorde/hordecomputesocket.h')
| -rw-r--r-- | src/zenhorde/hordecomputesocket.h | 109 |
1 files changed, 71 insertions, 38 deletions
diff --git a/src/zenhorde/hordecomputesocket.h b/src/zenhorde/hordecomputesocket.h index 0c3cb4195..6c494603a 100644 --- a/src/zenhorde/hordecomputesocket.h +++ b/src/zenhorde/hordecomputesocket.h @@ -2,45 +2,74 @@ #pragma once -#include "hordecomputebuffer.h" -#include "hordecomputechannel.h" #include "hordetransport.h" #include <zencore/logbase.h> -#include <condition_variable> +ZEN_THIRD_PARTY_INCLUDES_START +#include <asio.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +#if ZEN_PLATFORM_WINDOWS +# undef SendMessage +#endif + +#include <deque> +#include <functional> #include <memory> -#include <mutex> -#include <thread> +#include <system_error> #include <unordered_map> #include <vector> namespace zen::horde { -/** Multiplexed socket that routes data between multiple channels over a single transport. +class AsyncComputeTransport; + +/** Handler called when a data frame arrives for a channel. */ +using FrameHandler = std::function<void(std::vector<uint8_t> Data)>; + +/** Handler called when a channel is detached by the remote peer. */ +using DetachHandler = std::function<void()>; + +/** Handler for async send completion. */ +using SendHandler = std::function<void(const std::error_code&)>; + +/** Async multiplexed socket that routes data between channels over a single transport. * - * Each channel is identified by an integer ID and backed by a pair of ComputeBuffers. - * A recv thread demultiplexes incoming frames to channel-specific buffers, while - * per-channel send threads multiplex outgoing data onto the shared transport. + * Uses an async recv pump, a serialized send queue, and a periodic ping timer - + * all running on a shared io_context. * - * Wire format per frame: [channelId (4B)][size (4B)][data] - * Control messages use negative sizes: -2 = detach (channel closed), -3 = ping. + * Wire format per frame: [channelId(4B)][size(4B)][data]. + * Control messages use negative sizes: -2 = detach, -3 = ping. */ -class ComputeSocket +class AsyncComputeSocket : public std::enable_shared_from_this<AsyncComputeSocket> { public: - explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport); - ~ComputeSocket(); + AsyncComputeSocket(std::unique_ptr<AsyncComputeTransport> Transport, asio::io_context& IoContext); + ~AsyncComputeSocket(); + + AsyncComputeSocket(const AsyncComputeSocket&) = delete; + AsyncComputeSocket& operator=(const AsyncComputeSocket&) = delete; + + /** Register callbacks for a channel. Must be called before StartRecvPump(). */ + void RegisterChannel(int ChannelId, FrameHandler OnFrame, DetachHandler OnDetach); - ComputeSocket(const ComputeSocket&) = delete; - ComputeSocket& operator=(const ComputeSocket&) = delete; + /** Begin the async recv pump and ping timer. */ + void StartRecvPump(); - /** Create a channel with the given ID. - * Allocates anonymous in-process buffers and spawns a send thread for the channel. */ - Ref<ComputeChannel> CreateChannel(int ChannelId); + /** Enqueue a data frame for async transmission. */ + void AsyncSendFrame(int ChannelId, std::vector<uint8_t> Data, SendHandler Handler = {}); - /** Start the recv pump and ping threads. Must be called after all channels are created. */ - void StartCommunication(); + /** Send a control frame (detach) for a channel. */ + void AsyncSendDetach(int ChannelId, SendHandler Handler = {}); + + /** Close the transport and cancel all pending operations. */ + void Close(); + + /** The strand on which all socket I/O callbacks run. Channels that need to serialize + * their own state with OnFrame/OnDetach (which are invoked from this strand) should + * bind their timers and async operations to it as well. */ + asio::strand<asio::any_io_executor>& GetStrand() { return m_Strand; } private: struct FrameHeader @@ -49,31 +78,35 @@ private: int32_t Size = 0; }; + struct PendingWrite + { + FrameHeader Header; + std::vector<uint8_t> Data; + SendHandler Handler; + }; + static constexpr int32_t ControlDetach = -2; static constexpr int32_t ControlPing = -3; LoggerRef Log() { return m_Log; } - void RecvThreadProc(); - void SendThreadProc(int Channel, ComputeBufferReader Reader); - void PingThreadProc(); - - LoggerRef m_Log; - std::unique_ptr<ComputeTransport> m_Transport; - std::mutex m_SendMutex; ///< Serializes writes to the transport - - std::mutex m_WritersMutex; - std::unordered_map<int, ComputeBufferWriter> m_Writers; ///< Recv-side: writers keyed by channel ID + void DoRecvHeader(); + void DoRecvPayload(FrameHeader Header); + void FlushNextSend(); + void StartPingTimer(); + void HandleError(); - std::vector<ComputeBufferReader> m_Readers; ///< Send-side: readers for join on destruction - std::unordered_map<int, std::thread> m_SendThreads; ///< One send thread per channel + LoggerRef m_Log; + std::unique_ptr<AsyncComputeTransport> m_Transport; + asio::strand<asio::any_io_executor> m_Strand; + asio::steady_timer m_PingTimer; - std::thread m_RecvThread; - std::thread m_PingThread; + std::unordered_map<int, FrameHandler> m_FrameHandlers; + std::unordered_map<int, DetachHandler> m_DetachHandlers; - bool m_PingShouldStop = false; - std::mutex m_PingMutex; - std::condition_variable m_PingCV; + FrameHeader m_RecvHeader; + std::deque<PendingWrite> m_SendQueue; + bool m_Closed = false; }; } // namespace zen::horde |