diff options
| author | Fenrir <[email protected]> | 2018-01-21 14:06:28 -0700 |
|---|---|---|
| committer | FenrirWolf <[email protected]> | 2018-01-21 19:16:33 -0700 |
| commit | 23be3f4885688e5e0011005e2295c75168854c0a (patch) | |
| tree | dd0850f9c73c489e114a761d5c0757f3dbec3a65 /ctr-std/src/sync | |
| parent | Update CI for Rust nightly-2017-12-01 + other fixes (diff) | |
| download | ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.tar.xz ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.zip | |
Recreate ctr-std from latest nightly
Diffstat (limited to 'ctr-std/src/sync')
| -rw-r--r-- | ctr-std/src/sync/barrier.rs | 11 | ||||
| -rw-r--r-- | ctr-std/src/sync/condvar.rs | 86 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/blocking.rs | 2 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/cache_aligned.rs | 37 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/mod.rs | 774 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/mpsc_queue.rs | 35 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/select.rs | 22 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/spsc_queue.rs | 192 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/stream.rs | 105 | ||||
| -rw-r--r-- | ctr-std/src/sync/mpsc/sync.rs | 2 | ||||
| -rw-r--r-- | ctr-std/src/sync/mutex.rs | 89 | ||||
| -rw-r--r-- | ctr-std/src/sync/once.rs | 117 | ||||
| -rw-r--r-- | ctr-std/src/sync/rwlock.rs | 202 |
13 files changed, 1247 insertions, 427 deletions
diff --git a/ctr-std/src/sync/barrier.rs b/ctr-std/src/sync/barrier.rs index f15e7ff..273c7c1 100644 --- a/ctr-std/src/sync/barrier.rs +++ b/ctr-std/src/sync/barrier.rs @@ -50,12 +50,11 @@ struct BarrierState { generation_id: usize, } -/// A result returned from wait. +/// A `BarrierWaitResult` is returned by [`wait`] when all threads in the [`Barrier`] +/// have rendezvoused. /// -/// Currently this opaque structure only has one method, [`.is_leader()`]. Only -/// one thread will receive a result that will return `true` from this function. -/// -/// [`.is_leader()`]: #method.is_leader +/// [`wait`]: struct.Barrier.html#method.wait +/// [`Barrier`]: struct.Barrier.html /// /// # Examples /// @@ -153,7 +152,7 @@ impl Barrier { BarrierWaitResult(false) } else { lock.count = 0; - lock.generation_id += 1; + lock.generation_id = lock.generation_id.wrapping_add(1); self.cvar.notify_all(); BarrierWaitResult(true) } diff --git a/ctr-std/src/sync/condvar.rs b/ctr-std/src/sync/condvar.rs index 68c7e88..5640217 100644 --- a/ctr-std/src/sync/condvar.rs +++ b/ctr-std/src/sync/condvar.rs @@ -150,7 +150,7 @@ impl Condvar { /// /// This function will atomically unlock the mutex specified (represented by /// `guard`) and block the current thread. This means that any calls - /// to [`notify_one()`] or [`notify_all()`] which happen logically after the + /// to [`notify_one`] or [`notify_all`] which happen logically after the /// mutex is unlocked are candidates to wake this thread up. When this /// function call returns, the lock specified will have been re-acquired. /// @@ -167,16 +167,16 @@ impl Condvar { /// /// # Panics /// - /// This function will [`panic!()`] if it is used with more than one mutex + /// This function will [`panic!`] if it is used with more than one mutex /// over time. Each condition variable is dynamically bound to exactly one /// mutex to ensure defined behavior across platforms. If this functionality /// is not desired, then unsafe primitives in `sys` are provided. /// - /// [`notify_one()`]: #method.notify_one - /// [`notify_all()`]: #method.notify_all + /// [`notify_one`]: #method.notify_one + /// [`notify_all`]: #method.notify_all /// [poisoning]: ../sync/struct.Mutex.html#poisoning /// [`Mutex`]: ../sync/struct.Mutex.html - /// [`panic!()`]: ../../std/macro.panic.html + /// [`panic!`]: ../../std/macro.panic.html /// /// # Examples /// @@ -359,11 +359,11 @@ impl Condvar { /// be woken up from its call to [`wait`] or [`wait_timeout`]. Calls to /// `notify_one` are not buffered in any way. /// - /// To wake up all threads, see [`notify_all()`]. + /// To wake up all threads, see [`notify_all`]. /// /// [`wait`]: #method.wait /// [`wait_timeout`]: #method.wait_timeout - /// [`notify_all()`]: #method.notify_all + /// [`notify_all`]: #method.notify_all /// /// # Examples /// @@ -401,9 +401,9 @@ impl Condvar { /// variable are awoken. Calls to `notify_all()` are not buffered in any /// way. /// - /// To wake up only one thread, see [`notify_one()`]. + /// To wake up only one thread, see [`notify_one`]. /// - /// [`notify_one()`]: #method.notify_one + /// [`notify_one`]: #method.notify_one /// /// # Examples /// @@ -461,7 +461,7 @@ impl fmt::Debug for Condvar { } } -#[stable(feature = "condvar_default", since = "1.9.0")] +#[stable(feature = "condvar_default", since = "1.10.0")] impl Default for Condvar { /// Creates a `Condvar` which is ready to be waited on and notified. fn default() -> Condvar { @@ -480,9 +480,10 @@ impl Drop for Condvar { mod tests { use sync::mpsc::channel; use sync::{Condvar, Mutex, Arc}; + use sync::atomic::{AtomicBool, Ordering}; use thread; use time::Duration; - use u32; + use u64; #[test] fn smoke() { @@ -547,23 +548,58 @@ mod tests { #[test] #[cfg_attr(target_os = "emscripten", ignore)] - fn wait_timeout_ms() { + fn wait_timeout_wait() { let m = Arc::new(Mutex::new(())); - let m2 = m.clone(); let c = Arc::new(Condvar::new()); - let c2 = c.clone(); - let g = m.lock().unwrap(); - let (g, _no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); - // spurious wakeups mean this isn't necessarily true - // assert!(!no_timeout); - let _t = thread::spawn(move || { - let _g = m2.lock().unwrap(); - c2.notify_one(); - }); - let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u32::MAX as u64)).unwrap(); - assert!(!timeout_res.timed_out()); - drop(g); + loop { + let g = m.lock().unwrap(); + let (_g, no_timeout) = c.wait_timeout(g, Duration::from_millis(1)).unwrap(); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not timeout + if !no_timeout.timed_out() { + continue; + } + + break; + } + } + + #[test] + #[cfg_attr(target_os = "emscripten", ignore)] + fn wait_timeout_wake() { + let m = Arc::new(Mutex::new(())); + let c = Arc::new(Condvar::new()); + + loop { + let g = m.lock().unwrap(); + + let c2 = c.clone(); + let m2 = m.clone(); + + let notified = Arc::new(AtomicBool::new(false)); + let notified_copy = notified.clone(); + + let t = thread::spawn(move || { + let _g = m2.lock().unwrap(); + thread::sleep(Duration::from_millis(1)); + notified_copy.store(true, Ordering::SeqCst); + c2.notify_one(); + }); + let (g, timeout_res) = c.wait_timeout(g, Duration::from_millis(u64::MAX)).unwrap(); + assert!(!timeout_res.timed_out()); + // spurious wakeups mean this isn't necessarily true + // so execute test again, if not notified + if !notified.load(Ordering::SeqCst) { + t.join().unwrap(); + continue; + } + drop(g); + + t.join().unwrap(); + + break; + } } #[test] diff --git a/ctr-std/src/sync/mpsc/blocking.rs b/ctr-std/src/sync/mpsc/blocking.rs index 0f9ef6f..c08bd6d 100644 --- a/ctr-std/src/sync/mpsc/blocking.rs +++ b/ctr-std/src/sync/mpsc/blocking.rs @@ -46,7 +46,7 @@ pub fn tokens() -> (WaitToken, SignalToken) { inner: inner.clone(), }; let signal_token = SignalToken { - inner: inner + inner, }; (wait_token, signal_token) } diff --git a/ctr-std/src/sync/mpsc/cache_aligned.rs b/ctr-std/src/sync/mpsc/cache_aligned.rs new file mode 100644 index 0000000..5af0126 --- /dev/null +++ b/ctr-std/src/sync/mpsc/cache_aligned.rs @@ -0,0 +1,37 @@ +// Copyright 2017 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use ops::{Deref, DerefMut}; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(align(64))] +pub(super) struct Aligner; + +#[derive(Copy, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub(super) struct CacheAligned<T>(pub T, pub Aligner); + +impl<T> Deref for CacheAligned<T> { + type Target = T; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<T> DerefMut for CacheAligned<T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<T> CacheAligned<T> { + pub(super) fn new(t: T) -> Self { + CacheAligned(t, Aligner) + } +} diff --git a/ctr-std/src/sync/mpsc/mod.rs b/ctr-std/src/sync/mpsc/mod.rs index aeeab17..2dd3aeb 100644 --- a/ctr-std/src/sync/mpsc/mod.rs +++ b/ctr-std/src/sync/mpsc/mod.rs @@ -13,40 +13,50 @@ //! This module provides message-based communication over channels, concretely //! defined among three types: //! -//! * `Sender` -//! * `SyncSender` -//! * `Receiver` +//! * [`Sender`] +//! * [`SyncSender`] +//! * [`Receiver`] //! -//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both +//! A [`Sender`] or [`SyncSender`] is used to send data to a [`Receiver`]. Both //! senders are clone-able (multi-producer) such that many threads can send //! simultaneously to one receiver (single-consumer). //! //! These channels come in two flavors: //! -//! 1. An asynchronous, infinitely buffered channel. The `channel()` function +//! 1. An asynchronous, infinitely buffered channel. The [`channel`] function //! will return a `(Sender, Receiver)` tuple where all sends will be //! **asynchronous** (they never block). The channel conceptually has an //! infinite buffer. //! -//! 2. A synchronous, bounded channel. The `sync_channel()` function will return -//! a `(SyncSender, Receiver)` tuple where the storage for pending messages -//! is a pre-allocated buffer of a fixed size. All sends will be +//! 2. A synchronous, bounded channel. The [`sync_channel`] function will +//! return a `(SyncSender, Receiver)` tuple where the storage for pending +//! messages is a pre-allocated buffer of a fixed size. All sends will be //! **synchronous** by blocking until there is buffer space available. Note -//! that a bound of 0 is allowed, causing the channel to become a -//! "rendezvous" channel where each sender atomically hands off a message to -//! a receiver. +//! that a bound of 0 is allowed, causing the channel to become a "rendezvous" +//! channel where each sender atomically hands off a message to a receiver. +//! +//! [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html +//! [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html +//! [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html +//! [`send`]: ../../../std/sync/mpsc/struct.Sender.html#method.send +//! [`channel`]: ../../../std/sync/mpsc/fn.channel.html +//! [`sync_channel`]: ../../../std/sync/mpsc/fn.sync_channel.html //! //! ## Disconnection //! -//! The send and receive operations on channels will all return a `Result` +//! The send and receive operations on channels will all return a [`Result`] //! indicating whether the operation succeeded or not. An unsuccessful operation //! is normally indicative of the other half of a channel having "hung up" by //! being dropped in its corresponding thread. //! //! Once half of a channel has been deallocated, most operations can no longer -//! continue to make progress, so `Err` will be returned. Many applications will -//! continue to `unwrap()` the results returned from this module, instigating a -//! propagation of failure among threads if one unexpectedly dies. +//! continue to make progress, so [`Err`] will be returned. Many applications +//! will continue to [`unwrap`] the results returned from this module, +//! instigating a propagation of failure among threads if one unexpectedly dies. +//! +//! [`Result`]: ../../../std/result/enum.Result.html +//! [`Err`]: ../../../std/result/enum.Result.html#variant.Err +//! [`unwrap`]: ../../../std/result/enum.Result.html#method.unwrap //! //! # Examples //! @@ -287,8 +297,36 @@ mod sync; mod mpsc_queue; mod spsc_queue; -/// The receiving-half of Rust's channel type. This half can only be owned by -/// one thread +mod cache_aligned; + +/// The receiving half of Rust's [`channel`][] (or [`sync_channel`]) type. +/// This half can only be owned by one thread. +/// +/// Messages sent to the channel can be retrieved using [`recv`]. +/// +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html +/// [`recv`]: struct.Receiver.html#method.recv +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send("Hello world!").unwrap(); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// send.send("Delayed for 2 seconds").unwrap(); +/// }); +/// +/// println!("{}", recv.recv().unwrap()); // Received immediately +/// println!("Waiting..."); +/// println!("{}", recv.recv().unwrap()); // Received after 2 seconds +/// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct Receiver<T> { inner: UnsafeCell<Flavor<T>>, @@ -302,38 +340,153 @@ unsafe impl<T: Send> Send for Receiver<T> { } #[stable(feature = "rust1", since = "1.0.0")] impl<T> !Sync for Receiver<T> { } -/// An iterator over messages on a receiver, this iterator will block -/// whenever `next` is called, waiting for a new message, and `None` will be -/// returned when the corresponding channel has hung up. +/// An iterator over messages on a [`Receiver`], created by [`iter`]. +/// +/// This iterator will block whenever [`next`] is called, +/// waiting for a new message, and [`None`] will be returned +/// when the corresponding channel has hung up. +/// +/// [`iter`]: struct.Receiver.html#method.iter +/// [`Receiver`]: struct.Receiver.html +/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.iter() { +/// println!("Got: {}", x); +/// } +/// ``` #[stable(feature = "rust1", since = "1.0.0")] #[derive(Debug)] pub struct Iter<'a, T: 'a> { rx: &'a Receiver<T> } -/// An iterator that attempts to yield all pending values for a receiver. -/// `None` will be returned when there are no pending values remaining or +/// An iterator that attempts to yield all pending values for a [`Receiver`], +/// created by [`try_iter`]. +/// +/// [`None`] will be returned when there are no pending values remaining or /// if the corresponding channel has hung up. /// -/// This Iterator will never block the caller in order to wait for data to -/// become available. Instead, it will return `None`. +/// This iterator will never block the caller in order to wait for data to +/// become available. Instead, it will return [`None`]. +/// +/// [`Receiver`]: struct.Receiver.html +/// [`try_iter`]: struct.Receiver.html#method.try_iter +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// use std::time::Duration; +/// +/// let (sender, receiver) = channel(); +/// +/// // Nothing is in the buffer yet +/// assert!(receiver.try_iter().next().is_none()); +/// println!("Nothing in the buffer..."); +/// +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// sender.send(2).unwrap(); +/// sender.send(3).unwrap(); +/// }); +/// +/// println!("Going to sleep..."); +/// thread::sleep(Duration::from_secs(2)); // block for two seconds +/// +/// for x in receiver.try_iter() { +/// println!("Got: {}", x); +/// } +/// ``` #[stable(feature = "receiver_try_iter", since = "1.15.0")] #[derive(Debug)] pub struct TryIter<'a, T: 'a> { rx: &'a Receiver<T> } -/// An owning iterator over messages on a receiver, this iterator will block -/// whenever `next` is called, waiting for a new message, and `None` will be -/// returned when the corresponding channel has hung up. +/// An owning iterator over messages on a [`Receiver`], +/// created by **Receiver::into_iter**. +/// +/// This iterator will block whenever [`next`] +/// is called, waiting for a new message, and [`None`] will be +/// returned if the corresponding channel has hung up. +/// +/// [`Receiver`]: struct.Receiver.html +/// [`next`]: ../../../std/iter/trait.Iterator.html#tymethod.next +/// [`None`]: ../../../std/option/enum.Option.html#variant.None +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (send, recv) = channel(); +/// +/// thread::spawn(move || { +/// send.send(1u8).unwrap(); +/// send.send(2u8).unwrap(); +/// send.send(3u8).unwrap(); +/// }); +/// +/// for x in recv.into_iter() { +/// println!("Got: {}", x); +/// } +/// ``` #[stable(feature = "receiver_into_iter", since = "1.1.0")] #[derive(Debug)] pub struct IntoIter<T> { rx: Receiver<T> } -/// The sending-half of Rust's asynchronous channel type. This half can only be +/// The sending-half of Rust's asynchronous [`channel`] type. This half can only be /// owned by one thread, but it can be cloned to send to other threads. +/// +/// Messages can be sent through this channel with [`send`]. +/// +/// [`channel`]: fn.channel.html +/// [`send`]: struct.Sender.html#method.send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::channel; +/// use std::thread; +/// +/// let (sender, receiver) = channel(); +/// let sender2 = sender.clone(); +/// +/// // First thread owns sender +/// thread::spawn(move || { +/// sender.send(1).unwrap(); +/// }); +/// +/// // Second thread owns sender2 +/// thread::spawn(move || { +/// sender2.send(2).unwrap(); +/// }); +/// +/// let msg = receiver.recv().unwrap(); +/// let msg2 = receiver.recv().unwrap(); +/// +/// assert_eq!(3, msg + msg2); +/// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct Sender<T> { inner: UnsafeCell<Flavor<T>>, @@ -347,8 +500,53 @@ unsafe impl<T: Send> Send for Sender<T> { } #[stable(feature = "rust1", since = "1.0.0")] impl<T> !Sync for Sender<T> { } -/// The sending-half of Rust's synchronous channel type. This half can only be -/// owned by one thread, but it can be cloned to send to other threads. +/// The sending-half of Rust's synchronous [`sync_channel`] type. +/// +/// Messages can be sent through this channel with [`send`] or [`try_send`]. +/// +/// [`send`] will block if there is no space in the internal buffer. +/// +/// [`sync_channel`]: fn.sync_channel.html +/// [`send`]: struct.SyncSender.html#method.send +/// [`try_send`]: struct.SyncSender.html#method.try_send +/// +/// # Examples +/// +/// ```rust +/// use std::sync::mpsc::sync_channel; +/// use std::thread; +/// +/// // Create a sync_channel with buffer size 2 +/// let (sync_sender, receiver) = sync_channel(2); +/// let sync_sender2 = sync_sender.clone(); +/// +/// // First thread owns sync_sender +/// thread::spawn(move || { +/// sync_sender.send(1).unwrap(); +/// sync_sender.send(2).unwrap(); +/// }); +/// +/// // Second thread owns sync_sender2 +/// thread::spawn(move || { +/// sync_sender2.send(3).unwrap(); +/// // thread will now block since the buffer is full +/// println!("Thread unblocked!"); +/// }); +/// +/// let mut msg; +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {} received", msg); +/// +/// // "Thread unblocked!" will be printed now +/// +/// msg = receiver.recv().unwrap(); +/// println!("message {} received", msg); +/// +/// msg = receiver.recv().unwrap(); +/// +/// println!("message {} received", msg); +/// ``` #[stable(feature = "rust1", since = "1.0.0")] pub struct SyncSender<T> { inner: Arc<sync::Packet<T>>, @@ -357,73 +555,97 @@ pub struct SyncSender<T> { #[stable(feature = "rust1", since = "1.0.0")] unsafe impl<T: Send> Send for SyncSender<T> {} -#[stable(feature = "rust1", since = "1.0.0")] -impl<T> !Sync for SyncSender<T> {} - -/// An error returned from the `send` function on channels. +/// An error returned from the [`Sender::send`] or [`SyncSender::send`] +/// function on **channel**s. /// -/// A `send` operation can only fail if the receiving end of a channel is +/// A **send** operation can only fail if the receiving end of a channel is /// disconnected, implying that the data could never be received. The error /// contains the data being sent as a payload so it can be recovered. +/// +/// [`Sender::send`]: struct.Sender.html#method.send +/// [`SyncSender::send`]: struct.SyncSender.html#method.send #[stable(feature = "rust1", since = "1.0.0")] #[derive(PartialEq, Eq, Clone, Copy)] pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T); -/// An error returned from the `recv` function on a `Receiver`. +/// An error returned from the [`recv`] function on a [`Receiver`]. /// -/// The `recv` operation can only fail if the sending half of a channel is -/// disconnected, implying that no further messages will ever be received. +/// The [`recv`] operation can only fail if the sending half of a +/// [`channel`][`channel`] (or [`sync_channel`]) is disconnected, implying that no further +/// messages will ever be received. +/// +/// [`recv`]: struct.Receiver.html#method.recv +/// [`Receiver`]: struct.Receiver.html +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[stable(feature = "rust1", since = "1.0.0")] pub struct RecvError; -/// This enumeration is the list of the possible reasons that `try_recv` could -/// not return data when called. +/// This enumeration is the list of the possible reasons that [`try_recv`] could +/// not return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`try_recv`]: struct.Receiver.html#method.try_recv +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[stable(feature = "rust1", since = "1.0.0")] pub enum TryRecvError { - /// This channel is currently empty, but the sender(s) have not yet + /// This **channel** is currently empty, but the **Sender**(s) have not yet /// disconnected, so data may yet become available. #[stable(feature = "rust1", since = "1.0.0")] Empty, - /// This channel's sending half has become disconnected, and there will - /// never be any more data received on this channel + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. #[stable(feature = "rust1", since = "1.0.0")] Disconnected, } -/// This enumeration is the list of possible errors that `recv_timeout` could -/// not return data when called. +/// This enumeration is the list of possible errors that made [`recv_timeout`] +/// unable to return data when called. This can occur with both a [`channel`] and +/// a [`sync_channel`]. +/// +/// [`recv_timeout`]: struct.Receiver.html#method.recv_timeout +/// [`channel`]: fn.channel.html +/// [`sync_channel`]: fn.sync_channel.html #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] pub enum RecvTimeoutError { - /// This channel is currently empty, but the sender(s) have not yet + /// This **channel** is currently empty, but the **Sender**(s) have not yet /// disconnected, so data may yet become available. #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] Timeout, - /// This channel's sending half has become disconnected, and there will - /// never be any more data received on this channel + /// The **channel**'s sending half has become disconnected, and there will + /// never be any more data received on it. #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] Disconnected, } /// This enumeration is the list of the possible error outcomes for the -/// `SyncSender::try_send` method. +/// [`try_send`] method. +/// +/// [`try_send`]: struct.SyncSender.html#method.try_send #[stable(feature = "rust1", since = "1.0.0")] #[derive(PartialEq, Eq, Clone, Copy)] pub enum TrySendError<T> { - /// The data could not be sent on the channel because it would require that + /// The data could not be sent on the [`sync_channel`] because it would require that /// the callee block to send the data. /// /// If this is a buffered channel, then the buffer is full at this time. If - /// this is not a buffered channel, then there is no receiver available to + /// this is not a buffered channel, then there is no [`Receiver`] available to /// acquire the data. + /// + /// [`sync_channel`]: fn.sync_channel.html + /// [`Receiver`]: struct.Receiver.html #[stable(feature = "rust1", since = "1.0.0")] Full(#[stable(feature = "rust1", since = "1.0.0")] T), - /// This channel's receiving half has disconnected, so the data could not be + /// This [`sync_channel`]'s receiving half has disconnected, so the data could not be /// sent. The data is returned back to the callee in this case. + /// + /// [`sync_channel`]: fn.sync_channel.html #[stable(feature = "rust1", since = "1.0.0")] Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T), } @@ -457,15 +679,27 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { } /// Creates a new asynchronous channel, returning the sender/receiver halves. -/// All data sent on the sender will become available on the receiver, and no -/// send will block the calling thread (this channel has an "infinite buffer"). +/// All data sent on the [`Sender`] will become available on the [`Receiver`] in +/// the same order as it was sent, and no [`send`] will block the calling thread +/// (this channel has an "infinite buffer", unlike [`sync_channel`], which will +/// block after its buffer limit is reached). [`recv`] will block until a message +/// is available. /// -/// If the [`Receiver`] is disconnected while trying to [`send()`] with the -/// [`Sender`], the [`send()`] method will return an error. +/// The [`Sender`] can be cloned to [`send`] to the same channel multiple times, but +/// only one [`Receiver`] is supported. /// -/// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send -/// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html -/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html +/// If the [`Receiver`] is disconnected while trying to [`send`] with the +/// [`Sender`], the [`send`] method will return a [`SendError`]. Similarly, If the +/// [`Sender`] is disconnected while trying to [`recv`], the [`recv`] method will +/// return a [`RecvError`]. +/// +/// [`send`]: struct.Sender.html#method.send +/// [`recv`]: struct.Receiver.html#method.recv +/// [`Sender`]: struct.Sender.html +/// [`Receiver`]: struct.Receiver.html +/// [`sync_channel`]: fn.sync_channel.html +/// [`SendError`]: struct.SendError.html +/// [`RecvError`]: struct.RecvError.html /// /// # Examples /// @@ -473,20 +707,18 @@ impl<T> UnsafeFlavor<T> for Receiver<T> { /// use std::sync::mpsc::channel; /// use std::thread; /// -/// // tx is the sending half (tx for transmission), and rx is the receiving -/// // half (rx for receiving). -/// let (tx, rx) = channel(); +/// let (sender, receiver) = channel(); /// /// // Spawn off an expensive computation /// thread::spawn(move|| { /// # fn expensive_computation() {} -/// tx.send(expensive_computation()).unwrap(); +/// sender.send(expensive_computation()).unwrap(); /// }); /// /// // Do some useful work for awhile /// /// // Let's see what that answer was -/// println!("{:?}", rx.recv().unwrap()); +/// println!("{:?}", receiver.recv().unwrap()); /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn channel<T>() -> (Sender<T>, Receiver<T>) { @@ -495,24 +727,32 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) { } /// Creates a new synchronous, bounded channel. -/// -/// Like asynchronous channels, the [`Receiver`] will block until a message -/// becomes available. These channels differ greatly in the semantics of the -/// sender from asynchronous channels, however. +/// All data sent on the [`SyncSender`] will become available on the [`Receiver`] +/// in the same order as it was sent. Like asynchronous [`channel`]s, the +/// [`Receiver`] will block until a message becomes available. `sync_channel` +/// differs greatly in the semantics of the sender, however. /// /// This channel has an internal buffer on which messages will be queued. /// `bound` specifies the buffer size. When the internal buffer becomes full, /// future sends will *block* waiting for the buffer to open up. Note that a /// buffer size of 0 is valid, in which case this becomes "rendezvous channel" -/// where each [`send()`] will not return until a recv is paired with it. +/// where each [`send`] will not return until a [`recv`] is paired with it. /// -/// Like asynchronous channels, if the [`Receiver`] is disconnected while -/// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will -/// return an error. +/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple +/// times, but only one [`Receiver`] is supported. /// -/// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send -/// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html -/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html +/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying +/// to [`send`] with the [`SyncSender`], the [`send`] method will return a +/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying +/// to [`recv`], the [`recv`] method will return a [`RecvError`]. +/// +/// [`channel`]: fn.channel.html +/// [`send`]: struct.SyncSender.html#method.send +/// [`recv`]: struct.Receiver.html#method.recv +/// [`SyncSender`]: struct.SyncSender.html +/// [`Receiver`]: struct.Receiver.html +/// [`SendError`]: struct.SendError.html +/// [`RecvError`]: struct.RecvError.html /// /// # Examples /// @@ -520,18 +760,18 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) { /// use std::sync::mpsc::sync_channel; /// use std::thread; /// -/// let (tx, rx) = sync_channel(1); +/// let (sender, receiver) = sync_channel(1); /// /// // this returns immediately -/// tx.send(1).unwrap(); +/// sender.send(1).unwrap(); /// /// thread::spawn(move|| { /// // this will block until the previous message has been received -/// tx.send(2).unwrap(); +/// sender.send(2).unwrap(); /// }); /// -/// assert_eq!(rx.recv().unwrap(), 1); -/// assert_eq!(rx.recv().unwrap(), 2); +/// assert_eq!(receiver.recv().unwrap(), 1); +/// assert_eq!(receiver.recv().unwrap(), 2); /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) { @@ -556,10 +796,13 @@ impl<T> Sender<T> { /// A successful send occurs when it is determined that the other end of /// the channel has not hung up already. An unsuccessful send would be one /// where the corresponding receiver has already been deallocated. Note - /// that a return value of `Err` means that the data will never be - /// received, but a return value of `Ok` does *not* mean that the data + /// that a return value of [`Err`] means that the data will never be + /// received, but a return value of [`Ok`] does *not* mean that the data /// will be received. It is possible for the corresponding receiver to - /// hang up immediately after this function returns `Ok`. + /// hang up immediately after this function returns [`Ok`]. + /// + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// [`Ok`]: ../../../std/result/enum.Result.html#variant.Ok /// /// This method will never block the current thread. /// @@ -675,10 +918,10 @@ impl<T> Drop for Sender<T> { } } -#[stable(feature = "mpsc_debug", since = "1.7.0")] +#[stable(feature = "mpsc_debug", since = "1.8.0")] impl<T> fmt::Debug for Sender<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Sender {{ .. }}") + f.debug_struct("Sender").finish() } } @@ -699,12 +942,37 @@ impl<T> SyncSender<T> { /// Note that a successful send does *not* guarantee that the receiver will /// ever see the data if there is a buffer on this channel. Items may be /// enqueued in the internal buffer for the receiver to receive at a later - /// time. If the buffer size is 0, however, it can be guaranteed that the - /// receiver has indeed received the data if this function returns success. + /// time. If the buffer size is 0, however, the channel becomes a rendezvous + /// channel and it guarantees that the receiver has indeed received + /// the data if this function returns success. /// - /// This function will never panic, but it may return `Err` if the - /// `Receiver` has disconnected and is no longer able to receive + /// This function will never panic, but it may return [`Err`] if the + /// [`Receiver`] has disconnected and is no longer able to receive /// information. + /// + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a rendezvous sync_channel with buffer size 0 + /// let (sync_sender, receiver) = sync_channel(0); + /// + /// thread::spawn(move || { + /// println!("sending message..."); + /// sync_sender.send(1).unwrap(); + /// // Thread is now blocked until the message is received + /// + /// println!("...message received!"); + /// }); + /// + /// let msg = receiver.recv().unwrap(); + /// assert_eq!(1, msg); + /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn send(&self, t: T) -> Result<(), SendError<T>> { self.inner.send(t).map_err(SendError) @@ -712,13 +980,53 @@ impl<T> SyncSender<T> { /// Attempts to send a value on this channel without blocking. /// - /// This method differs from `send` by returning immediately if the + /// This method differs from [`send`] by returning immediately if the /// channel's buffer is full or no receiver is waiting to acquire some - /// data. Compared with `send`, this function has two failure cases + /// data. Compared with [`send`], this function has two failure cases /// instead of one (one for disconnection, one for a full buffer). /// - /// See `SyncSender::send` for notes about guarantees of whether the + /// See [`send`] for notes about guarantees of whether the /// receiver has received the data or not if this function is successful. + /// + /// [`send`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::sync_channel; + /// use std::thread; + /// + /// // Create a sync_channel with buffer size 1 + /// let (sync_sender, receiver) = sync_channel(1); + /// let sync_sender2 = sync_sender.clone(); + /// + /// // First thread owns sync_sender + /// thread::spawn(move || { + /// sync_sender.send(1).unwrap(); + /// sync_sender.send(2).unwrap(); + /// // Thread blocked + /// }); + /// + /// // Second thread owns sync_sender2 + /// thread::spawn(move || { + /// // This will return an error and send + /// // no message if the buffer is full + /// sync_sender2.try_send(3).is_err(); + /// }); + /// + /// let mut msg; + /// msg = receiver.recv().unwrap(); + /// println!("message {} received", msg); + /// + /// msg = receiver.recv().unwrap(); + /// println!("message {} received", msg); + /// + /// // Third message may have never been sent + /// match receiver.try_recv() { + /// Ok(msg) => println!("message {} received", msg), + /// Err(_) => println!("the third message was never sent"), + /// } + /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> { self.inner.try_send(t) @@ -740,10 +1048,10 @@ impl<T> Drop for SyncSender<T> { } } -#[stable(feature = "mpsc_debug", since = "1.7.0")] +#[stable(feature = "mpsc_debug", since = "1.8.0")] impl<T> fmt::Debug for SyncSender<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "SyncSender {{ .. }}") + f.debug_struct("SyncSender").finish() } } @@ -756,7 +1064,7 @@ impl<T> Receiver<T> { Receiver { inner: UnsafeCell::new(inner) } } - /// Attempts to return a pending value on this receiver without blocking + /// Attempts to return a pending value on this receiver without blocking. /// /// This method will never block the caller in order to wait for data to /// become available. Instead, this will always return immediately with a @@ -764,6 +1072,21 @@ impl<T> Receiver<T> { /// /// This is useful for a flavor of "optimistic check" before deciding to /// block on a receiver. + /// + /// Compared with [`recv`], this function has two failure cases instead of one + /// (one for disconnection, one for an empty buffer). + /// + /// [`recv`]: struct.Receiver.html#method.recv + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::{Receiver, channel}; + /// + /// let (_, receiver): (_, Receiver<i32>) = channel(); + /// + /// assert!(receiver.try_recv().is_err()); + /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn try_recv(&self) -> Result<T, TryRecvError> { loop { @@ -819,15 +1142,19 @@ impl<T> Receiver<T> { /// /// This function will always block the current thread if there is no data /// available and it's possible for more data to be sent. Once a message is - /// sent to the corresponding `Sender`, then this receiver will wake up and - /// return that message. + /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. /// - /// If the corresponding `Sender` has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return `Err` to + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to /// indicate that no more messages can ever be received on this channel. /// However, since channels are buffered, messages sent before the disconnect /// will still be properly received. /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// /// # Examples /// /// ``` @@ -907,25 +1234,58 @@ impl<T> Receiver<T> { /// /// This function will always block the current thread if there is no data /// available and it's possible for more data to be sent. Once a message is - /// sent to the corresponding `Sender`, then this receiver will wake up and - /// return that message. + /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. /// - /// If the corresponding `Sender` has disconnected, or it disconnects while - /// this call is blocking, this call will wake up and return `Err` to + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to /// indicate that no more messages can ever be received on this channel. /// However, since channels are buffered, messages sent before the disconnect /// will still be properly received. /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// /// # Examples /// + /// Successfully receiving value before encountering timeout: + /// + /// ```no_run + /// use std::thread; + /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching timeout: + /// /// ```no_run - /// use std::sync::mpsc::{self, RecvTimeoutError}; + /// use std::thread; /// use std::time::Duration; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); /// - /// let (send, recv) = mpsc::channel::<()>(); + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); /// - /// let timeout = Duration::from_millis(100); - /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout)); + /// assert_eq!( + /// recv.recv_timeout(Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); /// ``` #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")] pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> { @@ -937,11 +1297,72 @@ impl<T> Receiver<T> { Err(TryRecvError::Disconnected) => Err(RecvTimeoutError::Disconnected), Err(TryRecvError::Empty) - => self.recv_max_until(Instant::now() + timeout) + => self.recv_deadline(Instant::now() + timeout) } } - fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { + /// Attempts to wait for a value on this receiver, returning an error if the + /// corresponding channel has hung up, or if `deadline` is reached. + /// + /// This function will always block the current thread if there is no data + /// available and it's possible for more data to be sent. Once a message is + /// sent to the corresponding [`Sender`][] (or [`SyncSender`]), then this + /// receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected, or it disconnects while + /// this call is blocking, this call will wake up and return [`Err`] to + /// indicate that no more messages can ever be received on this channel. + /// However, since channels are buffered, messages sent before the disconnect + /// will still be properly received. + /// + /// [`Sender`]: struct.Sender.html + /// [`SyncSender`]: struct.SyncSender.html + /// [`Err`]: ../../../std/result/enum.Result.html#variant.Err + /// + /// # Examples + /// + /// Successfully receiving value before reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Ok('a') + /// ); + /// ``` + /// + /// Receiving an error upon reaching deadline: + /// + /// ```no_run + /// #![feature(deadline_api)] + /// use std::thread; + /// use std::time::{Duration, Instant}; + /// use std::sync::mpsc; + /// + /// let (send, recv) = mpsc::channel(); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_millis(800)); + /// send.send('a').unwrap(); + /// }); + /// + /// assert_eq!( + /// recv.recv_deadline(Instant::now() + Duration::from_millis(400)), + /// Err(mpsc::RecvTimeoutError::Timeout) + /// ); + /// ``` + #[unstable(feature = "deadline_api", issue = "46316")] + pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { use self::RecvTimeoutError::*; loop { @@ -993,7 +1414,31 @@ impl<T> Receiver<T> { } /// Returns an iterator that will block waiting for messages, but never - /// `panic!`. It will return `None` when the channel has hung up. + /// [`panic!`]. It will return [`None`] when the channel has hung up. + /// + /// [`panic!`]: ../../../std/macro.panic.html + /// [`None`]: ../../../std/option/enum.Option.html#variant.None + /// + /// # Examples + /// + /// ```rust + /// use std::sync::mpsc::channel; + /// use std::thread; + /// + /// let (send, recv) = channel(); + /// + /// thread::spawn(move || { + /// send.send(1).unwrap(); + /// send.send(2).unwrap(); + /// send.send(3).unwrap(); + /// }); + /// + /// let mut iter = recv.iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` #[stable(feature = "rust1", since = "1.0.0")] pub fn iter(&self) -> Iter<T> { Iter { rx: self } @@ -1001,8 +1446,42 @@ impl<T> Receiver<T> { /// Returns an iterator that will attempt to yield all pending values. /// It will return `None` if there are no more pending values or if the - /// channel has hung up. The iterator will never `panic!` or block the + /// channel has hung up. The iterator will never [`panic!`] or block the /// user by waiting for values. + /// + /// [`panic!`]: ../../../std/macro.panic.html + /// + /// # Examples + /// + /// ```no_run + /// use std::sync::mpsc::channel; + /// use std::thread; + /// use std::time::Duration; + /// + /// let (sender, receiver) = channel(); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// thread::spawn(move || { + /// thread::sleep(Duration::from_secs(1)); + /// sender.send(1).unwrap(); + /// sender.send(2).unwrap(); + /// sender.send(3).unwrap(); + /// }); + /// + /// // nothing is in the buffer yet + /// assert!(receiver.try_iter().next().is_none()); + /// + /// // block for two seconds + /// thread::sleep(Duration::from_secs(2)); + /// + /// let mut iter = receiver.try_iter(); + /// assert_eq!(iter.next(), Some(1)); + /// assert_eq!(iter.next(), Some(2)); + /// assert_eq!(iter.next(), Some(3)); + /// assert_eq!(iter.next(), None); + /// ``` #[stable(feature = "receiver_try_iter", since = "1.15.0")] pub fn try_iter(&self) -> TryIter<T> { TryIter { rx: self } @@ -1132,10 +1611,10 @@ impl<T> Drop for Receiver<T> { } } -#[stable(feature = "mpsc_debug", since = "1.7.0")] +#[stable(feature = "mpsc_debug", since = "1.8.0")] impl<T> fmt::Debug for Receiver<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Receiver {{ .. }}") + f.debug_struct("Receiver").finish() } } @@ -1207,6 +1686,15 @@ impl<T: Send> error::Error for TrySendError<T> { } } +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl<T> From<SendError<T>> for TrySendError<T> { + fn from(err: SendError<T>) -> TrySendError<T> { + match err { + SendError(t) => TrySendError::Disconnected(t), + } + } +} + #[stable(feature = "rust1", since = "1.0.0")] impl fmt::Display for RecvError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -1259,7 +1747,16 @@ impl error::Error for TryRecvError { } } -#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")] +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From<RecvError> for TryRecvError { + fn from(err: RecvError) -> TryRecvError { + match err { + RecvError => TryRecvError::Disconnected, + } + } +} + +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] impl fmt::Display for RecvTimeoutError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { @@ -1273,7 +1770,7 @@ impl fmt::Display for RecvTimeoutError { } } -#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")] +#[stable(feature = "mpsc_recv_timeout_error", since = "1.15.0")] impl error::Error for RecvTimeoutError { fn description(&self) -> &str { match *self { @@ -1291,6 +1788,15 @@ impl error::Error for RecvTimeoutError { } } +#[stable(feature = "mpsc_error_conversions", since = "1.24.0")] +impl From<RecvError> for RecvTimeoutError { + fn from(err: RecvError) -> RecvTimeoutError { + match err { + RecvError => RecvTimeoutError::Disconnected, + } + } +} + #[cfg(all(test, not(target_os = "emscripten")))] mod tests { use env; @@ -1539,7 +2045,7 @@ mod tests { fn oneshot_single_thread_send_then_recv() { let (tx, rx) = channel::<Box<i32>>(); tx.send(box 10).unwrap(); - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); } #[test] @@ -1596,7 +2102,7 @@ mod tests { fn oneshot_multi_task_recv_then_send() { let (tx, rx) = channel::<Box<i32>>(); let _t = thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); }); tx.send(box 10).unwrap(); @@ -1609,7 +2115,7 @@ mod tests { drop(tx); }); let res = thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); }).join(); assert!(res.is_err()); } @@ -1663,7 +2169,7 @@ mod tests { let _t = thread::spawn(move|| { tx.send(box 10).unwrap(); }); - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); } } @@ -1688,7 +2194,7 @@ mod tests { if i == 10 { return } thread::spawn(move|| { - assert!(rx.recv().unwrap() == box i); + assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); } @@ -2225,7 +2731,7 @@ mod sync_tests { fn oneshot_single_thread_send_then_recv() { let (tx, rx) = sync_channel::<Box<i32>>(1); tx.send(box 10).unwrap(); - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); } #[test] @@ -2297,7 +2803,7 @@ mod sync_tests { fn oneshot_multi_task_recv_then_send() { let (tx, rx) = sync_channel::<Box<i32>>(0); let _t = thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); }); tx.send(box 10).unwrap(); @@ -2310,7 +2816,7 @@ mod sync_tests { drop(tx); }); let res = thread::spawn(move|| { - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); }).join(); assert!(res.is_err()); } @@ -2364,7 +2870,7 @@ mod sync_tests { let _t = thread::spawn(move|| { tx.send(box 10).unwrap(); }); - assert!(rx.recv().unwrap() == box 10); + assert!(*rx.recv().unwrap() == 10); } } @@ -2389,7 +2895,7 @@ mod sync_tests { if i == 10 { return } thread::spawn(move|| { - assert!(rx.recv().unwrap() == box i); + assert!(*rx.recv().unwrap() == i); recv(rx, i + 1); }); } @@ -2593,22 +3099,4 @@ mod sync_tests { repro() } } - - #[test] - fn fmt_debug_sender() { - let (tx, _) = channel::<i32>(); - assert_eq!(format!("{:?}", tx), "Sender { .. }"); - } - - #[test] - fn fmt_debug_recv() { - let (_, rx) = channel::<i32>(); - assert_eq!(format!("{:?}", rx), "Receiver { .. }"); - } - - #[test] - fn fmt_debug_sync_sender() { - let (tx, _) = sync_channel::<i32>(1); - assert_eq!(format!("{:?}", tx), "SyncSender { .. }"); - } } diff --git a/ctr-std/src/sync/mpsc/mpsc_queue.rs b/ctr-std/src/sync/mpsc/mpsc_queue.rs index 8d80f94..296773d 100644 --- a/ctr-std/src/sync/mpsc/mpsc_queue.rs +++ b/ctr-std/src/sync/mpsc/mpsc_queue.rs @@ -1,29 +1,12 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ +// Copyright 2017 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. //! A mostly lock-free multi-producer, single consumer queue. //! diff --git a/ctr-std/src/sync/mpsc/select.rs b/ctr-std/src/sync/mpsc/select.rs index 8b4da53..a9f3cea 100644 --- a/ctr-std/src/sync/mpsc/select.rs +++ b/ctr-std/src/sync/mpsc/select.rs @@ -148,12 +148,12 @@ impl Select { let id = self.next_id.get(); self.next_id.set(id + 1); Handle { - id: id, + id, selector: self.inner.get(), next: ptr::null_mut(), prev: ptr::null_mut(), added: false, - rx: rx, + rx, packet: rx, } } @@ -354,13 +354,13 @@ impl Iterator for Packets { impl fmt::Debug for Select { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Select {{ .. }}") + f.debug_struct("Select").finish() } } impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Handle {{ .. }}") + f.debug_struct("Handle").finish() } } @@ -774,18 +774,4 @@ mod tests { } } } - - #[test] - fn fmt_debug_select() { - let sel = Select::new(); - assert_eq!(format!("{:?}", sel), "Select { .. }"); - } - - #[test] - fn fmt_debug_handle() { - let (_, rx) = channel::<i32>(); - let sel = Select::new(); - let handle = sel.handle(&rx); - assert_eq!(format!("{:?}", handle), "Handle { .. }"); - } } diff --git a/ctr-std/src/sync/mpsc/spsc_queue.rs b/ctr-std/src/sync/mpsc/spsc_queue.rs index 5858e4b..cc4be92 100644 --- a/ctr-std/src/sync/mpsc/spsc_queue.rs +++ b/ctr-std/src/sync/mpsc/spsc_queue.rs @@ -1,31 +1,12 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue +// Copyright 2017 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or +// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license +// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. //! A single-producer single-consumer concurrent queue //! @@ -33,18 +14,23 @@ //! concurrently between two threads. This data structure is safe to use and //! enforces the semantics that there is one pusher and one popper. +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + use alloc::boxed::Box; use core::ptr; use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use super::cache_aligned::CacheAligned; + // Node within the linked list queue of messages to send struct Node<T> { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? value: Option<T>, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache next: AtomicPtr<Node<T>>, // next node in the queue } @@ -52,38 +38,55 @@ struct Node<T> { /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue<T> { +pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> { // consumer fields + consumer: CacheAligned<Consumer<T, ConsumerAddition>>, + + // producer fields + producer: CacheAligned<Producer<T, ProducerAddition>>, +} + +struct Consumer<T, Addition> { tail: UnsafeCell<*mut Node<T>>, // where to pop from tail_prev: AtomicPtr<Node<T>>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable + addition: Addition, +} - // producer fields +struct Producer<T, Addition> { head: UnsafeCell<*mut Node<T>>, // where to push to first: UnsafeCell<*mut Node<T>>, // where to get new nodes from tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: usize, - cache_additions: AtomicUsize, - cache_subtractions: AtomicUsize, + addition: Addition, } -unsafe impl<T: Send> Send for Queue<T> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { } -unsafe impl<T: Send> Sync for Queue<T> { } +unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { } impl<T> Node<T> { fn new() -> *mut Node<T> { Box::into_raw(box Node { value: None, + cached: false, next: AtomicPtr::new(ptr::null_mut::<Node<T>>()), }) } } -impl<T> Queue<T> { - /// Creates a new queue. +impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> { + + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. /// /// This is unsafe as the type system doesn't enforce a single /// consumer-producer relationship. It also allows the consumer to `pop` @@ -100,19 +103,28 @@ impl<T> Queue<T> { /// cache (if desired). If the value is 0, then the cache has /// no bound. Otherwise, the cache will never grow larger than /// `bound` (although the queue itself could be much larger. - pub unsafe fn new(bound: usize) -> Queue<T> { + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { let n1 = Node::new(); let n2 = Node::new(); (*n1).next.store(n2, Ordering::Relaxed); Queue { - tail: UnsafeCell::new(n2), - tail_prev: AtomicPtr::new(n1), - head: UnsafeCell::new(n2), - first: UnsafeCell::new(n1), - tail_copy: UnsafeCell::new(n1), - cache_bound: bound, - cache_additions: AtomicUsize::new(0), - cache_subtractions: AtomicUsize::new(0), + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition + }), } } @@ -126,35 +138,25 @@ impl<T> Queue<T> { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(ptr::null_mut(), Ordering::Relaxed); - (**self.head.get()).next.store(n, Ordering::Release); - *self.head.get() = n; + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; } } unsafe fn alloc(&self) -> *mut Node<T> { // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + *self.producer.0.tail_copy.get() = + self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -170,27 +172,27 @@ impl<T> Queue<T> { // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Ordering::Release); + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Ordering::Relaxed); - let subtractions = self.cache_subtractions.load(Ordering::Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Ordering::Release); - self.cache_additions.store(additions + 1, Ordering::Relaxed); + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - (*self.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next.store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box<Node<T>> = Box::from_raw(tail); @@ -211,17 +213,25 @@ impl<T> Queue<T> { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { None } else { (*next).value.as_mut() } } } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } } -impl<T> Drop for Queue<T> { +impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> { fn drop(&mut self) { unsafe { - let mut cur = *self.first.get(); + let mut cur = *self.producer.first.get(); while !cur.is_null() { let next = (*cur).next.load(Ordering::Relaxed); let _n: Box<Node<T>> = Box::from_raw(cur); @@ -241,7 +251,7 @@ mod tests { #[test] fn smoke() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(1); queue.push(2); assert_eq!(queue.pop(), Some(1)); @@ -258,7 +268,7 @@ mod tests { #[test] fn peek() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(vec![1]); // Ensure the borrowchecker works @@ -281,7 +291,7 @@ mod tests { #[test] fn drop_full() { unsafe { - let q: Queue<Box<_>> = Queue::new(0); + let q: Queue<Box<_>> = Queue::with_additions(0, (), ()); q.push(box 1); q.push(box 2); } @@ -290,7 +300,7 @@ mod tests { #[test] fn smoke_bound() { unsafe { - let q = Queue::new(0); + let q = Queue::with_additions(0, (), ()); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -312,7 +322,7 @@ mod tests { } unsafe fn stress_bound(bound: usize) { - let q = Arc::new(Queue::new(bound)); + let q = Arc::new(Queue::with_additions(bound, (), ())); let (tx, rx) = channel(); let q2 = q.clone(); diff --git a/ctr-std/src/sync/mpsc/stream.rs b/ctr-std/src/sync/mpsc/stream.rs index 47cd897..d1515eb 100644 --- a/ctr-std/src/sync/mpsc/stream.rs +++ b/ctr-std/src/sync/mpsc/stream.rs @@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5; const MAX_STEALS: isize = 1 << 20; pub struct Packet<T> { - queue: spsc::Queue<Message<T>>, // internal queue for all message + // internal queue for all messages + queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>, +} +struct ProducerAddition { cnt: AtomicIsize, // How many items are on this channel - steals: UnsafeCell<isize>, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. } +struct ConsumerAddition { + steals: UnsafeCell<isize>, // How many times has a port received without blocking? +} + + pub enum Failure<T> { Empty, Disconnected, @@ -78,13 +85,18 @@ enum Message<T> { impl<T> Packet<T> { pub fn new() -> Packet<T> { Packet { - queue: unsafe { spsc::Queue::new(128) }, - - cnt: AtomicIsize::new(0), - steals: UnsafeCell::new(0), - to_wake: AtomicUsize::new(0), - - port_dropped: AtomicBool::new(false), + queue: unsafe { spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicUsize::new(0), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { + steals: UnsafeCell::new(0), + } + )}, } } @@ -92,7 +104,7 @@ impl<T> Packet<T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) } match self.do_send(Data(t)) { UpSuccess | UpDisconnected => {}, @@ -104,14 +116,16 @@ impl<T> Packet<T> { pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. - if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return UpDisconnected + } self.do_send(GoUp(up)) } fn do_send(&self, t: Message<T>) -> UpgradeResult { self.queue.push(t); - match self.cnt.fetch_add(1, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup -1 => UpWoke(self.take_to_wake()), // As as described before, SPSC queues must be >= -2 @@ -125,7 +139,7 @@ impl<T> Packet<T> { // will never remove this data. We can only have at most one item to // drain (the port drains the rest). DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); let first = self.queue.pop(); let second = self.queue.pop(); assert!(second.is_none()); @@ -144,8 +158,8 @@ impl<T> Packet<T> { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { - let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(0, Ordering::SeqCst); + let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); unsafe { SignalToken::cast_from_usize(ptr) } } @@ -154,14 +168,16 @@ impl<T> Packet<T> { // back if it shouldn't sleep. Note that this is the location where we take // steals into account. fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_usize() }; - self.to_wake.store(ptr, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); - let steals = unsafe { ptr::replace(self.steals.get(), 0) }; + let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { @@ -170,7 +186,7 @@ impl<T> Packet<T> { } } - self.to_wake.store(0, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); Err(unsafe { SignalToken::cast_from_usize(ptr) }) } @@ -201,7 +217,7 @@ impl<T> Packet<T> { // "steal" factored into the channel count above). data @ Ok(..) | data @ Err(Upgraded(..)) => unsafe { - *self.steals.get() -= 1; + *self.queue.consumer_addition().steals.get() -= 1; data }, @@ -223,20 +239,21 @@ impl<T> Packet<T> { // down as much as possible (without going negative), and then // adding back in whatever we couldn't factor into steals. Some(data) => unsafe { - if *self.steals.get() > MAX_STEALS { - match self.cnt.swap(0, Ordering::SeqCst) { + if *self.queue.consumer_addition().steals.get() > MAX_STEALS { + match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store( + DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, *self.steals.get()); - *self.steals.get() -= m; + let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); + *self.queue.consumer_addition().steals.get() -= m; self.bump(n - m); } } - assert!(*self.steals.get() >= 0); + assert!(*self.queue.consumer_addition().steals.get() >= 0); } - *self.steals.get() += 1; + *self.queue.consumer_addition().steals.get() += 1; match data { Data(t) => Ok(t), GoUp(up) => Err(Upgraded(up)), @@ -244,7 +261,7 @@ impl<T> Packet<T> { }, None => { - match self.cnt.load(Ordering::SeqCst) { + match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { n if n != DISCONNECTED => Err(Empty), // This is a little bit of a tricky case. We failed to pop @@ -273,7 +290,7 @@ impl<T> Packet<T> { pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } DISCONNECTED => {} n => { assert!(n >= 0); } @@ -300,7 +317,7 @@ impl<T> Packet<T> { // sends are gated on this flag, so we're immediately guaranteed that // there are a bounded number of active sends that we'll have to deal // with. - self.port_dropped.store(true, Ordering::SeqCst); + self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); // Now that we're guaranteed to deal with a bounded number of senders, // we need to drain the queue. This draining process happens atomically @@ -310,9 +327,9 @@ impl<T> Packet<T> { // continue to fail while active senders send data while we're dropping // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). - let mut steals = unsafe { *self.steals.get() }; + let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; while { - let cnt = self.cnt.compare_and_swap( + let cnt = self.queue.producer_addition().cnt.compare_and_swap( steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals } { @@ -353,9 +370,9 @@ impl<T> Packet<T> { // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { - match self.cnt.fetch_add(amt, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } n => n @@ -404,8 +421,8 @@ impl<T> Packet<T> { // this end. This is fine because we know it's a small bounded windows // of time until the data is actually sent. if was_upgrade { - assert_eq!(unsafe { *self.steals.get() }, 0); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -418,7 +435,7 @@ impl<T> Packet<T> { // If we were previously disconnected, then we know for sure that there // is no thread in to_wake, so just keep going let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1; @@ -441,13 +458,13 @@ impl<T> Packet<T> { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(Ordering::SeqCst) != 0 { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { thread::yield_now(); } } unsafe { - assert_eq!(*self.steals.get(), 0); - *self.steals.get() = steals; + assert_eq!(*self.queue.consumer_addition().steals.get(), 0); + *self.queue.consumer_addition().steals.get() = steals; } // if we were previously positive, then there's surely data to @@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> { // disconnection, but also a proper fence before the read of // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. - assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); } } diff --git a/ctr-std/src/sync/mpsc/sync.rs b/ctr-std/src/sync/mpsc/sync.rs index 1d16e00..90f12c8 100644 --- a/ctr-std/src/sync/mpsc/sync.rs +++ b/ctr-std/src/sync/mpsc/sync.rs @@ -177,7 +177,7 @@ impl<T> Packet<T> { lock: Mutex::new(State { disconnected: false, blocker: NoneBlocked, - cap: cap, + cap, canceled: None, queue: Queue { head: ptr::null_mut(), diff --git a/ctr-std/src/sync/mutex.rs b/ctr-std/src/sync/mutex.rs index 97b84d5..3b4904c 100644 --- a/ctr-std/src/sync/mutex.rs +++ b/ctr-std/src/sync/mutex.rs @@ -10,7 +10,6 @@ use cell::UnsafeCell; use fmt; -use marker; use mem; use ops::{Deref, DerefMut}; use ptr; @@ -20,30 +19,38 @@ use sys_common::poison::{self, TryLockError, TryLockResult, LockResult}; /// A mutual exclusion primitive useful for protecting shared data /// /// This mutex will block threads waiting for the lock to become available. The -/// mutex can also be statically initialized or created via a `new` +/// mutex can also be statically initialized or created via a [`new`] /// constructor. Each mutex has a type parameter which represents the data that /// it is protecting. The data can only be accessed through the RAII guards -/// returned from `lock` and `try_lock`, which guarantees that the data is only +/// returned from [`lock`] and [`try_lock`], which guarantees that the data is only /// ever accessed when the mutex is locked. /// /// # Poisoning /// /// The mutexes in this module implement a strategy called "poisoning" where a /// mutex is considered poisoned whenever a thread panics while holding the -/// lock. Once a mutex is poisoned, all other threads are unable to access the +/// mutex. Once a mutex is poisoned, all other threads are unable to access the /// data by default as it is likely tainted (some invariant is not being /// upheld). /// -/// For a mutex, this means that the `lock` and `try_lock` methods return a -/// `Result` which indicates whether a mutex has been poisoned or not. Most -/// usage of a mutex will simply `unwrap()` these results, propagating panics +/// For a mutex, this means that the [`lock`] and [`try_lock`] methods return a +/// [`Result`] which indicates whether a mutex has been poisoned or not. Most +/// usage of a mutex will simply [`unwrap()`] these results, propagating panics /// among threads to ensure that a possibly invalid invariant is not witnessed. /// /// A poisoned mutex, however, does not prevent all access to the underlying -/// data. The `PoisonError` type has an `into_inner` method which will return +/// data. The [`PoisonError`] type has an [`into_inner`] method which will return /// the guard that would have otherwise been returned on a successful lock. This /// allows access to the data, despite the lock being poisoned. /// +/// [`new`]: #method.new +/// [`lock`]: #method.lock +/// [`try_lock`]: #method.try_lock +/// [`Result`]: ../../std/result/enum.Result.html +/// [`unwrap()`]: ../../std/result/enum.Result.html#method.unwrap +/// [`PoisonError`]: ../../std/sync/struct.PoisonError.html +/// [`into_inner`]: ../../std/sync/struct.PoisonError.html#method.into_inner +/// /// # Examples /// /// ``` @@ -61,7 +68,7 @@ use sys_common::poison::{self, TryLockError, TryLockResult, LockResult}; /// let data = Arc::new(Mutex::new(0)); /// /// let (tx, rx) = channel(); -/// for _ in 0..10 { +/// for _ in 0..N { /// let (data, tx) = (data.clone(), tx.clone()); /// thread::spawn(move || { /// // The shared state can only be accessed once the lock is held. @@ -115,7 +122,7 @@ pub struct Mutex<T: ?Sized> { // Note that this mutex is in a *box*, not inlined into the struct itself. // Once a native mutex has been used once, its address can never change (it // can't be moved). This mutex type can be safely moved at any time, so to - // ensure that the native mutex is used correctly we box the inner lock to + // ensure that the native mutex is used correctly we box the inner mutex to // give it a constant address. inner: Box<sys::Mutex>, poison: poison::Flag, @@ -132,16 +139,16 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> { } /// An RAII implementation of a "scoped lock" of a mutex. When this structure is /// dropped (falls out of scope), the lock will be unlocked. /// -/// The data protected by the mutex can be access through this guard via its +/// The data protected by the mutex can be accessed through this guard via its /// [`Deref`] and [`DerefMut`] implementations. /// -/// This structure is created by the [`lock()`] and [`try_lock()`] methods on +/// This structure is created by the [`lock`] and [`try_lock`] methods on /// [`Mutex`]. /// /// [`Deref`]: ../../std/ops/trait.Deref.html /// [`DerefMut`]: ../../std/ops/trait.DerefMut.html -/// [`lock()`]: struct.Mutex.html#method.lock -/// [`try_lock()`]: struct.Mutex.html#method.try_lock +/// [`lock`]: struct.Mutex.html#method.lock +/// [`try_lock`]: struct.Mutex.html#method.try_lock /// [`Mutex`]: struct.Mutex.html #[must_use] #[stable(feature = "rust1", since = "1.0.0")] @@ -153,7 +160,9 @@ pub struct MutexGuard<'a, T: ?Sized + 'a> { } #[stable(feature = "rust1", since = "1.0.0")] -impl<'a, T: ?Sized> !marker::Send for MutexGuard<'a, T> {} +impl<'a, T: ?Sized> !Send for MutexGuard<'a, T> { } +#[stable(feature = "mutexguard", since = "1.19.0")] +unsafe impl<'a, T: ?Sized + Sync> Sync for MutexGuard<'a, T> { } impl<T> Mutex<T> { /// Creates a new mutex in an unlocked state ready for use. @@ -183,7 +192,7 @@ impl<T: ?Sized> Mutex<T> { /// Acquires a mutex, blocking the current thread until it is able to do so. /// /// This function will block the local thread until it is available to acquire - /// the mutex. Upon returning, the thread is the only thread with the mutex + /// the mutex. Upon returning, the thread is the only thread with the lock /// held. An RAII guard is returned to allow scoped unlock of the lock. When /// the guard goes out of scope, the mutex will be unlocked. /// @@ -225,7 +234,7 @@ impl<T: ?Sized> Mutex<T> { /// Attempts to acquire this lock. /// - /// If the lock could not be acquired at this time, then `Err` is returned. + /// If the lock could not be acquired at this time, then [`Err`] is returned. /// Otherwise, an RAII guard is returned. The lock will be unlocked when the /// guard is dropped. /// @@ -237,6 +246,8 @@ impl<T: ?Sized> Mutex<T> { /// this call will return failure if the mutex would otherwise be /// acquired. /// + /// [`Err`]: ../../std/result/enum.Result.html#variant.Err + /// /// # Examples /// /// ``` @@ -267,9 +278,9 @@ impl<T: ?Sized> Mutex<T> { } } - /// Determines whether the lock is poisoned. + /// Determines whether the mutex is poisoned. /// - /// If another thread is active, the lock can still become poisoned at any + /// If another thread is active, the mutex can still become poisoned at any /// time. You should not trust a `false` value for program correctness /// without additional synchronization. /// @@ -312,7 +323,7 @@ impl<T: ?Sized> Mutex<T> { #[stable(feature = "mutex_into_inner", since = "1.6.0")] pub fn into_inner(self) -> LockResult<T> where T: Sized { // We know statically that there are no outstanding references to - // `self` so there's no need to lock the inner lock. + // `self` so there's no need to lock the inner mutex. // // To get the inner value, we'd like to call `data.into_inner()`, // but because `Mutex` impl-s `Drop`, we can't move out of it, so @@ -353,7 +364,7 @@ impl<T: ?Sized> Mutex<T> { #[stable(feature = "mutex_get_mut", since = "1.6.0")] pub fn get_mut(&mut self) -> LockResult<&mut T> { // We know statically that there are no other references to `self`, so - // there's no need to lock the inner lock. + // there's no need to lock the inner mutex. let data = unsafe { &mut *self.data.get() }; poison::map_result(self.poison.borrow(), |_| data ) } @@ -371,7 +382,18 @@ unsafe impl<#[may_dangle] T: ?Sized> Drop for Mutex<T> { } } -#[stable(feature = "mutex_default", since = "1.9.0")] +#[stable(feature = "mutex_from", since = "1.24.0")] +impl<T> From<T> for Mutex<T> { + /// Creates a new mutex in an unlocked state ready for use. + /// This is equivalent to [`Mutex::new`]. + /// + /// [`Mutex::new`]: #method.new + fn from(t: T) -> Self { + Mutex::new(t) + } +} + +#[stable(feature = "mutex_default", since = "1.10.0")] impl<T: ?Sized + Default> Default for Mutex<T> { /// Creates a `Mutex<T>`, with the `Default` value for T. fn default() -> Mutex<T> { @@ -383,11 +405,18 @@ impl<T: ?Sized + Default> Default for Mutex<T> { impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.try_lock() { - Ok(guard) => write!(f, "Mutex {{ data: {:?} }}", &*guard), + Ok(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), Err(TryLockError::Poisoned(err)) => { - write!(f, "Mutex {{ data: Poisoned({:?}) }}", &**err.get_ref()) + f.debug_struct("Mutex").field("data", &&**err.get_ref()).finish() }, - Err(TryLockError::WouldBlock) => write!(f, "Mutex {{ <locked> }}") + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") } + } + + f.debug_struct("Mutex").field("data", &LockedPlaceholder).finish() + } } } } @@ -439,6 +468,13 @@ impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for MutexGuard<'a, T> { } } +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<'a, T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(f) + } +} + pub fn guard_lock<'a, T: ?Sized>(guard: &MutexGuard<'a, T>) -> &'a sys::Mutex { &guard.__lock.inner } @@ -459,9 +495,6 @@ mod tests { #[derive(Eq, PartialEq, Debug)] struct NonCopy(i32); - unsafe impl<T: Send> Send for Packet<T> {} - unsafe impl<T> Sync for Packet<T> {} - #[test] fn smoke() { let m = Mutex::new(()); diff --git a/ctr-std/src/sync/once.rs b/ctr-std/src/sync/once.rs index c449315..6fd8b6a 100644 --- a/ctr-std/src/sync/once.rs +++ b/ctr-std/src/sync/once.rs @@ -72,9 +72,11 @@ use thread::{self, Thread}; /// A synchronization primitive which can be used to run a one-time global /// initialization. Useful for one-time initialization for FFI or related -/// functionality. This type can only be constructed with the `ONCE_INIT` +/// functionality. This type can only be constructed with the [`ONCE_INIT`] /// value. /// +/// [`ONCE_INIT`]: constant.ONCE_INIT.html +/// /// # Examples /// /// ``` @@ -101,15 +103,28 @@ unsafe impl Sync for Once {} #[stable(feature = "rust1", since = "1.0.0")] unsafe impl Send for Once {} -/// State yielded to the `call_once_force` method which can be used to query -/// whether the `Once` was previously poisoned or not. +/// State yielded to [`call_once_force`]’s closure parameter. The state can be +/// used to query the poison status of the [`Once`]. +/// +/// [`call_once_force`]: struct.Once.html#method.call_once_force +/// [`Once`]: struct.Once.html #[unstable(feature = "once_poison", issue = "33577")] #[derive(Debug)] pub struct OnceState { poisoned: bool, } -/// Initialization value for static `Once` values. +/// Initialization value for static [`Once`] values. +/// +/// [`Once`]: struct.Once.html +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Once, ONCE_INIT}; +/// +/// static START: Once = ONCE_INIT; +/// ``` #[stable(feature = "rust1", since = "1.0.0")] pub const ONCE_INIT: Once = Once::new(); @@ -212,15 +227,52 @@ impl Once { self.call_inner(false, &mut |_| f.take().unwrap()()); } - /// Performs the same function as `call_once` except ignores poisoning. + /// Performs the same function as [`call_once`] except ignores poisoning. + /// + /// Unlike [`call_once`], if this `Once` has been poisoned (i.e. a previous + /// call to `call_once` or `call_once_force` caused a panic), calling + /// `call_once_force` will still invoke the closure `f` and will _not_ + /// result in an immediate panic. If `f` panics, the `Once` will remain + /// in a poison state. If `f` does _not_ panic, the `Once` will no + /// longer be in a poison state and all future calls to `call_once` or + /// `call_one_force` will no-op. + /// + /// The closure `f` is yielded a [`OnceState`] structure which can be used + /// to query the poison status of the `Once`. + /// + /// [`call_once`]: struct.Once.html#method.call_once + /// [`OnceState`]: struct.OnceState.html + /// + /// # Examples + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::{Once, ONCE_INIT}; + /// use std::thread; + /// + /// static INIT: Once = ONCE_INIT; + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); /// - /// If this `Once` has been poisoned (some initialization panicked) then - /// this function will continue to attempt to call initialization functions - /// until one of them doesn't panic. + /// // poisoning propagates + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| {}); + /// }); + /// assert!(handle.join().is_err()); /// - /// The closure `f` is yielded a structure which can be used to query the - /// state of this `Once` (whether initialization has previously panicked or - /// not). + /// // call_once_force will still run and reset the poisoned state + /// INIT.call_once_force(|state| { + /// assert!(state.poisoned()); + /// }); + /// + /// // once any success happens, we stop propagating the poison + /// INIT.call_once(|| {}); + /// ``` #[unstable(feature = "once_poison", issue = "33577")] pub fn call_once_force<F>(&'static self, f: F) where F: FnOnce(&OnceState) { // same as above, just with a different parameter to `call_inner`. @@ -366,10 +418,47 @@ impl Drop for Finish { } impl OnceState { - /// Returns whether the associated `Once` has been poisoned. + /// Returns whether the associated [`Once`] was poisoned prior to the + /// invocation of the closure passed to [`call_once_force`]. + /// + /// [`call_once_force`]: struct.Once.html#method.call_once_force + /// [`Once`]: struct.Once.html + /// + /// # Examples + /// + /// A poisoned `Once`: + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::{Once, ONCE_INIT}; + /// use std::thread; + /// + /// static INIT: Once = ONCE_INIT; + /// + /// // poison the once + /// let handle = thread::spawn(|| { + /// INIT.call_once(|| panic!()); + /// }); + /// assert!(handle.join().is_err()); + /// + /// INIT.call_once_force(|state| { + /// assert!(state.poisoned()); + /// }); + /// ``` + /// + /// An unpoisoned `Once`: + /// + /// ``` + /// #![feature(once_poison)] + /// + /// use std::sync::{Once, ONCE_INIT}; + /// + /// static INIT: Once = ONCE_INIT; /// - /// Once an initalization routine for a `Once` has panicked it will forever - /// indicate to future forced initialization routines that it is poisoned. + /// INIT.call_once_force(|state| { + /// assert!(!state.poisoned()); + /// }); #[unstable(feature = "once_poison", issue = "33577")] pub fn poisoned(&self) -> bool { self.poisoned diff --git a/ctr-std/src/sync/rwlock.rs b/ctr-std/src/sync/rwlock.rs index a3db0ad..2edf02e 100644 --- a/ctr-std/src/sync/rwlock.rs +++ b/ctr-std/src/sync/rwlock.rs @@ -10,7 +10,6 @@ use cell::UnsafeCell; use fmt; -use marker; use mem; use ops::{Deref, DerefMut}; use ptr; @@ -24,19 +23,24 @@ use sys_common::rwlock as sys; /// of the underlying data (exclusive access) and the read portion of this lock /// typically allows for read-only access (shared access). /// +/// In comparison, a [`Mutex`] does not distinguish between readers or writers +/// that aquire the lock, therefore blocking any threads waiting for the lock to +/// become available. An `RwLock` will allow any number of readers to aquire the +/// lock as long as a writer is not holding the lock. +/// /// The priority policy of the lock is dependent on the underlying operating /// system's implementation, and this type does not guarantee that any /// particular policy will be used. /// /// The type parameter `T` represents the data that this lock protects. It is -/// required that `T` satisfies `Send` to be shared across threads and `Sync` to -/// allow concurrent access through readers. The RAII guards returned from the -/// locking methods implement `Deref` (and `DerefMut` for the `write` methods) -/// to allow access to the contained of the lock. +/// required that `T` satisfies [`Send`] to be shared across threads and +/// [`Sync`] to allow concurrent access through readers. The RAII guards +/// returned from the locking methods implement [`Deref`][] (and [`DerefMut`] +/// for the `write` methods) to allow access to the content of the lock. /// /// # Poisoning /// -/// An `RwLock`, like `Mutex`, will become poisoned on a panic. Note, however, +/// An `RwLock`, like [`Mutex`], will become poisoned on a panic. Note, however, /// that an `RwLock` may only be poisoned if a panic occurs while it is locked /// exclusively (write mode). If a panic occurs in any reader, then the lock /// will not be poisoned. @@ -63,6 +67,12 @@ use sys_common::rwlock as sys; /// assert_eq!(*w, 6); /// } // write lock is dropped here /// ``` +/// +/// [`Deref`]: ../../std/ops/trait.Deref.html +/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html +/// [`Send`]: ../../std/marker/trait.Send.html +/// [`Sync`]: ../../std/marker/trait.Sync.html +/// [`Mutex`]: struct.Mutex.html #[stable(feature = "rust1", since = "1.0.0")] pub struct RwLock<T: ?Sized> { inner: Box<sys::RWLock>, @@ -71,18 +81,18 @@ pub struct RwLock<T: ?Sized> { } #[stable(feature = "rust1", since = "1.0.0")] -unsafe impl<T: ?Sized + Send + Sync> Send for RwLock<T> {} +unsafe impl<T: ?Sized + Send> Send for RwLock<T> {} #[stable(feature = "rust1", since = "1.0.0")] unsafe impl<T: ?Sized + Send + Sync> Sync for RwLock<T> {} /// RAII structure used to release the shared read access of a lock when /// dropped. /// -/// This structure is created by the [`read()`] and [`try_read()`] methods on +/// This structure is created by the [`read`] and [`try_read`] methods on /// [`RwLock`]. /// -/// [`read()`]: struct.RwLock.html#method.read -/// [`try_read()`]: struct.RwLock.html#method.try_read +/// [`read`]: struct.RwLock.html#method.read +/// [`try_read`]: struct.RwLock.html#method.try_read /// [`RwLock`]: struct.RwLock.html #[must_use] #[stable(feature = "rust1", since = "1.0.0")] @@ -91,16 +101,19 @@ pub struct RwLockReadGuard<'a, T: ?Sized + 'a> { } #[stable(feature = "rust1", since = "1.0.0")] -impl<'a, T: ?Sized> !marker::Send for RwLockReadGuard<'a, T> {} +impl<'a, T: ?Sized> !Send for RwLockReadGuard<'a, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl<'a, T: ?Sized + Sync> Sync for RwLockReadGuard<'a, T> {} /// RAII structure used to release the exclusive write access of a lock when /// dropped. /// -/// This structure is created by the [`write()`] and [`try_write()`] methods +/// This structure is created by the [`write`] and [`try_write`] methods /// on [`RwLock`]. /// -/// [`write()`]: struct.RwLock.html#method.write -/// [`try_write()`]: struct.RwLock.html#method.try_write +/// [`write`]: struct.RwLock.html#method.write +/// [`try_write`]: struct.RwLock.html#method.try_write /// [`RwLock`]: struct.RwLock.html #[must_use] #[stable(feature = "rust1", since = "1.0.0")] @@ -110,7 +123,10 @@ pub struct RwLockWriteGuard<'a, T: ?Sized + 'a> { } #[stable(feature = "rust1", since = "1.0.0")] -impl<'a, T: ?Sized> !marker::Send for RwLockWriteGuard<'a, T> {} +impl<'a, T: ?Sized> !Send for RwLockWriteGuard<'a, T> {} + +#[stable(feature = "rwlock_guard_sync", since = "1.23.0")] +unsafe impl<'a, T: ?Sized + Sync> Sync for RwLockWriteGuard<'a, T> {} impl<T> RwLock<T> { /// Creates a new instance of an `RwLock<T>` which is unlocked. @@ -154,6 +170,24 @@ impl<T: ?Sized> RwLock<T> { /// # Panics /// /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(1)); + /// let c_lock = lock.clone(); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// thread::spawn(move || { + /// let r = c_lock.read(); + /// assert!(r.is_ok()); + /// }).join().unwrap(); + /// ``` #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn read(&self) -> LockResult<RwLockReadGuard<T>> { @@ -180,6 +214,19 @@ impl<T: ?Sized> RwLock<T> { /// is poisoned whenever a writer panics while holding an exclusive lock. An /// error will only be returned if the lock would have otherwise been /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// match lock.try_read() { + /// Ok(n) => assert_eq!(*n, 1), + /// Err(_) => unreachable!(), + /// }; + /// ``` #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn try_read(&self) -> TryLockResult<RwLockReadGuard<T>> { @@ -210,6 +257,19 @@ impl<T: ?Sized> RwLock<T> { /// # Panics /// /// This function might panic when called if the lock is already held by the current thread. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let mut n = lock.write().unwrap(); + /// *n = 2; + /// + /// assert!(lock.try_read().is_err()); + /// ``` #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn write(&self) -> LockResult<RwLockWriteGuard<T>> { @@ -236,6 +296,19 @@ impl<T: ?Sized> RwLock<T> { /// is poisoned whenever a writer panics while holding an exclusive lock. An /// error will only be returned if the lock would have otherwise been /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(1); + /// + /// let n = lock.read().unwrap(); + /// assert_eq!(*n, 1); + /// + /// assert!(lock.try_write().is_err()); + /// ``` #[inline] #[stable(feature = "rust1", since = "1.0.0")] pub fn try_write(&self) -> TryLockResult<RwLockWriteGuard<T>> { @@ -253,6 +326,22 @@ impl<T: ?Sized> RwLock<T> { /// If another thread is active, the lock can still become poisoned at any /// time. You should not trust a `false` value for program correctness /// without additional synchronization. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, RwLock}; + /// use std::thread; + /// + /// let lock = Arc::new(RwLock::new(0)); + /// let c_lock = lock.clone(); + /// + /// let _ = thread::spawn(move || { + /// let _lock = c_lock.write().unwrap(); + /// panic!(); // the lock gets poisoned + /// }).join(); + /// assert_eq!(lock.is_poisoned(), true); + /// ``` #[inline] #[stable(feature = "sync_poison", since = "1.2.0")] pub fn is_poisoned(&self) -> bool { @@ -267,6 +356,19 @@ impl<T: ?Sized> RwLock<T> { /// is poisoned whenever a writer panics while holding an exclusive lock. An /// error will only be returned if the lock would have otherwise been /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let lock = RwLock::new(String::new()); + /// { + /// let mut s = lock.write().unwrap(); + /// *s = "modified".to_owned(); + /// } + /// assert_eq!(lock.into_inner().unwrap(), "modified"); + /// ``` #[stable(feature = "rwlock_into_inner", since = "1.6.0")] pub fn into_inner(self) -> LockResult<T> where T: Sized { // We know statically that there are no outstanding references to @@ -282,7 +384,7 @@ impl<T: ?Sized> RwLock<T> { (ptr::read(inner), ptr::read(poison), ptr::read(data)) }; mem::forget(self); - inner.destroy(); // Keep in sync with the `Drop` impl. + inner.destroy(); // Keep in sync with the `Drop` impl. drop(inner); poison::map_result(poison.borrow(), |_| data.into_inner()) @@ -300,6 +402,16 @@ impl<T: ?Sized> RwLock<T> { /// is poisoned whenever a writer panics while holding an exclusive lock. An /// error will only be returned if the lock would have otherwise been /// acquired. + /// + /// # Examples + /// + /// ``` + /// use std::sync::RwLock; + /// + /// let mut lock = RwLock::new(0); + /// *lock.get_mut().unwrap() = 10; + /// assert_eq!(*lock.read().unwrap(), 10); + /// ``` #[stable(feature = "rwlock_get_mut", since = "1.6.0")] pub fn get_mut(&mut self) -> LockResult<&mut T> { // We know statically that there are no other references to `self`, so @@ -321,16 +433,23 @@ unsafe impl<#[may_dangle] T: ?Sized> Drop for RwLock<T> { impl<T: ?Sized + fmt::Debug> fmt::Debug for RwLock<T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.try_read() { - Ok(guard) => write!(f, "RwLock {{ data: {:?} }}", &*guard), + Ok(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(), Err(TryLockError::Poisoned(err)) => { - write!(f, "RwLock {{ data: Poisoned({:?}) }}", &**err.get_ref()) + f.debug_struct("RwLock").field("data", &&**err.get_ref()).finish() }, - Err(TryLockError::WouldBlock) => write!(f, "RwLock {{ <locked> }}") + Err(TryLockError::WouldBlock) => { + struct LockedPlaceholder; + impl fmt::Debug for LockedPlaceholder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("<locked>") } + } + + f.debug_struct("RwLock").field("data", &LockedPlaceholder).finish() + } } } } -#[stable(feature = "rw_lock_default", since = "1.9.0")] +#[stable(feature = "rw_lock_default", since = "1.10.0")] impl<T: Default> Default for RwLock<T> { /// Creates a new `RwLock<T>`, with the `Default` value for T. fn default() -> RwLock<T> { @@ -338,6 +457,17 @@ impl<T: Default> Default for RwLock<T> { } } +#[stable(feature = "rw_lock_from", since = "1.24.0")] +impl<T> From<T> for RwLock<T> { + /// Creates a new instance of an `RwLock<T>` which is unlocked. + /// This is equivalent to [`RwLock::new`]. + /// + /// [`RwLock::new`]: #method.new + fn from(t: T) -> Self { + RwLock::new(t) + } +} + impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> { @@ -370,6 +500,13 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockReadGuard<'a, T> { } } +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockReadGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(f) + } +} + #[stable(feature = "std_debug", since = "1.16.0")] impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -379,6 +516,13 @@ impl<'a, T: fmt::Debug> fmt::Debug for RwLockWriteGuard<'a, T> { } } +#[stable(feature = "std_guard_impls", since = "1.20.0")] +impl<'a, T: ?Sized + fmt::Display> fmt::Display for RwLockWriteGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + (**self).fmt(f) + } +} + #[stable(feature = "rust1", since = "1.0.0")] impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> { type Target = T; @@ -421,8 +565,6 @@ impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> { #[cfg(all(test, not(target_os = "emscripten")))] mod tests { - #![allow(deprecated)] // rand - use rand::{self, Rng}; use sync::mpsc::channel; use thread; @@ -443,7 +585,7 @@ mod tests { #[test] fn frob() { - const N: usize = 10; + const N: u32 = 10; const M: usize = 1000; let r = Arc::new(RwLock::new(())); @@ -472,7 +614,7 @@ mod tests { fn test_rw_arc_poison_wr() { let arc = Arc::new(RwLock::new(1)); let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move|| { + let _: Result<(), _> = thread::spawn(move || { let _lock = arc2.write().unwrap(); panic!(); }).join(); @@ -484,7 +626,7 @@ mod tests { let arc = Arc::new(RwLock::new(1)); assert!(!arc.is_poisoned()); let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move|| { + let _: Result<(), _> = thread::spawn(move || { let _lock = arc2.write().unwrap(); panic!(); }).join(); @@ -496,7 +638,7 @@ mod tests { fn test_rw_arc_no_poison_rr() { let arc = Arc::new(RwLock::new(1)); let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move|| { + let _: Result<(), _> = thread::spawn(move || { let _lock = arc2.read().unwrap(); panic!(); }).join(); @@ -507,7 +649,7 @@ mod tests { fn test_rw_arc_no_poison_rw() { let arc = Arc::new(RwLock::new(1)); let arc2 = arc.clone(); - let _: Result<(), _> = thread::spawn(move|| { + let _: Result<(), _> = thread::spawn(move || { let _lock = arc2.read().unwrap(); panic!() }).join(); @@ -521,7 +663,7 @@ mod tests { let arc2 = arc.clone(); let (tx, rx) = channel(); - thread::spawn(move|| { + thread::spawn(move || { let mut lock = arc2.write().unwrap(); for _ in 0..10 { let tmp = *lock; @@ -536,7 +678,7 @@ mod tests { let mut children = Vec::new(); for _ in 0..5 { let arc3 = arc.clone(); - children.push(thread::spawn(move|| { + children.push(thread::spawn(move || { let lock = arc3.read().unwrap(); assert!(*lock >= 0); })); @@ -557,7 +699,7 @@ mod tests { fn test_rw_arc_access_in_unwind() { let arc = Arc::new(RwLock::new(1)); let arc2 = arc.clone(); - let _ = thread::spawn(move|| -> () { + let _ = thread::spawn(move || -> () { struct Unwinder { i: Arc<RwLock<isize>>, } |