// Copyright Epic Games, Inc. All Rights Reserved. #pragma once #include #include #include #include #include namespace zen::horde { class ComputeBufferReader; class ComputeBufferWriter; /** Simplified in-process ring buffer for the Horde compute protocol. * * Unlike the UE FComputeBuffer which supports shared-memory and memory-mapped files, * this implementation uses plain heap-allocated memory since we only need in-process * communication between channel and transport threads. The buffer is divided into * fixed-size chunks; readers and writers block when no space is available. */ class ComputeBuffer { public: struct Params { size_t NumChunks = 2; size_t ChunkLength = 512 * 1024; }; ComputeBuffer(); ~ComputeBuffer(); ComputeBuffer(const ComputeBuffer&) = delete; ComputeBuffer& operator=(const ComputeBuffer&) = delete; bool CreateNew(const Params& InParams); void Close(); bool IsValid() const; ComputeBufferReader CreateReader(); ComputeBufferWriter CreateWriter(); private: struct Detail; Ref m_Detail; friend class ComputeBufferReader; friend class ComputeBufferWriter; }; /** Read endpoint for a ComputeBuffer. * * Provides blocking reads from the ring buffer. WaitToRead() returns a pointer * directly into the buffer memory (zero-copy); the caller must call * AdvanceReadPosition() after consuming the data. */ class ComputeBufferReader { public: ComputeBufferReader(); ComputeBufferReader(const ComputeBufferReader&); ComputeBufferReader(ComputeBufferReader&&) noexcept; ~ComputeBufferReader(); ComputeBufferReader& operator=(const ComputeBufferReader&); ComputeBufferReader& operator=(ComputeBufferReader&&) noexcept; void Close(); void Detach(); bool IsValid() const; bool IsComplete() const; void AdvanceReadPosition(size_t Size); size_t GetMaxReadSize() const; /** Copy up to MaxSize bytes from the buffer into Buffer. Blocks until data is available. */ size_t Read(void* Buffer, size_t MaxSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr); /** Wait until at least MinSize bytes are available and return a direct pointer. * Returns nullptr on timeout or if the writer has completed. */ const uint8_t* WaitToRead(size_t MinSize, int TimeoutMs = -1, bool* OutTimedOut = nullptr); private: friend class ComputeBuffer; explicit ComputeBufferReader(Ref InDetail); Ref m_Detail; }; /** Write endpoint for a ComputeBuffer. * * Provides blocking writes into the ring buffer. WaitToWrite() returns a pointer * directly into the buffer memory (zero-copy); the caller must call * AdvanceWritePosition() after filling the data. Call MarkComplete() to signal * that no more data will be written. */ class ComputeBufferWriter { public: ComputeBufferWriter(); ComputeBufferWriter(const ComputeBufferWriter&); ComputeBufferWriter(ComputeBufferWriter&&) noexcept; ~ComputeBufferWriter(); ComputeBufferWriter& operator=(const ComputeBufferWriter&); ComputeBufferWriter& operator=(ComputeBufferWriter&&) noexcept; void Close(); bool IsValid() const; /** Signal that no more data will be written. Unblocks any waiting readers. */ void MarkComplete(); void AdvanceWritePosition(size_t Size); size_t GetMaxWriteSize() const; size_t GetChunkMaxLength() const; /** Copy up to MaxSize bytes from Buffer into the ring buffer. Blocks until space is available. */ size_t Write(const void* Buffer, size_t MaxSize, int TimeoutMs = -1); /** Wait until at least MinSize bytes of write space are available and return a direct pointer. * Returns nullptr on timeout. */ uint8_t* WaitToWrite(size_t MinSize, int TimeoutMs = -1); private: friend class ComputeBuffer; explicit ComputeBufferWriter(Ref InDetail); Ref m_Detail; }; } // namespace zen::horde