aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordecomputesocket.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenhorde/hordecomputesocket.h')
-rw-r--r--src/zenhorde/hordecomputesocket.h79
1 files changed, 79 insertions, 0 deletions
diff --git a/src/zenhorde/hordecomputesocket.h b/src/zenhorde/hordecomputesocket.h
new file mode 100644
index 000000000..0c3cb4195
--- /dev/null
+++ b/src/zenhorde/hordecomputesocket.h
@@ -0,0 +1,79 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include "hordecomputebuffer.h"
+#include "hordecomputechannel.h"
+#include "hordetransport.h"
+
+#include <zencore/logbase.h>
+
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+namespace zen::horde {
+
+/** Multiplexed socket that routes data between multiple channels over a single transport.
+ *
+ * Each channel is identified by an integer ID and backed by a pair of ComputeBuffers.
+ * A recv thread demultiplexes incoming frames to channel-specific buffers, while
+ * per-channel send threads multiplex outgoing data onto the shared transport.
+ *
+ * Wire format per frame: [channelId (4B)][size (4B)][data]
+ * Control messages use negative sizes: -2 = detach (channel closed), -3 = ping.
+ */
+class ComputeSocket
+{
+public:
+ explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport);
+ ~ComputeSocket();
+
+ ComputeSocket(const ComputeSocket&) = delete;
+ ComputeSocket& operator=(const ComputeSocket&) = delete;
+
+ /** Create a channel with the given ID.
+ * Allocates anonymous in-process buffers and spawns a send thread for the channel. */
+ Ref<ComputeChannel> CreateChannel(int ChannelId);
+
+ /** Start the recv pump and ping threads. Must be called after all channels are created. */
+ void StartCommunication();
+
+private:
+ struct FrameHeader
+ {
+ int32_t Channel = 0;
+ int32_t Size = 0;
+ };
+
+ static constexpr int32_t ControlDetach = -2;
+ static constexpr int32_t ControlPing = -3;
+
+ LoggerRef Log() { return m_Log; }
+
+ void RecvThreadProc();
+ void SendThreadProc(int Channel, ComputeBufferReader Reader);
+ void PingThreadProc();
+
+ LoggerRef m_Log;
+ std::unique_ptr<ComputeTransport> m_Transport;
+ std::mutex m_SendMutex; ///< Serializes writes to the transport
+
+ std::mutex m_WritersMutex;
+ std::unordered_map<int, ComputeBufferWriter> m_Writers; ///< Recv-side: writers keyed by channel ID
+
+ std::vector<ComputeBufferReader> m_Readers; ///< Send-side: readers for join on destruction
+ std::unordered_map<int, std::thread> m_SendThreads; ///< One send thread per channel
+
+ std::thread m_RecvThread;
+ std::thread m_PingThread;
+
+ bool m_PingShouldStop = false;
+ std::mutex m_PingMutex;
+ std::condition_variable m_PingCV;
+};
+
+} // namespace zen::horde