// Copyright Epic Games, Inc. All Rights Reserved. #include "asyncpipereader.h" #include #include ZEN_THIRD_PARTY_INCLUDES_START #if ZEN_PLATFORM_WINDOWS # include # include # include #else # include # include # include # include #endif ZEN_THIRD_PARTY_INCLUDES_END namespace zen { static constexpr size_t kReadBufferSize = 4096; // ============================================================================ // POSIX: non-blocking pipe + stream_descriptor // ============================================================================ #if !ZEN_PLATFORM_WINDOWS struct AsyncPipeReader::Impl { asio::io_context& m_IoContext; std::unique_ptr m_Descriptor; std::function m_DataCallback; std::function m_EofCallback; std::array m_Buffer{}; explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} ~Impl() { Stop(); } void Start(StdoutPipeHandles&& Pipe, std::function DataCallback, std::function EofCallback) { m_DataCallback = std::move(DataCallback); m_EofCallback = std::move(EofCallback); int Fd = Pipe.ReadFd; // Close the write end — child already has it Pipe.CloseWriteEnd(); // Set non-blocking int Flags = fcntl(Fd, F_GETFL, 0); fcntl(Fd, F_SETFL, Flags | O_NONBLOCK); // Take ownership of the fd. Detach it from StdoutPipeHandles so it // doesn't get double-closed. Pipe.ReadFd = -1; m_Descriptor = std::make_unique(m_IoContext, Fd); EnqueueRead(); } void Stop() { if (m_Descriptor) { asio::error_code Ec; m_Descriptor->cancel(Ec); m_Descriptor.reset(); } } void EnqueueRead() { if (!m_Descriptor) { return; } m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { if (Ec) { if (Ec != asio::error::operation_aborted && m_EofCallback) { m_EofCallback(); } return; } if (BytesRead > 0 && m_DataCallback) { m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); } EnqueueRead(); }); } }; bool CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) { // On POSIX, regular pipes work fine with non-blocking I/O return CreateStdoutPipe(OutPipe); } // ============================================================================ // Windows: overlapped named pipe + asio::windows::stream_handle // // Anonymous pipes (CreatePipe) do not support overlapped I/O. Instead, we // create a named pipe pair with FILE_FLAG_OVERLAPPED on the read (server) end. // The write (client) end is inheritable and used as the child's stdout/stderr. // // Callers must use CreateOverlappedStdoutPipe() instead of CreateStdoutPipe() // so the pipe is overlapped from the start. Passing a non-overlapped anonymous // pipe to Start() will fail. // ============================================================================ #else // ZEN_PLATFORM_WINDOWS static std::atomic s_PipeSerial{0}; bool CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) { // Generate a unique pipe name uint64_t Serial = s_PipeSerial.fetch_add(1); wchar_t PipeName[128]; swprintf_s(PipeName, _countof(PipeName), L"\\\\.\\pipe\\zen_async_%u_%llu", GetCurrentProcessId(), static_cast(Serial)); // Create the server (read) end with FILE_FLAG_OVERLAPPED HANDLE ReadHandle = CreateNamedPipeW(PipeName, PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, PIPE_TYPE_BYTE | PIPE_WAIT, 1, // max instances 0, // out buffer size kReadBufferSize, 0, // default timeout nullptr); if (ReadHandle == INVALID_HANDLE_VALUE) { ZEN_WARN("CreateNamedPipeW failed: {}", GetLastError()); return false; } // The read end should not be inherited by the child SetHandleInformation(ReadHandle, HANDLE_FLAG_INHERIT, 0); // Open the client (write) end — inheritable, for the child process SECURITY_ATTRIBUTES Sa; Sa.nLength = sizeof(Sa); Sa.lpSecurityDescriptor = nullptr; Sa.bInheritHandle = TRUE; HANDLE WriteHandle = CreateFileW(PipeName, GENERIC_WRITE, 0, // no sharing &Sa, // inheritable OPEN_EXISTING, 0, // no special flags on write end nullptr); if (WriteHandle == INVALID_HANDLE_VALUE) { DWORD Err = GetLastError(); CloseHandle(ReadHandle); ZEN_WARN("CreateFileW for pipe client end failed: {}", Err); return false; } OutPipe.ReadHandle = ReadHandle; OutPipe.WriteHandle = WriteHandle; return true; } struct AsyncPipeReader::Impl { asio::io_context& m_IoContext; std::unique_ptr m_StreamHandle; std::function m_DataCallback; std::function m_EofCallback; std::array m_Buffer{}; explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} ~Impl() { Stop(); } void Start(StdoutPipeHandles&& Pipe, std::function DataCallback, std::function EofCallback) { m_DataCallback = std::move(DataCallback); m_EofCallback = std::move(EofCallback); HANDLE ReadHandle = static_cast(Pipe.ReadHandle); // Close the write end — child already has it Pipe.CloseWriteEnd(); // Take ownership of the read handle Pipe.ReadHandle = nullptr; m_StreamHandle = std::make_unique(m_IoContext, ReadHandle); EnqueueRead(); } void Stop() { if (m_StreamHandle) { asio::error_code Ec; m_StreamHandle->cancel(Ec); m_StreamHandle.reset(); } } void EnqueueRead() { if (!m_StreamHandle) { return; } m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { if (Ec) { if (Ec != asio::error::operation_aborted && m_EofCallback) { m_EofCallback(); } return; } if (BytesRead > 0 && m_DataCallback) { m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); } EnqueueRead(); }); } }; #endif // ============================================================================ // Common wrapper // ============================================================================ AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique(IoContext)) { } AsyncPipeReader::~AsyncPipeReader() = default; void AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function DataCallback, std::function EofCallback) { m_Impl->Start(std::move(Pipe), std::move(DataCallback), std::move(EofCallback)); } void AsyncPipeReader::Stop() { m_Impl->Stop(); } } // namespace zen