aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync/mpsc/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ctr-std/src/sync/mpsc/mod.rs')
-rw-r--r--ctr-std/src/sync/mpsc/mod.rs774
1 files changed, 631 insertions, 143 deletions
diff --git a/ctr-std/src/sync/mpsc/mod.rs b/ctr-std/src/sync/mpsc/mod.rs
index aeeab17..2dd3aeb 100644
--- a/ctr-std/src/sync/mpsc/mod.rs
+++ b/ctr-std/src/sync/mpsc/mod.rs
@@ -13,40 +13,50 @@
//! This module provides message-based communication over channels, concretely
//! defined among three types:
//!
-//! * `Sender`
-//! * `SyncSender`
-//! * `Receiver`
+//! * [`Sender`]
+//! * [`SyncSender`]
+//! * [`Receiver`]
//!
-//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
+//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
//! senders are clone-able (multi-producer) such that many threads can send
//! simultaneously to one receiver (single-consumer).
//!
//! These channels come in two flavors:
//!
-//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
+//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
//! will return a `(Sender, Receiver)` tuple where all sends will be
//! **asynchronous** (they never block). The channel conceptually has an
//! infinite buffer.
//!
-//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
-//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
-//! is a pre-allocated buffer of a fixed size. All sends will be
+//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
+//! return a `(SyncSender, Receiver)` tuple where the storage for pending
+//! messages is a pre-allocated buffer of a fixed size. All sends will be
//! **synchronous** by blocking until there is buffer space available. Note
-//! that a bound of 0 is allowed, causing the channel to become a
-//! "rendezvous" channel where each sender atomically hands off a message to
-//! a receiver.
+//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
+//! channel where each sender atomically hands off a message to a receiver.
+//!
+//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
+//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
+//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
+//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
+//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
//!
//! ## Disconnection
//!
-//! The send and receive operations on channels will all return a `Result`
+//! The send and receive operations on channels will all return a [`Result`]
//! indicating whether the operation succeeded or not. An unsuccessful operation
//! is normally indicative of the other half of a channel having "hung up" by
//! being dropped in its corresponding thread.
//!
//! Once half of a channel has been deallocated, most operations can no longer
-//! continue to make progress, so `Err` will be returned. Many applications will
-//! continue to `unwrap()` the results returned from this module, instigating a
-//! propagation of failure among threads if one unexpectedly dies.
+//! continue to make progress, so [`Err`] will be returned. Many applications
+//! will continue to [`unwrap`] the results returned from this module,
+//! instigating a propagation of failure among threads if one unexpectedly dies.
+//!
+//! [`Result`]: ../../../std/result/enum.Result.html
+//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
//!
//! # Examples
//!
@@ -287,8 +297,36 @@ mod sync;
mod mpsc_queue;
mod spsc_queue;
-/// The receiving-half of Rust's channel type. This half can only be owned by
-/// one thread
+mod cache_aligned;
+
+/// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
+/// This half can only be owned by one thread.
+///
+/// Messages sent to the channel can be retrieved using [`recv`].
+///
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`recv`]: struct.Receiver.html#method.recv
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send("Hello world!").unwrap();
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+/// send.send("Delayed for 2 seconds").unwrap();
+/// });
+///
+/// println!("{}", recv.recv().unwrap()); // Received immediately
+/// println!("Waiting...");
+/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Receiver<T> {
inner: UnsafeCell<Flavor<T>>,
@@ -302,38 +340,153 @@ unsafe impl<T: Send> Send for Receiver<T> { }
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> !Sync for Receiver<T> { }
-/// An iterator over messages on a receiver, this iterator will block
-/// whenever `next` is called, waiting for a new message, and `None` will be
-/// returned when the corresponding channel has hung up.
+/// An iterator over messages on a [`Receiver`], created by [`iter`].
+///
+/// This iterator will block whenever [`next`] is called,
+/// waiting for a new message, and [`None`] will be returned
+/// when the corresponding channel has hung up.
+///
+/// [`iter`]: struct.Receiver.html#method.iter
+/// [`Receiver`]: struct.Receiver.html
+/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(Debug)]
pub struct Iter<'a, T: 'a> {
rx: &'a Receiver<T>
}
-/// An iterator that attempts to yield all pending values for a receiver.
-/// `None` will be returned when there are no pending values remaining or
+/// An iterator that attempts to yield all pending values for a [`Receiver`],
+/// created by [`try_iter`].
+///
+/// [`None`] will be returned when there are no pending values remaining or
/// if the corresponding channel has hung up.
///
-/// This Iterator will never block the caller in order to wait for data to
-/// become available. Instead, it will return `None`.
+/// This iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return [`None`].
+///
+/// [`Receiver`]: struct.Receiver.html
+/// [`try_iter`]: struct.Receiver.html#method.try_iter
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (sender, receiver) = channel();
+///
+/// // Nothing is in the buffer yet
+/// assert!(receiver.try_iter().next().is_none());
+/// println!("Nothing in the buffer...");
+///
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// sender.send(2).unwrap();
+/// sender.send(3).unwrap();
+/// });
+///
+/// println!("Going to sleep...");
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+///
+/// for x in receiver.try_iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
#[derive(Debug)]
pub struct TryIter<'a, T: 'a> {
rx: &'a Receiver<T>
}
-/// An owning iterator over messages on a receiver, this iterator will block
-/// whenever `next` is called, waiting for a new message, and `None` will be
-/// returned when the corresponding channel has hung up.
+/// An owning iterator over messages on a [`Receiver`],
+/// created by **Receiver::into_iter**.
+///
+/// This iterator will block whenever [`next`]
+/// is called, waiting for a new message, and [`None`] will be
+/// returned if the corresponding channel has hung up.
+///
+/// [`Receiver`]: struct.Receiver.html
+/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.into_iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
#[derive(Debug)]
pub struct IntoIter<T> {
rx: Receiver<T>
}
-/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
/// owned by one thread, but it can be cloned to send to other threads.
+///
+/// Messages can be sent through this channel with [`send`].
+///
+/// [`channel`]: fn.channel.html
+/// [`send`]: struct.Sender.html#method.send
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (sender, receiver) = channel();
+/// let sender2 = sender.clone();
+///
+/// // First thread owns sender
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// });
+///
+/// // Second thread owns sender2
+/// thread::spawn(move || {
+/// sender2.send(2).unwrap();
+/// });
+///
+/// let msg = receiver.recv().unwrap();
+/// let msg2 = receiver.recv().unwrap();
+///
+/// assert_eq!(3, msg + msg2);
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
inner: UnsafeCell<Flavor<T>>,
@@ -347,8 +500,53 @@ unsafe impl<T: Send> Send for Sender<T> { }
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> !Sync for Sender<T> { }
-/// The sending-half of Rust's synchronous channel type. This half can only be
-/// owned by one thread, but it can be cloned to send to other threads.
+/// The sending-half of Rust's synchronous [`sync_channel`] type.
+///
+/// Messages can be sent through this channel with [`send`] or [`try_send`].
+///
+/// [`send`] will block if there is no space in the internal buffer.
+///
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`send`]: struct.SyncSender.html#method.send
+/// [`try_send`]: struct.SyncSender.html#method.try_send
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::sync_channel;
+/// use std::thread;
+///
+/// // Create a sync_channel with buffer size 2
+/// let (sync_sender, receiver) = sync_channel(2);
+/// let sync_sender2 = sync_sender.clone();
+///
+/// // First thread owns sync_sender
+/// thread::spawn(move || {
+/// sync_sender.send(1).unwrap();
+/// sync_sender.send(2).unwrap();
+/// });
+///
+/// // Second thread owns sync_sender2
+/// thread::spawn(move || {
+/// sync_sender2.send(3).unwrap();
+/// // thread will now block since the buffer is full
+/// println!("Thread unblocked!");
+/// });
+///
+/// let mut msg;
+///
+/// msg = receiver.recv().unwrap();
+/// println!("message {} received", msg);
+///
+/// // "Thread unblocked!" will be printed now
+///
+/// msg = receiver.recv().unwrap();
+/// println!("message {} received", msg);
+///
+/// msg = receiver.recv().unwrap();
+///
+/// println!("message {} received", msg);
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<sync::Packet<T>>,
@@ -357,73 +555,97 @@ pub struct SyncSender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Send> Send for SyncSender<T> {}
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for SyncSender<T> {}
-
-/// An error returned from the `send` function on channels.
+/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
+/// function on **channel**s.
///
-/// A `send` operation can only fail if the receiving end of a channel is
+/// A **send** operation can only fail if the receiving end of a channel is
/// disconnected, implying that the data could never be received. The error
/// contains the data being sent as a payload so it can be recovered.
+///
+/// [`Sender::send`]: struct.Sender.html#method.send
+/// [`SyncSender::send`]: struct.SyncSender.html#method.send
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
-/// An error returned from the `recv` function on a `Receiver`.
+/// An error returned from the [`recv`] function on a [`Receiver`].
///
-/// The `recv` operation can only fail if the sending half of a channel is
-/// disconnected, implying that no further messages will ever be received.
+/// The [`recv`] operation can only fail if the sending half of a
+/// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
+/// messages will ever be received.
+///
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`Receiver`]: struct.Receiver.html
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "rust1", since = "1.0.0")]
pub struct RecvError;
-/// This enumeration is the list of the possible reasons that `try_recv` could
-/// not return data when called.
+/// This enumeration is the list of the possible reasons that [`try_recv`] could
+/// not return data when called. This can occur with both a [`channel`] and
+/// a [`sync_channel`].
+///
+/// [`try_recv`]: struct.Receiver.html#method.try_recv
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "rust1", since = "1.0.0")]
pub enum TryRecvError {
- /// This channel is currently empty, but the sender(s) have not yet
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
/// disconnected, so data may yet become available.
#[stable(feature = "rust1", since = "1.0.0")]
Empty,
- /// This channel's sending half has become disconnected, and there will
- /// never be any more data received on this channel
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
#[stable(feature = "rust1", since = "1.0.0")]
Disconnected,
}
-/// This enumeration is the list of possible errors that `recv_timeout` could
-/// not return data when called.
+/// This enumeration is the list of possible errors that made [`recv_timeout`]
+/// unable to return data when called. This can occur with both a [`channel`] and
+/// a [`sync_channel`].
+///
+/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub enum RecvTimeoutError {
- /// This channel is currently empty, but the sender(s) have not yet
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
/// disconnected, so data may yet become available.
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
Timeout,
- /// This channel's sending half has become disconnected, and there will
- /// never be any more data received on this channel
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
Disconnected,
}
/// This enumeration is the list of the possible error outcomes for the
-/// `SyncSender::try_send` method.
+/// [`try_send`] method.
+///
+/// [`try_send`]: struct.SyncSender.html#method.try_send
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
- /// The data could not be sent on the channel because it would require that
+ /// The data could not be sent on the [`sync_channel`] because it would require that
/// the callee block to send the data.
///
/// If this is a buffered channel, then the buffer is full at this time. If
- /// this is not a buffered channel, then there is no receiver available to
+ /// this is not a buffered channel, then there is no [`Receiver`] available to
/// acquire the data.
+ ///
+ /// [`sync_channel`]: fn.sync_channel.html
+ /// [`Receiver`]: struct.Receiver.html
#[stable(feature = "rust1", since = "1.0.0")]
Full(#[stable(feature = "rust1", since = "1.0.0")] T),
- /// This channel's receiving half has disconnected, so the data could not be
+ /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
/// sent. The data is returned back to the callee in this case.
+ ///
+ /// [`sync_channel`]: fn.sync_channel.html
#[stable(feature = "rust1", since = "1.0.0")]
Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
}
@@ -457,15 +679,27 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
}
/// Creates a new asynchronous channel, returning the sender/receiver halves.
-/// All data sent on the sender will become available on the receiver, and no
-/// send will block the calling thread (this channel has an "infinite buffer").
+/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
+/// the same order as it was sent, and no [`send`] will block the calling thread
+/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
+/// block after its buffer limit is reached). [`recv`] will block until a message
+/// is available.
///
-/// If the [`Receiver`] is disconnected while trying to [`send()`] with the
-/// [`Sender`], the [`send()`] method will return an error.
+/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
+/// only one [`Receiver`] is supported.
///
-/// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
-/// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
-/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+/// If the [`Receiver`] is disconnected while trying to [`send`] with the
+/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
+/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
+/// return a [`RecvError`].
+///
+/// [`send`]: struct.Sender.html#method.send
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`Sender`]: struct.Sender.html
+/// [`Receiver`]: struct.Receiver.html
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`SendError`]: struct.SendError.html
+/// [`RecvError`]: struct.RecvError.html
///
/// # Examples
///
@@ -473,20 +707,18 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// use std::sync::mpsc::channel;
/// use std::thread;
///
-/// // tx is the sending half (tx for transmission), and rx is the receiving
-/// // half (rx for receiving).
-/// let (tx, rx) = channel();
+/// let (sender, receiver) = channel();
///
/// // Spawn off an expensive computation
/// thread::spawn(move|| {
/// # fn expensive_computation() {}
-/// tx.send(expensive_computation()).unwrap();
+/// sender.send(expensive_computation()).unwrap();
/// });
///
/// // Do some useful work for awhile
///
/// // Let's see what that answer was
-/// println!("{:?}", rx.recv().unwrap());
+/// println!("{:?}", receiver.recv().unwrap());
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
@@ -495,24 +727,32 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
}
/// Creates a new synchronous, bounded channel.
-///
-/// Like asynchronous channels, the [`Receiver`] will block until a message
-/// becomes available. These channels differ greatly in the semantics of the
-/// sender from asynchronous channels, however.
+/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
+/// in the same order as it was sent. Like asynchronous [`channel`]s, the
+/// [`Receiver`] will block until a message becomes available. `sync_channel`
+/// differs greatly in the semantics of the sender, however.
///
/// This channel has an internal buffer on which messages will be queued.
/// `bound` specifies the buffer size. When the internal buffer becomes full,
/// future sends will *block* waiting for the buffer to open up. Note that a
/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
-/// where each [`send()`] will not return until a recv is paired with it.
+/// where each [`send`] will not return until a [`recv`] is paired with it.
///
-/// Like asynchronous channels, if the [`Receiver`] is disconnected while
-/// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will
-/// return an error.
+/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
+/// times, but only one [`Receiver`] is supported.
///
-/// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
-/// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
-/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
+/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
+/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
+/// to [`recv`], the [`recv`] method will return a [`RecvError`].
+///
+/// [`channel`]: fn.channel.html
+/// [`send`]: struct.SyncSender.html#method.send
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`SyncSender`]: struct.SyncSender.html
+/// [`Receiver`]: struct.Receiver.html
+/// [`SendError`]: struct.SendError.html
+/// [`RecvError`]: struct.RecvError.html
///
/// # Examples
///
@@ -520,18 +760,18 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// use std::sync::mpsc::sync_channel;
/// use std::thread;
///
-/// let (tx, rx) = sync_channel(1);
+/// let (sender, receiver) = sync_channel(1);
///
/// // this returns immediately
-/// tx.send(1).unwrap();
+/// sender.send(1).unwrap();
///
/// thread::spawn(move|| {
/// // this will block until the previous message has been received
-/// tx.send(2).unwrap();
+/// sender.send(2).unwrap();
/// });
///
-/// assert_eq!(rx.recv().unwrap(), 1);
-/// assert_eq!(rx.recv().unwrap(), 2);
+/// assert_eq!(receiver.recv().unwrap(), 1);
+/// assert_eq!(receiver.recv().unwrap(), 2);
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
@@ -556,10 +796,13 @@ impl<T> Sender<T> {
/// A successful send occurs when it is determined that the other end of
/// the channel has not hung up already. An unsuccessful send would be one
/// where the corresponding receiver has already been deallocated. Note
- /// that a return value of `Err` means that the data will never be
- /// received, but a return value of `Ok` does *not* mean that the data
+ /// that a return value of [`Err`] means that the data will never be
+ /// received, but a return value of [`Ok`] does *not* mean that the data
/// will be received. It is possible for the corresponding receiver to
- /// hang up immediately after this function returns `Ok`.
+ /// hang up immediately after this function returns [`Ok`].
+ ///
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
///
/// This method will never block the current thread.
///
@@ -675,10 +918,10 @@ impl<T> Drop for Sender<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Sender {{ .. }}")
+ f.debug_struct("Sender").finish()
}
}
@@ -699,12 +942,37 @@ impl<T> SyncSender<T> {
/// Note that a successful send does *not* guarantee that the receiver will
/// ever see the data if there is a buffer on this channel. Items may be
/// enqueued in the internal buffer for the receiver to receive at a later
- /// time. If the buffer size is 0, however, it can be guaranteed that the
- /// receiver has indeed received the data if this function returns success.
+ /// time. If the buffer size is 0, however, the channel becomes a rendezvous
+ /// channel and it guarantees that the receiver has indeed received
+ /// the data if this function returns success.
///
- /// This function will never panic, but it may return `Err` if the
- /// `Receiver` has disconnected and is no longer able to receive
+ /// This function will never panic, but it may return [`Err`] if the
+ /// [`Receiver`] has disconnected and is no longer able to receive
/// information.
+ ///
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::sync_channel;
+ /// use std::thread;
+ ///
+ /// // Create a rendezvous sync_channel with buffer size 0
+ /// let (sync_sender, receiver) = sync_channel(0);
+ ///
+ /// thread::spawn(move || {
+ /// println!("sending message...");
+ /// sync_sender.send(1).unwrap();
+ /// // Thread is now blocked until the message is received
+ ///
+ /// println!("...message received!");
+ /// });
+ ///
+ /// let msg = receiver.recv().unwrap();
+ /// assert_eq!(1, msg);
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.inner.send(t).map_err(SendError)
@@ -712,13 +980,53 @@ impl<T> SyncSender<T> {
/// Attempts to send a value on this channel without blocking.
///
- /// This method differs from `send` by returning immediately if the
+ /// This method differs from [`send`] by returning immediately if the
/// channel's buffer is full or no receiver is waiting to acquire some
- /// data. Compared with `send`, this function has two failure cases
+ /// data. Compared with [`send`], this function has two failure cases
/// instead of one (one for disconnection, one for a full buffer).
///
- /// See `SyncSender::send` for notes about guarantees of whether the
+ /// See [`send`] for notes about guarantees of whether the
/// receiver has received the data or not if this function is successful.
+ ///
+ /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::sync_channel;
+ /// use std::thread;
+ ///
+ /// // Create a sync_channel with buffer size 1
+ /// let (sync_sender, receiver) = sync_channel(1);
+ /// let sync_sender2 = sync_sender.clone();
+ ///
+ /// // First thread owns sync_sender
+ /// thread::spawn(move || {
+ /// sync_sender.send(1).unwrap();
+ /// sync_sender.send(2).unwrap();
+ /// // Thread blocked
+ /// });
+ ///
+ /// // Second thread owns sync_sender2
+ /// thread::spawn(move || {
+ /// // This will return an error and send
+ /// // no message if the buffer is full
+ /// sync_sender2.try_send(3).is_err();
+ /// });
+ ///
+ /// let mut msg;
+ /// msg = receiver.recv().unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// msg = receiver.recv().unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// // Third message may have never been sent
+ /// match receiver.try_recv() {
+ /// Ok(msg) => println!("message {} received", msg),
+ /// Err(_) => println!("the third message was never sent"),
+ /// }
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(t)
@@ -740,10 +1048,10 @@ impl<T> Drop for SyncSender<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for SyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "SyncSender {{ .. }}")
+ f.debug_struct("SyncSender").finish()
}
}
@@ -756,7 +1064,7 @@ impl<T> Receiver<T> {
Receiver { inner: UnsafeCell::new(inner) }
}
- /// Attempts to return a pending value on this receiver without blocking
+ /// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
/// become available. Instead, this will always return immediately with a
@@ -764,6 +1072,21 @@ impl<T> Receiver<T> {
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// block on a receiver.
+ ///
+ /// Compared with [`recv`], this function has two failure cases instead of one
+ /// (one for disconnection, one for an empty buffer).
+ ///
+ /// [`recv`]: struct.Receiver.html#method.recv
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::{Receiver, channel};
+ ///
+ /// let (_, receiver): (_, Receiver<i32>) = channel();
+ ///
+ /// assert!(receiver.try_recv().is_err());
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
loop {
@@ -819,15 +1142,19 @@ impl<T> Receiver<T> {
///
/// This function will always block the current thread if there is no data
/// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding `Sender`, then this receiver will wake up and
- /// return that message.
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
///
- /// If the corresponding `Sender` has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return `Err` to
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
/// indicate that no more messages can ever be received on this channel.
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
/// # Examples
///
/// ```
@@ -907,25 +1234,58 @@ impl<T> Receiver<T> {
///
/// This function will always block the current thread if there is no data
/// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding `Sender`, then this receiver will wake up and
- /// return that message.
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
///
- /// If the corresponding `Sender` has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return `Err` to
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
/// indicate that no more messages can ever be received on this channel.
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
/// # Examples
///
+ /// Successfully receiving value before encountering timeout:
+ ///
+ /// ```no_run
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching timeout:
+ ///
/// ```no_run
- /// use std::sync::mpsc::{self, RecvTimeoutError};
+ /// use std::thread;
/// use std::time::Duration;
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
///
- /// let (send, recv) = mpsc::channel::<()>();
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
///
- /// let timeout = Duration::from_millis(100);
- /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Err(mpsc::RecvTimeoutError::Timeout)
+ /// );
/// ```
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
@@ -937,11 +1297,72 @@ impl<T> Receiver<T> {
Err(TryRecvError::Disconnected)
=> Err(RecvTimeoutError::Disconnected),
Err(TryRecvError::Empty)
- => self.recv_max_until(Instant::now() + timeout)
+ => self.recv_deadline(Instant::now() + timeout)
}
}
- fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up, or if `deadline` is reached.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent. Once a message is
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
+ /// # Examples
+ ///
+ /// Successfully receiving value before reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(deadline_api)]
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(deadline_api)]
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Err(mpsc::RecvTimeoutError::Timeout)
+ /// );
+ /// ```
+ #[unstable(feature = "deadline_api", issue = "46316")]
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
use self::RecvTimeoutError::*;
loop {
@@ -993,7 +1414,31 @@ impl<T> Receiver<T> {
}
/// Returns an iterator that will block waiting for messages, but never
- /// `panic!`. It will return `None` when the channel has hung up.
+ /// [`panic!`]. It will return [`None`] when the channel has hung up.
+ ///
+ /// [`panic!`]: ../../../std/macro.panic.html
+ /// [`None`]: ../../../std/option/enum.Option.html#variant.None
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::channel;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send(1).unwrap();
+ /// send.send(2).unwrap();
+ /// send.send(3).unwrap();
+ /// });
+ ///
+ /// let mut iter = recv.iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn iter(&self) -> Iter<T> {
Iter { rx: self }
@@ -1001,8 +1446,42 @@ impl<T> Receiver<T> {
/// Returns an iterator that will attempt to yield all pending values.
/// It will return `None` if there are no more pending values or if the
- /// channel has hung up. The iterator will never `panic!` or block the
+ /// channel has hung up. The iterator will never [`panic!`] or block the
/// user by waiting for values.
+ ///
+ /// [`panic!`]: ../../../std/macro.panic.html
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use std::sync::mpsc::channel;
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// let (sender, receiver) = channel();
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// sender.send(1).unwrap();
+ /// sender.send(2).unwrap();
+ /// sender.send(3).unwrap();
+ /// });
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// // block for two seconds
+ /// thread::sleep(Duration::from_secs(2));
+ ///
+ /// let mut iter = receiver.try_iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
pub fn try_iter(&self) -> TryIter<T> {
TryIter { rx: self }
@@ -1132,10 +1611,10 @@ impl<T> Drop for Receiver<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Receiver {{ .. }}")
+ f.debug_struct("Receiver").finish()
}
}
@@ -1207,6 +1686,15 @@ impl<T: Send> error::Error for TrySendError<T> {
}
}
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl<T> From<SendError<T>> for TrySendError<T> {
+ fn from(err: SendError<T>) -> TrySendError<T> {
+ match err {
+ SendError(t) => TrySendError::Disconnected(t),
+ }
+ }
+}
+
#[stable(feature = "rust1", since = "1.0.0")]
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -1259,7 +1747,16 @@ impl error::Error for TryRecvError {
}
}
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl From<RecvError> for TryRecvError {
+ fn from(err: RecvError) -> TryRecvError {
+ match err {
+ RecvError => TryRecvError::Disconnected,
+ }
+ }
+}
+
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
@@ -1273,7 +1770,7 @@ impl fmt::Display for RecvTimeoutError {
}
}
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
impl error::Error for RecvTimeoutError {
fn description(&self) -> &str {
match *self {
@@ -1291,6 +1788,15 @@ impl error::Error for RecvTimeoutError {
}
}
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl From<RecvError> for RecvTimeoutError {
+ fn from(err: RecvError) -> RecvTimeoutError {
+ match err {
+ RecvError => RecvTimeoutError::Disconnected,
+ }
+ }
+}
+
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests {
use env;
@@ -1539,7 +2045,7 @@ mod tests {
fn oneshot_single_thread_send_then_recv() {
let (tx, rx) = channel::<Box<i32>>();
tx.send(box 10).unwrap();
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
#[test]
@@ -1596,7 +2102,7 @@ mod tests {
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = channel::<Box<i32>>();
let _t = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
});
tx.send(box 10).unwrap();
@@ -1609,7 +2115,7 @@ mod tests {
drop(tx);
});
let res = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}).join();
assert!(res.is_err());
}
@@ -1663,7 +2169,7 @@ mod tests {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
}
@@ -1688,7 +2194,7 @@ mod tests {
if i == 10 { return }
thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box i);
+ assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
}
@@ -2225,7 +2731,7 @@ mod sync_tests {
fn oneshot_single_thread_send_then_recv() {
let (tx, rx) = sync_channel::<Box<i32>>(1);
tx.send(box 10).unwrap();
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
#[test]
@@ -2297,7 +2803,7 @@ mod sync_tests {
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = sync_channel::<Box<i32>>(0);
let _t = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
});
tx.send(box 10).unwrap();
@@ -2310,7 +2816,7 @@ mod sync_tests {
drop(tx);
});
let res = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}).join();
assert!(res.is_err());
}
@@ -2364,7 +2870,7 @@ mod sync_tests {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
}
@@ -2389,7 +2895,7 @@ mod sync_tests {
if i == 10 { return }
thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box i);
+ assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
}
@@ -2593,22 +3099,4 @@ mod sync_tests {
repro()
}
}
-
- #[test]
- fn fmt_debug_sender() {
- let (tx, _) = channel::<i32>();
- assert_eq!(format!("{:?}", tx), "Sender { .. }");
- }
-
- #[test]
- fn fmt_debug_recv() {
- let (_, rx) = channel::<i32>();
- assert_eq!(format!("{:?}", rx), "Receiver { .. }");
- }
-
- #[test]
- fn fmt_debug_sync_sender() {
- let (tx, _) = sync_channel::<i32>(1);
- assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
- }
}