aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync
diff options
context:
space:
mode:
authorFenrir <[email protected]>2018-01-21 14:06:28 -0700
committerFenrirWolf <[email protected]>2018-01-21 19:16:33 -0700
commit23be3f4885688e5e0011005e2295c75168854c0a (patch)
treedd0850f9c73c489e114a761d5c0757f3dbec3a65 /ctr-std/src/sync
parentUpdate CI for Rust nightly-2017-12-01 + other fixes (diff)
downloadctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.tar.xz
ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.zip
Recreate ctr-std from latest nightly
Diffstat (limited to 'ctr-std/src/sync')
-rw-r--r--ctr-std/src/sync/barrier.rs11
-rw-r--r--ctr-std/src/sync/condvar.rs86
-rw-r--r--ctr-std/src/sync/mpsc/blocking.rs2
-rw-r--r--ctr-std/src/sync/mpsc/cache_aligned.rs37
-rw-r--r--ctr-std/src/sync/mpsc/mod.rs774
-rw-r--r--ctr-std/src/sync/mpsc/mpsc_queue.rs35
-rw-r--r--ctr-std/src/sync/mpsc/select.rs22
-rw-r--r--ctr-std/src/sync/mpsc/spsc_queue.rs192
-rw-r--r--ctr-std/src/sync/mpsc/stream.rs105
-rw-r--r--ctr-std/src/sync/mpsc/sync.rs2
-rw-r--r--ctr-std/src/sync/mutex.rs89
-rw-r--r--ctr-std/src/sync/once.rs117
-rw-r--r--ctr-std/src/sync/rwlock.rs202
13 files changed, 1247 insertions, 427 deletions
diff --git a/ctr-std/src/sync/barrier.rs b/ctr-std/src/sync/barrier.rs
index f15e7ff..273c7c1 100644
--- a/ctr-std/src/sync/barrier.rs
+++ b/ctr-std/src/sync/barrier.rs
@@ -50,12 +50,11 @@ struct BarrierState {
generation_id: usize,
}
-/// A result returned from wait.
+/// A `BarrierWaitResult` is returned by [`wait`] when all threads in the [`Barrier`]
+/// have rendezvoused.
///
-/// Currently this opaque structure only has one method, [`.is_leader()`]. Only
-/// one thread will receive a result that will return `true` from this function.
-///
-/// [`.is_leader()`]: #method.is_leader
+/// [`wait`]: struct.Barrier.html#method.wait
+/// [`Barrier`]: struct.Barrier.html
///
/// # Examples
///
@@ -153,7 +152,7 @@ impl Barrier {
BarrierWaitResult(false)
} else {
lock.count = 0;
- lock.generation_id += 1;
+ lock.generation_id = lock.generation_id.wrapping_add(1);
self.cvar.notify_all();
BarrierWaitResult(true)
}
diff --git a/ctr-std/src/sync/condvar.rs b/ctr-std/src/sync/condvar.rs
index 68c7e88..5640217 100644
--- a/ctr-std/src/sync/condvar.rs
+++ b/ctr-std/src/sync/condvar.rs
@@ -150,7 +150,7 @@ impl Condvar {
///
/// This function will atomically unlock the mutex specified (represented by
/// `guard`) and block the current thread. This means that any calls
- /// to [`notify_one()`] or [`notify_all()`] which happen logically after the
+ /// to [`notify_one`] or [`notify_all`] which happen logically after the
/// mutex is unlocked are candidates to wake this thread up. When this
/// function call returns, the lock specified will have been re-acquired.
///
@@ -167,16 +167,16 @@ impl Condvar {
///
/// # Panics
///
- /// This function will [`panic!()`] if it is used with more than one mutex
+ /// This function will [`panic!`] if it is used with more than one mutex
/// over time. Each condition variable is dynamically bound to exactly one
/// mutex to ensure defined behavior across platforms. If this functionality
/// is not desired, then unsafe primitives in `sys` are provided.
///
- /// [`notify_one()`]: #method.notify_one
- /// [`notify_all()`]: #method.notify_all
+ /// [`notify_one`]: #method.notify_one
+ /// [`notify_all`]: #method.notify_all
/// [poisoning]: ../sync/struct.Mutex.html#poisoning
/// [`Mutex`]: ../sync/struct.Mutex.html
- /// [`panic!()`]: ../../std/macro.panic.html
+ /// [`panic!`]: ../../std/macro.panic.html
///
/// # Examples
///
@@ -359,11 +359,11 @@ impl Condvar {
/// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to
/// `notify_one` are not buffered in any way.
///
- /// To wake up all threads, see [`notify_all()`].
+ /// To wake up all threads, see [`notify_all`].
///
/// [`wait`]: #method.wait
/// [`wait_timeout`]: #method.wait_timeout
- /// [`notify_all()`]: #method.notify_all
+ /// [`notify_all`]: #method.notify_all
///
/// # Examples
///
@@ -401,9 +401,9 @@ impl Condvar {
/// variable are awoken. Calls to `notify_all()` are not buffered in any
/// way.
///
- /// To wake up only one thread, see [`notify_one()`].
+ /// To wake up only one thread, see [`notify_one`].
///
- /// [`notify_one()`]: #method.notify_one
+ /// [`notify_one`]: #method.notify_one
///
/// # Examples
///
@@ -461,7 +461,7 @@ impl fmt::Debug for Condvar {
}
}
-#[stable(feature = "condvar_default", since = "1.9.0")]
+#[stable(feature = "condvar_default", since = "1.10.0")]
impl Default for Condvar {
/// Creates a `Condvar` which is ready to be waited on and notified.
fn default() -> Condvar {
@@ -480,9 +480,10 @@ impl Drop for Condvar {
mod tests {
use sync::mpsc::channel;
use sync::{Condvar, Mutex, Arc};
+ use sync::atomic::{AtomicBool, Ordering};
use thread;
use time::Duration;
- use u32;
+ use u64;
#[test]
fn smoke() {
@@ -547,23 +548,58 @@ mod tests {
#[test]
#[cfg_attr(target_os = "emscripten", ignore)]
- fn wait_timeout_ms() {
+ fn wait_timeout_wait() {
let m = Arc::new(Mutex::new(()));
- let m2 = m.clone();
let c = Arc::new(Condvar::new());
- let c2 = c.clone();
- let g = m.lock().unwrap();
- let (g, _no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap();
- // spurious wakeups mean this isn't necessarily true
- // assert!(!no_timeout);
- let _t = thread::spawn(move || {
- let _g = m2.lock().unwrap();
- c2.notify_one();
- });
- let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u32::MAX as u64)).unwrap();
- assert!(!timeout_res.timed_out());
- drop(g);
+ loop {
+ let g = m.lock().unwrap();
+ let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap();
+ // spurious wakeups mean this isn't necessarily true
+ // so execute test again, if not timeout
+ if !no_timeout.timed_out() {
+ continue;
+ }
+
+ break;
+ }
+ }
+
+ #[test]
+ #[cfg_attr(target_os = "emscripten", ignore)]
+ fn wait_timeout_wake() {
+ let m = Arc::new(Mutex::new(()));
+ let c = Arc::new(Condvar::new());
+
+ loop {
+ let g = m.lock().unwrap();
+
+ let c2 = c.clone();
+ let m2 = m.clone();
+
+ let notified = Arc::new(AtomicBool::new(false));
+ let notified_copy = notified.clone();
+
+ let t = thread::spawn(move || {
+ let _g = m2.lock().unwrap();
+ thread::sleep(Duration::from_millis(1));
+ notified_copy.store(true, Ordering::SeqCst);
+ c2.notify_one();
+ });
+ let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap();
+ assert!(!timeout_res.timed_out());
+ // spurious wakeups mean this isn't necessarily true
+ // so execute test again, if not notified
+ if !notified.load(Ordering::SeqCst) {
+ t.join().unwrap();
+ continue;
+ }
+ drop(g);
+
+ t.join().unwrap();
+
+ break;
+ }
}
#[test]
diff --git a/ctr-std/src/sync/mpsc/blocking.rs b/ctr-std/src/sync/mpsc/blocking.rs
index 0f9ef6f..c08bd6d 100644
--- a/ctr-std/src/sync/mpsc/blocking.rs
+++ b/ctr-std/src/sync/mpsc/blocking.rs
@@ -46,7 +46,7 @@ pub fn tokens() -> (WaitToken, SignalToken) {
inner: inner.clone(),
};
let signal_token = SignalToken {
- inner: inner
+ inner,
};
(wait_token, signal_token)
}
diff --git a/ctr-std/src/sync/mpsc/cache_aligned.rs b/ctr-std/src/sync/mpsc/cache_aligned.rs
new file mode 100644
index 0000000..5af0126
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/cache_aligned.rs
@@ -0,0 +1,37 @@
+// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use ops::{Deref, DerefMut};
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+#[repr(align(64))]
+pub(super) struct Aligner;
+
+#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub(super) struct CacheAligned<T>(pub T, pub Aligner);
+
+impl<T> Deref for CacheAligned<T> {
+ type Target = T;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl<T> DerefMut for CacheAligned<T> {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.0
+ }
+}
+
+impl<T> CacheAligned<T> {
+ pub(super) fn new(t: T) -> Self {
+ CacheAligned(t, Aligner)
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/mod.rs b/ctr-std/src/sync/mpsc/mod.rs
index aeeab17..2dd3aeb 100644
--- a/ctr-std/src/sync/mpsc/mod.rs
+++ b/ctr-std/src/sync/mpsc/mod.rs
@@ -13,40 +13,50 @@
//! This module provides message-based communication over channels, concretely
//! defined among three types:
//!
-//! * `Sender`
-//! * `SyncSender`
-//! * `Receiver`
+//! * [`Sender`]
+//! * [`SyncSender`]
+//! * [`Receiver`]
//!
-//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
+//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both
//! senders are clone-able (multi-producer) such that many threads can send
//! simultaneously to one receiver (single-consumer).
//!
//! These channels come in two flavors:
//!
-//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
+//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function
//! will return a `(Sender, Receiver)` tuple where all sends will be
//! **asynchronous** (they never block). The channel conceptually has an
//! infinite buffer.
//!
-//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
-//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
-//! is a pre-allocated buffer of a fixed size. All sends will be
+//! 2. A synchronous, bounded channel. The [`sync_channel`] function will
+//! return a `(SyncSender, Receiver)` tuple where the storage for pending
+//! messages is a pre-allocated buffer of a fixed size. All sends will be
//! **synchronous** by blocking until there is buffer space available. Note
-//! that a bound of 0 is allowed, causing the channel to become a
-//! "rendezvous" channel where each sender atomically hands off a message to
-//! a receiver.
+//! that a bound of 0 is allowed, causing the channel to become a "rendezvous"
+//! channel where each sender atomically hands off a message to a receiver.
+//!
+//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
+//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
+//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
+//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html
+//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html
//!
//! ## Disconnection
//!
-//! The send and receive operations on channels will all return a `Result`
+//! The send and receive operations on channels will all return a [`Result`]
//! indicating whether the operation succeeded or not. An unsuccessful operation
//! is normally indicative of the other half of a channel having "hung up" by
//! being dropped in its corresponding thread.
//!
//! Once half of a channel has been deallocated, most operations can no longer
-//! continue to make progress, so `Err` will be returned. Many applications will
-//! continue to `unwrap()` the results returned from this module, instigating a
-//! propagation of failure among threads if one unexpectedly dies.
+//! continue to make progress, so [`Err`] will be returned. Many applications
+//! will continue to [`unwrap`] the results returned from this module,
+//! instigating a propagation of failure among threads if one unexpectedly dies.
+//!
+//! [`Result`]: ../../../std/result/enum.Result.html
+//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap
//!
//! # Examples
//!
@@ -287,8 +297,36 @@ mod sync;
mod mpsc_queue;
mod spsc_queue;
-/// The receiving-half of Rust's channel type. This half can only be owned by
-/// one thread
+mod cache_aligned;
+
+/// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type.
+/// This half can only be owned by one thread.
+///
+/// Messages sent to the channel can be retrieved using [`recv`].
+///
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`recv`]: struct.Receiver.html#method.recv
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send("Hello world!").unwrap();
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+/// send.send("Delayed for 2 seconds").unwrap();
+/// });
+///
+/// println!("{}", recv.recv().unwrap()); // Received immediately
+/// println!("Waiting...");
+/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Receiver<T> {
inner: UnsafeCell<Flavor<T>>,
@@ -302,38 +340,153 @@ unsafe impl<T: Send> Send for Receiver<T> { }
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> !Sync for Receiver<T> { }
-/// An iterator over messages on a receiver, this iterator will block
-/// whenever `next` is called, waiting for a new message, and `None` will be
-/// returned when the corresponding channel has hung up.
+/// An iterator over messages on a [`Receiver`], created by [`iter`].
+///
+/// This iterator will block whenever [`next`] is called,
+/// waiting for a new message, and [`None`] will be returned
+/// when the corresponding channel has hung up.
+///
+/// [`iter`]: struct.Receiver.html#method.iter
+/// [`Receiver`]: struct.Receiver.html
+/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(Debug)]
pub struct Iter<'a, T: 'a> {
rx: &'a Receiver<T>
}
-/// An iterator that attempts to yield all pending values for a receiver.
-/// `None` will be returned when there are no pending values remaining or
+/// An iterator that attempts to yield all pending values for a [`Receiver`],
+/// created by [`try_iter`].
+///
+/// [`None`] will be returned when there are no pending values remaining or
/// if the corresponding channel has hung up.
///
-/// This Iterator will never block the caller in order to wait for data to
-/// become available. Instead, it will return `None`.
+/// This iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return [`None`].
+///
+/// [`Receiver`]: struct.Receiver.html
+/// [`try_iter`]: struct.Receiver.html#method.try_iter
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+/// use std::time::Duration;
+///
+/// let (sender, receiver) = channel();
+///
+/// // Nothing is in the buffer yet
+/// assert!(receiver.try_iter().next().is_none());
+/// println!("Nothing in the buffer...");
+///
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// sender.send(2).unwrap();
+/// sender.send(3).unwrap();
+/// });
+///
+/// println!("Going to sleep...");
+/// thread::sleep(Duration::from_secs(2)); // block for two seconds
+///
+/// for x in receiver.try_iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
#[derive(Debug)]
pub struct TryIter<'a, T: 'a> {
rx: &'a Receiver<T>
}
-/// An owning iterator over messages on a receiver, this iterator will block
-/// whenever `next` is called, waiting for a new message, and `None` will be
-/// returned when the corresponding channel has hung up.
+/// An owning iterator over messages on a [`Receiver`],
+/// created by **Receiver::into_iter**.
+///
+/// This iterator will block whenever [`next`]
+/// is called, waiting for a new message, and [`None`] will be
+/// returned if the corresponding channel has hung up.
+///
+/// [`Receiver`]: struct.Receiver.html
+/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next
+/// [`None`]: ../../../std/option/enum.Option.html#variant.None
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (send, recv) = channel();
+///
+/// thread::spawn(move || {
+/// send.send(1u8).unwrap();
+/// send.send(2u8).unwrap();
+/// send.send(3u8).unwrap();
+/// });
+///
+/// for x in recv.into_iter() {
+/// println!("Got: {}", x);
+/// }
+/// ```
#[stable(feature = "receiver_into_iter", since = "1.1.0")]
#[derive(Debug)]
pub struct IntoIter<T> {
rx: Receiver<T>
}
-/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be
/// owned by one thread, but it can be cloned to send to other threads.
+///
+/// Messages can be sent through this channel with [`send`].
+///
+/// [`channel`]: fn.channel.html
+/// [`send`]: struct.Sender.html#method.send
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// let (sender, receiver) = channel();
+/// let sender2 = sender.clone();
+///
+/// // First thread owns sender
+/// thread::spawn(move || {
+/// sender.send(1).unwrap();
+/// });
+///
+/// // Second thread owns sender2
+/// thread::spawn(move || {
+/// sender2.send(2).unwrap();
+/// });
+///
+/// let msg = receiver.recv().unwrap();
+/// let msg2 = receiver.recv().unwrap();
+///
+/// assert_eq!(3, msg + msg2);
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct Sender<T> {
inner: UnsafeCell<Flavor<T>>,
@@ -347,8 +500,53 @@ unsafe impl<T: Send> Send for Sender<T> { }
#[stable(feature = "rust1", since = "1.0.0")]
impl<T> !Sync for Sender<T> { }
-/// The sending-half of Rust's synchronous channel type. This half can only be
-/// owned by one thread, but it can be cloned to send to other threads.
+/// The sending-half of Rust's synchronous [`sync_channel`] type.
+///
+/// Messages can be sent through this channel with [`send`] or [`try_send`].
+///
+/// [`send`] will block if there is no space in the internal buffer.
+///
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`send`]: struct.SyncSender.html#method.send
+/// [`try_send`]: struct.SyncSender.html#method.try_send
+///
+/// # Examples
+///
+/// ```rust
+/// use std::sync::mpsc::sync_channel;
+/// use std::thread;
+///
+/// // Create a sync_channel with buffer size 2
+/// let (sync_sender, receiver) = sync_channel(2);
+/// let sync_sender2 = sync_sender.clone();
+///
+/// // First thread owns sync_sender
+/// thread::spawn(move || {
+/// sync_sender.send(1).unwrap();
+/// sync_sender.send(2).unwrap();
+/// });
+///
+/// // Second thread owns sync_sender2
+/// thread::spawn(move || {
+/// sync_sender2.send(3).unwrap();
+/// // thread will now block since the buffer is full
+/// println!("Thread unblocked!");
+/// });
+///
+/// let mut msg;
+///
+/// msg = receiver.recv().unwrap();
+/// println!("message {} received", msg);
+///
+/// // "Thread unblocked!" will be printed now
+///
+/// msg = receiver.recv().unwrap();
+/// println!("message {} received", msg);
+///
+/// msg = receiver.recv().unwrap();
+///
+/// println!("message {} received", msg);
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub struct SyncSender<T> {
inner: Arc<sync::Packet<T>>,
@@ -357,73 +555,97 @@ pub struct SyncSender<T> {
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Send> Send for SyncSender<T> {}
-#[stable(feature = "rust1", since = "1.0.0")]
-impl<T> !Sync for SyncSender<T> {}
-
-/// An error returned from the `send` function on channels.
+/// An error returned from the [`Sender::send`] or [`SyncSender::send`]
+/// function on **channel**s.
///
-/// A `send` operation can only fail if the receiving end of a channel is
+/// A **send** operation can only fail if the receiving end of a channel is
/// disconnected, implying that the data could never be received. The error
/// contains the data being sent as a payload so it can be recovered.
+///
+/// [`Sender::send`]: struct.Sender.html#method.send
+/// [`SyncSender::send`]: struct.SyncSender.html#method.send
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
-/// An error returned from the `recv` function on a `Receiver`.
+/// An error returned from the [`recv`] function on a [`Receiver`].
///
-/// The `recv` operation can only fail if the sending half of a channel is
-/// disconnected, implying that no further messages will ever be received.
+/// The [`recv`] operation can only fail if the sending half of a
+/// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further
+/// messages will ever be received.
+///
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`Receiver`]: struct.Receiver.html
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "rust1", since = "1.0.0")]
pub struct RecvError;
-/// This enumeration is the list of the possible reasons that `try_recv` could
-/// not return data when called.
+/// This enumeration is the list of the possible reasons that [`try_recv`] could
+/// not return data when called. This can occur with both a [`channel`] and
+/// a [`sync_channel`].
+///
+/// [`try_recv`]: struct.Receiver.html#method.try_recv
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "rust1", since = "1.0.0")]
pub enum TryRecvError {
- /// This channel is currently empty, but the sender(s) have not yet
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
/// disconnected, so data may yet become available.
#[stable(feature = "rust1", since = "1.0.0")]
Empty,
- /// This channel's sending half has become disconnected, and there will
- /// never be any more data received on this channel
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
#[stable(feature = "rust1", since = "1.0.0")]
Disconnected,
}
-/// This enumeration is the list of possible errors that `recv_timeout` could
-/// not return data when called.
+/// This enumeration is the list of possible errors that made [`recv_timeout`]
+/// unable to return data when called. This can occur with both a [`channel`] and
+/// a [`sync_channel`].
+///
+/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout
+/// [`channel`]: fn.channel.html
+/// [`sync_channel`]: fn.sync_channel.html
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub enum RecvTimeoutError {
- /// This channel is currently empty, but the sender(s) have not yet
+ /// This **channel** is currently empty, but the **Sender**(s) have not yet
/// disconnected, so data may yet become available.
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
Timeout,
- /// This channel's sending half has become disconnected, and there will
- /// never be any more data received on this channel
+ /// The **channel**'s sending half has become disconnected, and there will
+ /// never be any more data received on it.
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
Disconnected,
}
/// This enumeration is the list of the possible error outcomes for the
-/// `SyncSender::try_send` method.
+/// [`try_send`] method.
+///
+/// [`try_send`]: struct.SyncSender.html#method.try_send
#[stable(feature = "rust1", since = "1.0.0")]
#[derive(PartialEq, Eq, Clone, Copy)]
pub enum TrySendError<T> {
- /// The data could not be sent on the channel because it would require that
+ /// The data could not be sent on the [`sync_channel`] because it would require that
/// the callee block to send the data.
///
/// If this is a buffered channel, then the buffer is full at this time. If
- /// this is not a buffered channel, then there is no receiver available to
+ /// this is not a buffered channel, then there is no [`Receiver`] available to
/// acquire the data.
+ ///
+ /// [`sync_channel`]: fn.sync_channel.html
+ /// [`Receiver`]: struct.Receiver.html
#[stable(feature = "rust1", since = "1.0.0")]
Full(#[stable(feature = "rust1", since = "1.0.0")] T),
- /// This channel's receiving half has disconnected, so the data could not be
+ /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be
/// sent. The data is returned back to the callee in this case.
+ ///
+ /// [`sync_channel`]: fn.sync_channel.html
#[stable(feature = "rust1", since = "1.0.0")]
Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
}
@@ -457,15 +679,27 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
}
/// Creates a new asynchronous channel, returning the sender/receiver halves.
-/// All data sent on the sender will become available on the receiver, and no
-/// send will block the calling thread (this channel has an "infinite buffer").
+/// All data sent on the [`Sender`] will become available on the [`Receiver`] in
+/// the same order as it was sent, and no [`send`] will block the calling thread
+/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will
+/// block after its buffer limit is reached). [`recv`] will block until a message
+/// is available.
///
-/// If the [`Receiver`] is disconnected while trying to [`send()`] with the
-/// [`Sender`], the [`send()`] method will return an error.
+/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but
+/// only one [`Receiver`] is supported.
///
-/// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
-/// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
-/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+/// If the [`Receiver`] is disconnected while trying to [`send`] with the
+/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the
+/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will
+/// return a [`RecvError`].
+///
+/// [`send`]: struct.Sender.html#method.send
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`Sender`]: struct.Sender.html
+/// [`Receiver`]: struct.Receiver.html
+/// [`sync_channel`]: fn.sync_channel.html
+/// [`SendError`]: struct.SendError.html
+/// [`RecvError`]: struct.RecvError.html
///
/// # Examples
///
@@ -473,20 +707,18 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// use std::sync::mpsc::channel;
/// use std::thread;
///
-/// // tx is the sending half (tx for transmission), and rx is the receiving
-/// // half (rx for receiving).
-/// let (tx, rx) = channel();
+/// let (sender, receiver) = channel();
///
/// // Spawn off an expensive computation
/// thread::spawn(move|| {
/// # fn expensive_computation() {}
-/// tx.send(expensive_computation()).unwrap();
+/// sender.send(expensive_computation()).unwrap();
/// });
///
/// // Do some useful work for awhile
///
/// // Let's see what that answer was
-/// println!("{:?}", rx.recv().unwrap());
+/// println!("{:?}", receiver.recv().unwrap());
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
@@ -495,24 +727,32 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
}
/// Creates a new synchronous, bounded channel.
-///
-/// Like asynchronous channels, the [`Receiver`] will block until a message
-/// becomes available. These channels differ greatly in the semantics of the
-/// sender from asynchronous channels, however.
+/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
+/// in the same order as it was sent. Like asynchronous [`channel`]s, the
+/// [`Receiver`] will block until a message becomes available. `sync_channel`
+/// differs greatly in the semantics of the sender, however.
///
/// This channel has an internal buffer on which messages will be queued.
/// `bound` specifies the buffer size. When the internal buffer becomes full,
/// future sends will *block* waiting for the buffer to open up. Note that a
/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
-/// where each [`send()`] will not return until a recv is paired with it.
+/// where each [`send`] will not return until a [`recv`] is paired with it.
///
-/// Like asynchronous channels, if the [`Receiver`] is disconnected while
-/// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will
-/// return an error.
+/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
+/// times, but only one [`Receiver`] is supported.
///
-/// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
-/// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
-/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
+/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
+/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
+/// to [`recv`], the [`recv`] method will return a [`RecvError`].
+///
+/// [`channel`]: fn.channel.html
+/// [`send`]: struct.SyncSender.html#method.send
+/// [`recv`]: struct.Receiver.html#method.recv
+/// [`SyncSender`]: struct.SyncSender.html
+/// [`Receiver`]: struct.Receiver.html
+/// [`SendError`]: struct.SendError.html
+/// [`RecvError`]: struct.RecvError.html
///
/// # Examples
///
@@ -520,18 +760,18 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
/// use std::sync::mpsc::sync_channel;
/// use std::thread;
///
-/// let (tx, rx) = sync_channel(1);
+/// let (sender, receiver) = sync_channel(1);
///
/// // this returns immediately
-/// tx.send(1).unwrap();
+/// sender.send(1).unwrap();
///
/// thread::spawn(move|| {
/// // this will block until the previous message has been received
-/// tx.send(2).unwrap();
+/// sender.send(2).unwrap();
/// });
///
-/// assert_eq!(rx.recv().unwrap(), 1);
-/// assert_eq!(rx.recv().unwrap(), 2);
+/// assert_eq!(receiver.recv().unwrap(), 1);
+/// assert_eq!(receiver.recv().unwrap(), 2);
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
@@ -556,10 +796,13 @@ impl<T> Sender<T> {
/// A successful send occurs when it is determined that the other end of
/// the channel has not hung up already. An unsuccessful send would be one
/// where the corresponding receiver has already been deallocated. Note
- /// that a return value of `Err` means that the data will never be
- /// received, but a return value of `Ok` does *not* mean that the data
+ /// that a return value of [`Err`] means that the data will never be
+ /// received, but a return value of [`Ok`] does *not* mean that the data
/// will be received. It is possible for the corresponding receiver to
- /// hang up immediately after this function returns `Ok`.
+ /// hang up immediately after this function returns [`Ok`].
+ ///
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok
///
/// This method will never block the current thread.
///
@@ -675,10 +918,10 @@ impl<T> Drop for Sender<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Sender {{ .. }}")
+ f.debug_struct("Sender").finish()
}
}
@@ -699,12 +942,37 @@ impl<T> SyncSender<T> {
/// Note that a successful send does *not* guarantee that the receiver will
/// ever see the data if there is a buffer on this channel. Items may be
/// enqueued in the internal buffer for the receiver to receive at a later
- /// time. If the buffer size is 0, however, it can be guaranteed that the
- /// receiver has indeed received the data if this function returns success.
+ /// time. If the buffer size is 0, however, the channel becomes a rendezvous
+ /// channel and it guarantees that the receiver has indeed received
+ /// the data if this function returns success.
///
- /// This function will never panic, but it may return `Err` if the
- /// `Receiver` has disconnected and is no longer able to receive
+ /// This function will never panic, but it may return [`Err`] if the
+ /// [`Receiver`] has disconnected and is no longer able to receive
/// information.
+ ///
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::sync_channel;
+ /// use std::thread;
+ ///
+ /// // Create a rendezvous sync_channel with buffer size 0
+ /// let (sync_sender, receiver) = sync_channel(0);
+ ///
+ /// thread::spawn(move || {
+ /// println!("sending message...");
+ /// sync_sender.send(1).unwrap();
+ /// // Thread is now blocked until the message is received
+ ///
+ /// println!("...message received!");
+ /// });
+ ///
+ /// let msg = receiver.recv().unwrap();
+ /// assert_eq!(1, msg);
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn send(&self, t: T) -> Result<(), SendError<T>> {
self.inner.send(t).map_err(SendError)
@@ -712,13 +980,53 @@ impl<T> SyncSender<T> {
/// Attempts to send a value on this channel without blocking.
///
- /// This method differs from `send` by returning immediately if the
+ /// This method differs from [`send`] by returning immediately if the
/// channel's buffer is full or no receiver is waiting to acquire some
- /// data. Compared with `send`, this function has two failure cases
+ /// data. Compared with [`send`], this function has two failure cases
/// instead of one (one for disconnection, one for a full buffer).
///
- /// See `SyncSender::send` for notes about guarantees of whether the
+ /// See [`send`] for notes about guarantees of whether the
/// receiver has received the data or not if this function is successful.
+ ///
+ /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::sync_channel;
+ /// use std::thread;
+ ///
+ /// // Create a sync_channel with buffer size 1
+ /// let (sync_sender, receiver) = sync_channel(1);
+ /// let sync_sender2 = sync_sender.clone();
+ ///
+ /// // First thread owns sync_sender
+ /// thread::spawn(move || {
+ /// sync_sender.send(1).unwrap();
+ /// sync_sender.send(2).unwrap();
+ /// // Thread blocked
+ /// });
+ ///
+ /// // Second thread owns sync_sender2
+ /// thread::spawn(move || {
+ /// // This will return an error and send
+ /// // no message if the buffer is full
+ /// sync_sender2.try_send(3).is_err();
+ /// });
+ ///
+ /// let mut msg;
+ /// msg = receiver.recv().unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// msg = receiver.recv().unwrap();
+ /// println!("message {} received", msg);
+ ///
+ /// // Third message may have never been sent
+ /// match receiver.try_recv() {
+ /// Ok(msg) => println!("message {} received", msg),
+ /// Err(_) => println!("the third message was never sent"),
+ /// }
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
self.inner.try_send(t)
@@ -740,10 +1048,10 @@ impl<T> Drop for SyncSender<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for SyncSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "SyncSender {{ .. }}")
+ f.debug_struct("SyncSender").finish()
}
}
@@ -756,7 +1064,7 @@ impl<T> Receiver<T> {
Receiver { inner: UnsafeCell::new(inner) }
}
- /// Attempts to return a pending value on this receiver without blocking
+ /// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
/// become available. Instead, this will always return immediately with a
@@ -764,6 +1072,21 @@ impl<T> Receiver<T> {
///
/// This is useful for a flavor of "optimistic check" before deciding to
/// block on a receiver.
+ ///
+ /// Compared with [`recv`], this function has two failure cases instead of one
+ /// (one for disconnection, one for an empty buffer).
+ ///
+ /// [`recv`]: struct.Receiver.html#method.recv
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::{Receiver, channel};
+ ///
+ /// let (_, receiver): (_, Receiver<i32>) = channel();
+ ///
+ /// assert!(receiver.try_recv().is_err());
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_recv(&self) -> Result<T, TryRecvError> {
loop {
@@ -819,15 +1142,19 @@ impl<T> Receiver<T> {
///
/// This function will always block the current thread if there is no data
/// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding `Sender`, then this receiver will wake up and
- /// return that message.
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
///
- /// If the corresponding `Sender` has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return `Err` to
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
/// indicate that no more messages can ever be received on this channel.
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
/// # Examples
///
/// ```
@@ -907,25 +1234,58 @@ impl<T> Receiver<T> {
///
/// This function will always block the current thread if there is no data
/// available and it's possible for more data to be sent. Once a message is
- /// sent to the corresponding `Sender`, then this receiver will wake up and
- /// return that message.
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
///
- /// If the corresponding `Sender` has disconnected, or it disconnects while
- /// this call is blocking, this call will wake up and return `Err` to
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
/// indicate that no more messages can ever be received on this channel.
/// However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
/// # Examples
///
+ /// Successfully receiving value before encountering timeout:
+ ///
+ /// ```no_run
+ /// use std::thread;
+ /// use std::time::Duration;
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching timeout:
+ ///
/// ```no_run
- /// use std::sync::mpsc::{self, RecvTimeoutError};
+ /// use std::thread;
/// use std::time::Duration;
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
///
- /// let (send, recv) = mpsc::channel::<()>();
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
///
- /// let timeout = Duration::from_millis(100);
- /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
+ /// assert_eq!(
+ /// recv.recv_timeout(Duration::from_millis(400)),
+ /// Err(mpsc::RecvTimeoutError::Timeout)
+ /// );
/// ```
#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
@@ -937,11 +1297,72 @@ impl<T> Receiver<T> {
Err(TryRecvError::Disconnected)
=> Err(RecvTimeoutError::Disconnected),
Err(TryRecvError::Empty)
- => self.recv_max_until(Instant::now() + timeout)
+ => self.recv_deadline(Instant::now() + timeout)
}
}
- fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up, or if `deadline` is reached.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent. Once a message is
+ /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this
+ /// receiver will wake up and return that message.
+ ///
+ /// If the corresponding [`Sender`] has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return [`Err`] to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// [`Sender`]: struct.Sender.html
+ /// [`SyncSender`]: struct.SyncSender.html
+ /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err
+ ///
+ /// # Examples
+ ///
+ /// Successfully receiving value before reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(deadline_api)]
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Ok('a')
+ /// );
+ /// ```
+ ///
+ /// Receiving an error upon reaching deadline:
+ ///
+ /// ```no_run
+ /// #![feature(deadline_api)]
+ /// use std::thread;
+ /// use std::time::{Duration, Instant};
+ /// use std::sync::mpsc;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_millis(800));
+ /// send.send('a').unwrap();
+ /// });
+ ///
+ /// assert_eq!(
+ /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)),
+ /// Err(mpsc::RecvTimeoutError::Timeout)
+ /// );
+ /// ```
+ #[unstable(feature = "deadline_api", issue = "46316")]
+ pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
use self::RecvTimeoutError::*;
loop {
@@ -993,7 +1414,31 @@ impl<T> Receiver<T> {
}
/// Returns an iterator that will block waiting for messages, but never
- /// `panic!`. It will return `None` when the channel has hung up.
+ /// [`panic!`]. It will return [`None`] when the channel has hung up.
+ ///
+ /// [`panic!`]: ../../../std/macro.panic.html
+ /// [`None`]: ../../../std/option/enum.Option.html#variant.None
+ ///
+ /// # Examples
+ ///
+ /// ```rust
+ /// use std::sync::mpsc::channel;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = channel();
+ ///
+ /// thread::spawn(move || {
+ /// send.send(1).unwrap();
+ /// send.send(2).unwrap();
+ /// send.send(3).unwrap();
+ /// });
+ ///
+ /// let mut iter = recv.iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn iter(&self) -> Iter<T> {
Iter { rx: self }
@@ -1001,8 +1446,42 @@ impl<T> Receiver<T> {
/// Returns an iterator that will attempt to yield all pending values.
/// It will return `None` if there are no more pending values or if the
- /// channel has hung up. The iterator will never `panic!` or block the
+ /// channel has hung up. The iterator will never [`panic!`] or block the
/// user by waiting for values.
+ ///
+ /// [`panic!`]: ../../../std/macro.panic.html
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use std::sync::mpsc::channel;
+ /// use std::thread;
+ /// use std::time::Duration;
+ ///
+ /// let (sender, receiver) = channel();
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// thread::spawn(move || {
+ /// thread::sleep(Duration::from_secs(1));
+ /// sender.send(1).unwrap();
+ /// sender.send(2).unwrap();
+ /// sender.send(3).unwrap();
+ /// });
+ ///
+ /// // nothing is in the buffer yet
+ /// assert!(receiver.try_iter().next().is_none());
+ ///
+ /// // block for two seconds
+ /// thread::sleep(Duration::from_secs(2));
+ ///
+ /// let mut iter = receiver.try_iter();
+ /// assert_eq!(iter.next(), Some(1));
+ /// assert_eq!(iter.next(), Some(2));
+ /// assert_eq!(iter.next(), Some(3));
+ /// assert_eq!(iter.next(), None);
+ /// ```
#[stable(feature = "receiver_try_iter", since = "1.15.0")]
pub fn try_iter(&self) -> TryIter<T> {
TryIter { rx: self }
@@ -1132,10 +1611,10 @@ impl<T> Drop for Receiver<T> {
}
}
-#[stable(feature = "mpsc_debug", since = "1.7.0")]
+#[stable(feature = "mpsc_debug", since = "1.8.0")]
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Receiver {{ .. }}")
+ f.debug_struct("Receiver").finish()
}
}
@@ -1207,6 +1686,15 @@ impl<T: Send> error::Error for TrySendError<T> {
}
}
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl<T> From<SendError<T>> for TrySendError<T> {
+ fn from(err: SendError<T>) -> TrySendError<T> {
+ match err {
+ SendError(t) => TrySendError::Disconnected(t),
+ }
+ }
+}
+
#[stable(feature = "rust1", since = "1.0.0")]
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -1259,7 +1747,16 @@ impl error::Error for TryRecvError {
}
}
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl From<RecvError> for TryRecvError {
+ fn from(err: RecvError) -> TryRecvError {
+ match err {
+ RecvError => TryRecvError::Disconnected,
+ }
+ }
+}
+
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
impl fmt::Display for RecvTimeoutError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
@@ -1273,7 +1770,7 @@ impl fmt::Display for RecvTimeoutError {
}
}
-#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")]
impl error::Error for RecvTimeoutError {
fn description(&self) -> &str {
match *self {
@@ -1291,6 +1788,15 @@ impl error::Error for RecvTimeoutError {
}
}
+#[stable(feature = "mpsc_error_conversions", since = "1.24.0")]
+impl From<RecvError> for RecvTimeoutError {
+ fn from(err: RecvError) -> RecvTimeoutError {
+ match err {
+ RecvError => RecvTimeoutError::Disconnected,
+ }
+ }
+}
+
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests {
use env;
@@ -1539,7 +2045,7 @@ mod tests {
fn oneshot_single_thread_send_then_recv() {
let (tx, rx) = channel::<Box<i32>>();
tx.send(box 10).unwrap();
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
#[test]
@@ -1596,7 +2102,7 @@ mod tests {
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = channel::<Box<i32>>();
let _t = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
});
tx.send(box 10).unwrap();
@@ -1609,7 +2115,7 @@ mod tests {
drop(tx);
});
let res = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}).join();
assert!(res.is_err());
}
@@ -1663,7 +2169,7 @@ mod tests {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
}
@@ -1688,7 +2194,7 @@ mod tests {
if i == 10 { return }
thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box i);
+ assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
}
@@ -2225,7 +2731,7 @@ mod sync_tests {
fn oneshot_single_thread_send_then_recv() {
let (tx, rx) = sync_channel::<Box<i32>>(1);
tx.send(box 10).unwrap();
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
#[test]
@@ -2297,7 +2803,7 @@ mod sync_tests {
fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = sync_channel::<Box<i32>>(0);
let _t = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
});
tx.send(box 10).unwrap();
@@ -2310,7 +2816,7 @@ mod sync_tests {
drop(tx);
});
let res = thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}).join();
assert!(res.is_err());
}
@@ -2364,7 +2870,7 @@ mod sync_tests {
let _t = thread::spawn(move|| {
tx.send(box 10).unwrap();
});
- assert!(rx.recv().unwrap() == box 10);
+ assert!(*rx.recv().unwrap() == 10);
}
}
@@ -2389,7 +2895,7 @@ mod sync_tests {
if i == 10 { return }
thread::spawn(move|| {
- assert!(rx.recv().unwrap() == box i);
+ assert!(*rx.recv().unwrap() == i);
recv(rx, i + 1);
});
}
@@ -2593,22 +3099,4 @@ mod sync_tests {
repro()
}
}
-
- #[test]
- fn fmt_debug_sender() {
- let (tx, _) = channel::<i32>();
- assert_eq!(format!("{:?}", tx), "Sender { .. }");
- }
-
- #[test]
- fn fmt_debug_recv() {
- let (_, rx) = channel::<i32>();
- assert_eq!(format!("{:?}", rx), "Receiver { .. }");
- }
-
- #[test]
- fn fmt_debug_sync_sender() {
- let (tx, _) = sync_channel::<i32>(1);
- assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
- }
}
diff --git a/ctr-std/src/sync/mpsc/mpsc_queue.rs b/ctr-std/src/sync/mpsc/mpsc_queue.rs
index 8d80f94..296773d 100644
--- a/ctr-std/src/sync/mpsc/mpsc_queue.rs
+++ b/ctr-std/src/sync/mpsc/mpsc_queue.rs
@@ -1,29 +1,12 @@
-/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
- * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and documentation are
- * those of the authors and should not be interpreted as representing official
- * policies, either expressed or implied, of Dmitry Vyukov.
- */
+// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
//! A mostly lock-free multi-producer, single consumer queue.
//!
diff --git a/ctr-std/src/sync/mpsc/select.rs b/ctr-std/src/sync/mpsc/select.rs
index 8b4da53..a9f3cea 100644
--- a/ctr-std/src/sync/mpsc/select.rs
+++ b/ctr-std/src/sync/mpsc/select.rs
@@ -148,12 +148,12 @@ impl Select {
let id = self.next_id.get();
self.next_id.set(id + 1);
Handle {
- id: id,
+ id,
selector: self.inner.get(),
next: ptr::null_mut(),
prev: ptr::null_mut(),
added: false,
- rx: rx,
+ rx,
packet: rx,
}
}
@@ -354,13 +354,13 @@ impl Iterator for Packets {
impl fmt::Debug for Select {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Select {{ .. }}")
+ f.debug_struct("Select").finish()
}
}
impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "Handle {{ .. }}")
+ f.debug_struct("Handle").finish()
}
}
@@ -774,18 +774,4 @@ mod tests {
}
}
}
-
- #[test]
- fn fmt_debug_select() {
- let sel = Select::new();
- assert_eq!(format!("{:?}", sel), "Select { .. }");
- }
-
- #[test]
- fn fmt_debug_handle() {
- let (_, rx) = channel::<i32>();
- let sel = Select::new();
- let handle = sel.handle(&rx);
- assert_eq!(format!("{:?}", handle), "Handle { .. }");
- }
}
diff --git a/ctr-std/src/sync/mpsc/spsc_queue.rs b/ctr-std/src/sync/mpsc/spsc_queue.rs
index 5858e4b..cc4be92 100644
--- a/ctr-std/src/sync/mpsc/spsc_queue.rs
+++ b/ctr-std/src/sync/mpsc/spsc_queue.rs
@@ -1,31 +1,12 @@
-/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
- * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
- * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
- * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
- * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and documentation are
- * those of the authors and should not be interpreted as representing official
- * policies, either expressed or implied, of Dmitry Vyukov.
- */
-
-// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
+// Copyright 2017 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
//! A single-producer single-consumer concurrent queue
//!
@@ -33,18 +14,23 @@
//! concurrently between two threads. This data structure is safe to use and
//! enforces the semantics that there is one pusher and one popper.
+// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
+
use alloc::boxed::Box;
use core::ptr;
use core::cell::UnsafeCell;
use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+use super::cache_aligned::CacheAligned;
+
// Node within the linked list queue of messages to send
struct Node<T> {
// FIXME: this could be an uninitialized T if we're careful enough, and
// that would reduce memory usage (and be a bit faster).
// is it worth it?
value: Option<T>, // nullable for re-use of nodes
+ cached: bool, // This node goes into the node cache
next: AtomicPtr<Node<T>>, // next node in the queue
}
@@ -52,38 +38,55 @@ struct Node<T> {
/// but it can be safely shared in an Arc if it is guaranteed that there
/// is only one popper and one pusher touching the queue at any one point in
/// time.
-pub struct Queue<T> {
+pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
// consumer fields
+ consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
+
+ // producer fields
+ producer: CacheAligned<Producer<T, ProducerAddition>>,
+}
+
+struct Consumer<T, Addition> {
tail: UnsafeCell<*mut Node<T>>, // where to pop from
tail_prev: AtomicPtr<Node<T>>, // where to pop from
+ cache_bound: usize, // maximum cache size
+ cached_nodes: AtomicUsize, // number of nodes marked as cachable
+ addition: Addition,
+}
- // producer fields
+struct Producer<T, Addition> {
head: UnsafeCell<*mut Node<T>>, // where to push to
first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
-
- // Cache maintenance fields. Additions and subtractions are stored
- // separately in order to allow them to use nonatomic addition/subtraction.
- cache_bound: usize,
- cache_additions: AtomicUsize,
- cache_subtractions: AtomicUsize,
+ addition: Addition,
}
-unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
-unsafe impl<T: Send> Sync for Queue<T> { }
+unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
impl<T> Node<T> {
fn new() -> *mut Node<T> {
Box::into_raw(box Node {
value: None,
+ cached: false,
next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
})
}
}
-impl<T> Queue<T> {
- /// Creates a new queue.
+impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
+
+ /// Creates a new queue. With given additional elements in the producer and
+ /// consumer portions of the queue.
+ ///
+ /// Due to the performance implications of cache-contention,
+ /// we wish to keep fields used mainly by the producer on a separate cache
+ /// line than those used by the consumer.
+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
+ /// allocate one for small fields, so we allow users to insert additional
+ /// fields into the cache lines already allocated by this for the producer
+ /// and consumer.
///
/// This is unsafe as the type system doesn't enforce a single
/// consumer-producer relationship. It also allows the consumer to `pop`
@@ -100,19 +103,28 @@ impl<T> Queue<T> {
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
- pub unsafe fn new(bound: usize) -> Queue<T> {
+ pub unsafe fn with_additions(
+ bound: usize,
+ producer_addition: ProducerAddition,
+ consumer_addition: ConsumerAddition,
+ ) -> Self {
let n1 = Node::new();
let n2 = Node::new();
(*n1).next.store(n2, Ordering::Relaxed);
Queue {
- tail: UnsafeCell::new(n2),
- tail_prev: AtomicPtr::new(n1),
- head: UnsafeCell::new(n2),
- first: UnsafeCell::new(n1),
- tail_copy: UnsafeCell::new(n1),
- cache_bound: bound,
- cache_additions: AtomicUsize::new(0),
- cache_subtractions: AtomicUsize::new(0),
+ consumer: CacheAligned::new(Consumer {
+ tail: UnsafeCell::new(n2),
+ tail_prev: AtomicPtr::new(n1),
+ cache_bound: bound,
+ cached_nodes: AtomicUsize::new(0),
+ addition: consumer_addition
+ }),
+ producer: CacheAligned::new(Producer {
+ head: UnsafeCell::new(n2),
+ first: UnsafeCell::new(n1),
+ tail_copy: UnsafeCell::new(n1),
+ addition: producer_addition
+ }),
}
}
@@ -126,35 +138,25 @@ impl<T> Queue<T> {
assert!((*n).value.is_none());
(*n).value = Some(t);
(*n).next.store(ptr::null_mut(), Ordering::Relaxed);
- (**self.head.get()).next.store(n, Ordering::Release);
- *self.head.get() = n;
+ (**self.producer.head.get()).next.store(n, Ordering::Release);
+ *(&self.producer.head).get() = n;
}
}
unsafe fn alloc(&self) -> *mut Node<T> {
// First try to see if we can consume the 'first' node for our uses.
- // We try to avoid as many atomic instructions as possible here, so
- // the addition to cache_subtractions is not atomic (plus we're the
- // only one subtracting from the cache).
- if *self.first.get() != *self.tail_copy.get() {
- if self.cache_bound > 0 {
- let b = self.cache_subtractions.load(Ordering::Relaxed);
- self.cache_subtractions.store(b + 1, Ordering::Relaxed);
- }
- let ret = *self.first.get();
- *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ if *self.producer.first.get() != *self.producer.tail_copy.get() {
+ let ret = *self.producer.first.get();
+ *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If the above fails, then update our copy of the tail and try
// again.
- *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
- if *self.first.get() != *self.tail_copy.get() {
- if self.cache_bound > 0 {
- let b = self.cache_subtractions.load(Ordering::Relaxed);
- self.cache_subtractions.store(b + 1, Ordering::Relaxed);
- }
- let ret = *self.first.get();
- *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ *self.producer.0.tail_copy.get() =
+ self.consumer.tail_prev.load(Ordering::Acquire);
+ if *self.producer.first.get() != *self.producer.tail_copy.get() {
+ let ret = *self.producer.first.get();
+ *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
return ret;
}
// If all of that fails, then we have to allocate a new node
@@ -170,27 +172,27 @@ impl<T> Queue<T> {
// sentinel from where we should start popping from. Hence, look at
// tail's next field and see if we can use it. If we do a pop, then
// the current tail node is a candidate for going into the cache.
- let tail = *self.tail.get();
+ let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() { return None }
assert!((*next).value.is_some());
let ret = (*next).value.take();
- *self.tail.get() = next;
- if self.cache_bound == 0 {
- self.tail_prev.store(tail, Ordering::Release);
+ *self.consumer.0.tail.get() = next;
+ if self.consumer.cache_bound == 0 {
+ self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
- // FIXME: this is dubious with overflow.
- let additions = self.cache_additions.load(Ordering::Relaxed);
- let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
- let size = additions - subtractions;
-
- if size < self.cache_bound {
- self.tail_prev.store(tail, Ordering::Release);
- self.cache_additions.store(additions + 1, Ordering::Relaxed);
+ let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
+ if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
+ self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
+ (*tail).cached = true;
+ }
+
+ if (*tail).cached {
+ self.consumer.tail_prev.store(tail, Ordering::Release);
} else {
- (*self.tail_prev.load(Ordering::Relaxed))
- .next.store(next, Ordering::Relaxed);
+ (*self.consumer.tail_prev.load(Ordering::Relaxed))
+ .next.store(next, Ordering::Relaxed);
// We have successfully erased all references to 'tail', so
// now we can safely drop it.
let _: Box<Node<T>> = Box::from_raw(tail);
@@ -211,17 +213,25 @@ impl<T> Queue<T> {
// This is essentially the same as above with all the popping bits
// stripped out.
unsafe {
- let tail = *self.tail.get();
+ let tail = *self.consumer.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if next.is_null() { None } else { (*next).value.as_mut() }
}
}
+
+ pub fn producer_addition(&self) -> &ProducerAddition {
+ &self.producer.addition
+ }
+
+ pub fn consumer_addition(&self) -> &ConsumerAddition {
+ &self.consumer.addition
+ }
}
-impl<T> Drop for Queue<T> {
+impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
fn drop(&mut self) {
unsafe {
- let mut cur = *self.first.get();
+ let mut cur = *self.producer.first.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
let _n: Box<Node<T>> = Box::from_raw(cur);
@@ -241,7 +251,7 @@ mod tests {
#[test]
fn smoke() {
unsafe {
- let queue = Queue::new(0);
+ let queue = Queue::with_additions(0, (), ());
queue.push(1);
queue.push(2);
assert_eq!(queue.pop(), Some(1));
@@ -258,7 +268,7 @@ mod tests {
#[test]
fn peek() {
unsafe {
- let queue = Queue::new(0);
+ let queue = Queue::with_additions(0, (), ());
queue.push(vec![1]);
// Ensure the borrowchecker works
@@ -281,7 +291,7 @@ mod tests {
#[test]
fn drop_full() {
unsafe {
- let q: Queue<Box<_>> = Queue::new(0);
+ let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
q.push(box 1);
q.push(box 2);
}
@@ -290,7 +300,7 @@ mod tests {
#[test]
fn smoke_bound() {
unsafe {
- let q = Queue::new(0);
+ let q = Queue::with_additions(0, (), ());
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
@@ -312,7 +322,7 @@ mod tests {
}
unsafe fn stress_bound(bound: usize) {
- let q = Arc::new(Queue::new(bound));
+ let q = Arc::new(Queue::with_additions(bound, (), ()));
let (tx, rx) = channel();
let q2 = q.clone();
diff --git a/ctr-std/src/sync/mpsc/stream.rs b/ctr-std/src/sync/mpsc/stream.rs
index 47cd897..d1515eb 100644
--- a/ctr-std/src/sync/mpsc/stream.rs
+++ b/ctr-std/src/sync/mpsc/stream.rs
@@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5;
const MAX_STEALS: isize = 1 << 20;
pub struct Packet<T> {
- queue: spsc::Queue<Message<T>>, // internal queue for all message
+ // internal queue for all messages
+ queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
+}
+struct ProducerAddition {
cnt: AtomicIsize, // How many items are on this channel
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
}
+struct ConsumerAddition {
+ steals: UnsafeCell<isize>, // How many times has a port received without blocking?
+}
+
+
pub enum Failure<T> {
Empty,
Disconnected,
@@ -78,13 +85,18 @@ enum Message<T> {
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
- queue: unsafe { spsc::Queue::new(128) },
-
- cnt: AtomicIsize::new(0),
- steals: UnsafeCell::new(0),
- to_wake: AtomicUsize::new(0),
-
- port_dropped: AtomicBool::new(false),
+ queue: unsafe { spsc::Queue::with_additions(
+ 128,
+ ProducerAddition {
+ cnt: AtomicIsize::new(0),
+ to_wake: AtomicUsize::new(0),
+
+ port_dropped: AtomicBool::new(false),
+ },
+ ConsumerAddition {
+ steals: UnsafeCell::new(0),
+ }
+ )},
}
}
@@ -92,7 +104,7 @@ impl<T> Packet<T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
- if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+ if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
match self.do_send(Data(t)) {
UpSuccess | UpDisconnected => {},
@@ -104,14 +116,16 @@ impl<T> Packet<T> {
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
- if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+ if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
+ return UpDisconnected
+ }
self.do_send(GoUp(up))
}
fn do_send(&self, t: Message<T>) -> UpgradeResult {
self.queue.push(t);
- match self.cnt.fetch_add(1, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
-1 => UpWoke(self.take_to_wake()),
// As as described before, SPSC queues must be >= -2
@@ -125,7 +139,7 @@ impl<T> Packet<T> {
// will never remove this data. We can only have at most one item to
// drain (the port drains the rest).
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
let first = self.queue.pop();
let second = self.queue.pop();
assert!(second.is_none());
@@ -144,8 +158,8 @@ impl<T> Packet<T> {
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
- let ptr = self.to_wake.load(Ordering::SeqCst);
- self.to_wake.store(0, Ordering::SeqCst);
+ let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
unsafe { SignalToken::cast_from_usize(ptr) }
}
@@ -154,14 +168,16 @@ impl<T> Packet<T> {
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
- self.to_wake.store(ptr, Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
- let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+ let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
- match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+ match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
+ }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
@@ -170,7 +186,7 @@ impl<T> Packet<T> {
}
}
- self.to_wake.store(0, Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
Err(unsafe { SignalToken::cast_from_usize(ptr) })
}
@@ -201,7 +217,7 @@ impl<T> Packet<T> {
// "steal" factored into the channel count above).
data @ Ok(..) |
data @ Err(Upgraded(..)) => unsafe {
- *self.steals.get() -= 1;
+ *self.queue.consumer_addition().steals.get() -= 1;
data
},
@@ -223,20 +239,21 @@ impl<T> Packet<T> {
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => unsafe {
- if *self.steals.get() > MAX_STEALS {
- match self.cnt.swap(0, Ordering::SeqCst) {
+ if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
+ match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(
+ DISCONNECTED, Ordering::SeqCst);
}
n => {
- let m = cmp::min(n, *self.steals.get());
- *self.steals.get() -= m;
+ let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
+ *self.queue.consumer_addition().steals.get() -= m;
self.bump(n - m);
}
}
- assert!(*self.steals.get() >= 0);
+ assert!(*self.queue.consumer_addition().steals.get() >= 0);
}
- *self.steals.get() += 1;
+ *self.queue.consumer_addition().steals.get() += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
@@ -244,7 +261,7 @@ impl<T> Packet<T> {
},
None => {
- match self.cnt.load(Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
// This is a little bit of a tricky case. We failed to pop
@@ -273,7 +290,7 @@ impl<T> Packet<T> {
pub fn drop_chan(&self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
- match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
@@ -300,7 +317,7 @@ impl<T> Packet<T> {
// sends are gated on this flag, so we're immediately guaranteed that
// there are a bounded number of active sends that we'll have to deal
// with.
- self.port_dropped.store(true, Ordering::SeqCst);
+ self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
// Now that we're guaranteed to deal with a bounded number of senders,
// we need to drain the queue. This draining process happens atomically
@@ -310,9 +327,9 @@ impl<T> Packet<T> {
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
- let mut steals = unsafe { *self.steals.get() };
+ let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
while {
- let cnt = self.cnt.compare_and_swap(
+ let cnt = self.queue.producer_addition().cnt.compare_and_swap(
steals, DISCONNECTED, Ordering::SeqCst);
cnt != DISCONNECTED && cnt != steals
} {
@@ -353,9 +370,9 @@ impl<T> Packet<T> {
// increment the count on the channel (used for selection)
fn bump(&self, amt: isize) -> isize {
- match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
DISCONNECTED
}
n => n
@@ -404,8 +421,8 @@ impl<T> Packet<T> {
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
- assert_eq!(unsafe { *self.steals.get() }, 0);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
return Ok(true)
}
@@ -418,7 +435,7 @@ impl<T> Packet<T> {
// If we were previously disconnected, then we know for sure that there
// is no thread in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
@@ -441,13 +458,13 @@ impl<T> Packet<T> {
if prev < 0 {
drop(self.take_to_wake());
} else {
- while self.to_wake.load(Ordering::SeqCst) != 0 {
+ while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
thread::yield_now();
}
}
unsafe {
- assert_eq!(*self.steals.get(), 0);
- *self.steals.get() = steals;
+ assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
+ *self.queue.consumer_addition().steals.get() = steals;
}
// if we were previously positive, then there's surely data to
@@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> {
// disconnection, but also a proper fence before the read of
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
- assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
}
}
diff --git a/ctr-std/src/sync/mpsc/sync.rs b/ctr-std/src/sync/mpsc/sync.rs
index 1d16e00..90f12c8 100644
--- a/ctr-std/src/sync/mpsc/sync.rs
+++ b/ctr-std/src/sync/mpsc/sync.rs
@@ -177,7 +177,7 @@ impl<T> Packet<T> {
lock: Mutex::new(State {
disconnected: false,
blocker: NoneBlocked,
- cap: cap,
+ cap,
canceled: None,
queue: Queue {
head: ptr::null_mut(),
diff --git a/ctr-std/src/sync/mutex.rs b/ctr-std/src/sync/mutex.rs
index 97b84d5..3b4904c 100644
--- a/ctr-std/src/sync/mutex.rs
+++ b/ctr-std/src/sync/mutex.rs
@@ -10,7 +10,6 @@
use cell::UnsafeCell;
use fmt;
-use marker;
use mem;
use ops::{Deref, DerefMut};
use ptr;
@@ -20,30 +19,38 @@ use sys_common::poison::{self, TryLockError, TryLockResult, LockResult};
/// A mutual exclusion primitive useful for protecting shared data
///
/// This mutex will block threads waiting for the lock to become available. The
-/// mutex can also be statically initialized or created via a `new`
+/// mutex can also be statically initialized or created via a [`new`]
/// constructor. Each mutex has a type parameter which represents the data that
/// it is protecting. The data can only be accessed through the RAII guards
-/// returned from `lock` and `try_lock`, which guarantees that the data is only
+/// returned from [`lock`] and [`try_lock`], which guarantees that the data is only
/// ever accessed when the mutex is locked.
///
/// # Poisoning
///
/// The mutexes in this module implement a strategy called "poisoning" where a
/// mutex is considered poisoned whenever a thread panics while holding the
-/// lock. Once a mutex is poisoned, all other threads are unable to access the
+/// mutex. Once a mutex is poisoned, all other threads are unable to access the
/// data by default as it is likely tainted (some invariant is not being
/// upheld).
///
-/// For a mutex, this means that the `lock` and `try_lock` methods return a
-/// `Result` which indicates whether a mutex has been poisoned or not. Most
-/// usage of a mutex will simply `unwrap()` these results, propagating panics
+/// For a mutex, this means that the [`lock`] and [`try_lock`] methods return a
+/// [`Result`] which indicates whether a mutex has been poisoned or not. Most
+/// usage of a mutex will simply [`unwrap()`] these results, propagating panics
/// among threads to ensure that a possibly invalid invariant is not witnessed.
///
/// A poisoned mutex, however, does not prevent all access to the underlying
-/// data. The `PoisonError` type has an `into_inner` method which will return
+/// data. The [`PoisonError`] type has an [`into_inner`] method which will return
/// the guard that would have otherwise been returned on a successful lock. This
/// allows access to the data, despite the lock being poisoned.
///
+/// [`new`]: #method.new
+/// [`lock`]: #method.lock
+/// [`try_lock`]: #method.try_lock
+/// [`Result`]: ../../std/result/enum.Result.html
+/// [`unwrap()`]: ../../std/result/enum.Result.html#method.unwrap
+/// [`PoisonError`]: ../../std/sync/struct.PoisonError.html
+/// [`into_inner`]: ../../std/sync/struct.PoisonError.html#method.into_inner
+///
/// # Examples
///
/// ```
@@ -61,7 +68,7 @@ use sys_common::poison::{self, TryLockError, TryLockResult, LockResult};
/// let data = Arc::new(Mutex::new(0));
///
/// let (tx, rx) = channel();
-/// for _ in 0..10 {
+/// for _ in 0..N {
/// let (data, tx) = (data.clone(), tx.clone());
/// thread::spawn(move || {
/// // The shared state can only be accessed once the lock is held.
@@ -115,7 +122,7 @@ pub struct Mutex<T: ?Sized> {
// Note that this mutex is in a *box*, not inlined into the struct itself.
// Once a native mutex has been used once, its address can never change (it
// can't be moved). This mutex type can be safely moved at any time, so to
- // ensure that the native mutex is used correctly we box the inner lock to
+ // ensure that the native mutex is used correctly we box the inner mutex to
// give it a constant address.
inner: Box<sys::Mutex>,
poison: poison::Flag,
@@ -132,16 +139,16 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> { }
/// An RAII implementation of a "scoped lock" of a mutex. When this structure is
/// dropped (falls out of scope), the lock will be unlocked.
///
-/// The data protected by the mutex can be access through this guard via its
+/// The data protected by the mutex can be accessed through this guard via its
/// [`Deref`] and [`DerefMut`] implementations.
///
-/// This structure is created by the [`lock()`] and [`try_lock()`] methods on
+/// This structure is created by the [`lock`] and [`try_lock`] methods on
/// [`Mutex`].
///
/// [`Deref`]: ../../std/ops/trait.Deref.html
/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html
-/// [`lock()`]: struct.Mutex.html#method.lock
-/// [`try_lock()`]: struct.Mutex.html#method.try_lock
+/// [`lock`]: struct.Mutex.html#method.lock
+/// [`try_lock`]: struct.Mutex.html#method.try_lock
/// [`Mutex`]: struct.Mutex.html
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
@@ -153,7 +160,9 @@ pub struct MutexGuard<'a, T: ?Sized + 'a> {
}
#[stable(feature = "rust1", since = "1.0.0")]
-impl<'a, T: ?Sized> !marker::Send for MutexGuard<'a, T> {}
+impl<'a, T: ?Sized> !Send for MutexGuard<'a, T> { }
+#[stable(feature = "mutexguard", since = "1.19.0")]
+unsafe impl<'a, T: ?Sized + Sync> Sync for MutexGuard<'a, T> { }
impl<T> Mutex<T> {
/// Creates a new mutex in an unlocked state ready for use.
@@ -183,7 +192,7 @@ impl<T: ?Sized> Mutex<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
///
/// This function will block the local thread until it is available to acquire
- /// the mutex. Upon returning, the thread is the only thread with the mutex
+ /// the mutex. Upon returning, the thread is the only thread with the lock
/// held. An RAII guard is returned to allow scoped unlock of the lock. When
/// the guard goes out of scope, the mutex will be unlocked.
///
@@ -225,7 +234,7 @@ impl<T: ?Sized> Mutex<T> {
/// Attempts to acquire this lock.
///
- /// If the lock could not be acquired at this time, then `Err` is returned.
+ /// If the lock could not be acquired at this time, then [`Err`] is returned.
/// Otherwise, an RAII guard is returned. The lock will be unlocked when the
/// guard is dropped.
///
@@ -237,6 +246,8 @@ impl<T: ?Sized> Mutex<T> {
/// this call will return failure if the mutex would otherwise be
/// acquired.
///
+ /// [`Err`]: ../../std/result/enum.Result.html#variant.Err
+ ///
/// # Examples
///
/// ```
@@ -267,9 +278,9 @@ impl<T: ?Sized> Mutex<T> {
}
}
- /// Determines whether the lock is poisoned.
+ /// Determines whether the mutex is poisoned.
///
- /// If another thread is active, the lock can still become poisoned at any
+ /// If another thread is active, the mutex can still become poisoned at any
/// time. You should not trust a `false` value for program correctness
/// without additional synchronization.
///
@@ -312,7 +323,7 @@ impl<T: ?Sized> Mutex<T> {
#[stable(feature = "mutex_into_inner", since = "1.6.0")]
pub fn into_inner(self) -> LockResult<T> where T: Sized {
// We know statically that there are no outstanding references to
- // `self` so there's no need to lock the inner lock.
+ // `self` so there's no need to lock the inner mutex.
//
// To get the inner value, we'd like to call `data.into_inner()`,
// but because `Mutex` impl-s `Drop`, we can't move out of it, so
@@ -353,7 +364,7 @@ impl<T: ?Sized> Mutex<T> {
#[stable(feature = "mutex_get_mut", since = "1.6.0")]
pub fn get_mut(&mut self) -> LockResult<&mut T> {
// We know statically that there are no other references to `self`, so
- // there's no need to lock the inner lock.
+ // there's no need to lock the inner mutex.
let data = unsafe { &mut *self.data.get() };
poison::map_result(self.poison.borrow(), |_| data )
}
@@ -371,7 +382,18 @@ unsafe impl<#[may_dangle] T: ?Sized> Drop for Mutex<T> {
}
}
-#[stable(feature = "mutex_default", since = "1.9.0")]
+#[stable(feature = "mutex_from", since = "1.24.0")]
+impl<T> From<T> for Mutex<T> {
+ /// Creates a new mutex in an unlocked state ready for use.
+ /// This is equivalent to [`Mutex::new`].
+ ///
+ /// [`Mutex::new`]: #method.new
+ fn from(t: T) -> Self {
+ Mutex::new(t)
+ }
+}
+
+#[stable(feature = "mutex_default", since = "1.10.0")]
impl<T: ?Sized + Default> Default for Mutex<T> {
/// Creates a `Mutex<T>`, with the `Default` value for T.
fn default() -> Mutex<T> {
@@ -383,11 +405,18 @@ impl<T: ?Sized + Default> Default for Mutex<T> {
impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_lock() {
- Ok(guard) => write!(f, "Mutex {{ data: {:?} }}", &*guard),
+ Ok(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
Err(TryLockError::Poisoned(err)) => {
- write!(f, "Mutex {{ data: Poisoned({:?}) }}", &**err.get_ref())
+ f.debug_struct("Mutex").field("data", &&**err.get_ref()).finish()
},
- Err(TryLockError::WouldBlock) => write!(f, "Mutex {{ <locked> }}")
+ Err(TryLockError::WouldBlock) => {
+ struct LockedPlaceholder;
+ impl fmt::Debug for LockedPlaceholder {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") }
+ }
+
+ f.debug_struct("Mutex").field("data", &LockedPlaceholder).finish()
+ }
}
}
}
@@ -439,6 +468,13 @@ impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'a, T> {
}
}
+#[stable(feature = "std_guard_impls", since = "1.20.0")]
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ (**self).fmt(f)
+ }
+}
+
pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex {
&guard.__lock.inner
}
@@ -459,9 +495,6 @@ mod tests {
#[derive(Eq, PartialEq, Debug)]
struct NonCopy(i32);
- unsafe impl<T: Send> Send for Packet<T> {}
- unsafe impl<T> Sync for Packet<T> {}
-
#[test]
fn smoke() {
let m = Mutex::new(());
diff --git a/ctr-std/src/sync/once.rs b/ctr-std/src/sync/once.rs
index c449315..6fd8b6a 100644
--- a/ctr-std/src/sync/once.rs
+++ b/ctr-std/src/sync/once.rs
@@ -72,9 +72,11 @@ use thread::{self, Thread};
/// A synchronization primitive which can be used to run a one-time global
/// initialization. Useful for one-time initialization for FFI or related
-/// functionality. This type can only be constructed with the `ONCE_INIT`
+/// functionality. This type can only be constructed with the [`ONCE_INIT`]
/// value.
///
+/// [`ONCE_INIT`]: constant.ONCE_INIT.html
+///
/// # Examples
///
/// ```
@@ -101,15 +103,28 @@ unsafe impl Sync for Once {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl Send for Once {}
-/// State yielded to the `call_once_force` method which can be used to query
-/// whether the `Once` was previously poisoned or not.
+/// State yielded to [`call_once_force`]’s closure parameter. The state can be
+/// used to query the poison status of the [`Once`].
+///
+/// [`call_once_force`]: struct.Once.html#method.call_once_force
+/// [`Once`]: struct.Once.html
#[unstable(feature = "once_poison", issue = "33577")]
#[derive(Debug)]
pub struct OnceState {
poisoned: bool,
}
-/// Initialization value for static `Once` values.
+/// Initialization value for static [`Once`] values.
+///
+/// [`Once`]: struct.Once.html
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::{Once, ONCE_INIT};
+///
+/// static START: Once = ONCE_INIT;
+/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub const ONCE_INIT: Once = Once::new();
@@ -212,15 +227,52 @@ impl Once {
self.call_inner(false, &mut |_| f.take().unwrap()());
}
- /// Performs the same function as `call_once` except ignores poisoning.
+ /// Performs the same function as [`call_once`] except ignores poisoning.
+ ///
+ /// Unlike [`call_once`], if this `Once` has been poisoned (i.e. a previous
+ /// call to `call_once` or `call_once_force` caused a panic), calling
+ /// `call_once_force` will still invoke the closure `f` and will _not_
+ /// result in an immediate panic. If `f` panics, the `Once` will remain
+ /// in a poison state. If `f` does _not_ panic, the `Once` will no
+ /// longer be in a poison state and all future calls to `call_once` or
+ /// `call_one_force` will no-op.
+ ///
+ /// The closure `f` is yielded a [`OnceState`] structure which can be used
+ /// to query the poison status of the `Once`.
+ ///
+ /// [`call_once`]: struct.Once.html#method.call_once
+ /// [`OnceState`]: struct.OnceState.html
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(once_poison)]
+ ///
+ /// use std::sync::{Once, ONCE_INIT};
+ /// use std::thread;
+ ///
+ /// static INIT: Once = ONCE_INIT;
+ ///
+ /// // poison the once
+ /// let handle = thread::spawn(|| {
+ /// INIT.call_once(|| panic!());
+ /// });
+ /// assert!(handle.join().is_err());
///
- /// If this `Once` has been poisoned (some initialization panicked) then
- /// this function will continue to attempt to call initialization functions
- /// until one of them doesn't panic.
+ /// // poisoning propagates
+ /// let handle = thread::spawn(|| {
+ /// INIT.call_once(|| {});
+ /// });
+ /// assert!(handle.join().is_err());
///
- /// The closure `f` is yielded a structure which can be used to query the
- /// state of this `Once` (whether initialization has previously panicked or
- /// not).
+ /// // call_once_force will still run and reset the poisoned state
+ /// INIT.call_once_force(|state| {
+ /// assert!(state.poisoned());
+ /// });
+ ///
+ /// // once any success happens, we stop propagating the poison
+ /// INIT.call_once(|| {});
+ /// ```
#[unstable(feature = "once_poison", issue = "33577")]
pub fn call_once_force<F>(&'static self, f: F) where F: FnOnce(&OnceState) {
// same as above, just with a different parameter to `call_inner`.
@@ -366,10 +418,47 @@ impl Drop for Finish {
}
impl OnceState {
- /// Returns whether the associated `Once` has been poisoned.
+ /// Returns whether the associated [`Once`] was poisoned prior to the
+ /// invocation of the closure passed to [`call_once_force`].
+ ///
+ /// [`call_once_force`]: struct.Once.html#method.call_once_force
+ /// [`Once`]: struct.Once.html
+ ///
+ /// # Examples
+ ///
+ /// A poisoned `Once`:
+ ///
+ /// ```
+ /// #![feature(once_poison)]
+ ///
+ /// use std::sync::{Once, ONCE_INIT};
+ /// use std::thread;
+ ///
+ /// static INIT: Once = ONCE_INIT;
+ ///
+ /// // poison the once
+ /// let handle = thread::spawn(|| {
+ /// INIT.call_once(|| panic!());
+ /// });
+ /// assert!(handle.join().is_err());
+ ///
+ /// INIT.call_once_force(|state| {
+ /// assert!(state.poisoned());
+ /// });
+ /// ```
+ ///
+ /// An unpoisoned `Once`:
+ ///
+ /// ```
+ /// #![feature(once_poison)]
+ ///
+ /// use std::sync::{Once, ONCE_INIT};
+ ///
+ /// static INIT: Once = ONCE_INIT;
///
- /// Once an initalization routine for a `Once` has panicked it will forever
- /// indicate to future forced initialization routines that it is poisoned.
+ /// INIT.call_once_force(|state| {
+ /// assert!(!state.poisoned());
+ /// });
#[unstable(feature = "once_poison", issue = "33577")]
pub fn poisoned(&self) -> bool {
self.poisoned
diff --git a/ctr-std/src/sync/rwlock.rs b/ctr-std/src/sync/rwlock.rs
index a3db0ad..2edf02e 100644
--- a/ctr-std/src/sync/rwlock.rs
+++ b/ctr-std/src/sync/rwlock.rs
@@ -10,7 +10,6 @@
use cell::UnsafeCell;
use fmt;
-use marker;
use mem;
use ops::{Deref, DerefMut};
use ptr;
@@ -24,19 +23,24 @@ use sys_common::rwlock as sys;
/// of the underlying data (exclusive access) and the read portion of this lock
/// typically allows for read-only access (shared access).
///
+/// In comparison, a [`Mutex`] does not distinguish between readers or writers
+/// that aquire the lock, therefore blocking any threads waiting for the lock to
+/// become available. An `RwLock` will allow any number of readers to aquire the
+/// lock as long as a writer is not holding the lock.
+///
/// The priority policy of the lock is dependent on the underlying operating
/// system's implementation, and this type does not guarantee that any
/// particular policy will be used.
///
/// The type parameter `T` represents the data that this lock protects. It is
-/// required that `T` satisfies `Send` to be shared across threads and `Sync` to
-/// allow concurrent access through readers. The RAII guards returned from the
-/// locking methods implement `Deref` (and `DerefMut` for the `write` methods)
-/// to allow access to the contained of the lock.
+/// required that `T` satisfies [`Send`] to be shared across threads and
+/// [`Sync`] to allow concurrent access through readers. The RAII guards
+/// returned from the locking methods implement [`Deref`][] (and [`DerefMut`]
+/// for the `write` methods) to allow access to the content of the lock.
///
/// # Poisoning
///
-/// An `RwLock`, like `Mutex`, will become poisoned on a panic. Note, however,
+/// An `RwLock`, like [`Mutex`], will become poisoned on a panic. Note, however,
/// that an `RwLock` may only be poisoned if a panic occurs while it is locked
/// exclusively (write mode). If a panic occurs in any reader, then the lock
/// will not be poisoned.
@@ -63,6 +67,12 @@ use sys_common::rwlock as sys;
/// assert_eq!(*w, 6);
/// } // write lock is dropped here
/// ```
+///
+/// [`Deref`]: ../../std/ops/trait.Deref.html
+/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html
+/// [`Send`]: ../../std/marker/trait.Send.html
+/// [`Sync`]: ../../std/marker/trait.Sync.html
+/// [`Mutex`]: struct.Mutex.html
#[stable(feature = "rust1", since = "1.0.0")]
pub struct RwLock<T: ?Sized> {
inner: Box<sys::RWLock>,
@@ -71,18 +81,18 @@ pub struct RwLock<T: ?Sized> {
}
#[stable(feature = "rust1", since = "1.0.0")]
-unsafe impl<T: ?Sized + Send + Sync> Send for RwLock<T> {}
+unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {}
/// RAII structure used to release the shared read access of a lock when
/// dropped.
///
-/// This structure is created by the [`read()`] and [`try_read()`] methods on
+/// This structure is created by the [`read`] and [`try_read`] methods on
/// [`RwLock`].
///
-/// [`read()`]: struct.RwLock.html#method.read
-/// [`try_read()`]: struct.RwLock.html#method.try_read
+/// [`read`]: struct.RwLock.html#method.read
+/// [`try_read`]: struct.RwLock.html#method.try_read
/// [`RwLock`]: struct.RwLock.html
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
@@ -91,16 +101,19 @@ pub struct RwLockReadGuard<'a, T: ?Sized + 'a> {
}
#[stable(feature = "rust1", since = "1.0.0")]
-impl<'a, T: ?Sized> !marker::Send for RwLockReadGuard<'a, T> {}
+impl<'a, T: ?Sized> !Send for RwLockReadGuard<'a, T> {}
+
+#[stable(feature = "rwlock_guard_sync", since = "1.23.0")]
+unsafe impl<'a, T: ?Sized + Sync> Sync for RwLockReadGuard<'a, T> {}
/// RAII structure used to release the exclusive write access of a lock when
/// dropped.
///
-/// This structure is created by the [`write()`] and [`try_write()`] methods
+/// This structure is created by the [`write`] and [`try_write`] methods
/// on [`RwLock`].
///
-/// [`write()`]: struct.RwLock.html#method.write
-/// [`try_write()`]: struct.RwLock.html#method.try_write
+/// [`write`]: struct.RwLock.html#method.write
+/// [`try_write`]: struct.RwLock.html#method.try_write
/// [`RwLock`]: struct.RwLock.html
#[must_use]
#[stable(feature = "rust1", since = "1.0.0")]
@@ -110,7 +123,10 @@ pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> {
}
#[stable(feature = "rust1", since = "1.0.0")]
-impl<'a, T: ?Sized> !marker::Send for RwLockWriteGuard<'a, T> {}
+impl<'a, T: ?Sized> !Send for RwLockWriteGuard<'a, T> {}
+
+#[stable(feature = "rwlock_guard_sync", since = "1.23.0")]
+unsafe impl<'a, T: ?Sized + Sync> Sync for RwLockWriteGuard<'a, T> {}
impl<T> RwLock<T> {
/// Creates a new instance of an `RwLock<T>` which is unlocked.
@@ -154,6 +170,24 @@ impl<T: ?Sized> RwLock<T> {
/// # Panics
///
/// This function might panic when called if the lock is already held by the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::{Arc, RwLock};
+ /// use std::thread;
+ ///
+ /// let lock = Arc::new(RwLock::new(1));
+ /// let c_lock = lock.clone();
+ ///
+ /// let n = lock.read().unwrap();
+ /// assert_eq!(*n, 1);
+ ///
+ /// thread::spawn(move || {
+ /// let r = c_lock.read();
+ /// assert!(r.is_ok());
+ /// }).join().unwrap();
+ /// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn read(&self) -> LockResult<RwLockReadGuard<T>> {
@@ -180,6 +214,19 @@ impl<T: ?Sized> RwLock<T> {
/// is poisoned whenever a writer panics while holding an exclusive lock. An
/// error will only be returned if the lock would have otherwise been
/// acquired.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(1);
+ ///
+ /// match lock.try_read() {
+ /// Ok(n) => assert_eq!(*n, 1),
+ /// Err(_) => unreachable!(),
+ /// };
+ /// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<T>> {
@@ -210,6 +257,19 @@ impl<T: ?Sized> RwLock<T> {
/// # Panics
///
/// This function might panic when called if the lock is already held by the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(1);
+ ///
+ /// let mut n = lock.write().unwrap();
+ /// *n = 2;
+ ///
+ /// assert!(lock.try_read().is_err());
+ /// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> {
@@ -236,6 +296,19 @@ impl<T: ?Sized> RwLock<T> {
/// is poisoned whenever a writer panics while holding an exclusive lock. An
/// error will only be returned if the lock would have otherwise been
/// acquired.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(1);
+ ///
+ /// let n = lock.read().unwrap();
+ /// assert_eq!(*n, 1);
+ ///
+ /// assert!(lock.try_write().is_err());
+ /// ```
#[inline]
#[stable(feature = "rust1", since = "1.0.0")]
pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> {
@@ -253,6 +326,22 @@ impl<T: ?Sized> RwLock<T> {
/// If another thread is active, the lock can still become poisoned at any
/// time. You should not trust a `false` value for program correctness
/// without additional synchronization.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::{Arc, RwLock};
+ /// use std::thread;
+ ///
+ /// let lock = Arc::new(RwLock::new(0));
+ /// let c_lock = lock.clone();
+ ///
+ /// let _ = thread::spawn(move || {
+ /// let _lock = c_lock.write().unwrap();
+ /// panic!(); // the lock gets poisoned
+ /// }).join();
+ /// assert_eq!(lock.is_poisoned(), true);
+ /// ```
#[inline]
#[stable(feature = "sync_poison", since = "1.2.0")]
pub fn is_poisoned(&self) -> bool {
@@ -267,6 +356,19 @@ impl<T: ?Sized> RwLock<T> {
/// is poisoned whenever a writer panics while holding an exclusive lock. An
/// error will only be returned if the lock would have otherwise been
/// acquired.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::RwLock;
+ ///
+ /// let lock = RwLock::new(String::new());
+ /// {
+ /// let mut s = lock.write().unwrap();
+ /// *s = "modified".to_owned();
+ /// }
+ /// assert_eq!(lock.into_inner().unwrap(), "modified");
+ /// ```
#[stable(feature = "rwlock_into_inner", since = "1.6.0")]
pub fn into_inner(self) -> LockResult<T> where T: Sized {
// We know statically that there are no outstanding references to
@@ -282,7 +384,7 @@ impl<T: ?Sized> RwLock<T> {
(ptr::read(inner), ptr::read(poison), ptr::read(data))
};
mem::forget(self);
- inner.destroy(); // Keep in sync with the `Drop` impl.
+ inner.destroy(); // Keep in sync with the `Drop` impl.
drop(inner);
poison::map_result(poison.borrow(), |_| data.into_inner())
@@ -300,6 +402,16 @@ impl<T: ?Sized> RwLock<T> {
/// is poisoned whenever a writer panics while holding an exclusive lock. An
/// error will only be returned if the lock would have otherwise been
/// acquired.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::RwLock;
+ ///
+ /// let mut lock = RwLock::new(0);
+ /// *lock.get_mut().unwrap() = 10;
+ /// assert_eq!(*lock.read().unwrap(), 10);
+ /// ```
#[stable(feature = "rwlock_get_mut", since = "1.6.0")]
pub fn get_mut(&mut self) -> LockResult<&mut T> {
// We know statically that there are no other references to `self`, so
@@ -321,16 +433,23 @@ unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> {
impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLock<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.try_read() {
- Ok(guard) => write!(f, "RwLock {{ data: {:?} }}", &*guard),
+ Ok(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(),
Err(TryLockError::Poisoned(err)) => {
- write!(f, "RwLock {{ data: Poisoned({:?}) }}", &**err.get_ref())
+ f.debug_struct("RwLock").field("data", &&**err.get_ref()).finish()
},
- Err(TryLockError::WouldBlock) => write!(f, "RwLock {{ <locked> }}")
+ Err(TryLockError::WouldBlock) => {
+ struct LockedPlaceholder;
+ impl fmt::Debug for LockedPlaceholder {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") }
+ }
+
+ f.debug_struct("RwLock").field("data", &LockedPlaceholder).finish()
+ }
}
}
}
-#[stable(feature = "rw_lock_default", since = "1.9.0")]
+#[stable(feature = "rw_lock_default", since = "1.10.0")]
impl<T: Default> Default for RwLock<T> {
/// Creates a new `RwLock<T>`, with the `Default` value for T.
fn default() -> RwLock<T> {
@@ -338,6 +457,17 @@ impl<T: Default> Default for RwLock<T> {
}
}
+#[stable(feature = "rw_lock_from", since = "1.24.0")]
+impl<T> From<T> for RwLock<T> {
+ /// Creates a new instance of an `RwLock<T>` which is unlocked.
+ /// This is equivalent to [`RwLock::new`].
+ ///
+ /// [`RwLock::new`]: #method.new
+ fn from(t: T) -> Self {
+ RwLock::new(t)
+ }
+}
+
impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> {
unsafe fn new(lock: &'rwlock RwLock<T>)
-> LockResult<RwLockReadGuard<'rwlock, T>> {
@@ -370,6 +500,13 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockReadGuard<'a, T> {
}
}
+#[stable(feature = "std_guard_impls", since = "1.20.0")]
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ (**self).fmt(f)
+ }
+}
+
#[stable(feature = "std_debug", since = "1.16.0")]
impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
@@ -379,6 +516,13 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> {
}
}
+#[stable(feature = "std_guard_impls", since = "1.20.0")]
+impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'a, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ (**self).fmt(f)
+ }
+}
+
#[stable(feature = "rust1", since = "1.0.0")]
impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> {
type Target = T;
@@ -421,8 +565,6 @@ impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> {
#[cfg(all(test, not(target_os = "emscripten")))]
mod tests {
- #![allow(deprecated)] // rand
-
use rand::{self, Rng};
use sync::mpsc::channel;
use thread;
@@ -443,7 +585,7 @@ mod tests {
#[test]
fn frob() {
- const N: usize = 10;
+ const N: u32 = 10;
const M: usize = 1000;
let r = Arc::new(RwLock::new(()));
@@ -472,7 +614,7 @@ mod tests {
fn test_rw_arc_poison_wr() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
- let _: Result<(), _> = thread::spawn(move|| {
+ let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.write().unwrap();
panic!();
}).join();
@@ -484,7 +626,7 @@ mod tests {
let arc = Arc::new(RwLock::new(1));
assert!(!arc.is_poisoned());
let arc2 = arc.clone();
- let _: Result<(), _> = thread::spawn(move|| {
+ let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.write().unwrap();
panic!();
}).join();
@@ -496,7 +638,7 @@ mod tests {
fn test_rw_arc_no_poison_rr() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
- let _: Result<(), _> = thread::spawn(move|| {
+ let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.read().unwrap();
panic!();
}).join();
@@ -507,7 +649,7 @@ mod tests {
fn test_rw_arc_no_poison_rw() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
- let _: Result<(), _> = thread::spawn(move|| {
+ let _: Result<(), _> = thread::spawn(move || {
let _lock = arc2.read().unwrap();
panic!()
}).join();
@@ -521,7 +663,7 @@ mod tests {
let arc2 = arc.clone();
let (tx, rx) = channel();
- thread::spawn(move|| {
+ thread::spawn(move || {
let mut lock = arc2.write().unwrap();
for _ in 0..10 {
let tmp = *lock;
@@ -536,7 +678,7 @@ mod tests {
let mut children = Vec::new();
for _ in 0..5 {
let arc3 = arc.clone();
- children.push(thread::spawn(move|| {
+ children.push(thread::spawn(move || {
let lock = arc3.read().unwrap();
assert!(*lock >= 0);
}));
@@ -557,7 +699,7 @@ mod tests {
fn test_rw_arc_access_in_unwind() {
let arc = Arc::new(RwLock::new(1));
let arc2 = arc.clone();
- let _ = thread::spawn(move|| -> () {
+ let _ = thread::spawn(move || -> () {
struct Unwinder {
i: Arc<RwLock<isize>>,
}