diff options
Diffstat (limited to 'src/zenhorde/hordecomputesocket.h')
| -rw-r--r-- | src/zenhorde/hordecomputesocket.h | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/src/zenhorde/hordecomputesocket.h b/src/zenhorde/hordecomputesocket.h new file mode 100644 index 000000000..0c3cb4195 --- /dev/null +++ b/src/zenhorde/hordecomputesocket.h @@ -0,0 +1,79 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include "hordecomputebuffer.h" +#include "hordecomputechannel.h" +#include "hordetransport.h" + +#include <zencore/logbase.h> + +#include <condition_variable> +#include <memory> +#include <mutex> +#include <thread> +#include <unordered_map> +#include <vector> + +namespace zen::horde { + +/** Multiplexed socket that routes data between multiple channels over a single transport. + * + * Each channel is identified by an integer ID and backed by a pair of ComputeBuffers. + * A recv thread demultiplexes incoming frames to channel-specific buffers, while + * per-channel send threads multiplex outgoing data onto the shared transport. + * + * Wire format per frame: [channelId (4B)][size (4B)][data] + * Control messages use negative sizes: -2 = detach (channel closed), -3 = ping. + */ +class ComputeSocket +{ +public: + explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport); + ~ComputeSocket(); + + ComputeSocket(const ComputeSocket&) = delete; + ComputeSocket& operator=(const ComputeSocket&) = delete; + + /** Create a channel with the given ID. + * Allocates anonymous in-process buffers and spawns a send thread for the channel. */ + Ref<ComputeChannel> CreateChannel(int ChannelId); + + /** Start the recv pump and ping threads. Must be called after all channels are created. */ + void StartCommunication(); + +private: + struct FrameHeader + { + int32_t Channel = 0; + int32_t Size = 0; + }; + + static constexpr int32_t ControlDetach = -2; + static constexpr int32_t ControlPing = -3; + + LoggerRef Log() { return m_Log; } + + void RecvThreadProc(); + void SendThreadProc(int Channel, ComputeBufferReader Reader); + void PingThreadProc(); + + LoggerRef m_Log; + std::unique_ptr<ComputeTransport> m_Transport; + std::mutex m_SendMutex; ///< Serializes writes to the transport + + std::mutex m_WritersMutex; + std::unordered_map<int, ComputeBufferWriter> m_Writers; ///< Recv-side: writers keyed by channel ID + + std::vector<ComputeBufferReader> m_Readers; ///< Send-side: readers for join on destruction + std::unordered_map<int, std::thread> m_SendThreads; ///< One send thread per channel + + std::thread m_RecvThread; + std::thread m_PingThread; + + bool m_PingShouldStop = false; + std::mutex m_PingMutex; + std::condition_variable m_PingCV; +}; + +} // namespace zen::horde |