aboutsummaryrefslogtreecommitdiff
path: root/src/zenhorde/hordecomputebuffer.h
blob: 64ef91b7aba6e1e0f6566ccb427a0a09057eecdc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright Epic Games, Inc. All Rights Reserved.

#pragma once

#include <zenbase/refcount.h>

#include <cstddef>
#include <cstdint>
#include <mutex>
#include <vector>

namespace zen::horde {

class ComputeBufferReader;
class ComputeBufferWriter;

/** Simplified in-process ring buffer for the Horde compute protocol.
 *
 *  Unlike the UE FComputeBuffer which supports shared-memory and memory-mapped files,
 *  this implementation uses plain heap-allocated memory since we only need in-process
 *  communication between channel and transport threads. The buffer is divided into
 *  fixed-size chunks; readers and writers block when no space is available.
 */
class ComputeBuffer
{
public:
	struct Params
	{
		size_t NumChunks   = 2;
		size_t ChunkLength = 512 * 1024;
	};

	ComputeBuffer();
	~ComputeBuffer();

	ComputeBuffer(const ComputeBuffer&) = delete;
	ComputeBuffer& operator=(const ComputeBuffer&) = delete;

	bool CreateNew(const Params& InParams);
	void Close();

	bool IsValid() const;

	ComputeBufferReader CreateReader();
	ComputeBufferWriter CreateWriter();

private:
	struct Detail;
	Ref<Detail> m_Detail;

	friend class ComputeBufferReader;
	friend class ComputeBufferWriter;
};

/** Read endpoint for a ComputeBuffer.
 *
 *  Provides blocking reads from the ring buffer. WaitToRead() returns a pointer
 *  directly into the buffer memory (zero-copy); the caller must call
 *  AdvanceReadPosition() after consuming the data.
 */
class ComputeBufferReader
{
public:
	ComputeBufferReader();
	ComputeBufferReader(const ComputeBufferReader&);
	ComputeBufferReader(ComputeBufferReader&&) noexcept;
	~ComputeBufferReader();

	ComputeBufferReader& operator=(const ComputeBufferReader&);
	ComputeBufferReader& operator=(ComputeBufferReader&&) noexcept;

	void Close();
	void Detach();
	bool IsValid() const;
	bool IsComplete() const;

	void   AdvanceReadPosition(size_t Size);
	size_t GetMaxReadSize() const;

	/** Copy up to MaxSize bytes from the buffer into Buffer. Blocks until data is available. */
	size_t Read(void* Buffer, size_t MaxSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr);

	/** Wait until at least MinSize bytes are available and return a direct pointer.
	 *  Returns nullptr on timeout or if the writer has completed. */
	const uint8_t* WaitToRead(size_t MinSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr);

private:
	friend class ComputeBuffer;
	explicit ComputeBufferReader(Ref<ComputeBuffer::Detail> InDetail);

	Ref<ComputeBuffer::Detail> m_Detail;
};

/** Write endpoint for a ComputeBuffer.
 *
 *  Provides blocking writes into the ring buffer. WaitToWrite() returns a pointer
 *  directly into the buffer memory (zero-copy); the caller must call
 *  AdvanceWritePosition() after filling the data. Call MarkComplete() to signal
 *  that no more data will be written.
 */
class ComputeBufferWriter
{
public:
	ComputeBufferWriter();
	ComputeBufferWriter(const ComputeBufferWriter&);
	ComputeBufferWriter(ComputeBufferWriter&&) noexcept;
	~ComputeBufferWriter();

	ComputeBufferWriter& operator=(const ComputeBufferWriter&);
	ComputeBufferWriter& operator=(ComputeBufferWriter&&) noexcept;

	void Close();
	bool IsValid() const;

	/** Signal that no more data will be written. Unblocks any waiting readers. */
	void MarkComplete();

	void   AdvanceWritePosition(size_t Size);
	size_t GetMaxWriteSize() const;
	size_t GetChunkMaxLength() const;

	/** Copy up to MaxSize bytes from Buffer into the ring buffer. Blocks until space is available. */
	size_t Write(const void* Buffer, size_t MaxSize, int TimeoutMs = -1);

	/** Wait until at least MinSize bytes of write space are available and return a direct pointer.
	 *  Returns nullptr on timeout. */
	uint8_t* WaitToWrite(size_t MinSize, int TimeoutMs = -1);

private:
	friend class ComputeBuffer;
	explicit ComputeBufferWriter(Ref<ComputeBuffer::Detail> InDetail);

	Ref<ComputeBuffer::Detail> m_Detail;
};

}  // namespace zen::horde