aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordecomputesocket.h
blob: 0c3cb419544d0d8cfd3541fdab412bf94632c2ce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include "hordecomputebuffer.h"
#include "hordecomputechannel.h"
#include "hordetransport.h"

#include <zencore/logbase.h>

#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>

namespace zen::horde {

/** Multiplexed socket that routes data between multiple channels over a single transport.
 *
 *  Each channel is identified by an integer ID and backed by a pair of ComputeBuffers.
 *  A recv thread demultiplexes incoming frames to channel-specific buffers, while
 *  per-channel send threads multiplex outgoing data onto the shared transport.
 *
 *  Wire format per frame: [channelId (4B)][size (4B)][data]
 *  Control messages use negative sizes: -2 = detach (channel closed), -3 = ping.
 */
class ComputeSocket
{
public:
	explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport);
	~ComputeSocket();

	ComputeSocket(const ComputeSocket&) = delete;
	ComputeSocket& operator=(const ComputeSocket&) = delete;

	/** Create a channel with the given ID.
	 *  Allocates anonymous in-process buffers and spawns a send thread for the channel. */
	Ref<ComputeChannel> CreateChannel(int ChannelId);

	/** Start the recv pump and ping threads. Must be called after all channels are created. */
	void StartCommunication();

private:
	struct FrameHeader
	{
		int32_t Channel = 0;
		int32_t Size	= 0;
	};

	static constexpr int32_t ControlDetach = -2;
	static constexpr int32_t ControlPing   = -3;

	LoggerRef Log() { return m_Log; }

	void RecvThreadProc();
	void SendThreadProc(int Channel, ComputeBufferReader Reader);
	void PingThreadProc();

	LoggerRef						  m_Log;
	std::unique_ptr<ComputeTransport> m_Transport;
	std::mutex						  m_SendMutex;	///< Serializes writes to the transport

	std::mutex									 m_WritersMutex;
	std::unordered_map<int, ComputeBufferWriter> m_Writers;	 ///< Recv-side: writers keyed by channel ID

	std::vector<ComputeBufferReader>	 m_Readers;		 ///< Send-side: readers for join on destruction
	std::unordered_map<int, std::thread> m_SendThreads;	 ///< One send thread per channel

	std::thread m_RecvThread;
	std::thread m_PingThread;

	bool					m_PingShouldStop = false;
	std::mutex				m_PingMutex;
	std::condition_variable m_PingCV;
};

}  // namespace zen::horde