blob: 0c3cb419544d0d8cfd3541fdab412bf94632c2ce (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
// Copyright Epic Games, Inc. All Rights Reserved.
#pragma once
#include "hordecomputebuffer.h"
#include "hordecomputechannel.h"
#include "hordetransport.h"
#include <zencore/logbase.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
namespace zen::horde {
/** Multiplexed socket that routes data between multiple channels over a single transport.
*
* Each channel is identified by an integer ID and backed by a pair of ComputeBuffers.
* A recv thread demultiplexes incoming frames to channel-specific buffers, while
* per-channel send threads multiplex outgoing data onto the shared transport.
*
* Wire format per frame: [channelId (4B)][size (4B)][data]
* Control messages use negative sizes: -2 = detach (channel closed), -3 = ping.
*/
class ComputeSocket
{
public:
explicit ComputeSocket(std::unique_ptr<ComputeTransport> Transport);
~ComputeSocket();
ComputeSocket(const ComputeSocket&) = delete;
ComputeSocket& operator=(const ComputeSocket&) = delete;
/** Create a channel with the given ID.
* Allocates anonymous in-process buffers and spawns a send thread for the channel. */
Ref<ComputeChannel> CreateChannel(int ChannelId);
/** Start the recv pump and ping threads. Must be called after all channels are created. */
void StartCommunication();
private:
struct FrameHeader
{
int32_t Channel = 0;
int32_t Size = 0;
};
static constexpr int32_t ControlDetach = -2;
static constexpr int32_t ControlPing = -3;
LoggerRef Log() { return m_Log; }
void RecvThreadProc();
void SendThreadProc(int Channel, ComputeBufferReader Reader);
void PingThreadProc();
LoggerRef m_Log;
std::unique_ptr<ComputeTransport> m_Transport;
std::mutex m_SendMutex; ///< Serializes writes to the transport
std::mutex m_WritersMutex;
std::unordered_map<int, ComputeBufferWriter> m_Writers; ///< Recv-side: writers keyed by channel ID
std::vector<ComputeBufferReader> m_Readers; ///< Send-side: readers for join on destruction
std::unordered_map<int, std::thread> m_SendThreads; ///< One send thread per channel
std::thread m_RecvThread;
std::thread m_PingThread;
bool m_PingShouldStop = false;
std::mutex m_PingMutex;
std::condition_variable m_PingCV;
};
} // namespace zen::horde
|