diff options
Diffstat (limited to 'src/zenhorde/hordecomputebuffer.h')
| -rw-r--r-- | src/zenhorde/hordecomputebuffer.h | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/src/zenhorde/hordecomputebuffer.h b/src/zenhorde/hordecomputebuffer.h new file mode 100644 index 000000000..64ef91b7a --- /dev/null +++ b/src/zenhorde/hordecomputebuffer.h @@ -0,0 +1,136 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zenbase/refcount.h> + +#include <cstddef> +#include <cstdint> +#include <mutex> +#include <vector> + +namespace zen::horde { + +class ComputeBufferReader; +class ComputeBufferWriter; + +/** Simplified in-process ring buffer for the Horde compute protocol. + * + * Unlike the UE FComputeBuffer which supports shared-memory and memory-mapped files, + * this implementation uses plain heap-allocated memory since we only need in-process + * communication between channel and transport threads. The buffer is divided into + * fixed-size chunks; readers and writers block when no space is available. + */ +class ComputeBuffer +{ +public: + struct Params + { + size_t NumChunks = 2; + size_t ChunkLength = 512 * 1024; + }; + + ComputeBuffer(); + ~ComputeBuffer(); + + ComputeBuffer(const ComputeBuffer&) = delete; + ComputeBuffer& operator=(const ComputeBuffer&) = delete; + + bool CreateNew(const Params& InParams); + void Close(); + + bool IsValid() const; + + ComputeBufferReader CreateReader(); + ComputeBufferWriter CreateWriter(); + +private: + struct Detail; + Ref<Detail> m_Detail; + + friend class ComputeBufferReader; + friend class ComputeBufferWriter; +}; + +/** Read endpoint for a ComputeBuffer. + * + * Provides blocking reads from the ring buffer. WaitToRead() returns a pointer + * directly into the buffer memory (zero-copy); the caller must call + * AdvanceReadPosition() after consuming the data. + */ +class ComputeBufferReader +{ +public: + ComputeBufferReader(); + ComputeBufferReader(const ComputeBufferReader&); + ComputeBufferReader(ComputeBufferReader&&) noexcept; + ~ComputeBufferReader(); + + ComputeBufferReader& operator=(const ComputeBufferReader&); + ComputeBufferReader& operator=(ComputeBufferReader&&) noexcept; + + void Close(); + void Detach(); + bool IsValid() const; + bool IsComplete() const; + + void AdvanceReadPosition(size_t Size); + size_t GetMaxReadSize() const; + + /** Copy up to MaxSize bytes from the buffer into Buffer. Blocks until data is available. */ + size_t Read(void* Buffer, size_t MaxSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr); + + /** Wait until at least MinSize bytes are available and return a direct pointer. + * Returns nullptr on timeout or if the writer has completed. */ + const uint8_t* WaitToRead(size_t MinSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr); + +private: + friend class ComputeBuffer; + explicit ComputeBufferReader(Ref<ComputeBuffer::Detail> InDetail); + + Ref<ComputeBuffer::Detail> m_Detail; +}; + +/** Write endpoint for a ComputeBuffer. + * + * Provides blocking writes into the ring buffer. WaitToWrite() returns a pointer + * directly into the buffer memory (zero-copy); the caller must call + * AdvanceWritePosition() after filling the data. Call MarkComplete() to signal + * that no more data will be written. + */ +class ComputeBufferWriter +{ +public: + ComputeBufferWriter(); + ComputeBufferWriter(const ComputeBufferWriter&); + ComputeBufferWriter(ComputeBufferWriter&&) noexcept; + ~ComputeBufferWriter(); + + ComputeBufferWriter& operator=(const ComputeBufferWriter&); + ComputeBufferWriter& operator=(ComputeBufferWriter&&) noexcept; + + void Close(); + bool IsValid() const; + + /** Signal that no more data will be written. Unblocks any waiting readers. */ + void MarkComplete(); + + void AdvanceWritePosition(size_t Size); + size_t GetMaxWriteSize() const; + size_t GetChunkMaxLength() const; + + /** Copy up to MaxSize bytes from Buffer into the ring buffer. Blocks until space is available. */ + size_t Write(const void* Buffer, size_t MaxSize, int TimeoutMs = -1); + + /** Wait until at least MinSize bytes of write space are available and return a direct pointer. + * Returns nullptr on timeout. */ + uint8_t* WaitToWrite(size_t MinSize, int TimeoutMs = -1); + +private: + friend class ComputeBuffer; + explicit ComputeBufferWriter(Ref<ComputeBuffer::Detail> InDetail); + + Ref<ComputeBuffer::Detail> m_Detail; +}; + +} // namespace zen::horde |