aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordecomputesocket.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhorde/hordecomputesocket.h')
-rw-r--r--src/zenhorde/hordecomputesocket.h109
1 files changed, 71 insertions, 38 deletions
diff --git a/src/zenhorde/hordecomputesocket.h b/src/zenhorde/hordecomputesocket.h
index 0c3cb4195..6c494603a 100644
--- a/src/zenhorde/hordecomputesocket.h
+++ b/src/zenhorde/hordecomputesocket.h
@@ -2,45 +2,74 @@
#pragma once
-#include "hordecomputebuffer.h"
-#include "hordecomputechannel.h"
#include "hordetransport.h"
#include <zencore/logbase.h>
-#include <condition_variable>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <asio.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+#if ZEN_PLATFORM_WINDOWS
+# undef SendMessage
+#endif
+
+#include <deque>
+#include <functional>
#include <memory>
-#include <mutex>
-#include <thread>
+#include <system_error>
#include <unordered_map>
#include <vector>
namespace zen::horde {
-/** Multiplexed socket that routes data between multiple channels over a single transport.
+class AsyncComputeTransport;
+
+/** Handler called when a data frame arrives for a channel. */
+using FrameHandler = std::function<void(std::vector<uint8_t> Data)>;
+
+/** Handler called when a channel is detached by the remote peer. */
+using DetachHandler = std::function<void()>;
+
+/** Handler for async send completion. */
+using SendHandler = std::function<void(const std::error_code&)>;
+
+/** Async multiplexed socket that routes data between channels over a single transport.
*
- * Each channel is identified by an integer ID and backed by a pair of ComputeBuffers.
- * A recv thread demultiplexes incoming frames to channel-specific buffers, while
- * per-channel send threads multiplex outgoing data onto the shared transport.
+ * Uses an async recv pump, a serialized send queue, and a periodic ping timer -
+ * all running on a shared io_context.
*
- * Wire format per frame: [channelId (4B)][size (4B)][data]
- * Control messages use negative sizes: -2 = detach (channel closed), -3 = ping.
+ * Wire format per frame: [channelId(4B)][size(4B)][data].
+ * Control messages use negative sizes: -2 = detach, -3 = ping.
*/
-class ComputeSocket
+class AsyncComputeSocket : public std::enable_shared_from_this<AsyncComputeSocket>
{
public:
- explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport);
- ~ComputeSocket();
+ AsyncComputeSocket(std::unique_ptr<AsyncComputeTransport> Transport, asio::io_context& IoContext);
+ ~AsyncComputeSocket();
+
+ AsyncComputeSocket(const AsyncComputeSocket&) = delete;
+ AsyncComputeSocket& operator=(const AsyncComputeSocket&) = delete;
+
+ /** Register callbacks for a channel. Must be called before StartRecvPump(). */
+ void RegisterChannel(int ChannelId, FrameHandler OnFrame, DetachHandler OnDetach);
- ComputeSocket(const ComputeSocket&) = delete;
- ComputeSocket& operator=(const ComputeSocket&) = delete;
+ /** Begin the async recv pump and ping timer. */
+ void StartRecvPump();
- /** Create a channel with the given ID.
- * Allocates anonymous in-process buffers and spawns a send thread for the channel. */
- Ref<ComputeChannel> CreateChannel(int ChannelId);
+ /** Enqueue a data frame for async transmission. */
+ void AsyncSendFrame(int ChannelId, std::vector<uint8_t> Data, SendHandler Handler = {});
- /** Start the recv pump and ping threads. Must be called after all channels are created. */
- void StartCommunication();
+ /** Send a control frame (detach) for a channel. */
+ void AsyncSendDetach(int ChannelId, SendHandler Handler = {});
+
+ /** Close the transport and cancel all pending operations. */
+ void Close();
+
+ /** The strand on which all socket I/O callbacks run. Channels that need to serialize
+ * their own state with OnFrame/OnDetach (which are invoked from this strand) should
+ * bind their timers and async operations to it as well. */
+ asio::strand<asio::any_io_executor>& GetStrand() { return m_Strand; }
private:
struct FrameHeader
@@ -49,31 +78,35 @@ private:
int32_t Size = 0;
};
+ struct PendingWrite
+ {
+ FrameHeader Header;
+ std::vector<uint8_t> Data;
+ SendHandler Handler;
+ };
+
static constexpr int32_t ControlDetach = -2;
static constexpr int32_t ControlPing = -3;
LoggerRef Log() { return m_Log; }
- void RecvThreadProc();
- void SendThreadProc(int Channel, ComputeBufferReader Reader);
- void PingThreadProc();
-
- LoggerRef m_Log;
- std::unique_ptr<ComputeTransport> m_Transport;
- std::mutex m_SendMutex; ///< Serializes writes to the transport
-
- std::mutex m_WritersMutex;
- std::unordered_map<int, ComputeBufferWriter> m_Writers; ///< Recv-side: writers keyed by channel ID
+ void DoRecvHeader();
+ void DoRecvPayload(FrameHeader Header);
+ void FlushNextSend();
+ void StartPingTimer();
+ void HandleError();
- std::vector<ComputeBufferReader> m_Readers; ///< Send-side: readers for join on destruction
- std::unordered_map<int, std::thread> m_SendThreads; ///< One send thread per channel
+ LoggerRef m_Log;
+ std::unique_ptr<AsyncComputeTransport> m_Transport;
+ asio::strand<asio::any_io_executor> m_Strand;
+ asio::steady_timer m_PingTimer;
- std::thread m_RecvThread;
- std::thread m_PingThread;
+ std::unordered_map<int, FrameHandler> m_FrameHandlers;
+ std::unordered_map<int, DetachHandler> m_DetachHandlers;
- bool m_PingShouldStop = false;
- std::mutex m_PingMutex;
- std::condition_variable m_PingCV;
+ FrameHeader m_RecvHeader;
+ std::deque<PendingWrite> m_SendQueue;
+ bool m_Closed = false;
};
} // namespace zen::horde