aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/process/asyncpipereader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil/process/asyncpipereader.cpp')
-rw-r--r--src/zenutil/process/asyncpipereader.cpp48
1 files changed, 32 insertions, 16 deletions
diff --git a/src/zenutil/process/asyncpipereader.cpp b/src/zenutil/process/asyncpipereader.cpp
index 8eac350c6..7d603aa3f 100644
--- a/src/zenutil/process/asyncpipereader.cpp
+++ b/src/zenutil/process/asyncpipereader.cpp
@@ -31,7 +31,7 @@ static constexpr size_t kReadBufferSize = 4096;
#if !ZEN_PLATFORM_WINDOWS
-struct AsyncPipeReader::Impl
+struct AsyncPipeReader::Impl : public TRefCounted<Impl>
{
asio::io_context& m_IoContext;
std::unique_ptr<asio::posix::stream_descriptor> m_Descriptor;
@@ -82,22 +82,25 @@ struct AsyncPipeReader::Impl
return;
}
- m_Descriptor->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ // Capture Self so the Impl outlives any completion the io_context has
+ // already picked up but not yet dispatched, even if the owning
+ // AsyncPipeReader is destroyed in the meantime.
+ m_Descriptor->async_read_some(asio::buffer(m_Buffer), [Self = Ref<Impl>(this)](const asio::error_code& Ec, size_t BytesRead) {
if (Ec)
{
- if (Ec != asio::error::operation_aborted && m_EofCallback)
+ if (Ec != asio::error::operation_aborted && Self->m_EofCallback)
{
- m_EofCallback();
+ Self->m_EofCallback();
}
return;
}
- if (BytesRead > 0 && m_DataCallback)
+ if (BytesRead > 0 && Self->m_DataCallback)
{
- m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ Self->m_DataCallback(std::string_view(Self->m_Buffer.data(), BytesRead));
}
- EnqueueRead();
+ Self->EnqueueRead();
});
}
};
@@ -183,7 +186,7 @@ CreateOverlappedStdoutPipe(StdoutPipeHandles& OutPipe)
return true;
}
-struct AsyncPipeReader::Impl
+struct AsyncPipeReader::Impl : public TRefCounted<Impl>
{
asio::io_context& m_IoContext;
std::unique_ptr<asio::windows::stream_handle> m_StreamHandle;
@@ -229,22 +232,25 @@ struct AsyncPipeReader::Impl
return;
}
- m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [this](const asio::error_code& Ec, size_t BytesRead) {
+ // Capture Self so the Impl outlives any completion the io_context has
+ // already picked up but not yet dispatched, even if the owning
+ // AsyncPipeReader is destroyed in the meantime.
+ m_StreamHandle->async_read_some(asio::buffer(m_Buffer), [Self = Ref<Impl>(this)](const asio::error_code& Ec, size_t BytesRead) {
if (Ec)
{
- if (Ec != asio::error::operation_aborted && m_EofCallback)
+ if (Ec != asio::error::operation_aborted && Self->m_EofCallback)
{
- m_EofCallback();
+ Self->m_EofCallback();
}
return;
}
- if (BytesRead > 0 && m_DataCallback)
+ if (BytesRead > 0 && Self->m_DataCallback)
{
- m_DataCallback(std::string_view(m_Buffer.data(), BytesRead));
+ Self->m_DataCallback(std::string_view(Self->m_Buffer.data(), BytesRead));
}
- EnqueueRead();
+ Self->EnqueueRead();
});
}
};
@@ -255,11 +261,21 @@ struct AsyncPipeReader::Impl
// Common wrapper
// ============================================================================
-AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(std::make_unique<Impl>(IoContext))
+AsyncPipeReader::AsyncPipeReader(asio::io_context& IoContext) : m_Impl(new Impl(IoContext))
{
}
-AsyncPipeReader::~AsyncPipeReader() = default;
+AsyncPipeReader::~AsyncPipeReader()
+{
+ // Explicitly stop pending async reads. The Impl may outlive this call if a
+ // completion is still in the io_context queue (the handler holds a strong
+ // Ref back to Impl to keep it alive). Stop() here guarantees reads stop
+ // even if nobody has called Stop() on the wrapper.
+ if (m_Impl)
+ {
+ m_Impl->Stop();
+ }
+}
void
AsyncPipeReader::Start(StdoutPipeHandles&& Pipe, std::function<void(std::string_view)> DataCallback, std::function<void()> EofCallback)