// Copyright Epic Games, Inc. All Rights Reserved. #include "hordecomputebuffer.h" #include #include #include #include #include namespace zen::horde { // Simplified ring buffer implementation for in-process use only. // Uses a single contiguous buffer with write/read cursors and // mutex+condvar for synchronization. This is simpler than the UE version // which uses lock-free atomics and shared memory, but sufficient for our // use case where we're the initiator side of the compute protocol. struct ComputeBuffer::Detail : TRefCounted { std::vector Data; size_t NumChunks = 0; size_t ChunkLength = 0; // Current write state size_t WriteChunkIdx = 0; size_t WriteOffset = 0; bool WriteComplete = false; // Current read state size_t ReadChunkIdx = 0; size_t ReadOffset = 0; bool Detached = false; // Per-chunk written length std::vector ChunkWrittenLength; std::vector ChunkFinished; // Writer moved to next chunk std::mutex Mutex; std::condition_variable ReadCV; ///< Signaled when new data is written or stream completes std::condition_variable WriteCV; ///< Signaled when reader advances past a chunk, freeing space bool HasWriter = false; bool HasReader = false; uint8_t* ChunkPtr(size_t ChunkIdx) { return Data.data() + ChunkIdx * ChunkLength; } const uint8_t* ChunkPtr(size_t ChunkIdx) const { return Data.data() + ChunkIdx * ChunkLength; } }; // ComputeBuffer ComputeBuffer::ComputeBuffer() { } ComputeBuffer::~ComputeBuffer() { } bool ComputeBuffer::CreateNew(const Params& InParams) { auto* NewDetail = new Detail(); NewDetail->NumChunks = InParams.NumChunks; NewDetail->ChunkLength = InParams.ChunkLength; NewDetail->Data.resize(InParams.NumChunks * InParams.ChunkLength, 0); NewDetail->ChunkWrittenLength.resize(InParams.NumChunks, 0); NewDetail->ChunkFinished.resize(InParams.NumChunks, false); m_Detail = NewDetail; return true; } void ComputeBuffer::Close() { m_Detail = nullptr; } bool ComputeBuffer::IsValid() const { return static_cast(m_Detail); } ComputeBufferReader ComputeBuffer::CreateReader() { assert(m_Detail); m_Detail->HasReader = true; return ComputeBufferReader(m_Detail); } ComputeBufferWriter ComputeBuffer::CreateWriter() { assert(m_Detail); m_Detail->HasWriter = true; return ComputeBufferWriter(m_Detail); } // ComputeBufferReader ComputeBufferReader::ComputeBufferReader() { } ComputeBufferReader::~ComputeBufferReader() { } ComputeBufferReader::ComputeBufferReader(const ComputeBufferReader& Other) = default; ComputeBufferReader::ComputeBufferReader(ComputeBufferReader&& Other) noexcept = default; ComputeBufferReader& ComputeBufferReader::operator=(const ComputeBufferReader& Other) = default; ComputeBufferReader& ComputeBufferReader::operator=(ComputeBufferReader&& Other) noexcept = default; ComputeBufferReader::ComputeBufferReader(Ref InDetail) : m_Detail(std::move(InDetail)) { } void ComputeBufferReader::Close() { m_Detail = nullptr; } void ComputeBufferReader::Detach() { if (m_Detail) { std::lock_guard Lock(m_Detail->Mutex); m_Detail->Detached = true; m_Detail->ReadCV.notify_all(); } } bool ComputeBufferReader::IsValid() const { return static_cast(m_Detail); } bool ComputeBufferReader::IsComplete() const { if (!m_Detail) { return true; } std::lock_guard Lock(m_Detail->Mutex); if (m_Detail->Detached) { return true; } return m_Detail->WriteComplete && m_Detail->ReadChunkIdx == m_Detail->WriteChunkIdx && m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[m_Detail->ReadChunkIdx]; } void ComputeBufferReader::AdvanceReadPosition(size_t Size) { if (!m_Detail) { return; } std::lock_guard Lock(m_Detail->Mutex); m_Detail->ReadOffset += Size; // Check if we need to move to next chunk const size_t ReadChunk = m_Detail->ReadChunkIdx; if (m_Detail->ChunkFinished[ReadChunk] && m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[ReadChunk]) { const size_t NextChunk = (ReadChunk + 1) % m_Detail->NumChunks; m_Detail->ReadChunkIdx = NextChunk; m_Detail->ReadOffset = 0; m_Detail->WriteCV.notify_all(); } m_Detail->ReadCV.notify_all(); } size_t ComputeBufferReader::GetMaxReadSize() const { if (!m_Detail) { return 0; } std::lock_guard Lock(m_Detail->Mutex); const size_t ReadChunk = m_Detail->ReadChunkIdx; return m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset; } const uint8_t* ComputeBufferReader::WaitToRead(size_t MinSize, int TimeoutMs, bool* OutTimedOut) { if (!m_Detail) { return nullptr; } std::unique_lock Lock(m_Detail->Mutex); auto Predicate = [&]() -> bool { if (m_Detail->Detached) { return true; } const size_t ReadChunk = m_Detail->ReadChunkIdx; const size_t Available = m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset; if (Available >= MinSize) { return true; } // If chunk is finished and we've read everything, try to move to next if (m_Detail->ChunkFinished[ReadChunk] && m_Detail->ReadOffset >= m_Detail->ChunkWrittenLength[ReadChunk]) { if (m_Detail->WriteComplete) { return true; // End of stream } // Move to next chunk const size_t NextChunk = (ReadChunk + 1) % m_Detail->NumChunks; m_Detail->ReadChunkIdx = NextChunk; m_Detail->ReadOffset = 0; m_Detail->WriteCV.notify_all(); return false; // Re-check with new chunk } if (m_Detail->WriteComplete) { return true; // End of stream } return false; }; if (TimeoutMs < 0) { m_Detail->ReadCV.wait(Lock, Predicate); } else { if (!m_Detail->ReadCV.wait_for(Lock, std::chrono::milliseconds(TimeoutMs), Predicate)) { if (OutTimedOut) { *OutTimedOut = true; } return nullptr; } } if (m_Detail->Detached) { return nullptr; } const size_t ReadChunk = m_Detail->ReadChunkIdx; const size_t Available = m_Detail->ChunkWrittenLength[ReadChunk] - m_Detail->ReadOffset; if (Available < MinSize) { return nullptr; // End of stream } return m_Detail->ChunkPtr(ReadChunk) + m_Detail->ReadOffset; } size_t ComputeBufferReader::Read(void* Buffer, size_t MaxSize, int TimeoutMs, bool* OutTimedOut) { const uint8_t* Data = WaitToRead(1, TimeoutMs, OutTimedOut); if (!Data) { return 0; } const size_t Available = GetMaxReadSize(); const size_t ToCopy = std::min(Available, MaxSize); memcpy(Buffer, Data, ToCopy); AdvanceReadPosition(ToCopy); return ToCopy; } // ComputeBufferWriter ComputeBufferWriter::ComputeBufferWriter() = default; ComputeBufferWriter::ComputeBufferWriter(const ComputeBufferWriter& Other) = default; ComputeBufferWriter::ComputeBufferWriter(ComputeBufferWriter&& Other) noexcept = default; ComputeBufferWriter::~ComputeBufferWriter() = default; ComputeBufferWriter& ComputeBufferWriter::operator=(const ComputeBufferWriter& Other) = default; ComputeBufferWriter& ComputeBufferWriter::operator=(ComputeBufferWriter&& Other) noexcept = default; ComputeBufferWriter::ComputeBufferWriter(Ref InDetail) : m_Detail(std::move(InDetail)) { } void ComputeBufferWriter::Close() { if (m_Detail) { { std::lock_guard Lock(m_Detail->Mutex); if (!m_Detail->WriteComplete) { m_Detail->WriteComplete = true; m_Detail->ReadCV.notify_all(); } } m_Detail = nullptr; } } bool ComputeBufferWriter::IsValid() const { return static_cast(m_Detail); } void ComputeBufferWriter::MarkComplete() { if (m_Detail) { std::lock_guard Lock(m_Detail->Mutex); m_Detail->WriteComplete = true; m_Detail->ReadCV.notify_all(); } } void ComputeBufferWriter::AdvanceWritePosition(size_t Size) { if (!m_Detail || Size == 0) { return; } std::lock_guard Lock(m_Detail->Mutex); const size_t WriteChunk = m_Detail->WriteChunkIdx; m_Detail->ChunkWrittenLength[WriteChunk] += Size; m_Detail->WriteOffset += Size; m_Detail->ReadCV.notify_all(); } size_t ComputeBufferWriter::GetMaxWriteSize() const { if (!m_Detail) { return 0; } std::lock_guard Lock(m_Detail->Mutex); const size_t WriteChunk = m_Detail->WriteChunkIdx; return m_Detail->ChunkLength - m_Detail->ChunkWrittenLength[WriteChunk]; } size_t ComputeBufferWriter::GetChunkMaxLength() const { if (!m_Detail) { return 0; } return m_Detail->ChunkLength; } size_t ComputeBufferWriter::Write(const void* Buffer, size_t MaxSize, int TimeoutMs) { uint8_t* Dest = WaitToWrite(1, TimeoutMs); if (!Dest) { return 0; } const size_t Available = GetMaxWriteSize(); const size_t ToCopy = std::min(Available, MaxSize); memcpy(Dest, Buffer, ToCopy); AdvanceWritePosition(ToCopy); return ToCopy; } uint8_t* ComputeBufferWriter::WaitToWrite(size_t MinSize, int TimeoutMs) { if (!m_Detail) { return nullptr; } std::unique_lock Lock(m_Detail->Mutex); if (m_Detail->WriteComplete) { return nullptr; } const size_t WriteChunk = m_Detail->WriteChunkIdx; const size_t Available = m_Detail->ChunkLength - m_Detail->ChunkWrittenLength[WriteChunk]; // If current chunk has enough space, return pointer if (Available >= MinSize) { return m_Detail->ChunkPtr(WriteChunk) + m_Detail->ChunkWrittenLength[WriteChunk]; } // Current chunk is full - mark it as finished and move to next. // The writer cannot advance until the reader has fully consumed the next chunk, // preventing the writer from overwriting data the reader hasn't processed yet. m_Detail->ChunkFinished[WriteChunk] = true; m_Detail->ReadCV.notify_all(); const size_t NextChunk = (WriteChunk + 1) % m_Detail->NumChunks; // Wait until reader has consumed the next chunk auto Predicate = [&]() -> bool { // Check if read has moved past this chunk return m_Detail->ReadChunkIdx != NextChunk || m_Detail->Detached; }; if (TimeoutMs < 0) { m_Detail->WriteCV.wait(Lock, Predicate); } else { if (!m_Detail->WriteCV.wait_for(Lock, std::chrono::milliseconds(TimeoutMs), Predicate)) { return nullptr; } } if (m_Detail->Detached) { return nullptr; } // Reset next chunk m_Detail->ChunkWrittenLength[NextChunk] = 0; m_Detail->ChunkFinished[NextChunk] = false; m_Detail->WriteChunkIdx = NextChunk; m_Detail->WriteOffset = 0; return m_Detail->ChunkPtr(NextChunk); } } // namespace zen::horde