diff options
Diffstat (limited to 'src/zenutil/process/asyncpipereader.cpp')
| -rw-r--r-- | src/zenutil/process/asyncpipereader.cpp | 276 |
1 files changed, 276 insertions, 0 deletions
diff --git a/src/zenutil/process/asyncpipereader.cpp b/src/zenutil/process/asyncpipereader.cpp new file mode 100644 index 000000000..2fdcda30d --- /dev/null +++ b/src/zenutil/process/asyncpipereader.cpp @@ -0,0 +1,276 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "asyncpipereader.h" + +#include <zencore/logging.h> + +#include <array> + +ZEN_THIRD_PARTY_INCLUDES_START + +#if ZEN_PLATFORM_WINDOWS +# include <zencore/windows.h> +# include <asio/io_context.hpp> +# include <asio/windows/stream_handle.hpp> +#else +# include <fcntl.h> +# include <unistd.h> +# include <asio/io_context.hpp> +# include <asio/posix/stream_descriptor.hpp> +#endif + +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +static constexpr size_t kReadBufferSize = 4096; + +// ============================================================================ +// POSIX: non-blocking pipe + stream_descriptor +// ============================================================================ + +#if !ZEN_PLATFORM_WINDOWS + +struct AsyncPipeReader::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor; + std::function<void(std::string_view)> m_DataCallback; + std::function<void()> m_EofCallback; + std::array<char, kReadBufferSize> m_Buffer{}; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Stop(); } + + void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) + { + m_DataCallback = std::move(DataCallback); + m_EofCallback = std::move(EofCallback); + + int Fd = Pipe.ReadFd; + + // Close the write end — child already has it + Pipe.CloseWriteEnd(); + + // Set non-blocking + int Flags = fcntl(Fd, F_GETFL, 0); + fcntl(Fd, F_SETFL, Flags | O_NONBLOCK); + + // Take ownership of the fd. Detach it from StdoutPipeHandles so it + // doesn't get double-closed. + Pipe.ReadFd = -1; + + m_Descriptor = std::make_unique<asio::posix::stream_descriptor>(m_IoContext, Fd); + EnqueueRead(); + } + + void Stop() + { + if (m_Descriptor) + { + asio::error_code Ec; + m_Descriptor->cancel(Ec); + m_Descriptor.reset(); + } + } + + void EnqueueRead() + { + if (!m_Descriptor) + { + return; + } + + m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::operation_aborted && m_EofCallback) + { + m_EofCallback(); + } + return; + } + + if (BytesRead > 0 && m_DataCallback) + { + m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); + } + + EnqueueRead(); + }); + } +}; + +bool +CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) +{ + // On POSIX, regular pipes work fine with non-blocking I/O + return CreateStdoutPipe(OutPipe); +} + +// ============================================================================ +// Windows: overlapped named pipe + asio::windows::stream_handle +// +// Anonymous pipes (CreatePipe) do not support overlapped I/O. Instead, we +// create a named pipe pair with FILE_FLAG_OVERLAPPED on the read (server) end. +// The write (client) end is inheritable and used as the child's stdout/stderr. +// +// Callers must use CreateOverlappedStdoutPipe() instead of CreateStdoutPipe() +// so the pipe is overlapped from the start. Passing a non-overlapped anonymous +// pipe to Start() will fail. +// ============================================================================ + +#else // ZEN_PLATFORM_WINDOWS + +static std::atomic<uint64_t> s_PipeSerial{0}; + +bool +CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe) +{ + // Generate a unique pipe name + uint64_t Serial = s_PipeSerial.fetch_add(1); + wchar_t PipeName[128]; + swprintf_s(PipeName, + _countof(PipeName), + L"\\\\.\\pipe\\zen_async_%u_%llu", + GetCurrentProcessId(), + static_cast<unsigned long long>(Serial)); + + // Create the server (read) end with FILE_FLAG_OVERLAPPED + HANDLE ReadHandle = CreateNamedPipeW(PipeName, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, // max instances + 0, // out buffer size + kReadBufferSize, + 0, // default timeout + nullptr); + + if (ReadHandle == INVALID_HANDLE_VALUE) + { + ZEN_WARN("CreateNamedPipeW failed: {}", GetLastError()); + return false; + } + + // The read end should not be inherited by the child + SetHandleInformation(ReadHandle, HANDLE_FLAG_INHERIT, 0); + + // Open the client (write) end — inheritable, for the child process + SECURITY_ATTRIBUTES Sa; + Sa.nLength = sizeof(Sa); + Sa.lpSecurityDescriptor = nullptr; + Sa.bInheritHandle = TRUE; + + HANDLE WriteHandle = CreateFileW(PipeName, + GENERIC_WRITE, + 0, // no sharing + &Sa, // inheritable + OPEN_EXISTING, + 0, // no special flags on write end + nullptr); + + if (WriteHandle == INVALID_HANDLE_VALUE) + { + DWORD Err = GetLastError(); + CloseHandle(ReadHandle); + ZEN_WARN("CreateFileW for pipe client end failed: {}", Err); + return false; + } + + OutPipe.ReadHandle = ReadHandle; + OutPipe.WriteHandle = WriteHandle; + return true; +} + +struct AsyncPipeReader::Impl +{ + asio::io_context& m_IoContext; + std::unique_ptr<asio::windows::stream_handle> m_StreamHandle; + std::function<void(std::string_view)> m_DataCallback; + std::function<void()> m_EofCallback; + std::array<char, kReadBufferSize> m_Buffer{}; + + explicit Impl(asio::io_context& IoContext) : m_IoContext(IoContext) {} + + ~Impl() { Stop(); } + + void Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) + { + m_DataCallback = std::move(DataCallback); + m_EofCallback = std::move(EofCallback); + + HANDLE ReadHandle = static_cast<HANDLE>(Pipe.ReadHandle); + + // Close the write end — child already has it + Pipe.CloseWriteEnd(); + + // Take ownership of the read handle + Pipe.ReadHandle = nullptr; + + m_StreamHandle = std::make_unique<asio::windows::stream_handle>(m_IoContext, ReadHandle); + EnqueueRead(); + } + + void Stop() + { + if (m_StreamHandle) + { + asio::error_code Ec; + m_StreamHandle->cancel(Ec); + m_StreamHandle.reset(); + } + } + + void EnqueueRead() + { + if (!m_StreamHandle) + { + return; + } + + m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) { + if (Ec) + { + if (Ec != asio::error::operation_aborted && m_EofCallback) + { + m_EofCallback(); + } + return; + } + + if (BytesRead > 0 && m_DataCallback) + { + m_DataCallback(std::string_view(m_Buffer.data(), BytesRead)); + } + + EnqueueRead(); + }); + } +}; + +#endif + +// ============================================================================ +// Common wrapper +// ============================================================================ + +AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext)) +{ +} + +AsyncPipeReader::~AsyncPipeReader() = default; + +void +AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback) +{ + m_Impl->Start(std::move(Pipe), std::move(DataCallback), std::move(EofCallback)); +} + +void +AsyncPipeReader::Stop() +{ + m_Impl->Stop(); +} + +} // namespace zen |