aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync
diff options
context:
space:
mode:
authorFenrir <[email protected]>2017-03-05 22:57:34 -0700
committerFenrir <[email protected]>2017-03-05 22:57:34 -0700
commit5c02db6cb953433d3837faed1451b2f804dc81a9 (patch)
treeb21b480d42a630e9f85d9fb88b6d74f7efe827f1 /ctr-std/src/sync
parentInitial thread support (diff)
downloadctru-rs-5c02db6cb953433d3837faed1451b2f804dc81a9.tar.xz
ctru-rs-5c02db6cb953433d3837faed1451b2f804dc81a9.zip
Add the rest of std::sync
Diffstat (limited to 'ctr-std/src/sync')
-rw-r--r--ctr-std/src/sync/barrier.rs233
-rw-r--r--ctr-std/src/sync/mod.rs8
-rw-r--r--ctr-std/src/sync/mpsc/blocking.rs96
-rw-r--r--ctr-std/src/sync/mpsc/mod.rs2614
-rw-r--r--ctr-std/src/sync/mpsc/mpsc_queue.rs198
-rw-r--r--ctr-std/src/sync/mpsc/oneshot.rs396
-rw-r--r--ctr-std/src/sync/mpsc/select.rs791
-rw-r--r--ctr-std/src/sync/mpsc/shared.rs506
-rw-r--r--ctr-std/src/sync/mpsc/spsc_queue.rs337
-rw-r--r--ctr-std/src/sync/mpsc/stream.rs487
-rw-r--r--ctr-std/src/sync/mpsc/sync.rs528
-rw-r--r--ctr-std/src/sync/mutex.rs4
-rw-r--r--ctr-std/src/sync/once.rs496
13 files changed, 6693 insertions, 1 deletions
diff --git a/ctr-std/src/sync/barrier.rs b/ctr-std/src/sync/barrier.rs
new file mode 100644
index 0000000..f15e7ff
--- /dev/null
+++ b/ctr-std/src/sync/barrier.rs
@@ -0,0 +1,233 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+use fmt;
+use sync::{Mutex, Condvar};
+
+/// A barrier enables multiple threads to synchronize the beginning
+/// of some computation.
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::{Arc, Barrier};
+/// use std::thread;
+///
+/// let mut handles = Vec::with_capacity(10);
+/// let barrier = Arc::new(Barrier::new(10));
+/// for _ in 0..10 {
+/// let c = barrier.clone();
+/// // The same messages will be printed together.
+/// // You will NOT see any interleaving.
+/// handles.push(thread::spawn(move|| {
+/// println!("before wait");
+/// c.wait();
+/// println!("after wait");
+/// }));
+/// }
+/// // Wait for other threads to finish.
+/// for handle in handles {
+/// handle.join().unwrap();
+/// }
+/// ```
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct Barrier {
+ lock: Mutex<BarrierState>,
+ cvar: Condvar,
+ num_threads: usize,
+}
+
+// The inner state of a double barrier
+struct BarrierState {
+ count: usize,
+ generation_id: usize,
+}
+
+/// A result returned from wait.
+///
+/// Currently this opaque structure only has one method, [`.is_leader()`]. Only
+/// one thread will receive a result that will return `true` from this function.
+///
+/// [`.is_leader()`]: #method.is_leader
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::Barrier;
+///
+/// let barrier = Barrier::new(1);
+/// let barrier_wait_result = barrier.wait();
+/// ```
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct BarrierWaitResult(bool);
+
+#[stable(feature = "std_debug", since = "1.16.0")]
+impl fmt::Debug for Barrier {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("Barrier { .. }")
+ }
+}
+
+impl Barrier {
+ /// Creates a new barrier that can block a given number of threads.
+ ///
+ /// A barrier will block `n`-1 threads which call [`wait`] and then wake up
+ /// all threads at once when the `n`th thread calls [`wait`].
+ ///
+ /// [`wait`]: #method.wait
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Barrier;
+ ///
+ /// let barrier = Barrier::new(10);
+ /// ```
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn new(n: usize) -> Barrier {
+ Barrier {
+ lock: Mutex::new(BarrierState {
+ count: 0,
+ generation_id: 0,
+ }),
+ cvar: Condvar::new(),
+ num_threads: n,
+ }
+ }
+
+ /// Blocks the current thread until all threads have rendezvoused here.
+ ///
+ /// Barriers are re-usable after all threads have rendezvoused once, and can
+ /// be used continuously.
+ ///
+ /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that
+ /// returns `true` from [`is_leader`] when returning from this function, and
+ /// all other threads will receive a result that will return `false` from
+ /// [`is_leader`].
+ ///
+ /// [`BarrierWaitResult`]: struct.BarrierWaitResult.html
+ /// [`is_leader`]: struct.BarrierWaitResult.html#method.is_leader
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::{Arc, Barrier};
+ /// use std::thread;
+ ///
+ /// let mut handles = Vec::with_capacity(10);
+ /// let barrier = Arc::new(Barrier::new(10));
+ /// for _ in 0..10 {
+ /// let c = barrier.clone();
+ /// // The same messages will be printed together.
+ /// // You will NOT see any interleaving.
+ /// handles.push(thread::spawn(move|| {
+ /// println!("before wait");
+ /// c.wait();
+ /// println!("after wait");
+ /// }));
+ /// }
+ /// // Wait for other threads to finish.
+ /// for handle in handles {
+ /// handle.join().unwrap();
+ /// }
+ /// ```
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn wait(&self) -> BarrierWaitResult {
+ let mut lock = self.lock.lock().unwrap();
+ let local_gen = lock.generation_id;
+ lock.count += 1;
+ if lock.count < self.num_threads {
+ // We need a while loop to guard against spurious wakeups.
+ // http://en.wikipedia.org/wiki/Spurious_wakeup
+ while local_gen == lock.generation_id &&
+ lock.count < self.num_threads {
+ lock = self.cvar.wait(lock).unwrap();
+ }
+ BarrierWaitResult(false)
+ } else {
+ lock.count = 0;
+ lock.generation_id += 1;
+ self.cvar.notify_all();
+ BarrierWaitResult(true)
+ }
+ }
+}
+
+#[stable(feature = "std_debug", since = "1.16.0")]
+impl fmt::Debug for BarrierWaitResult {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.debug_struct("BarrierWaitResult")
+ .field("is_leader", &self.is_leader())
+ .finish()
+ }
+}
+
+impl BarrierWaitResult {
+ /// Returns whether this thread from [`wait`] is the "leader thread".
+ ///
+ /// Only one thread will have `true` returned from their result, all other
+ /// threads will have `false` returned.
+ ///
+ /// [`wait`]: struct.Barrier.html#method.wait
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::Barrier;
+ ///
+ /// let barrier = Barrier::new(1);
+ /// let barrier_wait_result = barrier.wait();
+ /// println!("{:?}", barrier_wait_result.is_leader());
+ /// ```
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn is_leader(&self) -> bool { self.0 }
+}
+
+#[cfg(test)]
+mod tests {
+ use sync::{Arc, Barrier};
+ use sync::mpsc::{channel, TryRecvError};
+ use thread;
+
+ #[test]
+ #[cfg_attr(target_os = "emscripten", ignore)]
+ fn test_barrier() {
+ const N: usize = 10;
+
+ let barrier = Arc::new(Barrier::new(N));
+ let (tx, rx) = channel();
+
+ for _ in 0..N - 1 {
+ let c = barrier.clone();
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ tx.send(c.wait().is_leader()).unwrap();
+ });
+ }
+
+ // At this point, all spawned threads should be blocked,
+ // so we shouldn't get anything from the port
+ assert!(match rx.try_recv() {
+ Err(TryRecvError::Empty) => true,
+ _ => false,
+ });
+
+ let mut leader_found = barrier.wait().is_leader();
+
+ // Now, the barrier is cleared and we should get data.
+ for _ in 0..N - 1 {
+ if rx.recv().unwrap() {
+ assert!(!leader_found);
+ leader_found = true;
+ }
+ }
+ assert!(leader_found);
+ }
+}
diff --git a/ctr-std/src/sync/mod.rs b/ctr-std/src/sync/mod.rs
index 245aaab..289b47b 100644
--- a/ctr-std/src/sync/mod.rs
+++ b/ctr-std/src/sync/mod.rs
@@ -23,14 +23,22 @@ pub use alloc::arc::{Arc, Weak};
pub use core::sync::atomic;
#[stable(feature = "rust1", since = "1.0.0")]
+pub use self::barrier::{Barrier, BarrierWaitResult};
+#[stable(feature = "rust1", since = "1.0.0")]
pub use self::condvar::{Condvar, WaitTimeoutResult};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::mutex::{Mutex, MutexGuard};
#[stable(feature = "rust1", since = "1.0.0")]
+pub use self::once::{Once, OnceState, ONCE_INIT};
+#[stable(feature = "rust1", since = "1.0.0")]
pub use sys_common::poison::{PoisonError, TryLockError, TryLockResult, LockResult};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
+pub mod mpsc;
+
+mod barrier;
mod condvar;
mod mutex;
+mod once;
mod rwlock;
diff --git a/ctr-std/src/sync/mpsc/blocking.rs b/ctr-std/src/sync/mpsc/blocking.rs
new file mode 100644
index 0000000..0f9ef6f
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/blocking.rs
@@ -0,0 +1,96 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Generic support for building blocking abstractions.
+
+use thread::{self, Thread};
+use sync::atomic::{AtomicBool, Ordering};
+use sync::Arc;
+use mem;
+use time::Instant;
+
+struct Inner {
+ thread: Thread,
+ woken: AtomicBool,
+}
+
+unsafe impl Send for Inner {}
+unsafe impl Sync for Inner {}
+
+#[derive(Clone)]
+pub struct SignalToken {
+ inner: Arc<Inner>,
+}
+
+pub struct WaitToken {
+ inner: Arc<Inner>,
+}
+
+impl !Send for WaitToken {}
+
+impl !Sync for WaitToken {}
+
+pub fn tokens() -> (WaitToken, SignalToken) {
+ let inner = Arc::new(Inner {
+ thread: thread::current(),
+ woken: AtomicBool::new(false),
+ });
+ let wait_token = WaitToken {
+ inner: inner.clone(),
+ };
+ let signal_token = SignalToken {
+ inner: inner
+ };
+ (wait_token, signal_token)
+}
+
+impl SignalToken {
+ pub fn signal(&self) -> bool {
+ let wake = !self.inner.woken.compare_and_swap(false, true, Ordering::SeqCst);
+ if wake {
+ self.inner.thread.unpark();
+ }
+ wake
+ }
+
+ /// Convert to an unsafe usize value. Useful for storing in a pipe's state
+ /// flag.
+ #[inline]
+ pub unsafe fn cast_to_usize(self) -> usize {
+ mem::transmute(self.inner)
+ }
+
+ /// Convert from an unsafe usize value. Useful for retrieving a pipe's state
+ /// flag.
+ #[inline]
+ pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
+ SignalToken { inner: mem::transmute(signal_ptr) }
+ }
+}
+
+impl WaitToken {
+ pub fn wait(self) {
+ while !self.inner.woken.load(Ordering::SeqCst) {
+ thread::park()
+ }
+ }
+
+ /// Returns true if we wake up normally, false otherwise.
+ pub fn wait_max_until(self, end: Instant) -> bool {
+ while !self.inner.woken.load(Ordering::SeqCst) {
+ let now = Instant::now();
+ if now >= end {
+ return false;
+ }
+ thread::park_timeout(end - now)
+ }
+ true
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/mod.rs b/ctr-std/src/sync/mpsc/mod.rs
new file mode 100644
index 0000000..aeeab17
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/mod.rs
@@ -0,0 +1,2614 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Multi-producer, single-consumer FIFO queue communication primitives.
+//!
+//! This module provides message-based communication over channels, concretely
+//! defined among three types:
+//!
+//! * `Sender`
+//! * `SyncSender`
+//! * `Receiver`
+//!
+//! A `Sender` or `SyncSender` is used to send data to a `Receiver`. Both
+//! senders are clone-able (multi-producer) such that many threads can send
+//! simultaneously to one receiver (single-consumer).
+//!
+//! These channels come in two flavors:
+//!
+//! 1. An asynchronous, infinitely buffered channel. The `channel()` function
+//! will return a `(Sender, Receiver)` tuple where all sends will be
+//! **asynchronous** (they never block). The channel conceptually has an
+//! infinite buffer.
+//!
+//! 2. A synchronous, bounded channel. The `sync_channel()` function will return
+//! a `(SyncSender, Receiver)` tuple where the storage for pending messages
+//! is a pre-allocated buffer of a fixed size. All sends will be
+//! **synchronous** by blocking until there is buffer space available. Note
+//! that a bound of 0 is allowed, causing the channel to become a
+//! "rendezvous" channel where each sender atomically hands off a message to
+//! a receiver.
+//!
+//! ## Disconnection
+//!
+//! The send and receive operations on channels will all return a `Result`
+//! indicating whether the operation succeeded or not. An unsuccessful operation
+//! is normally indicative of the other half of a channel having "hung up" by
+//! being dropped in its corresponding thread.
+//!
+//! Once half of a channel has been deallocated, most operations can no longer
+//! continue to make progress, so `Err` will be returned. Many applications will
+//! continue to `unwrap()` the results returned from this module, instigating a
+//! propagation of failure among threads if one unexpectedly dies.
+//!
+//! # Examples
+//!
+//! Simple usage:
+//!
+//! ```
+//! use std::thread;
+//! use std::sync::mpsc::channel;
+//!
+//! // Create a simple streaming channel
+//! let (tx, rx) = channel();
+//! thread::spawn(move|| {
+//! tx.send(10).unwrap();
+//! });
+//! assert_eq!(rx.recv().unwrap(), 10);
+//! ```
+//!
+//! Shared usage:
+//!
+//! ```
+//! use std::thread;
+//! use std::sync::mpsc::channel;
+//!
+//! // Create a shared channel that can be sent along from many threads
+//! // where tx is the sending half (tx for transmission), and rx is the receiving
+//! // half (rx for receiving).
+//! let (tx, rx) = channel();
+//! for i in 0..10 {
+//! let tx = tx.clone();
+//! thread::spawn(move|| {
+//! tx.send(i).unwrap();
+//! });
+//! }
+//!
+//! for _ in 0..10 {
+//! let j = rx.recv().unwrap();
+//! assert!(0 <= j && j < 10);
+//! }
+//! ```
+//!
+//! Propagating panics:
+//!
+//! ```
+//! use std::sync::mpsc::channel;
+//!
+//! // The call to recv() will return an error because the channel has already
+//! // hung up (or been deallocated)
+//! let (tx, rx) = channel::<i32>();
+//! drop(tx);
+//! assert!(rx.recv().is_err());
+//! ```
+//!
+//! Synchronous channels:
+//!
+//! ```
+//! use std::thread;
+//! use std::sync::mpsc::sync_channel;
+//!
+//! let (tx, rx) = sync_channel::<i32>(0);
+//! thread::spawn(move|| {
+//! // This will wait for the parent thread to start receiving
+//! tx.send(53).unwrap();
+//! });
+//! rx.recv().unwrap();
+//! ```
+
+#![stable(feature = "rust1", since = "1.0.0")]
+
+// A description of how Rust's channel implementation works
+//
+// Channels are supposed to be the basic building block for all other
+// concurrent primitives that are used in Rust. As a result, the channel type
+// needs to be highly optimized, flexible, and broad enough for use everywhere.
+//
+// The choice of implementation of all channels is to be built on lock-free data
+// structures. The channels themselves are then consequently also lock-free data
+// structures. As always with lock-free code, this is a very "here be dragons"
+// territory, especially because I'm unaware of any academic papers that have
+// gone into great length about channels of these flavors.
+//
+// ## Flavors of channels
+//
+// From the perspective of a consumer of this library, there is only one flavor
+// of channel. This channel can be used as a stream and cloned to allow multiple
+// senders. Under the hood, however, there are actually three flavors of
+// channels in play.
+//
+// * Flavor::Oneshots - these channels are highly optimized for the one-send use
+// case. They contain as few atomics as possible and
+// involve one and exactly one allocation.
+// * Streams - these channels are optimized for the non-shared use case. They
+// use a different concurrent queue that is more tailored for this
+// use case. The initial allocation of this flavor of channel is not
+// optimized.
+// * Shared - this is the most general form of channel that this module offers,
+// a channel with multiple senders. This type is as optimized as it
+// can be, but the previous two types mentioned are much faster for
+// their use-cases.
+//
+// ## Concurrent queues
+//
+// The basic idea of Rust's Sender/Receiver types is that send() never blocks,
+// but recv() obviously blocks. This means that under the hood there must be
+// some shared and concurrent queue holding all of the actual data.
+//
+// With two flavors of channels, two flavors of queues are also used. We have
+// chosen to use queues from a well-known author that are abbreviated as SPSC
+// and MPSC (single producer, single consumer and multiple producer, single
+// consumer). SPSC queues are used for streams while MPSC queues are used for
+// shared channels.
+//
+// ### SPSC optimizations
+//
+// The SPSC queue found online is essentially a linked list of nodes where one
+// half of the nodes are the "queue of data" and the other half of nodes are a
+// cache of unused nodes. The unused nodes are used such that an allocation is
+// not required on every push() and a free doesn't need to happen on every
+// pop().
+//
+// As found online, however, the cache of nodes is of an infinite size. This
+// means that if a channel at one point in its life had 50k items in the queue,
+// then the queue will always have the capacity for 50k items. I believed that
+// this was an unnecessary limitation of the implementation, so I have altered
+// the queue to optionally have a bound on the cache size.
+//
+// By default, streams will have an unbounded SPSC queue with a small-ish cache
+// size. The hope is that the cache is still large enough to have very fast
+// send() operations while not too large such that millions of channels can
+// coexist at once.
+//
+// ### MPSC optimizations
+//
+// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
+// a linked list under the hood to earn its unboundedness, but I have not put
+// forth much effort into having a cache of nodes similar to the SPSC queue.
+//
+// For now, I believe that this is "ok" because shared channels are not the most
+// common type, but soon we may wish to revisit this queue choice and determine
+// another candidate for backend storage of shared channels.
+//
+// ## Overview of the Implementation
+//
+// Now that there's a little background on the concurrent queues used, it's
+// worth going into much more detail about the channels themselves. The basic
+// pseudocode for a send/recv are:
+//
+//
+// send(t) recv()
+// queue.push(t) return if queue.pop()
+// if increment() == -1 deschedule {
+// wakeup() if decrement() > 0
+// cancel_deschedule()
+// }
+// queue.pop()
+//
+// As mentioned before, there are no locks in this implementation, only atomic
+// instructions are used.
+//
+// ### The internal atomic counter
+//
+// Every channel has a shared counter with each half to keep track of the size
+// of the queue. This counter is used to abort descheduling by the receiver and
+// to know when to wake up on the sending side.
+//
+// As seen in the pseudocode, senders will increment this count and receivers
+// will decrement the count. The theory behind this is that if a sender sees a
+// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
+// then it doesn't need to block.
+//
+// The recv() method has a beginning call to pop(), and if successful, it needs
+// to decrement the count. It is a crucial implementation detail that this
+// decrement does *not* happen to the shared counter. If this were the case,
+// then it would be possible for the counter to be very negative when there were
+// no receivers waiting, in which case the senders would have to determine when
+// it was actually appropriate to wake up a receiver.
+//
+// Instead, the "steal count" is kept track of separately (not atomically
+// because it's only used by receivers), and then the decrement() call when
+// descheduling will lump in all of the recent steals into one large decrement.
+//
+// The implication of this is that if a sender sees a -1 count, then there's
+// guaranteed to be a waiter waiting!
+//
+// ## Native Implementation
+//
+// A major goal of these channels is to work seamlessly on and off the runtime.
+// All of the previous race conditions have been worded in terms of
+// scheduler-isms (which is obviously not available without the runtime).
+//
+// For now, native usage of channels (off the runtime) will fall back onto
+// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
+// is still entirely lock-free, the "deschedule" blocks above are surrounded by
+// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
+// condition variable.
+//
+// ## Select
+//
+// Being able to support selection over channels has greatly influenced this
+// design, and not only does selection need to work inside the runtime, but also
+// outside the runtime.
+//
+// The implementation is fairly straightforward. The goal of select() is not to
+// return some data, but only to return which channel can receive data without
+// blocking. The implementation is essentially the entire blocking procedure
+// followed by an increment as soon as its woken up. The cancellation procedure
+// involves an increment and swapping out of to_wake to acquire ownership of the
+// thread to unblock.
+//
+// Sadly this current implementation requires multiple allocations, so I have
+// seen the throughput of select() be much worse than it should be. I do not
+// believe that there is anything fundamental that needs to change about these
+// channels, however, in order to support a more efficient select().
+//
+// # Conclusion
+//
+// And now that you've seen all the races that I found and attempted to fix,
+// here's the code for you to find some more!
+
+use sync::Arc;
+use error;
+use fmt;
+use mem;
+use cell::UnsafeCell;
+use time::{Duration, Instant};
+
+#[unstable(feature = "mpsc_select", issue = "27800")]
+pub use self::select::{Select, Handle};
+use self::select::StartResult;
+use self::select::StartResult::*;
+use self::blocking::SignalToken;
+
+mod blocking;
+mod oneshot;
+mod select;
+mod shared;
+mod stream;
+mod sync;
+mod mpsc_queue;
+mod spsc_queue;
+
+/// The receiving-half of Rust's channel type. This half can only be owned by
+/// one thread
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct Receiver<T> {
+ inner: UnsafeCell<Flavor<T>>,
+}
+
+// The receiver port can be sent from place to place, so long as it
+// is not used to receive non-sendable things.
+#[stable(feature = "rust1", since = "1.0.0")]
+unsafe impl<T: Send> Send for Receiver<T> { }
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> !Sync for Receiver<T> { }
+
+/// An iterator over messages on a receiver, this iterator will block
+/// whenever `next` is called, waiting for a new message, and `None` will be
+/// returned when the corresponding channel has hung up.
+#[stable(feature = "rust1", since = "1.0.0")]
+#[derive(Debug)]
+pub struct Iter<'a, T: 'a> {
+ rx: &'a Receiver<T>
+}
+
+/// An iterator that attempts to yield all pending values for a receiver.
+/// `None` will be returned when there are no pending values remaining or
+/// if the corresponding channel has hung up.
+///
+/// This Iterator will never block the caller in order to wait for data to
+/// become available. Instead, it will return `None`.
+#[stable(feature = "receiver_try_iter", since = "1.15.0")]
+#[derive(Debug)]
+pub struct TryIter<'a, T: 'a> {
+ rx: &'a Receiver<T>
+}
+
+/// An owning iterator over messages on a receiver, this iterator will block
+/// whenever `next` is called, waiting for a new message, and `None` will be
+/// returned when the corresponding channel has hung up.
+#[stable(feature = "receiver_into_iter", since = "1.1.0")]
+#[derive(Debug)]
+pub struct IntoIter<T> {
+ rx: Receiver<T>
+}
+
+/// The sending-half of Rust's asynchronous channel type. This half can only be
+/// owned by one thread, but it can be cloned to send to other threads.
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct Sender<T> {
+ inner: UnsafeCell<Flavor<T>>,
+}
+
+// The send port can be sent from place to place, so long as it
+// is not used to send non-sendable things.
+#[stable(feature = "rust1", since = "1.0.0")]
+unsafe impl<T: Send> Send for Sender<T> { }
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> !Sync for Sender<T> { }
+
+/// The sending-half of Rust's synchronous channel type. This half can only be
+/// owned by one thread, but it can be cloned to send to other threads.
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct SyncSender<T> {
+ inner: Arc<sync::Packet<T>>,
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+unsafe impl<T: Send> Send for SyncSender<T> {}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> !Sync for SyncSender<T> {}
+
+/// An error returned from the `send` function on channels.
+///
+/// A `send` operation can only fail if the receiving end of a channel is
+/// disconnected, implying that the data could never be received. The error
+/// contains the data being sent as a payload so it can be recovered.
+#[stable(feature = "rust1", since = "1.0.0")]
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub struct SendError<T>(#[stable(feature = "rust1", since = "1.0.0")] pub T);
+
+/// An error returned from the `recv` function on a `Receiver`.
+///
+/// The `recv` operation can only fail if the sending half of a channel is
+/// disconnected, implying that no further messages will ever be received.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct RecvError;
+
+/// This enumeration is the list of the possible reasons that `try_recv` could
+/// not return data when called.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[stable(feature = "rust1", since = "1.0.0")]
+pub enum TryRecvError {
+ /// This channel is currently empty, but the sender(s) have not yet
+ /// disconnected, so data may yet become available.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ Empty,
+
+ /// This channel's sending half has become disconnected, and there will
+ /// never be any more data received on this channel
+ #[stable(feature = "rust1", since = "1.0.0")]
+ Disconnected,
+}
+
+/// This enumeration is the list of possible errors that `recv_timeout` could
+/// not return data when called.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+#[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
+pub enum RecvTimeoutError {
+ /// This channel is currently empty, but the sender(s) have not yet
+ /// disconnected, so data may yet become available.
+ #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
+ Timeout,
+ /// This channel's sending half has become disconnected, and there will
+ /// never be any more data received on this channel
+ #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
+ Disconnected,
+}
+
+/// This enumeration is the list of the possible error outcomes for the
+/// `SyncSender::try_send` method.
+#[stable(feature = "rust1", since = "1.0.0")]
+#[derive(PartialEq, Eq, Clone, Copy)]
+pub enum TrySendError<T> {
+ /// The data could not be sent on the channel because it would require that
+ /// the callee block to send the data.
+ ///
+ /// If this is a buffered channel, then the buffer is full at this time. If
+ /// this is not a buffered channel, then there is no receiver available to
+ /// acquire the data.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ Full(#[stable(feature = "rust1", since = "1.0.0")] T),
+
+ /// This channel's receiving half has disconnected, so the data could not be
+ /// sent. The data is returned back to the callee in this case.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ Disconnected(#[stable(feature = "rust1", since = "1.0.0")] T),
+}
+
+enum Flavor<T> {
+ Oneshot(Arc<oneshot::Packet<T>>),
+ Stream(Arc<stream::Packet<T>>),
+ Shared(Arc<shared::Packet<T>>),
+ Sync(Arc<sync::Packet<T>>),
+}
+
+#[doc(hidden)]
+trait UnsafeFlavor<T> {
+ fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>>;
+ unsafe fn inner_mut(&self) -> &mut Flavor<T> {
+ &mut *self.inner_unsafe().get()
+ }
+ unsafe fn inner(&self) -> &Flavor<T> {
+ &*self.inner_unsafe().get()
+ }
+}
+impl<T> UnsafeFlavor<T> for Sender<T> {
+ fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
+ &self.inner
+ }
+}
+impl<T> UnsafeFlavor<T> for Receiver<T> {
+ fn inner_unsafe(&self) -> &UnsafeCell<Flavor<T>> {
+ &self.inner
+ }
+}
+
+/// Creates a new asynchronous channel, returning the sender/receiver halves.
+/// All data sent on the sender will become available on the receiver, and no
+/// send will block the calling thread (this channel has an "infinite buffer").
+///
+/// If the [`Receiver`] is disconnected while trying to [`send()`] with the
+/// [`Sender`], the [`send()`] method will return an error.
+///
+/// [`send()`]: ../../../std/sync/mpsc/struct.Sender.html#method.send
+/// [`Sender`]: ../../../std/sync/mpsc/struct.Sender.html
+/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::mpsc::channel;
+/// use std::thread;
+///
+/// // tx is the sending half (tx for transmission), and rx is the receiving
+/// // half (rx for receiving).
+/// let (tx, rx) = channel();
+///
+/// // Spawn off an expensive computation
+/// thread::spawn(move|| {
+/// # fn expensive_computation() {}
+/// tx.send(expensive_computation()).unwrap();
+/// });
+///
+/// // Do some useful work for awhile
+///
+/// // Let's see what that answer was
+/// println!("{:?}", rx.recv().unwrap());
+/// ```
+#[stable(feature = "rust1", since = "1.0.0")]
+pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
+ let a = Arc::new(oneshot::Packet::new());
+ (Sender::new(Flavor::Oneshot(a.clone())), Receiver::new(Flavor::Oneshot(a)))
+}
+
+/// Creates a new synchronous, bounded channel.
+///
+/// Like asynchronous channels, the [`Receiver`] will block until a message
+/// becomes available. These channels differ greatly in the semantics of the
+/// sender from asynchronous channels, however.
+///
+/// This channel has an internal buffer on which messages will be queued.
+/// `bound` specifies the buffer size. When the internal buffer becomes full,
+/// future sends will *block* waiting for the buffer to open up. Note that a
+/// buffer size of 0 is valid, in which case this becomes "rendezvous channel"
+/// where each [`send()`] will not return until a recv is paired with it.
+///
+/// Like asynchronous channels, if the [`Receiver`] is disconnected while
+/// trying to [`send()`] with the [`SyncSender`], the [`send()`] method will
+/// return an error.
+///
+/// [`send()`]: ../../../std/sync/mpsc/struct.SyncSender.html#method.send
+/// [`SyncSender`]: ../../../std/sync/mpsc/struct.SyncSender.html
+/// [`Receiver`]: ../../../std/sync/mpsc/struct.Receiver.html
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::mpsc::sync_channel;
+/// use std::thread;
+///
+/// let (tx, rx) = sync_channel(1);
+///
+/// // this returns immediately
+/// tx.send(1).unwrap();
+///
+/// thread::spawn(move|| {
+/// // this will block until the previous message has been received
+/// tx.send(2).unwrap();
+/// });
+///
+/// assert_eq!(rx.recv().unwrap(), 1);
+/// assert_eq!(rx.recv().unwrap(), 2);
+/// ```
+#[stable(feature = "rust1", since = "1.0.0")]
+pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
+ let a = Arc::new(sync::Packet::new(bound));
+ (SyncSender::new(a.clone()), Receiver::new(Flavor::Sync(a)))
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Sender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Sender<T> {
+ fn new(inner: Flavor<T>) -> Sender<T> {
+ Sender {
+ inner: UnsafeCell::new(inner),
+ }
+ }
+
+ /// Attempts to send a value on this channel, returning it back if it could
+ /// not be sent.
+ ///
+ /// A successful send occurs when it is determined that the other end of
+ /// the channel has not hung up already. An unsuccessful send would be one
+ /// where the corresponding receiver has already been deallocated. Note
+ /// that a return value of `Err` means that the data will never be
+ /// received, but a return value of `Ok` does *not* mean that the data
+ /// will be received. It is possible for the corresponding receiver to
+ /// hang up immediately after this function returns `Ok`.
+ ///
+ /// This method will never block the current thread.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::mpsc::channel;
+ ///
+ /// let (tx, rx) = channel();
+ ///
+ /// // This send is always successful
+ /// tx.send(1).unwrap();
+ ///
+ /// // This send will fail because the receiver is gone
+ /// drop(rx);
+ /// assert_eq!(tx.send(1).unwrap_err().0, 1);
+ /// ```
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ let (new_inner, ret) = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ if !p.sent() {
+ return p.send(t).map_err(SendError);
+ } else {
+ let a = Arc::new(stream::Packet::new());
+ let rx = Receiver::new(Flavor::Stream(a.clone()));
+ match p.upgrade(rx) {
+ oneshot::UpSuccess => {
+ let ret = a.send(t);
+ (a, ret)
+ }
+ oneshot::UpDisconnected => (a, Err(t)),
+ oneshot::UpWoke(token) => {
+ // This send cannot panic because the thread is
+ // asleep (we're looking at it), so the receiver
+ // can't go away.
+ a.send(t).ok().unwrap();
+ token.signal();
+ (a, Ok(()))
+ }
+ }
+ }
+ }
+ Flavor::Stream(ref p) => return p.send(t).map_err(SendError),
+ Flavor::Shared(ref p) => return p.send(t).map_err(SendError),
+ Flavor::Sync(..) => unreachable!(),
+ };
+
+ unsafe {
+ let tmp = Sender::new(Flavor::Stream(new_inner));
+ mem::swap(self.inner_mut(), tmp.inner_mut());
+ }
+ ret.map_err(SendError)
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> Clone for Sender<T> {
+ fn clone(&self) -> Sender<T> {
+ let packet = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ let a = Arc::new(shared::Packet::new());
+ {
+ let guard = a.postinit_lock();
+ let rx = Receiver::new(Flavor::Shared(a.clone()));
+ let sleeper = match p.upgrade(rx) {
+ oneshot::UpSuccess |
+ oneshot::UpDisconnected => None,
+ oneshot::UpWoke(task) => Some(task),
+ };
+ a.inherit_blocker(sleeper, guard);
+ }
+ a
+ }
+ Flavor::Stream(ref p) => {
+ let a = Arc::new(shared::Packet::new());
+ {
+ let guard = a.postinit_lock();
+ let rx = Receiver::new(Flavor::Shared(a.clone()));
+ let sleeper = match p.upgrade(rx) {
+ stream::UpSuccess |
+ stream::UpDisconnected => None,
+ stream::UpWoke(task) => Some(task),
+ };
+ a.inherit_blocker(sleeper, guard);
+ }
+ a
+ }
+ Flavor::Shared(ref p) => {
+ p.clone_chan();
+ return Sender::new(Flavor::Shared(p.clone()));
+ }
+ Flavor::Sync(..) => unreachable!(),
+ };
+
+ unsafe {
+ let tmp = Sender::new(Flavor::Shared(packet.clone()));
+ mem::swap(self.inner_mut(), tmp.inner_mut());
+ }
+ Sender::new(Flavor::Shared(packet))
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> Drop for Sender<T> {
+ fn drop(&mut self) {
+ match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => p.drop_chan(),
+ Flavor::Stream(ref p) => p.drop_chan(),
+ Flavor::Shared(ref p) => p.drop_chan(),
+ Flavor::Sync(..) => unreachable!(),
+ }
+ }
+}
+
+#[stable(feature = "mpsc_debug", since = "1.7.0")]
+impl<T> fmt::Debug for Sender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Sender {{ .. }}")
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// SyncSender
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> SyncSender<T> {
+ fn new(inner: Arc<sync::Packet<T>>) -> SyncSender<T> {
+ SyncSender { inner: inner }
+ }
+
+ /// Sends a value on this synchronous channel.
+ ///
+ /// This function will *block* until space in the internal buffer becomes
+ /// available or a receiver is available to hand off the message to.
+ ///
+ /// Note that a successful send does *not* guarantee that the receiver will
+ /// ever see the data if there is a buffer on this channel. Items may be
+ /// enqueued in the internal buffer for the receiver to receive at a later
+ /// time. If the buffer size is 0, however, it can be guaranteed that the
+ /// receiver has indeed received the data if this function returns success.
+ ///
+ /// This function will never panic, but it may return `Err` if the
+ /// `Receiver` has disconnected and is no longer able to receive
+ /// information.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn send(&self, t: T) -> Result<(), SendError<T>> {
+ self.inner.send(t).map_err(SendError)
+ }
+
+ /// Attempts to send a value on this channel without blocking.
+ ///
+ /// This method differs from `send` by returning immediately if the
+ /// channel's buffer is full or no receiver is waiting to acquire some
+ /// data. Compared with `send`, this function has two failure cases
+ /// instead of one (one for disconnection, one for a full buffer).
+ ///
+ /// See `SyncSender::send` for notes about guarantees of whether the
+ /// receiver has received the data or not if this function is successful.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
+ self.inner.try_send(t)
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> Clone for SyncSender<T> {
+ fn clone(&self) -> SyncSender<T> {
+ self.inner.clone_chan();
+ SyncSender::new(self.inner.clone())
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> Drop for SyncSender<T> {
+ fn drop(&mut self) {
+ self.inner.drop_chan();
+ }
+}
+
+#[stable(feature = "mpsc_debug", since = "1.7.0")]
+impl<T> fmt::Debug for SyncSender<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "SyncSender {{ .. }}")
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Receiver
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Receiver<T> {
+ fn new(inner: Flavor<T>) -> Receiver<T> {
+ Receiver { inner: UnsafeCell::new(inner) }
+ }
+
+ /// Attempts to return a pending value on this receiver without blocking
+ ///
+ /// This method will never block the caller in order to wait for data to
+ /// become available. Instead, this will always return immediately with a
+ /// possible option of pending data on the channel.
+ ///
+ /// This is useful for a flavor of "optimistic check" before deciding to
+ /// block on a receiver.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn try_recv(&self) -> Result<T, TryRecvError> {
+ loop {
+ let new_port = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Empty) => return Err(TryRecvError::Empty),
+ Err(oneshot::Disconnected) => {
+ return Err(TryRecvError::Disconnected)
+ }
+ Err(oneshot::Upgraded(rx)) => rx,
+ }
+ }
+ Flavor::Stream(ref p) => {
+ match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(stream::Empty) => return Err(TryRecvError::Empty),
+ Err(stream::Disconnected) => {
+ return Err(TryRecvError::Disconnected)
+ }
+ Err(stream::Upgraded(rx)) => rx,
+ }
+ }
+ Flavor::Shared(ref p) => {
+ match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(shared::Empty) => return Err(TryRecvError::Empty),
+ Err(shared::Disconnected) => {
+ return Err(TryRecvError::Disconnected)
+ }
+ }
+ }
+ Flavor::Sync(ref p) => {
+ match p.try_recv() {
+ Ok(t) => return Ok(t),
+ Err(sync::Empty) => return Err(TryRecvError::Empty),
+ Err(sync::Disconnected) => {
+ return Err(TryRecvError::Disconnected)
+ }
+ }
+ }
+ };
+ unsafe {
+ mem::swap(self.inner_mut(),
+ new_port.inner_mut());
+ }
+ }
+ }
+
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent. Once a message is
+ /// sent to the corresponding `Sender`, then this receiver will wake up and
+ /// return that message.
+ ///
+ /// If the corresponding `Sender` has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return `Err` to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::mpsc;
+ /// use std::thread;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// });
+ ///
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(Ok(1), recv.recv());
+ /// ```
+ ///
+ /// Buffering behavior:
+ ///
+ /// ```
+ /// use std::sync::mpsc;
+ /// use std::thread;
+ /// use std::sync::mpsc::RecvError;
+ ///
+ /// let (send, recv) = mpsc::channel();
+ /// let handle = thread::spawn(move || {
+ /// send.send(1u8).unwrap();
+ /// send.send(2).unwrap();
+ /// send.send(3).unwrap();
+ /// drop(send);
+ /// });
+ ///
+ /// // wait for the thread to join so we ensure the sender is dropped
+ /// handle.join().unwrap();
+ ///
+ /// assert_eq!(Ok(1), recv.recv());
+ /// assert_eq!(Ok(2), recv.recv());
+ /// assert_eq!(Ok(3), recv.recv());
+ /// assert_eq!(Err(RecvError), recv.recv());
+ /// ```
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn recv(&self) -> Result<T, RecvError> {
+ loop {
+ let new_port = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Disconnected) => return Err(RecvError),
+ Err(oneshot::Upgraded(rx)) => rx,
+ Err(oneshot::Empty) => unreachable!(),
+ }
+ }
+ Flavor::Stream(ref p) => {
+ match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(stream::Disconnected) => return Err(RecvError),
+ Err(stream::Upgraded(rx)) => rx,
+ Err(stream::Empty) => unreachable!(),
+ }
+ }
+ Flavor::Shared(ref p) => {
+ match p.recv(None) {
+ Ok(t) => return Ok(t),
+ Err(shared::Disconnected) => return Err(RecvError),
+ Err(shared::Empty) => unreachable!(),
+ }
+ }
+ Flavor::Sync(ref p) => return p.recv(None).map_err(|_| RecvError),
+ };
+ unsafe {
+ mem::swap(self.inner_mut(), new_port.inner_mut());
+ }
+ }
+ }
+
+ /// Attempts to wait for a value on this receiver, returning an error if the
+ /// corresponding channel has hung up, or if it waits more than `timeout`.
+ ///
+ /// This function will always block the current thread if there is no data
+ /// available and it's possible for more data to be sent. Once a message is
+ /// sent to the corresponding `Sender`, then this receiver will wake up and
+ /// return that message.
+ ///
+ /// If the corresponding `Sender` has disconnected, or it disconnects while
+ /// this call is blocking, this call will wake up and return `Err` to
+ /// indicate that no more messages can ever be received on this channel.
+ /// However, since channels are buffered, messages sent before the disconnect
+ /// will still be properly received.
+ ///
+ /// # Examples
+ ///
+ /// ```no_run
+ /// use std::sync::mpsc::{self, RecvTimeoutError};
+ /// use std::time::Duration;
+ ///
+ /// let (send, recv) = mpsc::channel::<()>();
+ ///
+ /// let timeout = Duration::from_millis(100);
+ /// assert_eq!(Err(RecvTimeoutError::Timeout), recv.recv_timeout(timeout));
+ /// ```
+ #[stable(feature = "mpsc_recv_timeout", since = "1.12.0")]
+ pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
+ // Do an optimistic try_recv to avoid the performance impact of
+ // Instant::now() in the full-channel case.
+ match self.try_recv() {
+ Ok(result)
+ => Ok(result),
+ Err(TryRecvError::Disconnected)
+ => Err(RecvTimeoutError::Disconnected),
+ Err(TryRecvError::Empty)
+ => self.recv_max_until(Instant::now() + timeout)
+ }
+ }
+
+ fn recv_max_until(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
+ use self::RecvTimeoutError::*;
+
+ loop {
+ let port_or_empty = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(oneshot::Disconnected) => return Err(Disconnected),
+ Err(oneshot::Upgraded(rx)) => Some(rx),
+ Err(oneshot::Empty) => None,
+ }
+ }
+ Flavor::Stream(ref p) => {
+ match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(stream::Disconnected) => return Err(Disconnected),
+ Err(stream::Upgraded(rx)) => Some(rx),
+ Err(stream::Empty) => None,
+ }
+ }
+ Flavor::Shared(ref p) => {
+ match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(shared::Disconnected) => return Err(Disconnected),
+ Err(shared::Empty) => None,
+ }
+ }
+ Flavor::Sync(ref p) => {
+ match p.recv(Some(deadline)) {
+ Ok(t) => return Ok(t),
+ Err(sync::Disconnected) => return Err(Disconnected),
+ Err(sync::Empty) => None,
+ }
+ }
+ };
+
+ if let Some(new_port) = port_or_empty {
+ unsafe {
+ mem::swap(self.inner_mut(), new_port.inner_mut());
+ }
+ }
+
+ // If we're already passed the deadline, and we're here without
+ // data, return a timeout, else try again.
+ if Instant::now() >= deadline {
+ return Err(Timeout);
+ }
+ }
+ }
+
+ /// Returns an iterator that will block waiting for messages, but never
+ /// `panic!`. It will return `None` when the channel has hung up.
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn iter(&self) -> Iter<T> {
+ Iter { rx: self }
+ }
+
+ /// Returns an iterator that will attempt to yield all pending values.
+ /// It will return `None` if there are no more pending values or if the
+ /// channel has hung up. The iterator will never `panic!` or block the
+ /// user by waiting for values.
+ #[stable(feature = "receiver_try_iter", since = "1.15.0")]
+ pub fn try_iter(&self) -> TryIter<T> {
+ TryIter { rx: self }
+ }
+
+}
+
+impl<T> select::Packet for Receiver<T> {
+ fn can_recv(&self) -> bool {
+ loop {
+ let new_port = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ match p.can_recv() {
+ Ok(ret) => return ret,
+ Err(upgrade) => upgrade,
+ }
+ }
+ Flavor::Stream(ref p) => {
+ match p.can_recv() {
+ Ok(ret) => return ret,
+ Err(upgrade) => upgrade,
+ }
+ }
+ Flavor::Shared(ref p) => return p.can_recv(),
+ Flavor::Sync(ref p) => return p.can_recv(),
+ };
+ unsafe {
+ mem::swap(self.inner_mut(),
+ new_port.inner_mut());
+ }
+ }
+ }
+
+ fn start_selection(&self, mut token: SignalToken) -> StartResult {
+ loop {
+ let (t, new_port) = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => {
+ match p.start_selection(token) {
+ oneshot::SelSuccess => return Installed,
+ oneshot::SelCanceled => return Abort,
+ oneshot::SelUpgraded(t, rx) => (t, rx),
+ }
+ }
+ Flavor::Stream(ref p) => {
+ match p.start_selection(token) {
+ stream::SelSuccess => return Installed,
+ stream::SelCanceled => return Abort,
+ stream::SelUpgraded(t, rx) => (t, rx),
+ }
+ }
+ Flavor::Shared(ref p) => return p.start_selection(token),
+ Flavor::Sync(ref p) => return p.start_selection(token),
+ };
+ token = t;
+ unsafe {
+ mem::swap(self.inner_mut(), new_port.inner_mut());
+ }
+ }
+ }
+
+ fn abort_selection(&self) -> bool {
+ let mut was_upgrade = false;
+ loop {
+ let result = match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => p.abort_selection(),
+ Flavor::Stream(ref p) => p.abort_selection(was_upgrade),
+ Flavor::Shared(ref p) => return p.abort_selection(was_upgrade),
+ Flavor::Sync(ref p) => return p.abort_selection(),
+ };
+ let new_port = match result { Ok(b) => return b, Err(p) => p };
+ was_upgrade = true;
+ unsafe {
+ mem::swap(self.inner_mut(),
+ new_port.inner_mut());
+ }
+ }
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<'a, T> Iterator for Iter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
+}
+
+#[stable(feature = "receiver_try_iter", since = "1.15.0")]
+impl<'a, T> Iterator for TryIter<'a, T> {
+ type Item = T;
+
+ fn next(&mut self) -> Option<T> { self.rx.try_recv().ok() }
+}
+
+#[stable(feature = "receiver_into_iter", since = "1.1.0")]
+impl<'a, T> IntoIterator for &'a Receiver<T> {
+ type Item = T;
+ type IntoIter = Iter<'a, T>;
+
+ fn into_iter(self) -> Iter<'a, T> { self.iter() }
+}
+
+#[stable(feature = "receiver_into_iter", since = "1.1.0")]
+impl<T> Iterator for IntoIter<T> {
+ type Item = T;
+ fn next(&mut self) -> Option<T> { self.rx.recv().ok() }
+}
+
+#[stable(feature = "receiver_into_iter", since = "1.1.0")]
+impl <T> IntoIterator for Receiver<T> {
+ type Item = T;
+ type IntoIter = IntoIter<T>;
+
+ fn into_iter(self) -> IntoIter<T> {
+ IntoIter { rx: self }
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> Drop for Receiver<T> {
+ fn drop(&mut self) {
+ match *unsafe { self.inner() } {
+ Flavor::Oneshot(ref p) => p.drop_port(),
+ Flavor::Stream(ref p) => p.drop_port(),
+ Flavor::Shared(ref p) => p.drop_port(),
+ Flavor::Sync(ref p) => p.drop_port(),
+ }
+ }
+}
+
+#[stable(feature = "mpsc_debug", since = "1.7.0")]
+impl<T> fmt::Debug for Receiver<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Receiver {{ .. }}")
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> fmt::Debug for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ "SendError(..)".fmt(f)
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> fmt::Display for SendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ "sending on a closed channel".fmt(f)
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T: Send> error::Error for SendError<T> {
+ fn description(&self) -> &str {
+ "sending on a closed channel"
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ None
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> fmt::Debug for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ TrySendError::Full(..) => "Full(..)".fmt(f),
+ TrySendError::Disconnected(..) => "Disconnected(..)".fmt(f),
+ }
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T> fmt::Display for TrySendError<T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ TrySendError::Full(..) => {
+ "sending on a full channel".fmt(f)
+ }
+ TrySendError::Disconnected(..) => {
+ "sending on a closed channel".fmt(f)
+ }
+ }
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl<T: Send> error::Error for TrySendError<T> {
+
+ fn description(&self) -> &str {
+ match *self {
+ TrySendError::Full(..) => {
+ "sending on a full channel"
+ }
+ TrySendError::Disconnected(..) => {
+ "sending on a closed channel"
+ }
+ }
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ None
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl fmt::Display for RecvError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ "receiving on a closed channel".fmt(f)
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl error::Error for RecvError {
+
+ fn description(&self) -> &str {
+ "receiving on a closed channel"
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ None
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl fmt::Display for TryRecvError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ TryRecvError::Empty => {
+ "receiving on an empty channel".fmt(f)
+ }
+ TryRecvError::Disconnected => {
+ "receiving on a closed channel".fmt(f)
+ }
+ }
+ }
+}
+
+#[stable(feature = "rust1", since = "1.0.0")]
+impl error::Error for TryRecvError {
+
+ fn description(&self) -> &str {
+ match *self {
+ TryRecvError::Empty => {
+ "receiving on an empty channel"
+ }
+ TryRecvError::Disconnected => {
+ "receiving on a closed channel"
+ }
+ }
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ None
+ }
+}
+
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+impl fmt::Display for RecvTimeoutError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ RecvTimeoutError::Timeout => {
+ "timed out waiting on channel".fmt(f)
+ }
+ RecvTimeoutError::Disconnected => {
+ "channel is empty and sending half is closed".fmt(f)
+ }
+ }
+ }
+}
+
+#[stable(feature = "mpsc_recv_timeout_error", since = "1.14.0")]
+impl error::Error for RecvTimeoutError {
+ fn description(&self) -> &str {
+ match *self {
+ RecvTimeoutError::Timeout => {
+ "timed out waiting on channel"
+ }
+ RecvTimeoutError::Disconnected => {
+ "channel is empty and sending half is closed"
+ }
+ }
+ }
+
+ fn cause(&self) -> Option<&error::Error> {
+ None
+ }
+}
+
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use env;
+ use super::*;
+ use thread;
+ use time::{Duration, Instant};
+
+ pub fn stress_factor() -> usize {
+ match env::var("RUST_TEST_STRESS") {
+ Ok(val) => val.parse().unwrap(),
+ Err(..) => 1,
+ }
+ }
+
+ #[test]
+ fn smoke() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn drop_full() {
+ let (tx, _rx) = channel::<Box<isize>>();
+ tx.send(box 1).unwrap();
+ }
+
+ #[test]
+ fn drop_full_shared() {
+ let (tx, _rx) = channel::<Box<isize>>();
+ drop(tx.clone());
+ drop(tx.clone());
+ tx.send(box 1).unwrap();
+ }
+
+ #[test]
+ fn smoke_shared() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ let tx = tx.clone();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn smoke_threads() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move|| {
+ tx.send(1).unwrap();
+ });
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn smoke_port_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(1).is_err());
+ }
+
+ #[test]
+ fn smoke_shared_port_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(1).is_err())
+ }
+
+ #[test]
+ fn smoke_shared_port_gone2() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ let tx2 = tx.clone();
+ drop(tx);
+ assert!(tx2.send(1).is_err());
+ }
+
+ #[test]
+ fn port_gone_concurrent() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() {}
+ }
+
+ #[test]
+ fn port_gone_concurrent_shared() {
+ let (tx, rx) = channel::<i32>();
+ let tx2 = tx.clone();
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
+ }
+
+ #[test]
+ fn smoke_chan_gone() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn smoke_chan_gone_shared() {
+ let (tx, rx) = channel::<()>();
+ let tx2 = tx.clone();
+ drop(tx);
+ drop(tx2);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn chan_gone_concurrent() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move|| {
+ tx.send(1).unwrap();
+ tx.send(1).unwrap();
+ });
+ while rx.recv().is_ok() {}
+ }
+
+ #[test]
+ fn stress() {
+ let (tx, rx) = channel::<i32>();
+ let t = thread::spawn(move|| {
+ for _ in 0..10000 { tx.send(1).unwrap(); }
+ });
+ for _ in 0..10000 {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ t.join().ok().unwrap();
+ }
+
+ #[test]
+ fn stress_shared() {
+ const AMT: u32 = 10000;
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = channel::<i32>();
+
+ let t = thread::spawn(move|| {
+ for _ in 0..AMT * NTHREADS {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ match rx.try_recv() {
+ Ok(..) => panic!(),
+ _ => {}
+ }
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ for _ in 0..AMT { tx.send(1).unwrap(); }
+ });
+ }
+ drop(tx);
+ t.join().ok().unwrap();
+ }
+
+ #[test]
+ fn send_from_outside_runtime() {
+ let (tx1, rx1) = channel::<()>();
+ let (tx2, rx2) = channel::<i32>();
+ let t1 = thread::spawn(move|| {
+ tx1.send(()).unwrap();
+ for _ in 0..40 {
+ assert_eq!(rx2.recv().unwrap(), 1);
+ }
+ });
+ rx1.recv().unwrap();
+ let t2 = thread::spawn(move|| {
+ for _ in 0..40 {
+ tx2.send(1).unwrap();
+ }
+ });
+ t1.join().ok().unwrap();
+ t2.join().ok().unwrap();
+ }
+
+ #[test]
+ fn recv_from_outside_runtime() {
+ let (tx, rx) = channel::<i32>();
+ let t = thread::spawn(move|| {
+ for _ in 0..40 {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ });
+ for _ in 0..40 {
+ tx.send(1).unwrap();
+ }
+ t.join().ok().unwrap();
+ }
+
+ #[test]
+ fn no_runtime() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let t1 = thread::spawn(move|| {
+ assert_eq!(rx1.recv().unwrap(), 1);
+ tx2.send(2).unwrap();
+ });
+ let t2 = thread::spawn(move|| {
+ tx1.send(1).unwrap();
+ assert_eq!(rx2.recv().unwrap(), 2);
+ });
+ t1.join().ok().unwrap();
+ t2.join().ok().unwrap();
+ }
+
+ #[test]
+ fn oneshot_single_thread_close_port_first() {
+ // Simple test of closing without sending
+ let (_tx, rx) = channel::<i32>();
+ drop(rx);
+ }
+
+ #[test]
+ fn oneshot_single_thread_close_chan_first() {
+ // Simple test of closing without sending
+ let (tx, _rx) = channel::<i32>();
+ drop(tx);
+ }
+
+ #[test]
+ fn oneshot_single_thread_send_port_close() {
+ // Testing that the sender cleans up the payload if receiver is closed
+ let (tx, rx) = channel::<Box<i32>>();
+ drop(rx);
+ assert!(tx.send(box 0).is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_recv_chan_close() {
+ // Receiving on a closed chan will panic
+ let res = thread::spawn(move|| {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ rx.recv().unwrap();
+ }).join();
+ // What is our res?
+ assert!(res.is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_send_then_recv() {
+ let (tx, rx) = channel::<Box<i32>>();
+ tx.send(box 10).unwrap();
+ assert!(rx.recv().unwrap() == box 10);
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_send_open() {
+ let (tx, rx) = channel::<i32>();
+ assert!(tx.send(10).is_ok());
+ assert!(rx.recv().unwrap() == 10);
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_send_closed() {
+ let (tx, rx) = channel::<i32>();
+ drop(rx);
+ assert!(tx.send(10).is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_recv_open() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(10).unwrap();
+ assert!(rx.recv() == Ok(10));
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_recv_closed() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_data() {
+ let (tx, rx) = channel::<i32>();
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ tx.send(10).unwrap();
+ assert_eq!(rx.try_recv(), Ok(10));
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_close() {
+ let (tx, rx) = channel::<i32>();
+ drop(tx);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_open() {
+ let (_tx, rx) = channel::<i32>();
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ }
+
+ #[test]
+ fn oneshot_multi_task_recv_then_send() {
+ let (tx, rx) = channel::<Box<i32>>();
+ let _t = thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box 10);
+ });
+
+ tx.send(box 10).unwrap();
+ }
+
+ #[test]
+ fn oneshot_multi_task_recv_then_close() {
+ let (tx, rx) = channel::<Box<i32>>();
+ let _t = thread::spawn(move|| {
+ drop(tx);
+ });
+ let res = thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box 10);
+ }).join();
+ assert!(res.is_err());
+ }
+
+ #[test]
+ fn oneshot_multi_thread_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move|| {
+ drop(rx);
+ });
+ drop(tx);
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_send_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ let _t = thread::spawn(move|| {
+ drop(rx);
+ });
+ let _ = thread::spawn(move|| {
+ tx.send(1).unwrap();
+ }).join();
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_recv_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<i32>();
+ thread::spawn(move|| {
+ let res = thread::spawn(move|| {
+ rx.recv().unwrap();
+ }).join();
+ assert!(res.is_err());
+ });
+ let _t = thread::spawn(move|| {
+ thread::spawn(move|| {
+ drop(tx);
+ });
+ });
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel::<Box<isize>>();
+ let _t = thread::spawn(move|| {
+ tx.send(box 10).unwrap();
+ });
+ assert!(rx.recv().unwrap() == box 10);
+ }
+ }
+
+ #[test]
+ fn stream_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = channel();
+
+ send(tx, 0);
+ recv(rx, 0);
+
+ fn send(tx: Sender<Box<i32>>, i: i32) {
+ if i == 10 { return }
+
+ thread::spawn(move|| {
+ tx.send(box i).unwrap();
+ send(tx, i + 1);
+ });
+ }
+
+ fn recv(rx: Receiver<Box<i32>>, i: i32) {
+ if i == 10 { return }
+
+ thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box i);
+ recv(rx, i + 1);
+ });
+ }
+ }
+ }
+
+ #[test]
+ fn oneshot_single_thread_recv_timeout() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+ }
+
+ #[test]
+ fn stress_recv_timeout_two_threads() {
+ let (tx, rx) = channel();
+ let stress = stress_factor() + 100;
+ let timeout = Duration::from_millis(100);
+
+ thread::spawn(move || {
+ for i in 0..stress {
+ if i % 2 == 0 {
+ thread::sleep(timeout * 2);
+ }
+ tx.send(1usize).unwrap();
+ }
+ });
+
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(timeout) {
+ Ok(n) => {
+ assert_eq!(n, 1usize);
+ recv_count += 1;
+ }
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, stress);
+ }
+
+ #[test]
+ fn recv_timeout_upgrade() {
+ let (tx, rx) = channel::<()>();
+ let timeout = Duration::from_millis(1);
+ let _tx_clone = tx.clone();
+
+ let start = Instant::now();
+ assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
+ assert!(Instant::now() >= start + timeout);
+ }
+
+ #[test]
+ fn stress_recv_timeout_shared() {
+ let (tx, rx) = channel();
+ let stress = stress_factor() + 100;
+
+ for i in 0..stress {
+ let tx = tx.clone();
+ thread::spawn(move || {
+ thread::sleep(Duration::from_millis(i as u64 * 10));
+ tx.send(1usize).unwrap();
+ });
+ }
+
+ drop(tx);
+
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(Duration::from_millis(10)) {
+ Ok(n) => {
+ assert_eq!(n, 1usize);
+ recv_count += 1;
+ }
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, stress);
+ }
+
+ #[test]
+ fn recv_a_lot() {
+ // Regression test that we don't run out of stack in scheduler context
+ let (tx, rx) = channel();
+ for _ in 0..10000 { tx.send(()).unwrap(); }
+ for _ in 0..10000 { rx.recv().unwrap(); }
+ }
+
+ #[test]
+ fn shared_recv_timeout() {
+ let (tx, rx) = channel();
+ let total = 5;
+ for _ in 0..total {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..total { rx.recv().unwrap(); }
+
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
+ tx.send(()).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
+ }
+
+ #[test]
+ fn shared_chan_stress() {
+ let (tx, rx) = channel();
+ let total = stress_factor() + 100;
+ for _ in 0..total {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..total {
+ rx.recv().unwrap();
+ }
+ }
+
+ #[test]
+ fn test_nested_recv_iter() {
+ let (tx, rx) = channel::<i32>();
+ let (total_tx, total_rx) = channel::<i32>();
+
+ let _t = thread::spawn(move|| {
+ let mut acc = 0;
+ for x in rx.iter() {
+ acc += x;
+ }
+ total_tx.send(acc).unwrap();
+ });
+
+ tx.send(3).unwrap();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ drop(tx);
+ assert_eq!(total_rx.recv().unwrap(), 6);
+ }
+
+ #[test]
+ fn test_recv_iter_break() {
+ let (tx, rx) = channel::<i32>();
+ let (count_tx, count_rx) = channel();
+
+ let _t = thread::spawn(move|| {
+ let mut count = 0;
+ for x in rx.iter() {
+ if count >= 3 {
+ break;
+ } else {
+ count += x;
+ }
+ }
+ count_tx.send(count).unwrap();
+ });
+
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ let _ = tx.send(2);
+ drop(tx);
+ assert_eq!(count_rx.recv().unwrap(), 4);
+ }
+
+ #[test]
+ fn test_recv_try_iter() {
+ let (request_tx, request_rx) = channel();
+ let (response_tx, response_rx) = channel();
+
+ // Request `x`s until we have `6`.
+ let t = thread::spawn(move|| {
+ let mut count = 0;
+ loop {
+ for x in response_rx.try_iter() {
+ count += x;
+ if count == 6 {
+ return count;
+ }
+ }
+ request_tx.send(()).unwrap();
+ }
+ });
+
+ for _ in request_rx.iter() {
+ if response_tx.send(2).is_err() {
+ break;
+ }
+ }
+
+ assert_eq!(t.join().unwrap(), 6);
+ }
+
+ #[test]
+ fn test_recv_into_iter_owned() {
+ let mut iter = {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+
+ rx.into_iter()
+ };
+ assert_eq!(iter.next().unwrap(), 1);
+ assert_eq!(iter.next().unwrap(), 2);
+ assert_eq!(iter.next().is_none(), true);
+ }
+
+ #[test]
+ fn test_recv_into_iter_borrowed() {
+ let (tx, rx) = channel::<i32>();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ drop(tx);
+ let mut iter = (&rx).into_iter();
+ assert_eq!(iter.next().unwrap(), 1);
+ assert_eq!(iter.next().unwrap(), 2);
+ assert_eq!(iter.next().is_none(), true);
+ }
+
+ #[test]
+ fn try_recv_states() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<()>();
+ let (tx3, rx3) = channel::<()>();
+ let _t = thread::spawn(move|| {
+ rx2.recv().unwrap();
+ tx1.send(1).unwrap();
+ tx3.send(()).unwrap();
+ rx2.recv().unwrap();
+ drop(tx1);
+ tx3.send(()).unwrap();
+ });
+
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Ok(1));
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
+ }
+
+ // This bug used to end up in a livelock inside of the Receiver destructor
+ // because the internal state of the Shared packet was corrupted
+ #[test]
+ fn destroy_upgraded_shared_port_when_sender_still_active() {
+ let (tx, rx) = channel();
+ let (tx2, rx2) = channel();
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap(); // wait on a oneshot
+ drop(rx); // destroy a shared
+ tx2.send(()).unwrap();
+ });
+ // make sure the other thread has gone to sleep
+ for _ in 0..5000 { thread::yield_now(); }
+
+ // upgrade to a shared chan and send a message
+ let t = tx.clone();
+ drop(tx);
+ t.send(()).unwrap();
+
+ // wait for the child thread to exit before we exit
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn issue_32114() {
+ let (tx, _) = channel();
+ let _ = tx.send(123);
+ assert_eq!(tx.send(123), Err(SendError(123)));
+ }
+}
+
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod sync_tests {
+ use env;
+ use thread;
+ use super::*;
+ use time::Duration;
+
+ pub fn stress_factor() -> usize {
+ match env::var("RUST_TEST_STRESS") {
+ Ok(val) => val.parse().unwrap(),
+ Err(..) => 1,
+ }
+ }
+
+ #[test]
+ fn smoke() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn drop_full() {
+ let (tx, _rx) = sync_channel::<Box<isize>>(1);
+ tx.send(box 1).unwrap();
+ }
+
+ #[test]
+ fn smoke_shared() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ let tx = tx.clone();
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn recv_timeout() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Err(RecvTimeoutError::Timeout));
+ tx.send(1).unwrap();
+ assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
+ }
+
+ #[test]
+ fn smoke_threads() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ tx.send(1).unwrap();
+ });
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+
+ #[test]
+ fn smoke_port_gone() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(rx);
+ assert!(tx.send(1).is_err());
+ }
+
+ #[test]
+ fn smoke_shared_port_gone2() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(rx);
+ let tx2 = tx.clone();
+ drop(tx);
+ assert!(tx2.send(1).is_err());
+ }
+
+ #[test]
+ fn port_gone_concurrent() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() {}
+ }
+
+ #[test]
+ fn port_gone_concurrent_shared() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let tx2 = tx.clone();
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap();
+ });
+ while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
+ }
+
+ #[test]
+ fn smoke_chan_gone() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(tx);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn smoke_chan_gone_shared() {
+ let (tx, rx) = sync_channel::<()>(0);
+ let tx2 = tx.clone();
+ drop(tx);
+ drop(tx2);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn chan_gone_concurrent() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ thread::spawn(move|| {
+ tx.send(1).unwrap();
+ tx.send(1).unwrap();
+ });
+ while rx.recv().is_ok() {}
+ }
+
+ #[test]
+ fn stress() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ thread::spawn(move|| {
+ for _ in 0..10000 { tx.send(1).unwrap(); }
+ });
+ for _ in 0..10000 {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ }
+
+ #[test]
+ fn stress_recv_timeout_two_threads() {
+ let (tx, rx) = sync_channel::<i32>(0);
+
+ thread::spawn(move|| {
+ for _ in 0..10000 { tx.send(1).unwrap(); }
+ });
+
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(Duration::from_millis(1)) {
+ Ok(v) => {
+ assert_eq!(v, 1);
+ recv_count += 1;
+ },
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, 10000);
+ }
+
+ #[test]
+ fn stress_recv_timeout_shared() {
+ const AMT: u32 = 1000;
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = sync_channel::<i32>(0);
+ let (dtx, drx) = sync_channel::<()>(0);
+
+ thread::spawn(move|| {
+ let mut recv_count = 0;
+ loop {
+ match rx.recv_timeout(Duration::from_millis(10)) {
+ Ok(v) => {
+ assert_eq!(v, 1);
+ recv_count += 1;
+ },
+ Err(RecvTimeoutError::Timeout) => continue,
+ Err(RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert_eq!(recv_count, AMT * NTHREADS);
+ assert!(rx.try_recv().is_err());
+
+ dtx.send(()).unwrap();
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ for _ in 0..AMT { tx.send(1).unwrap(); }
+ });
+ }
+
+ drop(tx);
+
+ drx.recv().unwrap();
+ }
+
+ #[test]
+ fn stress_shared() {
+ const AMT: u32 = 1000;
+ const NTHREADS: u32 = 8;
+ let (tx, rx) = sync_channel::<i32>(0);
+ let (dtx, drx) = sync_channel::<()>(0);
+
+ thread::spawn(move|| {
+ for _ in 0..AMT * NTHREADS {
+ assert_eq!(rx.recv().unwrap(), 1);
+ }
+ match rx.try_recv() {
+ Ok(..) => panic!(),
+ _ => {}
+ }
+ dtx.send(()).unwrap();
+ });
+
+ for _ in 0..NTHREADS {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ for _ in 0..AMT { tx.send(1).unwrap(); }
+ });
+ }
+ drop(tx);
+ drx.recv().unwrap();
+ }
+
+ #[test]
+ fn oneshot_single_thread_close_port_first() {
+ // Simple test of closing without sending
+ let (_tx, rx) = sync_channel::<i32>(0);
+ drop(rx);
+ }
+
+ #[test]
+ fn oneshot_single_thread_close_chan_first() {
+ // Simple test of closing without sending
+ let (tx, _rx) = sync_channel::<i32>(0);
+ drop(tx);
+ }
+
+ #[test]
+ fn oneshot_single_thread_send_port_close() {
+ // Testing that the sender cleans up the payload if receiver is closed
+ let (tx, rx) = sync_channel::<Box<i32>>(0);
+ drop(rx);
+ assert!(tx.send(box 0).is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_recv_chan_close() {
+ // Receiving on a closed chan will panic
+ let res = thread::spawn(move|| {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(tx);
+ rx.recv().unwrap();
+ }).join();
+ // What is our res?
+ assert!(res.is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_send_then_recv() {
+ let (tx, rx) = sync_channel::<Box<i32>>(1);
+ tx.send(box 10).unwrap();
+ assert!(rx.recv().unwrap() == box 10);
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_send_open() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ assert_eq!(tx.try_send(10), Ok(()));
+ assert!(rx.recv().unwrap() == 10);
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_send_closed() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(rx);
+ assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_send_closed2() {
+ let (tx, _rx) = sync_channel::<i32>(0);
+ assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_recv_open() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(10).unwrap();
+ assert!(rx.recv() == Ok(10));
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_recv_closed() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(tx);
+ assert!(rx.recv().is_err());
+ }
+
+ #[test]
+ fn oneshot_single_thread_try_recv_closed_with_data() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(10).unwrap();
+ drop(tx);
+ assert_eq!(rx.try_recv(), Ok(10));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_data() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ tx.send(10).unwrap();
+ assert_eq!(rx.try_recv(), Ok(10));
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_close() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ drop(tx);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
+ }
+
+ #[test]
+ fn oneshot_single_thread_peek_open() {
+ let (_tx, rx) = sync_channel::<i32>(0);
+ assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
+ }
+
+ #[test]
+ fn oneshot_multi_task_recv_then_send() {
+ let (tx, rx) = sync_channel::<Box<i32>>(0);
+ let _t = thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box 10);
+ });
+
+ tx.send(box 10).unwrap();
+ }
+
+ #[test]
+ fn oneshot_multi_task_recv_then_close() {
+ let (tx, rx) = sync_channel::<Box<i32>>(0);
+ let _t = thread::spawn(move|| {
+ drop(tx);
+ });
+ let res = thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box 10);
+ }).join();
+ assert!(res.is_err());
+ }
+
+ #[test]
+ fn oneshot_multi_thread_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ drop(rx);
+ });
+ drop(tx);
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_send_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ drop(rx);
+ });
+ let _ = thread::spawn(move || {
+ tx.send(1).unwrap();
+ }).join();
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_recv_close_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ let res = thread::spawn(move|| {
+ rx.recv().unwrap();
+ }).join();
+ assert!(res.is_err());
+ });
+ let _t = thread::spawn(move|| {
+ thread::spawn(move|| {
+ drop(tx);
+ });
+ });
+ }
+ }
+
+ #[test]
+ fn oneshot_multi_thread_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = sync_channel::<Box<i32>>(0);
+ let _t = thread::spawn(move|| {
+ tx.send(box 10).unwrap();
+ });
+ assert!(rx.recv().unwrap() == box 10);
+ }
+ }
+
+ #[test]
+ fn stream_send_recv_stress() {
+ for _ in 0..stress_factor() {
+ let (tx, rx) = sync_channel::<Box<i32>>(0);
+
+ send(tx, 0);
+ recv(rx, 0);
+
+ fn send(tx: SyncSender<Box<i32>>, i: i32) {
+ if i == 10 { return }
+
+ thread::spawn(move|| {
+ tx.send(box i).unwrap();
+ send(tx, i + 1);
+ });
+ }
+
+ fn recv(rx: Receiver<Box<i32>>, i: i32) {
+ if i == 10 { return }
+
+ thread::spawn(move|| {
+ assert!(rx.recv().unwrap() == box i);
+ recv(rx, i + 1);
+ });
+ }
+ }
+ }
+
+ #[test]
+ fn recv_a_lot() {
+ // Regression test that we don't run out of stack in scheduler context
+ let (tx, rx) = sync_channel(10000);
+ for _ in 0..10000 { tx.send(()).unwrap(); }
+ for _ in 0..10000 { rx.recv().unwrap(); }
+ }
+
+ #[test]
+ fn shared_chan_stress() {
+ let (tx, rx) = sync_channel(0);
+ let total = stress_factor() + 100;
+ for _ in 0..total {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ tx.send(()).unwrap();
+ });
+ }
+
+ for _ in 0..total {
+ rx.recv().unwrap();
+ }
+ }
+
+ #[test]
+ fn test_nested_recv_iter() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let (total_tx, total_rx) = sync_channel::<i32>(0);
+
+ let _t = thread::spawn(move|| {
+ let mut acc = 0;
+ for x in rx.iter() {
+ acc += x;
+ }
+ total_tx.send(acc).unwrap();
+ });
+
+ tx.send(3).unwrap();
+ tx.send(1).unwrap();
+ tx.send(2).unwrap();
+ drop(tx);
+ assert_eq!(total_rx.recv().unwrap(), 6);
+ }
+
+ #[test]
+ fn test_recv_iter_break() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let (count_tx, count_rx) = sync_channel(0);
+
+ let _t = thread::spawn(move|| {
+ let mut count = 0;
+ for x in rx.iter() {
+ if count >= 3 {
+ break;
+ } else {
+ count += x;
+ }
+ }
+ count_tx.send(count).unwrap();
+ });
+
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ tx.send(2).unwrap();
+ let _ = tx.try_send(2);
+ drop(tx);
+ assert_eq!(count_rx.recv().unwrap(), 4);
+ }
+
+ #[test]
+ fn try_recv_states() {
+ let (tx1, rx1) = sync_channel::<i32>(1);
+ let (tx2, rx2) = sync_channel::<()>(1);
+ let (tx3, rx3) = sync_channel::<()>(1);
+ let _t = thread::spawn(move|| {
+ rx2.recv().unwrap();
+ tx1.send(1).unwrap();
+ tx3.send(()).unwrap();
+ rx2.recv().unwrap();
+ drop(tx1);
+ tx3.send(()).unwrap();
+ });
+
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Ok(1));
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
+ }
+
+ // This bug used to end up in a livelock inside of the Receiver destructor
+ // because the internal state of the Shared packet was corrupted
+ #[test]
+ fn destroy_upgraded_shared_port_when_sender_still_active() {
+ let (tx, rx) = sync_channel::<()>(0);
+ let (tx2, rx2) = sync_channel::<()>(0);
+ let _t = thread::spawn(move|| {
+ rx.recv().unwrap(); // wait on a oneshot
+ drop(rx); // destroy a shared
+ tx2.send(()).unwrap();
+ });
+ // make sure the other thread has gone to sleep
+ for _ in 0..5000 { thread::yield_now(); }
+
+ // upgrade to a shared chan and send a message
+ let t = tx.clone();
+ drop(tx);
+ t.send(()).unwrap();
+
+ // wait for the child thread to exit before we exit
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn send1() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| { rx.recv().unwrap(); });
+ assert_eq!(tx.send(1), Ok(()));
+ }
+
+ #[test]
+ fn send2() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| { drop(rx); });
+ assert!(tx.send(1).is_err());
+ }
+
+ #[test]
+ fn send3() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ assert_eq!(tx.send(1), Ok(()));
+ let _t =thread::spawn(move|| { drop(rx); });
+ assert!(tx.send(1).is_err());
+ }
+
+ #[test]
+ fn send4() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let tx2 = tx.clone();
+ let (done, donerx) = channel();
+ let done2 = done.clone();
+ let _t = thread::spawn(move|| {
+ assert!(tx.send(1).is_err());
+ done.send(()).unwrap();
+ });
+ let _t = thread::spawn(move|| {
+ assert!(tx2.send(2).is_err());
+ done2.send(()).unwrap();
+ });
+ drop(rx);
+ donerx.recv().unwrap();
+ donerx.recv().unwrap();
+ }
+
+ #[test]
+ fn try_send1() {
+ let (tx, _rx) = sync_channel::<i32>(0);
+ assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
+ }
+
+ #[test]
+ fn try_send2() {
+ let (tx, _rx) = sync_channel::<i32>(1);
+ assert_eq!(tx.try_send(1), Ok(()));
+ assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
+ }
+
+ #[test]
+ fn try_send3() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ assert_eq!(tx.try_send(1), Ok(()));
+ drop(rx);
+ assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
+ }
+
+ #[test]
+ fn issue_15761() {
+ fn repro() {
+ let (tx1, rx1) = sync_channel::<()>(3);
+ let (tx2, rx2) = sync_channel::<()>(3);
+
+ let _t = thread::spawn(move|| {
+ rx1.recv().unwrap();
+ tx2.try_send(()).unwrap();
+ });
+
+ tx1.try_send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ for _ in 0..100 {
+ repro()
+ }
+ }
+
+ #[test]
+ fn fmt_debug_sender() {
+ let (tx, _) = channel::<i32>();
+ assert_eq!(format!("{:?}", tx), "Sender { .. }");
+ }
+
+ #[test]
+ fn fmt_debug_recv() {
+ let (_, rx) = channel::<i32>();
+ assert_eq!(format!("{:?}", rx), "Receiver { .. }");
+ }
+
+ #[test]
+ fn fmt_debug_sync_sender() {
+ let (tx, _) = sync_channel::<i32>(1);
+ assert_eq!(format!("{:?}", tx), "SyncSender { .. }");
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/mpsc_queue.rs b/ctr-std/src/sync/mpsc/mpsc_queue.rs
new file mode 100644
index 0000000..8d80f94
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/mpsc_queue.rs
@@ -0,0 +1,198 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+//! A mostly lock-free multi-producer, single consumer queue.
+//!
+//! This module contains an implementation of a concurrent MPSC queue. This
+//! queue can be used to share data between threads, and is also used as the
+//! building block of channels in rust.
+//!
+//! Note that the current implementation of this queue has a caveat of the `pop`
+//! method, and see the method for more information about it. Due to this
+//! caveat, this queue may not be appropriate for all use-cases.
+
+// http://www.1024cores.net/home/lock-free-algorithms
+// /queues/non-intrusive-mpsc-node-based-queue
+
+pub use self::PopResult::*;
+
+use alloc::boxed::Box;
+use core::ptr;
+use core::cell::UnsafeCell;
+
+use sync::atomic::{AtomicPtr, Ordering};
+
+/// A result of the `pop` function.
+pub enum PopResult<T> {
+ /// Some data has been popped
+ Data(T),
+ /// The queue is empty
+ Empty,
+ /// The queue is in an inconsistent state. Popping data should succeed, but
+ /// some pushers have yet to make enough progress in order allow a pop to
+ /// succeed. It is recommended that a pop() occur "in the near future" in
+ /// order to see if the sender has made progress or not
+ Inconsistent,
+}
+
+struct Node<T> {
+ next: AtomicPtr<Node<T>>,
+ value: Option<T>,
+}
+
+/// The multi-producer single-consumer structure. This is not cloneable, but it
+/// may be safely shared so long as it is guaranteed that there is only one
+/// popper at a time (many pushers are allowed).
+pub struct Queue<T> {
+ head: AtomicPtr<Node<T>>,
+ tail: UnsafeCell<*mut Node<T>>,
+}
+
+unsafe impl<T: Send> Send for Queue<T> { }
+unsafe impl<T: Send> Sync for Queue<T> { }
+
+impl<T> Node<T> {
+ unsafe fn new(v: Option<T>) -> *mut Node<T> {
+ Box::into_raw(box Node {
+ next: AtomicPtr::new(ptr::null_mut()),
+ value: v,
+ })
+ }
+}
+
+impl<T> Queue<T> {
+ /// Creates a new queue that is safe to share among multiple producers and
+ /// one consumer.
+ pub fn new() -> Queue<T> {
+ let stub = unsafe { Node::new(None) };
+ Queue {
+ head: AtomicPtr::new(stub),
+ tail: UnsafeCell::new(stub),
+ }
+ }
+
+ /// Pushes a new value onto this queue.
+ pub fn push(&self, t: T) {
+ unsafe {
+ let n = Node::new(Some(t));
+ let prev = self.head.swap(n, Ordering::AcqRel);
+ (*prev).next.store(n, Ordering::Release);
+ }
+ }
+
+ /// Pops some data from this queue.
+ ///
+ /// Note that the current implementation means that this function cannot
+ /// return `Option<T>`. It is possible for this queue to be in an
+ /// inconsistent state where many pushes have succeeded and completely
+ /// finished, but pops cannot return `Some(t)`. This inconsistent state
+ /// happens when a pusher is pre-empted at an inopportune moment.
+ ///
+ /// This inconsistent state means that this queue does indeed have data, but
+ /// it does not currently have access to it at this time.
+ pub fn pop(&self) -> PopResult<T> {
+ unsafe {
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+
+ if !next.is_null() {
+ *self.tail.get() = next;
+ assert!((*tail).value.is_none());
+ assert!((*next).value.is_some());
+ let ret = (*next).value.take().unwrap();
+ let _: Box<Node<T>> = Box::from_raw(tail);
+ return Data(ret);
+ }
+
+ if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
+ }
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let mut cur = *self.tail.get();
+ while !cur.is_null() {
+ let next = (*cur).next.load(Ordering::Relaxed);
+ let _: Box<Node<T>> = Box::from_raw(cur);
+ cur = next;
+ }
+ }
+ }
+}
+
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use sync::mpsc::channel;
+ use super::{Queue, Data, Empty, Inconsistent};
+ use sync::Arc;
+ use thread;
+
+ #[test]
+ fn test_full() {
+ let q: Queue<Box<_>> = Queue::new();
+ q.push(box 1);
+ q.push(box 2);
+ }
+
+ #[test]
+ fn test() {
+ let nthreads = 8;
+ let nmsgs = 1000;
+ let q = Queue::new();
+ match q.pop() {
+ Empty => {}
+ Inconsistent | Data(..) => panic!()
+ }
+ let (tx, rx) = channel();
+ let q = Arc::new(q);
+
+ for _ in 0..nthreads {
+ let tx = tx.clone();
+ let q = q.clone();
+ thread::spawn(move|| {
+ for i in 0..nmsgs {
+ q.push(i);
+ }
+ tx.send(()).unwrap();
+ });
+ }
+
+ let mut i = 0;
+ while i < nthreads * nmsgs {
+ match q.pop() {
+ Empty | Inconsistent => {},
+ Data(_) => { i += 1 }
+ }
+ }
+ drop(tx);
+ for _ in 0..nthreads {
+ rx.recv().unwrap();
+ }
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/oneshot.rs b/ctr-std/src/sync/mpsc/oneshot.rs
new file mode 100644
index 0000000..b8e50c9
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/oneshot.rs
@@ -0,0 +1,396 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Oneshot channels/ports
+///
+/// This is the initial flavor of channels/ports used for comm module. This is
+/// an optimization for the one-use case of a channel. The major optimization of
+/// this type is to have one and exactly one allocation when the chan/port pair
+/// is created.
+///
+/// Another possible optimization would be to not use an Arc box because
+/// in theory we know when the shared packet can be deallocated (no real need
+/// for the atomic reference counting), but I was having trouble how to destroy
+/// the data early in a drop of a Port.
+///
+/// # Implementation
+///
+/// Oneshots are implemented around one atomic usize variable. This variable
+/// indicates both the state of the port/chan but also contains any threads
+/// blocked on the port. All atomic operations happen on this one word.
+///
+/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
+/// on behalf of the channel side of things (it can be mentally thought of as
+/// consuming the port). This upgrade is then also stored in the shared packet.
+/// The one caveat to consider is that when a port sees a disconnected channel
+/// it must check for data because there is no "data plus upgrade" state.
+
+pub use self::Failure::*;
+pub use self::UpgradeResult::*;
+pub use self::SelectionResult::*;
+use self::MyUpgrade::*;
+
+use sync::mpsc::Receiver;
+use sync::mpsc::blocking::{self, SignalToken};
+use cell::UnsafeCell;
+use ptr;
+use sync::atomic::{AtomicUsize, Ordering};
+use time::Instant;
+
+// Various states you can find a port in.
+const EMPTY: usize = 0; // initial state: no data, no blocked receiver
+const DATA: usize = 1; // data ready for receiver to take
+const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
+// Any other value represents a pointer to a SignalToken value. The
+// protocol ensures that when the state moves *to* a pointer,
+// ownership of the token is given to the packet, and when the state
+// moves *from* a pointer, ownership of the token is transferred to
+// whoever changed the state.
+
+pub struct Packet<T> {
+ // Internal state of the chan/port pair (stores the blocked thread as well)
+ state: AtomicUsize,
+ // One-shot data slot location
+ data: UnsafeCell<Option<T>>,
+ // when used for the second time, a oneshot channel must be upgraded, and
+ // this contains the slot for the upgrade
+ upgrade: UnsafeCell<MyUpgrade<T>>,
+}
+
+pub enum Failure<T> {
+ Empty,
+ Disconnected,
+ Upgraded(Receiver<T>),
+}
+
+pub enum UpgradeResult {
+ UpSuccess,
+ UpDisconnected,
+ UpWoke(SignalToken),
+}
+
+pub enum SelectionResult<T> {
+ SelCanceled,
+ SelUpgraded(SignalToken, Receiver<T>),
+ SelSuccess,
+}
+
+enum MyUpgrade<T> {
+ NothingSent,
+ SendUsed,
+ GoUp(Receiver<T>),
+}
+
+impl<T> Packet<T> {
+ pub fn new() -> Packet<T> {
+ Packet {
+ data: UnsafeCell::new(None),
+ upgrade: UnsafeCell::new(NothingSent),
+ state: AtomicUsize::new(EMPTY),
+ }
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ unsafe {
+ // Sanity check
+ match *self.upgrade.get() {
+ NothingSent => {}
+ _ => panic!("sending on a oneshot that's already sent on "),
+ }
+ assert!((*self.data.get()).is_none());
+ ptr::write(self.data.get(), Some(t));
+ ptr::write(self.upgrade.get(), SendUsed);
+
+ match self.state.swap(DATA, Ordering::SeqCst) {
+ // Sent the data, no one was waiting
+ EMPTY => Ok(()),
+
+ // Couldn't send the data, the port hung up first. Return the data
+ // back up the stack.
+ DISCONNECTED => {
+ self.state.swap(DISCONNECTED, Ordering::SeqCst);
+ ptr::write(self.upgrade.get(), NothingSent);
+ Err((&mut *self.data.get()).take().unwrap())
+ }
+
+ // Not possible, these are one-use channels
+ DATA => unreachable!(),
+
+ // There is a thread waiting on the other end. We leave the 'DATA'
+ // state inside so it'll pick it up on the other end.
+ ptr => {
+ SignalToken::cast_from_usize(ptr).signal();
+ Ok(())
+ }
+ }
+ }
+ }
+
+ // Just tests whether this channel has been sent on or not, this is only
+ // safe to use from the sender.
+ pub fn sent(&self) -> bool {
+ unsafe {
+ match *self.upgrade.get() {
+ NothingSent => false,
+ _ => true,
+ }
+ }
+ }
+
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
+ // Attempt to not block the thread (it's a little expensive). If it looks
+ // like we're not empty, then immediately go through to `try_recv`.
+ if self.state.load(Ordering::SeqCst) == EMPTY {
+ let (wait_token, signal_token) = blocking::tokens();
+ let ptr = unsafe { signal_token.cast_to_usize() };
+
+ // race with senders to enter the blocking state
+ if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
+ if let Some(deadline) = deadline {
+ let timed_out = !wait_token.wait_max_until(deadline);
+ // Try to reset the state
+ if timed_out {
+ self.abort_selection().map_err(Upgraded)?;
+ }
+ } else {
+ wait_token.wait();
+ debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
+ }
+ } else {
+ // drop the signal token, since we never blocked
+ drop(unsafe { SignalToken::cast_from_usize(ptr) });
+ }
+ }
+
+ self.try_recv()
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure<T>> {
+ unsafe {
+ match self.state.load(Ordering::SeqCst) {
+ EMPTY => Err(Empty),
+
+ // We saw some data on the channel, but the channel can be used
+ // again to send us an upgrade. As a result, we need to re-insert
+ // into the channel that there's no data available (otherwise we'll
+ // just see DATA next time). This is done as a cmpxchg because if
+ // the state changes under our feet we'd rather just see that state
+ // change.
+ DATA => {
+ self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
+ match (&mut *self.data.get()).take() {
+ Some(data) => Ok(data),
+ None => unreachable!(),
+ }
+ }
+
+ // There's no guarantee that we receive before an upgrade happens,
+ // and an upgrade flags the channel as disconnected, so when we see
+ // this we first need to check if there's data available and *then*
+ // we go through and process the upgrade.
+ DISCONNECTED => {
+ match (&mut *self.data.get()).take() {
+ Some(data) => Ok(data),
+ None => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ SendUsed | NothingSent => Err(Disconnected),
+ GoUp(upgrade) => Err(Upgraded(upgrade))
+ }
+ }
+ }
+ }
+
+ // We are the sole receiver; there cannot be a blocking
+ // receiver already.
+ _ => unreachable!()
+ }
+ }
+ }
+
+ // Returns whether the upgrade was completed. If the upgrade wasn't
+ // completed, then the port couldn't get sent to the other half (it will
+ // never receive it).
+ pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
+ unsafe {
+ let prev = match *self.upgrade.get() {
+ NothingSent => NothingSent,
+ SendUsed => SendUsed,
+ _ => panic!("upgrading again"),
+ };
+ ptr::write(self.upgrade.get(), GoUp(up));
+
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ // If the channel is empty or has data on it, then we're good to go.
+ // Senders will check the data before the upgrade (in case we
+ // plastered over the DATA state).
+ DATA | EMPTY => UpSuccess,
+
+ // If the other end is already disconnected, then we failed the
+ // upgrade. Be sure to trash the port we were given.
+ DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
+
+ // If someone's waiting, we gotta wake them up
+ ptr => UpWoke(SignalToken::cast_from_usize(ptr))
+ }
+ }
+ }
+
+ pub fn drop_chan(&self) {
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ DATA | DISCONNECTED | EMPTY => {}
+
+ // If someone's waiting, we gotta wake them up
+ ptr => unsafe {
+ SignalToken::cast_from_usize(ptr).signal();
+ }
+ }
+ }
+
+ pub fn drop_port(&self) {
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ // An empty channel has nothing to do, and a remotely disconnected
+ // channel also has nothing to do b/c we're about to run the drop
+ // glue
+ DISCONNECTED | EMPTY => {}
+
+ // There's data on the channel, so make sure we destroy it promptly.
+ // This is why not using an arc is a little difficult (need the box
+ // to stay valid while we take the data).
+ DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
+
+ // We're the only ones that can block on this port
+ _ => unreachable!()
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // If Ok, the value is whether this port has data, if Err, then the upgraded
+ // port needs to be checked instead of this one.
+ pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
+ unsafe {
+ match self.state.load(Ordering::SeqCst) {
+ EMPTY => Ok(false), // Welp, we tried
+ DATA => Ok(true), // we have some un-acquired data
+ DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
+ DISCONNECTED => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => Err(upgrade),
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => { ptr::write(self.upgrade.get(), up); Ok(true) }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+ }
+
+ // Attempts to start selection on this port. This can either succeed, fail
+ // because there is data, or fail because there is an upgrade pending.
+ pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
+ unsafe {
+ let ptr = token.cast_to_usize();
+ match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
+ EMPTY => SelSuccess,
+ DATA => {
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ DISCONNECTED if (*self.data.get()).is_some() => {
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ DISCONNECTED => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => {
+ SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
+ }
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => {
+ ptr::write(self.upgrade.get(), up);
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+ }
+
+ // Remove a previous selecting thread from this port. This ensures that the
+ // blocked thread will no longer be visible to any other threads.
+ //
+ // The return value indicates whether there's data on this port.
+ pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
+ let state = match self.state.load(Ordering::SeqCst) {
+ // Each of these states means that no further activity will happen
+ // with regard to abortion selection
+ s @ EMPTY |
+ s @ DATA |
+ s @ DISCONNECTED => s,
+
+ // If we've got a blocked thread, then use an atomic to gain ownership
+ // of it (may fail)
+ ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
+ };
+
+ // Now that we've got ownership of our state, figure out what to do
+ // about it.
+ match state {
+ EMPTY => unreachable!(),
+ // our thread used for select was stolen
+ DATA => Ok(true),
+
+ // If the other end has hung up, then we have complete ownership
+ // of the port. First, check if there was data waiting for us. This
+ // is possible if the other end sent something and then hung up.
+ //
+ // We then need to check to see if there was an upgrade requested,
+ // and if so, the upgraded port needs to have its selection aborted.
+ DISCONNECTED => unsafe {
+ if (*self.data.get()).is_some() {
+ Ok(true)
+ } else {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ GoUp(port) => Err(port),
+ _ => Ok(true),
+ }
+ }
+ },
+
+ // We woke ourselves up from select.
+ ptr => unsafe {
+ drop(SignalToken::cast_from_usize(ptr));
+ Ok(false)
+ }
+ }
+ }
+}
+
+impl<T> Drop for Packet<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/select.rs b/ctr-std/src/sync/mpsc/select.rs
new file mode 100644
index 0000000..8b4da53
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/select.rs
@@ -0,0 +1,791 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Selection over an array of receivers
+//!
+//! This module contains the implementation machinery necessary for selecting
+//! over a number of receivers. One large goal of this module is to provide an
+//! efficient interface to selecting over any receiver of any type.
+//!
+//! This is achieved through an architecture of a "receiver set" in which
+//! receivers are added to a set and then the entire set is waited on at once.
+//! The set can be waited on multiple times to prevent re-adding each receiver
+//! to the set.
+//!
+//! Usage of this module is currently encouraged to go through the use of the
+//! `select!` macro. This macro allows naturally binding of variables to the
+//! received values of receivers in a much more natural syntax then usage of the
+//! `Select` structure directly.
+//!
+//! # Examples
+//!
+//! ```rust
+//! #![feature(mpsc_select)]
+//!
+//! use std::sync::mpsc::channel;
+//!
+//! let (tx1, rx1) = channel();
+//! let (tx2, rx2) = channel();
+//!
+//! tx1.send(1).unwrap();
+//! tx2.send(2).unwrap();
+//!
+//! select! {
+//! val = rx1.recv() => {
+//! assert_eq!(val.unwrap(), 1);
+//! },
+//! val = rx2.recv() => {
+//! assert_eq!(val.unwrap(), 2);
+//! }
+//! }
+//! ```
+
+#![allow(dead_code)]
+#![unstable(feature = "mpsc_select",
+ reason = "This implementation, while likely sufficient, is unsafe and \
+ likely to be error prone. At some point in the future this \
+ module will likely be replaced, and it is currently \
+ unknown how much API breakage that will cause. The ability \
+ to select over a number of channels will remain forever, \
+ but no guarantees beyond this are being made",
+ issue = "27800")]
+
+
+use fmt;
+
+use core::cell::{Cell, UnsafeCell};
+use core::marker;
+use core::ptr;
+use core::usize;
+
+use sync::mpsc::{Receiver, RecvError};
+use sync::mpsc::blocking::{self, SignalToken};
+
+/// The "receiver set" of the select interface. This structure is used to manage
+/// a set of receivers which are being selected over.
+pub struct Select {
+ inner: UnsafeCell<SelectInner>,
+ next_id: Cell<usize>,
+}
+
+struct SelectInner {
+ head: *mut Handle<'static, ()>,
+ tail: *mut Handle<'static, ()>,
+}
+
+impl !marker::Send for Select {}
+
+/// A handle to a receiver which is currently a member of a `Select` set of
+/// receivers. This handle is used to keep the receiver in the set as well as
+/// interact with the underlying receiver.
+pub struct Handle<'rx, T:Send+'rx> {
+ /// The ID of this handle, used to compare against the return value of
+ /// `Select::wait()`
+ id: usize,
+ selector: *mut SelectInner,
+ next: *mut Handle<'static, ()>,
+ prev: *mut Handle<'static, ()>,
+ added: bool,
+ packet: &'rx (Packet+'rx),
+
+ // due to our fun transmutes, we be sure to place this at the end. (nothing
+ // previous relies on T)
+ rx: &'rx Receiver<T>,
+}
+
+struct Packets { cur: *mut Handle<'static, ()> }
+
+#[doc(hidden)]
+#[derive(PartialEq, Eq)]
+pub enum StartResult {
+ Installed,
+ Abort,
+}
+
+#[doc(hidden)]
+pub trait Packet {
+ fn can_recv(&self) -> bool;
+ fn start_selection(&self, token: SignalToken) -> StartResult;
+ fn abort_selection(&self) -> bool;
+}
+
+impl Select {
+ /// Creates a new selection structure. This set is initially empty.
+ ///
+ /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
+ /// the `select!` macro.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpsc_select)]
+ ///
+ /// use std::sync::mpsc::Select;
+ ///
+ /// let select = Select::new();
+ /// ```
+ pub fn new() -> Select {
+ Select {
+ inner: UnsafeCell::new(SelectInner {
+ head: ptr::null_mut(),
+ tail: ptr::null_mut(),
+ }),
+ next_id: Cell::new(1),
+ }
+ }
+
+ /// Creates a new handle into this receiver set for a new receiver. Note
+ /// that this does *not* add the receiver to the receiver set, for that you
+ /// must call the `add` method on the handle itself.
+ pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
+ let id = self.next_id.get();
+ self.next_id.set(id + 1);
+ Handle {
+ id: id,
+ selector: self.inner.get(),
+ next: ptr::null_mut(),
+ prev: ptr::null_mut(),
+ added: false,
+ rx: rx,
+ packet: rx,
+ }
+ }
+
+ /// Waits for an event on this receiver set. The returned value is *not* an
+ /// index, but rather an id. This id can be queried against any active
+ /// `Handle` structures (each one has an `id` method). The handle with
+ /// the matching `id` will have some sort of event available on it. The
+ /// event could either be that data is available or the corresponding
+ /// channel has been closed.
+ pub fn wait(&self) -> usize {
+ self.wait2(true)
+ }
+
+ /// Helper method for skipping the preflight checks during testing
+ fn wait2(&self, do_preflight_checks: bool) -> usize {
+ // Note that this is currently an inefficient implementation. We in
+ // theory have knowledge about all receivers in the set ahead of time,
+ // so this method shouldn't really have to iterate over all of them yet
+ // again. The idea with this "receiver set" interface is to get the
+ // interface right this time around, and later this implementation can
+ // be optimized.
+ //
+ // This implementation can be summarized by:
+ //
+ // fn select(receivers) {
+ // if any receiver ready { return ready index }
+ // deschedule {
+ // block on all receivers
+ // }
+ // unblock on all receivers
+ // return ready index
+ // }
+ //
+ // Most notably, the iterations over all of the receivers shouldn't be
+ // necessary.
+ unsafe {
+ // Stage 1: preflight checks. Look for any packets ready to receive
+ if do_preflight_checks {
+ for handle in self.iter() {
+ if (*handle).packet.can_recv() {
+ return (*handle).id();
+ }
+ }
+ }
+
+ // Stage 2: begin the blocking process
+ //
+ // Create a number of signal tokens, and install each one
+ // sequentially until one fails. If one fails, then abort the
+ // selection on the already-installed tokens.
+ let (wait_token, signal_token) = blocking::tokens();
+ for (i, handle) in self.iter().enumerate() {
+ match (*handle).packet.start_selection(signal_token.clone()) {
+ StartResult::Installed => {}
+ StartResult::Abort => {
+ // Go back and abort the already-begun selections
+ for handle in self.iter().take(i) {
+ (*handle).packet.abort_selection();
+ }
+ return (*handle).id;
+ }
+ }
+ }
+
+ // Stage 3: no messages available, actually block
+ wait_token.wait();
+
+ // Stage 4: there *must* be message available; find it.
+ //
+ // Abort the selection process on each receiver. If the abort
+ // process returns `true`, then that means that the receiver is
+ // ready to receive some data. Note that this also means that the
+ // receiver may have yet to have fully read the `to_wake` field and
+ // woken us up (although the wakeup is guaranteed to fail).
+ //
+ // This situation happens in the window of where a sender invokes
+ // increment(), sees -1, and then decides to wake up the thread. After
+ // all this is done, the sending thread will set `selecting` to
+ // `false`. Until this is done, we cannot return. If we were to
+ // return, then a sender could wake up a receiver which has gone
+ // back to sleep after this call to `select`.
+ //
+ // Note that it is a "fairly small window" in which an increment()
+ // views that it should wake a thread up until the `selecting` bit
+ // is set to false. For now, the implementation currently just spins
+ // in a yield loop. This is very distasteful, but this
+ // implementation is already nowhere near what it should ideally be.
+ // A rewrite should focus on avoiding a yield loop, and for now this
+ // implementation is tying us over to a more efficient "don't
+ // iterate over everything every time" implementation.
+ let mut ready_id = usize::MAX;
+ for handle in self.iter() {
+ if (*handle).packet.abort_selection() {
+ ready_id = (*handle).id;
+ }
+ }
+
+ // We must have found a ready receiver
+ assert!(ready_id != usize::MAX);
+ return ready_id;
+ }
+ }
+
+ fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
+}
+
+impl<'rx, T: Send> Handle<'rx, T> {
+ /// Retrieves the id of this handle.
+ #[inline]
+ pub fn id(&self) -> usize { self.id }
+
+ /// Blocks to receive a value on the underlying receiver, returning `Some` on
+ /// success or `None` if the channel disconnects. This function has the same
+ /// semantics as `Receiver.recv`
+ pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
+
+ /// Adds this handle to the receiver set that the handle was created from. This
+ /// method can be called multiple times, but it has no effect if `add` was
+ /// called previously.
+ ///
+ /// This method is unsafe because it requires that the `Handle` is not moved
+ /// while it is added to the `Select` set.
+ pub unsafe fn add(&mut self) {
+ if self.added { return }
+ let selector = &mut *self.selector;
+ let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
+
+ if selector.head.is_null() {
+ selector.head = me;
+ selector.tail = me;
+ } else {
+ (*me).prev = selector.tail;
+ assert!((*me).next.is_null());
+ (*selector.tail).next = me;
+ selector.tail = me;
+ }
+ self.added = true;
+ }
+
+ /// Removes this handle from the `Select` set. This method is unsafe because
+ /// it has no guarantee that the `Handle` was not moved since `add` was
+ /// called.
+ pub unsafe fn remove(&mut self) {
+ if !self.added { return }
+
+ let selector = &mut *self.selector;
+ let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
+
+ if self.prev.is_null() {
+ assert_eq!(selector.head, me);
+ selector.head = self.next;
+ } else {
+ (*self.prev).next = self.next;
+ }
+ if self.next.is_null() {
+ assert_eq!(selector.tail, me);
+ selector.tail = self.prev;
+ } else {
+ (*self.next).prev = self.prev;
+ }
+
+ self.next = ptr::null_mut();
+ self.prev = ptr::null_mut();
+
+ self.added = false;
+ }
+}
+
+impl Drop for Select {
+ fn drop(&mut self) {
+ unsafe {
+ assert!((&*self.inner.get()).head.is_null());
+ assert!((&*self.inner.get()).tail.is_null());
+ }
+ }
+}
+
+impl<'rx, T: Send> Drop for Handle<'rx, T> {
+ fn drop(&mut self) {
+ unsafe { self.remove() }
+ }
+}
+
+impl Iterator for Packets {
+ type Item = *mut Handle<'static, ()>;
+
+ fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
+ if self.cur.is_null() {
+ None
+ } else {
+ let ret = Some(self.cur);
+ unsafe { self.cur = (*self.cur).next; }
+ ret
+ }
+ }
+}
+
+impl fmt::Debug for Select {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Select {{ .. }}")
+ }
+}
+
+impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Handle {{ .. }}")
+ }
+}
+
+#[allow(unused_imports)]
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use thread;
+ use sync::mpsc::*;
+
+ // Don't use the libstd version so we can pull in the right Select structure
+ // (std::comm points at the wrong one)
+ macro_rules! select {
+ (
+ $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
+ ) => ({
+ let sel = Select::new();
+ $( let mut $rx = sel.handle(&$rx); )+
+ unsafe {
+ $( $rx.add(); )+
+ }
+ let ret = sel.wait();
+ $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
+ { unreachable!() }
+ })
+ }
+
+ #[test]
+ fn smoke() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ tx1.send(1).unwrap();
+ select! {
+ foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
+ _bar = rx2.recv() => { panic!() }
+ }
+ tx2.send(2).unwrap();
+ select! {
+ _foo = rx1.recv() => { panic!() },
+ bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
+ }
+ drop(tx1);
+ select! {
+ foo = rx1.recv() => { assert!(foo.is_err()); },
+ _bar = rx2.recv() => { panic!() }
+ }
+ drop(tx2);
+ select! {
+ bar = rx2.recv() => { assert!(bar.is_err()); }
+ }
+ }
+
+ #[test]
+ fn smoke2() {
+ let (_tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (_tx3, rx3) = channel::<i32>();
+ let (_tx4, rx4) = channel::<i32>();
+ let (tx5, rx5) = channel::<i32>();
+ tx5.send(4).unwrap();
+ select! {
+ _foo = rx1.recv() => { panic!("1") },
+ _foo = rx2.recv() => { panic!("2") },
+ _foo = rx3.recv() => { panic!("3") },
+ _foo = rx4.recv() => { panic!("4") },
+ foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
+ }
+ }
+
+ #[test]
+ fn closed() {
+ let (_tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ drop(tx2);
+
+ select! {
+ _a1 = rx1.recv() => { panic!() },
+ a2 = rx2.recv() => { assert!(a2.is_err()); }
+ }
+ }
+
+ #[test]
+ fn unblocks() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<i32>();
+
+ let _t = thread::spawn(move|| {
+ for _ in 0..20 { thread::yield_now(); }
+ tx1.send(1).unwrap();
+ rx3.recv().unwrap();
+ for _ in 0..20 { thread::yield_now(); }
+ });
+
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ _b = rx2.recv() => { panic!() }
+ }
+ tx3.send(1).unwrap();
+ select! {
+ a = rx1.recv() => { assert!(a.is_err()) },
+ _b = rx2.recv() => { panic!() }
+ }
+ }
+
+ #[test]
+ fn both_ready() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ for _ in 0..20 { thread::yield_now(); }
+ tx1.send(1).unwrap();
+ tx2.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
+ }
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
+ }
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn stress() {
+ const AMT: i32 = 10000;
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ for i in 0..AMT {
+ if i % 2 == 0 {
+ tx1.send(i).unwrap();
+ } else {
+ tx2.send(i).unwrap();
+ }
+ rx3.recv().unwrap();
+ }
+ });
+
+ for i in 0..AMT {
+ select! {
+ i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
+ i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
+ }
+ tx3.send(()).unwrap();
+ }
+ }
+
+ #[test]
+ fn cloning() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ rx3.recv().unwrap();
+ tx1.clone();
+ assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
+ tx1.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ tx3.send(()).unwrap();
+ select! {
+ _i1 = rx1.recv() => {},
+ _i2 = rx2.recv() => panic!()
+ }
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn cloning2() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ rx3.recv().unwrap();
+ tx1.clone();
+ assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
+ tx1.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ tx3.send(()).unwrap();
+ select! {
+ _i1 = rx1.recv() => {},
+ _i2 = rx2.recv() => panic!()
+ }
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn cloning3() {
+ let (tx1, rx1) = channel::<()>();
+ let (tx2, rx2) = channel::<()>();
+ let (tx3, rx3) = channel::<()>();
+ let _t = thread::spawn(move|| {
+ let s = Select::new();
+ let mut h1 = s.handle(&rx1);
+ let mut h2 = s.handle(&rx2);
+ unsafe { h2.add(); }
+ unsafe { h1.add(); }
+ assert_eq!(s.wait(), h2.id);
+ tx3.send(()).unwrap();
+ });
+
+ for _ in 0..1000 { thread::yield_now(); }
+ drop(tx1.clone());
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ }
+
+ #[test]
+ fn preflight1() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight2() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight3() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight4() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight5() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight6() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight7() {
+ let (tx, rx) = channel::<()>();
+ drop(tx);
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight8() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ drop(tx);
+ rx.recv().unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight9() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ drop(tx);
+ rx.recv().unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn oneshot_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn stream_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ tx1.send(()).unwrap();
+ tx1.send(()).unwrap();
+ rx1.recv().unwrap();
+ rx1.recv().unwrap();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn shared_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ drop(tx1.clone());
+ tx1.send(()).unwrap();
+ rx1.recv().unwrap();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn sync1() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(1).unwrap();
+ select! {
+ n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
+ }
+ }
+
+ #[test]
+ fn sync2() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ for _ in 0..100 { thread::yield_now() }
+ tx.send(1).unwrap();
+ });
+ select! {
+ n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
+ }
+ }
+
+ #[test]
+ fn sync3() {
+ let (tx1, rx1) = sync_channel::<i32>(0);
+ let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
+ let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
+ let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
+ select! {
+ n = rx1.recv() => {
+ let n = n.unwrap();
+ assert_eq!(n, 1);
+ assert_eq!(rx2.recv().unwrap(), 2);
+ },
+ n = rx2.recv() => {
+ let n = n.unwrap();
+ assert_eq!(n, 2);
+ assert_eq!(rx1.recv().unwrap(), 1);
+ }
+ }
+ }
+
+ #[test]
+ fn fmt_debug_select() {
+ let sel = Select::new();
+ assert_eq!(format!("{:?}", sel), "Select { .. }");
+ }
+
+ #[test]
+ fn fmt_debug_handle() {
+ let (_, rx) = channel::<i32>();
+ let sel = Select::new();
+ let handle = sel.handle(&rx);
+ assert_eq!(format!("{:?}", handle), "Handle { .. }");
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/shared.rs b/ctr-std/src/sync/mpsc/shared.rs
new file mode 100644
index 0000000..f9e0290
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/shared.rs
@@ -0,0 +1,506 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Shared channels
+///
+/// This is the flavor of channels which are not necessarily optimized for any
+/// particular use case, but are the most general in how they are used. Shared
+/// channels are cloneable allowing for multiple senders.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module. You'll also note that the implementation of the shared and stream
+/// channels are quite similar, and this is no coincidence!
+
+pub use self::Failure::*;
+
+use core::cmp;
+use core::intrinsics::abort;
+use core::isize;
+
+use cell::UnsafeCell;
+use ptr;
+use sync::atomic::{AtomicUsize, AtomicIsize, AtomicBool, Ordering};
+use sync::mpsc::blocking::{self, SignalToken};
+use sync::mpsc::mpsc_queue as mpsc;
+use sync::mpsc::select::StartResult::*;
+use sync::mpsc::select::StartResult;
+use sync::{Mutex, MutexGuard};
+use thread;
+use time::Instant;
+
+const DISCONNECTED: isize = isize::MIN;
+const FUDGE: isize = 1024;
+const MAX_REFCOUNT: usize = (isize::MAX) as usize;
+#[cfg(test)]
+const MAX_STEALS: isize = 5;
+#[cfg(not(test))]
+const MAX_STEALS: isize = 1 << 20;
+
+pub struct Packet<T> {
+ queue: mpsc::Queue<T>,
+ cnt: AtomicIsize, // How many items are on this channel
+ steals: UnsafeCell<isize>, // How many times has a port received without blocking?
+ to_wake: AtomicUsize, // SignalToken for wake up
+
+ // The number of channels which are currently using this packet.
+ channels: AtomicUsize,
+
+ // See the discussion in Port::drop and the channel send methods for what
+ // these are used for
+ port_dropped: AtomicBool,
+ sender_drain: AtomicIsize,
+
+ // this lock protects various portions of this implementation during
+ // select()
+ select_lock: Mutex<()>,
+}
+
+pub enum Failure {
+ Empty,
+ Disconnected,
+}
+
+impl<T> Packet<T> {
+ // Creation of a packet *must* be followed by a call to postinit_lock
+ // and later by inherit_blocker
+ pub fn new() -> Packet<T> {
+ Packet {
+ queue: mpsc::Queue::new(),
+ cnt: AtomicIsize::new(0),
+ steals: UnsafeCell::new(0),
+ to_wake: AtomicUsize::new(0),
+ channels: AtomicUsize::new(2),
+ port_dropped: AtomicBool::new(false),
+ sender_drain: AtomicIsize::new(0),
+ select_lock: Mutex::new(()),
+ }
+ }
+
+ // This function should be used after newly created Packet
+ // was wrapped with an Arc
+ // In other case mutex data will be duplicated while cloning
+ // and that could cause problems on platforms where it is
+ // represented by opaque data structure
+ pub fn postinit_lock(&self) -> MutexGuard<()> {
+ self.select_lock.lock().unwrap()
+ }
+
+ // This function is used at the creation of a shared packet to inherit a
+ // previously blocked thread. This is done to prevent spurious wakeups of
+ // threads in select().
+ //
+ // This can only be called at channel-creation time
+ pub fn inherit_blocker(&self,
+ token: Option<SignalToken>,
+ guard: MutexGuard<()>) {
+ token.map(|token| {
+ assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
+ self.cnt.store(-1, Ordering::SeqCst);
+
+ // This store is a little sketchy. What's happening here is that
+ // we're transferring a blocker from a oneshot or stream channel to
+ // this shared channel. In doing so, we never spuriously wake them
+ // up and rather only wake them up at the appropriate time. This
+ // implementation of shared channels assumes that any blocking
+ // recv() will undo the increment of steals performed in try_recv()
+ // once the recv is complete. This thread that we're inheriting,
+ // however, is not in the middle of recv. Hence, the first time we
+ // wake them up, they're going to wake up from their old port, move
+ // on to the upgraded port, and then call the block recv() function.
+ //
+ // When calling this function, they'll find there's data immediately
+ // available, counting it as a steal. This in fact wasn't a steal
+ // because we appropriately blocked them waiting for data.
+ //
+ // To offset this bad increment, we initially set the steal count to
+ // -1. You'll find some special code in abort_selection() as well to
+ // ensure that this -1 steal count doesn't escape too far.
+ unsafe { *self.steals.get() = -1; }
+ });
+
+ // When the shared packet is constructed, we grabbed this lock. The
+ // purpose of this lock is to ensure that abort_selection() doesn't
+ // interfere with this method. After we unlock this lock, we're
+ // signifying that we're done modifying self.cnt and self.to_wake and
+ // the port is ready for the world to continue using it.
+ drop(guard);
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ // See Port::drop for what's going on
+ if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+
+ // Note that the multiple sender case is a little trickier
+ // semantically than the single sender case. The logic for
+ // incrementing is "add and if disconnected store disconnected".
+ // This could end up leading some senders to believe that there
+ // wasn't a disconnect if in fact there was a disconnect. This means
+ // that while one thread is attempting to re-store the disconnected
+ // states, other threads could walk through merrily incrementing
+ // this very-negative disconnected count. To prevent senders from
+ // spuriously attempting to send when the channels is actually
+ // disconnected, the count has a ranged check here.
+ //
+ // This is also done for another reason. Remember that the return
+ // value of this function is:
+ //
+ // `true` == the data *may* be received, this essentially has no
+ // meaning
+ // `false` == the data will *never* be received, this has a lot of
+ // meaning
+ //
+ // In the SPSC case, we have a check of 'queue.is_empty()' to see
+ // whether the data was actually received, but this same condition
+ // means nothing in a multi-producer context. As a result, this
+ // preflight check serves as the definitive "this will never be
+ // received". Once we get beyond this check, we have permanently
+ // entered the realm of "this may be received"
+ if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE {
+ return Err(t)
+ }
+
+ self.queue.push(t);
+ match self.cnt.fetch_add(1, Ordering::SeqCst) {
+ -1 => {
+ self.take_to_wake().signal();
+ }
+
+ // In this case, we have possibly failed to send our data, and
+ // we need to consider re-popping the data in order to fully
+ // destroy it. We must arbitrate among the multiple senders,
+ // however, because the queues that we're using are
+ // single-consumer queues. In order to do this, all exiting
+ // pushers will use an atomic count in order to count those
+ // flowing through. Pushers who see 0 are required to drain as
+ // much as possible, and then can only exit when they are the
+ // only pusher (otherwise they must try again).
+ n if n < DISCONNECTED + FUDGE => {
+ // see the comment in 'try' for a shared channel for why this
+ // window of "not disconnected" is ok.
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+
+ if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 {
+ loop {
+ // drain the queue, for info on the thread yield see the
+ // discussion in try_recv
+ loop {
+ match self.queue.pop() {
+ mpsc::Data(..) => {}
+ mpsc::Empty => break,
+ mpsc::Inconsistent => thread::yield_now(),
+ }
+ }
+ // maybe we're done, if we're not the last ones
+ // here, then we need to go try again.
+ if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 {
+ break
+ }
+ }
+
+ // At this point, there may still be data on the queue,
+ // but only if the count hasn't been incremented and
+ // some other sender hasn't finished pushing data just
+ // yet. That sender in question will drain its own data.
+ }
+ }
+
+ // Can't make any assumptions about this case like in the SPSC case.
+ _ => {}
+ }
+
+ Ok(())
+ }
+
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
+ // This code is essentially the exact same as that found in the stream
+ // case (see stream.rs)
+ match self.try_recv() {
+ Err(Empty) => {}
+ data => return data,
+ }
+
+ let (wait_token, signal_token) = blocking::tokens();
+ if self.decrement(signal_token) == Installed {
+ if let Some(deadline) = deadline {
+ let timed_out = !wait_token.wait_max_until(deadline);
+ if timed_out {
+ self.abort_selection(false);
+ }
+ } else {
+ wait_token.wait();
+ }
+ }
+
+ match self.try_recv() {
+ data @ Ok(..) => unsafe { *self.steals.get() -= 1; data },
+ data => data,
+ }
+ }
+
+ // Essentially the exact same thing as the stream decrement function.
+ // Returns true if blocking should proceed.
+ fn decrement(&self, token: SignalToken) -> StartResult {
+ unsafe {
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ let ptr = token.cast_to_usize();
+ self.to_wake.store(ptr, Ordering::SeqCst);
+
+ let steals = ptr::replace(self.steals.get(), 0);
+
+ match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+ DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+ // If we factor in our steals and notice that the channel has no
+ // data, we successfully sleep
+ n => {
+ assert!(n >= 0);
+ if n - steals <= 0 { return Installed }
+ }
+ }
+
+ self.to_wake.store(0, Ordering::SeqCst);
+ drop(SignalToken::cast_from_usize(ptr));
+ Abort
+ }
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure> {
+ let ret = match self.queue.pop() {
+ mpsc::Data(t) => Some(t),
+ mpsc::Empty => None,
+
+ // This is a bit of an interesting case. The channel is reported as
+ // having data available, but our pop() has failed due to the queue
+ // being in an inconsistent state. This means that there is some
+ // pusher somewhere which has yet to complete, but we are guaranteed
+ // that a pop will eventually succeed. In this case, we spin in a
+ // yield loop because the remote sender should finish their enqueue
+ // operation "very quickly".
+ //
+ // Avoiding this yield loop would require a different queue
+ // abstraction which provides the guarantee that after M pushes have
+ // succeeded, at least M pops will succeed. The current queues
+ // guarantee that if there are N active pushes, you can pop N times
+ // once all N have finished.
+ mpsc::Inconsistent => {
+ let data;
+ loop {
+ thread::yield_now();
+ match self.queue.pop() {
+ mpsc::Data(t) => { data = t; break }
+ mpsc::Empty => panic!("inconsistent => empty"),
+ mpsc::Inconsistent => {}
+ }
+ }
+ Some(data)
+ }
+ };
+ match ret {
+ // See the discussion in the stream implementation for why we
+ // might decrement steals.
+ Some(data) => unsafe {
+ if *self.steals.get() > MAX_STEALS {
+ match self.cnt.swap(0, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ }
+ n => {
+ let m = cmp::min(n, *self.steals.get());
+ *self.steals.get() -= m;
+ self.bump(n - m);
+ }
+ }
+ assert!(*self.steals.get() >= 0);
+ }
+ *self.steals.get() += 1;
+ Ok(data)
+ },
+
+ // See the discussion in the stream implementation for why we try
+ // again.
+ None => {
+ match self.cnt.load(Ordering::SeqCst) {
+ n if n != DISCONNECTED => Err(Empty),
+ _ => {
+ match self.queue.pop() {
+ mpsc::Data(t) => Ok(t),
+ mpsc::Empty => Err(Disconnected),
+ // with no senders, an inconsistency is impossible.
+ mpsc::Inconsistent => unreachable!(),
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // Prepares this shared packet for a channel clone, essentially just bumping
+ // a refcount.
+ pub fn clone_chan(&self) {
+ let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
+
+ // See comments on Arc::clone() on why we do this (for `mem::forget`).
+ if old_count > MAX_REFCOUNT {
+ unsafe {
+ abort();
+ }
+ }
+ }
+
+ // Decrement the reference count on a channel. This is called whenever a
+ // Chan is dropped and may end up waking up a receiver. It's the receiver's
+ // responsibility on the other end to figure out that we've disconnected.
+ pub fn drop_chan(&self) {
+ match self.channels.fetch_sub(1, Ordering::SeqCst) {
+ 1 => {}
+ n if n > 1 => return,
+ n => panic!("bad number of channels left {}", n),
+ }
+
+ match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+ -1 => { self.take_to_wake().signal(); }
+ DISCONNECTED => {}
+ n => { assert!(n >= 0); }
+ }
+ }
+
+ // See the long discussion inside of stream.rs for why the queue is drained,
+ // and why it is done in this fashion.
+ pub fn drop_port(&self) {
+ self.port_dropped.store(true, Ordering::SeqCst);
+ let mut steals = unsafe { *self.steals.get() };
+ while {
+ let cnt = self.cnt.compare_and_swap(steals, DISCONNECTED, Ordering::SeqCst);
+ cnt != DISCONNECTED && cnt != steals
+ } {
+ // See the discussion in 'try_recv' for why we yield
+ // control of this thread.
+ loop {
+ match self.queue.pop() {
+ mpsc::Data(..) => { steals += 1; }
+ mpsc::Empty | mpsc::Inconsistent => break,
+ }
+ }
+ }
+ }
+
+ // Consumes ownership of the 'to_wake' field.
+ fn take_to_wake(&self) -> SignalToken {
+ let ptr = self.to_wake.load(Ordering::SeqCst);
+ self.to_wake.store(0, Ordering::SeqCst);
+ assert!(ptr != 0);
+ unsafe { SignalToken::cast_from_usize(ptr) }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // Helper function for select, tests whether this port can receive without
+ // blocking (obviously not an atomic decision).
+ //
+ // This is different than the stream version because there's no need to peek
+ // at the queue, we can just look at the local count.
+ pub fn can_recv(&self) -> bool {
+ let cnt = self.cnt.load(Ordering::SeqCst);
+ cnt == DISCONNECTED || cnt - unsafe { *self.steals.get() } > 0
+ }
+
+ // increment the count on the channel (used for selection)
+ fn bump(&self, amt: isize) -> isize {
+ match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ DISCONNECTED
+ }
+ n => n
+ }
+ }
+
+ // Inserts the signal token for selection on this port, returning true if
+ // blocking should proceed.
+ //
+ // The code here is the same as in stream.rs, except that it doesn't need to
+ // peek at the channel to see if an upgrade is pending.
+ pub fn start_selection(&self, token: SignalToken) -> StartResult {
+ match self.decrement(token) {
+ Installed => Installed,
+ Abort => {
+ let prev = self.bump(1);
+ assert!(prev == DISCONNECTED || prev >= 0);
+ Abort
+ }
+ }
+ }
+
+ // Cancels a previous thread waiting on this port, returning whether there's
+ // data on the port.
+ //
+ // This is similar to the stream implementation (hence fewer comments), but
+ // uses a different value for the "steals" variable.
+ pub fn abort_selection(&self, _was_upgrade: bool) -> bool {
+ // Before we do anything else, we bounce on this lock. The reason for
+ // doing this is to ensure that any upgrade-in-progress is gone and
+ // done with. Without this bounce, we can race with inherit_blocker
+ // about looking at and dealing with to_wake. Once we have acquired the
+ // lock, we are guaranteed that inherit_blocker is done.
+ {
+ let _guard = self.select_lock.lock().unwrap();
+ }
+
+ // Like the stream implementation, we want to make sure that the count
+ // on the channel goes non-negative. We don't know how negative the
+ // stream currently is, so instead of using a steal value of 1, we load
+ // the channel count and figure out what we should do to make it
+ // positive.
+ let steals = {
+ let cnt = self.cnt.load(Ordering::SeqCst);
+ if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0}
+ };
+ let prev = self.bump(steals + 1);
+
+ if prev == DISCONNECTED {
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ true
+ } else {
+ let cur = prev + steals + 1;
+ assert!(cur >= 0);
+ if prev < 0 {
+ drop(self.take_to_wake());
+ } else {
+ while self.to_wake.load(Ordering::SeqCst) != 0 {
+ thread::yield_now();
+ }
+ }
+ unsafe {
+ // if the number of steals is -1, it was the pre-emptive -1 steal
+ // count from when we inherited a blocker. This is fine because
+ // we're just going to overwrite it with a real value.
+ let old = self.steals.get();
+ assert!(*old == 0 || *old == -1);
+ *old = steals;
+ prev >= 0
+ }
+ }
+ }
+}
+
+impl<T> Drop for Packet<T> {
+ fn drop(&mut self) {
+ // Note that this load is not only an assert for correctness about
+ // disconnection, but also a proper fence before the read of
+ // `to_wake`, so this assert cannot be removed with also removing
+ // the `to_wake` assert.
+ assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.channels.load(Ordering::SeqCst), 0);
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/spsc_queue.rs b/ctr-std/src/sync/mpsc/spsc_queue.rs
new file mode 100644
index 0000000..5858e4b
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/spsc_queue.rs
@@ -0,0 +1,337 @@
+/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
+ * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and documentation are
+ * those of the authors and should not be interpreted as representing official
+ * policies, either expressed or implied, of Dmitry Vyukov.
+ */
+
+// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
+
+//! A single-producer single-consumer concurrent queue
+//!
+//! This module contains the implementation of an SPSC queue which can be used
+//! concurrently between two threads. This data structure is safe to use and
+//! enforces the semantics that there is one pusher and one popper.
+
+use alloc::boxed::Box;
+use core::ptr;
+use core::cell::UnsafeCell;
+
+use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
+
+// Node within the linked list queue of messages to send
+struct Node<T> {
+ // FIXME: this could be an uninitialized T if we're careful enough, and
+ // that would reduce memory usage (and be a bit faster).
+ // is it worth it?
+ value: Option<T>, // nullable for re-use of nodes
+ next: AtomicPtr<Node<T>>, // next node in the queue
+}
+
+/// The single-producer single-consumer queue. This structure is not cloneable,
+/// but it can be safely shared in an Arc if it is guaranteed that there
+/// is only one popper and one pusher touching the queue at any one point in
+/// time.
+pub struct Queue<T> {
+ // consumer fields
+ tail: UnsafeCell<*mut Node<T>>, // where to pop from
+ tail_prev: AtomicPtr<Node<T>>, // where to pop from
+
+ // producer fields
+ head: UnsafeCell<*mut Node<T>>, // where to push to
+ first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
+ tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
+
+ // Cache maintenance fields. Additions and subtractions are stored
+ // separately in order to allow them to use nonatomic addition/subtraction.
+ cache_bound: usize,
+ cache_additions: AtomicUsize,
+ cache_subtractions: AtomicUsize,
+}
+
+unsafe impl<T: Send> Send for Queue<T> { }
+
+unsafe impl<T: Send> Sync for Queue<T> { }
+
+impl<T> Node<T> {
+ fn new() -> *mut Node<T> {
+ Box::into_raw(box Node {
+ value: None,
+ next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
+ })
+ }
+}
+
+impl<T> Queue<T> {
+ /// Creates a new queue.
+ ///
+ /// This is unsafe as the type system doesn't enforce a single
+ /// consumer-producer relationship. It also allows the consumer to `pop`
+ /// items while there is a `peek` active due to all methods having a
+ /// non-mutable receiver.
+ ///
+ /// # Arguments
+ ///
+ /// * `bound` - This queue implementation is implemented with a linked
+ /// list, and this means that a push is always a malloc. In
+ /// order to amortize this cost, an internal cache of nodes is
+ /// maintained to prevent a malloc from always being
+ /// necessary. This bound is the limit on the size of the
+ /// cache (if desired). If the value is 0, then the cache has
+ /// no bound. Otherwise, the cache will never grow larger than
+ /// `bound` (although the queue itself could be much larger.
+ pub unsafe fn new(bound: usize) -> Queue<T> {
+ let n1 = Node::new();
+ let n2 = Node::new();
+ (*n1).next.store(n2, Ordering::Relaxed);
+ Queue {
+ tail: UnsafeCell::new(n2),
+ tail_prev: AtomicPtr::new(n1),
+ head: UnsafeCell::new(n2),
+ first: UnsafeCell::new(n1),
+ tail_copy: UnsafeCell::new(n1),
+ cache_bound: bound,
+ cache_additions: AtomicUsize::new(0),
+ cache_subtractions: AtomicUsize::new(0),
+ }
+ }
+
+ /// Pushes a new value onto this queue. Note that to use this function
+ /// safely, it must be externally guaranteed that there is only one pusher.
+ pub fn push(&self, t: T) {
+ unsafe {
+ // Acquire a node (which either uses a cached one or allocates a new
+ // one), and then append this to the 'head' node.
+ let n = self.alloc();
+ assert!((*n).value.is_none());
+ (*n).value = Some(t);
+ (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
+ (**self.head.get()).next.store(n, Ordering::Release);
+ *self.head.get() = n;
+ }
+ }
+
+ unsafe fn alloc(&self) -> *mut Node<T> {
+ // First try to see if we can consume the 'first' node for our uses.
+ // We try to avoid as many atomic instructions as possible here, so
+ // the addition to cache_subtractions is not atomic (plus we're the
+ // only one subtracting from the cache).
+ if *self.first.get() != *self.tail_copy.get() {
+ if self.cache_bound > 0 {
+ let b = self.cache_subtractions.load(Ordering::Relaxed);
+ self.cache_subtractions.store(b + 1, Ordering::Relaxed);
+ }
+ let ret = *self.first.get();
+ *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ return ret;
+ }
+ // If the above fails, then update our copy of the tail and try
+ // again.
+ *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
+ if *self.first.get() != *self.tail_copy.get() {
+ if self.cache_bound > 0 {
+ let b = self.cache_subtractions.load(Ordering::Relaxed);
+ self.cache_subtractions.store(b + 1, Ordering::Relaxed);
+ }
+ let ret = *self.first.get();
+ *self.first.get() = (*ret).next.load(Ordering::Relaxed);
+ return ret;
+ }
+ // If all of that fails, then we have to allocate a new node
+ // (there's nothing in the node cache).
+ Node::new()
+ }
+
+ /// Attempts to pop a value from this queue. Remember that to use this type
+ /// safely you must ensure that there is only one popper at a time.
+ pub fn pop(&self) -> Option<T> {
+ unsafe {
+ // The `tail` node is not actually a used node, but rather a
+ // sentinel from where we should start popping from. Hence, look at
+ // tail's next field and see if we can use it. If we do a pop, then
+ // the current tail node is a candidate for going into the cache.
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+ if next.is_null() { return None }
+ assert!((*next).value.is_some());
+ let ret = (*next).value.take();
+
+ *self.tail.get() = next;
+ if self.cache_bound == 0 {
+ self.tail_prev.store(tail, Ordering::Release);
+ } else {
+ // FIXME: this is dubious with overflow.
+ let additions = self.cache_additions.load(Ordering::Relaxed);
+ let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
+ let size = additions - subtractions;
+
+ if size < self.cache_bound {
+ self.tail_prev.store(tail, Ordering::Release);
+ self.cache_additions.store(additions + 1, Ordering::Relaxed);
+ } else {
+ (*self.tail_prev.load(Ordering::Relaxed))
+ .next.store(next, Ordering::Relaxed);
+ // We have successfully erased all references to 'tail', so
+ // now we can safely drop it.
+ let _: Box<Node<T>> = Box::from_raw(tail);
+ }
+ }
+ ret
+ }
+ }
+
+ /// Attempts to peek at the head of the queue, returning `None` if the queue
+ /// has no data currently
+ ///
+ /// # Warning
+ /// The reference returned is invalid if it is not used before the consumer
+ /// pops the value off the queue. If the producer then pushes another value
+ /// onto the queue, it will overwrite the value pointed to by the reference.
+ pub fn peek(&self) -> Option<&mut T> {
+ // This is essentially the same as above with all the popping bits
+ // stripped out.
+ unsafe {
+ let tail = *self.tail.get();
+ let next = (*tail).next.load(Ordering::Acquire);
+ if next.is_null() { None } else { (*next).value.as_mut() }
+ }
+ }
+}
+
+impl<T> Drop for Queue<T> {
+ fn drop(&mut self) {
+ unsafe {
+ let mut cur = *self.first.get();
+ while !cur.is_null() {
+ let next = (*cur).next.load(Ordering::Relaxed);
+ let _n: Box<Node<T>> = Box::from_raw(cur);
+ cur = next;
+ }
+ }
+ }
+}
+
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use sync::Arc;
+ use super::Queue;
+ use thread;
+ use sync::mpsc::channel;
+
+ #[test]
+ fn smoke() {
+ unsafe {
+ let queue = Queue::new(0);
+ queue.push(1);
+ queue.push(2);
+ assert_eq!(queue.pop(), Some(1));
+ assert_eq!(queue.pop(), Some(2));
+ assert_eq!(queue.pop(), None);
+ queue.push(3);
+ queue.push(4);
+ assert_eq!(queue.pop(), Some(3));
+ assert_eq!(queue.pop(), Some(4));
+ assert_eq!(queue.pop(), None);
+ }
+ }
+
+ #[test]
+ fn peek() {
+ unsafe {
+ let queue = Queue::new(0);
+ queue.push(vec![1]);
+
+ // Ensure the borrowchecker works
+ match queue.peek() {
+ Some(vec) => {
+ assert_eq!(&*vec, &[1]);
+ },
+ None => unreachable!()
+ }
+
+ match queue.pop() {
+ Some(vec) => {
+ assert_eq!(&*vec, &[1]);
+ },
+ None => unreachable!()
+ }
+ }
+ }
+
+ #[test]
+ fn drop_full() {
+ unsafe {
+ let q: Queue<Box<_>> = Queue::new(0);
+ q.push(box 1);
+ q.push(box 2);
+ }
+ }
+
+ #[test]
+ fn smoke_bound() {
+ unsafe {
+ let q = Queue::new(0);
+ q.push(1);
+ q.push(2);
+ assert_eq!(q.pop(), Some(1));
+ assert_eq!(q.pop(), Some(2));
+ assert_eq!(q.pop(), None);
+ q.push(3);
+ q.push(4);
+ assert_eq!(q.pop(), Some(3));
+ assert_eq!(q.pop(), Some(4));
+ assert_eq!(q.pop(), None);
+ }
+ }
+
+ #[test]
+ fn stress() {
+ unsafe {
+ stress_bound(0);
+ stress_bound(1);
+ }
+
+ unsafe fn stress_bound(bound: usize) {
+ let q = Arc::new(Queue::new(bound));
+
+ let (tx, rx) = channel();
+ let q2 = q.clone();
+ let _t = thread::spawn(move|| {
+ for _ in 0..100000 {
+ loop {
+ match q2.pop() {
+ Some(1) => break,
+ Some(_) => panic!(),
+ None => {}
+ }
+ }
+ }
+ tx.send(()).unwrap();
+ });
+ for _ in 0..100000 {
+ q.push(1);
+ }
+ rx.recv().unwrap();
+ }
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/stream.rs b/ctr-std/src/sync/mpsc/stream.rs
new file mode 100644
index 0000000..47cd897
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/stream.rs
@@ -0,0 +1,487 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Stream channels
+///
+/// This is the flavor of channels which are optimized for one sender and one
+/// receiver. The sender will be upgraded to a shared channel if the channel is
+/// cloned.
+///
+/// High level implementation details can be found in the comment of the parent
+/// module.
+
+pub use self::Failure::*;
+pub use self::UpgradeResult::*;
+pub use self::SelectionResult::*;
+use self::Message::*;
+
+use cell::UnsafeCell;
+use core::cmp;
+use core::isize;
+use ptr;
+use thread;
+use time::Instant;
+
+use sync::atomic::{AtomicIsize, AtomicUsize, Ordering, AtomicBool};
+use sync::mpsc::Receiver;
+use sync::mpsc::blocking::{self, SignalToken};
+use sync::mpsc::spsc_queue as spsc;
+
+const DISCONNECTED: isize = isize::MIN;
+#[cfg(test)]
+const MAX_STEALS: isize = 5;
+#[cfg(not(test))]
+const MAX_STEALS: isize = 1 << 20;
+
+pub struct Packet<T> {
+ queue: spsc::Queue<Message<T>>, // internal queue for all message
+
+ cnt: AtomicIsize, // How many items are on this channel
+ steals: UnsafeCell<isize>, // How many times has a port received without blocking?
+ to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
+
+ port_dropped: AtomicBool, // flag if the channel has been destroyed.
+}
+
+pub enum Failure<T> {
+ Empty,
+ Disconnected,
+ Upgraded(Receiver<T>),
+}
+
+pub enum UpgradeResult {
+ UpSuccess,
+ UpDisconnected,
+ UpWoke(SignalToken),
+}
+
+pub enum SelectionResult<T> {
+ SelSuccess,
+ SelCanceled,
+ SelUpgraded(SignalToken, Receiver<T>),
+}
+
+// Any message could contain an "upgrade request" to a new shared port, so the
+// internal queue it's a queue of T, but rather Message<T>
+enum Message<T> {
+ Data(T),
+ GoUp(Receiver<T>),
+}
+
+impl<T> Packet<T> {
+ pub fn new() -> Packet<T> {
+ Packet {
+ queue: unsafe { spsc::Queue::new(128) },
+
+ cnt: AtomicIsize::new(0),
+ steals: UnsafeCell::new(0),
+ to_wake: AtomicUsize::new(0),
+
+ port_dropped: AtomicBool::new(false),
+ }
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ // If the other port has deterministically gone away, then definitely
+ // must return the data back up the stack. Otherwise, the data is
+ // considered as being sent.
+ if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+
+ match self.do_send(Data(t)) {
+ UpSuccess | UpDisconnected => {},
+ UpWoke(token) => { token.signal(); }
+ }
+ Ok(())
+ }
+
+ pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
+ // If the port has gone away, then there's no need to proceed any
+ // further.
+ if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+
+ self.do_send(GoUp(up))
+ }
+
+ fn do_send(&self, t: Message<T>) -> UpgradeResult {
+ self.queue.push(t);
+ match self.cnt.fetch_add(1, Ordering::SeqCst) {
+ // As described in the mod's doc comment, -1 == wakeup
+ -1 => UpWoke(self.take_to_wake()),
+ // As as described before, SPSC queues must be >= -2
+ -2 => UpSuccess,
+
+ // Be sure to preserve the disconnected state, and the return value
+ // in this case is going to be whether our data was received or not.
+ // This manifests itself on whether we have an empty queue or not.
+ //
+ // Primarily, are required to drain the queue here because the port
+ // will never remove this data. We can only have at most one item to
+ // drain (the port drains the rest).
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ let first = self.queue.pop();
+ let second = self.queue.pop();
+ assert!(second.is_none());
+
+ match first {
+ Some(..) => UpSuccess, // we failed to send the data
+ None => UpDisconnected, // we successfully sent data
+ }
+ }
+
+ // Otherwise we just sent some data on a non-waiting queue, so just
+ // make sure the world is sane and carry on!
+ n => { assert!(n >= 0); UpSuccess }
+ }
+ }
+
+ // Consumes ownership of the 'to_wake' field.
+ fn take_to_wake(&self) -> SignalToken {
+ let ptr = self.to_wake.load(Ordering::SeqCst);
+ self.to_wake.store(0, Ordering::SeqCst);
+ assert!(ptr != 0);
+ unsafe { SignalToken::cast_from_usize(ptr) }
+ }
+
+ // Decrements the count on the channel for a sleeper, returning the sleeper
+ // back if it shouldn't sleep. Note that this is the location where we take
+ // steals into account.
+ fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ let ptr = unsafe { token.cast_to_usize() };
+ self.to_wake.store(ptr, Ordering::SeqCst);
+
+ let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+
+ match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+ DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+ // If we factor in our steals and notice that the channel has no
+ // data, we successfully sleep
+ n => {
+ assert!(n >= 0);
+ if n - steals <= 0 { return Ok(()) }
+ }
+ }
+
+ self.to_wake.store(0, Ordering::SeqCst);
+ Err(unsafe { SignalToken::cast_from_usize(ptr) })
+ }
+
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
+ // Optimistic preflight check (scheduling is expensive).
+ match self.try_recv() {
+ Err(Empty) => {}
+ data => return data,
+ }
+
+ // Welp, our channel has no data. Deschedule the current thread and
+ // initiate the blocking protocol.
+ let (wait_token, signal_token) = blocking::tokens();
+ if self.decrement(signal_token).is_ok() {
+ if let Some(deadline) = deadline {
+ let timed_out = !wait_token.wait_max_until(deadline);
+ if timed_out {
+ self.abort_selection(/* was_upgrade = */ false).map_err(Upgraded)?;
+ }
+ } else {
+ wait_token.wait();
+ }
+ }
+
+ match self.try_recv() {
+ // Messages which actually popped from the queue shouldn't count as
+ // a steal, so offset the decrement here (we already have our
+ // "steal" factored into the channel count above).
+ data @ Ok(..) |
+ data @ Err(Upgraded(..)) => unsafe {
+ *self.steals.get() -= 1;
+ data
+ },
+
+ data => data,
+ }
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure<T>> {
+ match self.queue.pop() {
+ // If we stole some data, record to that effect (this will be
+ // factored into cnt later on).
+ //
+ // Note that we don't allow steals to grow without bound in order to
+ // prevent eventual overflow of either steals or cnt as an overflow
+ // would have catastrophic results. Sometimes, steals > cnt, but
+ // other times cnt > steals, so we don't know the relation between
+ // steals and cnt. This code path is executed only rarely, so we do
+ // a pretty slow operation, of swapping 0 into cnt, taking steals
+ // down as much as possible (without going negative), and then
+ // adding back in whatever we couldn't factor into steals.
+ Some(data) => unsafe {
+ if *self.steals.get() > MAX_STEALS {
+ match self.cnt.swap(0, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ }
+ n => {
+ let m = cmp::min(n, *self.steals.get());
+ *self.steals.get() -= m;
+ self.bump(n - m);
+ }
+ }
+ assert!(*self.steals.get() >= 0);
+ }
+ *self.steals.get() += 1;
+ match data {
+ Data(t) => Ok(t),
+ GoUp(up) => Err(Upgraded(up)),
+ }
+ },
+
+ None => {
+ match self.cnt.load(Ordering::SeqCst) {
+ n if n != DISCONNECTED => Err(Empty),
+
+ // This is a little bit of a tricky case. We failed to pop
+ // data above, and then we have viewed that the channel is
+ // disconnected. In this window more data could have been
+ // sent on the channel. It doesn't really make sense to
+ // return that the channel is disconnected when there's
+ // actually data on it, so be extra sure there's no data by
+ // popping one more time.
+ //
+ // We can ignore steals because the other end is
+ // disconnected and we'll never need to really factor in our
+ // steals again.
+ _ => {
+ match self.queue.pop() {
+ Some(Data(t)) => Ok(t),
+ Some(GoUp(up)) => Err(Upgraded(up)),
+ None => Err(Disconnected),
+ }
+ }
+ }
+ }
+ }
+ }
+
+ pub fn drop_chan(&self) {
+ // Dropping a channel is pretty simple, we just flag it as disconnected
+ // and then wakeup a blocker if there is one.
+ match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+ -1 => { self.take_to_wake().signal(); }
+ DISCONNECTED => {}
+ n => { assert!(n >= 0); }
+ }
+ }
+
+ pub fn drop_port(&self) {
+ // Dropping a port seems like a fairly trivial thing. In theory all we
+ // need to do is flag that we're disconnected and then everything else
+ // can take over (we don't have anyone to wake up).
+ //
+ // The catch for Ports is that we want to drop the entire contents of
+ // the queue. There are multiple reasons for having this property, the
+ // largest of which is that if another chan is waiting in this channel
+ // (but not received yet), then waiting on that port will cause a
+ // deadlock.
+ //
+ // So if we accept that we must now destroy the entire contents of the
+ // queue, this code may make a bit more sense. The tricky part is that
+ // we can't let any in-flight sends go un-dropped, we have to make sure
+ // *everything* is dropped and nothing new will come onto the channel.
+
+ // The first thing we do is set a flag saying that we're done for. All
+ // sends are gated on this flag, so we're immediately guaranteed that
+ // there are a bounded number of active sends that we'll have to deal
+ // with.
+ self.port_dropped.store(true, Ordering::SeqCst);
+
+ // Now that we're guaranteed to deal with a bounded number of senders,
+ // we need to drain the queue. This draining process happens atomically
+ // with respect to the "count" of the channel. If the count is nonzero
+ // (with steals taken into account), then there must be data on the
+ // channel. In this case we drain everything and then try again. We will
+ // continue to fail while active senders send data while we're dropping
+ // data, but eventually we're guaranteed to break out of this loop
+ // (because there is a bounded number of senders).
+ let mut steals = unsafe { *self.steals.get() };
+ while {
+ let cnt = self.cnt.compare_and_swap(
+ steals, DISCONNECTED, Ordering::SeqCst);
+ cnt != DISCONNECTED && cnt != steals
+ } {
+ while let Some(_) = self.queue.pop() { steals += 1; }
+ }
+
+ // At this point in time, we have gated all future senders from sending,
+ // and we have flagged the channel as being disconnected. The senders
+ // still have some responsibility, however, because some sends may not
+ // complete until after we flag the disconnection. There are more
+ // details in the sending methods that see DISCONNECTED
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // Tests to see whether this port can receive without blocking. If Ok is
+ // returned, then that's the answer. If Err is returned, then the returned
+ // port needs to be queried instead (an upgrade happened)
+ pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
+ // We peek at the queue to see if there's anything on it, and we use
+ // this return value to determine if we should pop from the queue and
+ // upgrade this channel immediately. If it looks like we've got an
+ // upgrade pending, then go through the whole recv rigamarole to update
+ // the internal state.
+ match self.queue.peek() {
+ Some(&mut GoUp(..)) => {
+ match self.recv(None) {
+ Err(Upgraded(port)) => Err(port),
+ _ => unreachable!(),
+ }
+ }
+ Some(..) => Ok(true),
+ None => Ok(false)
+ }
+ }
+
+ // increment the count on the channel (used for selection)
+ fn bump(&self, amt: isize) -> isize {
+ match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ DISCONNECTED
+ }
+ n => n
+ }
+ }
+
+ // Attempts to start selecting on this port. Like a oneshot, this can fail
+ // immediately because of an upgrade.
+ pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
+ match self.decrement(token) {
+ Ok(()) => SelSuccess,
+ Err(token) => {
+ let ret = match self.queue.peek() {
+ Some(&mut GoUp(..)) => {
+ match self.queue.pop() {
+ Some(GoUp(port)) => SelUpgraded(token, port),
+ _ => unreachable!(),
+ }
+ }
+ Some(..) => SelCanceled,
+ None => SelCanceled,
+ };
+ // Undo our decrement above, and we should be guaranteed that the
+ // previous value is positive because we're not going to sleep
+ let prev = self.bump(1);
+ assert!(prev == DISCONNECTED || prev >= 0);
+ ret
+ }
+ }
+ }
+
+ // Removes a previous thread from being blocked in this port
+ pub fn abort_selection(&self,
+ was_upgrade: bool) -> Result<bool, Receiver<T>> {
+ // If we're aborting selection after upgrading from a oneshot, then
+ // we're guarantee that no one is waiting. The only way that we could
+ // have seen the upgrade is if data was actually sent on the channel
+ // half again. For us, this means that there is guaranteed to be data on
+ // this channel. Furthermore, we're guaranteed that there was no
+ // start_selection previously, so there's no need to modify `self.cnt`
+ // at all.
+ //
+ // Hence, because of these invariants, we immediately return `Ok(true)`.
+ // Note that the data may not actually be sent on the channel just yet.
+ // The other end could have flagged the upgrade but not sent data to
+ // this end. This is fine because we know it's a small bounded windows
+ // of time until the data is actually sent.
+ if was_upgrade {
+ assert_eq!(unsafe { *self.steals.get() }, 0);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ return Ok(true)
+ }
+
+ // We want to make sure that the count on the channel goes non-negative,
+ // and in the stream case we can have at most one steal, so just assume
+ // that we had one steal.
+ let steals = 1;
+ let prev = self.bump(steals + 1);
+
+ // If we were previously disconnected, then we know for sure that there
+ // is no thread in to_wake, so just keep going
+ let has_data = if prev == DISCONNECTED {
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ true // there is data, that data is that we're disconnected
+ } else {
+ let cur = prev + steals + 1;
+ assert!(cur >= 0);
+
+ // If the previous count was negative, then we just made things go
+ // positive, hence we passed the -1 boundary and we're responsible
+ // for removing the to_wake() field and trashing it.
+ //
+ // If the previous count was positive then we're in a tougher
+ // situation. A possible race is that a sender just incremented
+ // through -1 (meaning it's going to try to wake a thread up), but it
+ // hasn't yet read the to_wake. In order to prevent a future recv()
+ // from waking up too early (this sender picking up the plastered
+ // over to_wake), we spin loop here waiting for to_wake to be 0.
+ // Note that this entire select() implementation needs an overhaul,
+ // and this is *not* the worst part of it, so this is not done as a
+ // final solution but rather out of necessity for now to get
+ // something working.
+ if prev < 0 {
+ drop(self.take_to_wake());
+ } else {
+ while self.to_wake.load(Ordering::SeqCst) != 0 {
+ thread::yield_now();
+ }
+ }
+ unsafe {
+ assert_eq!(*self.steals.get(), 0);
+ *self.steals.get() = steals;
+ }
+
+ // if we were previously positive, then there's surely data to
+ // receive
+ prev >= 0
+ };
+
+ // Now that we've determined that this queue "has data", we peek at the
+ // queue to see if the data is an upgrade or not. If it's an upgrade,
+ // then we need to destroy this port and abort selection on the
+ // upgraded port.
+ if has_data {
+ match self.queue.peek() {
+ Some(&mut GoUp(..)) => {
+ match self.queue.pop() {
+ Some(GoUp(port)) => Err(port),
+ _ => unreachable!(),
+ }
+ }
+ _ => Ok(true),
+ }
+ } else {
+ Ok(false)
+ }
+ }
+}
+
+impl<T> Drop for Packet<T> {
+ fn drop(&mut self) {
+ // Note that this load is not only an assert for correctness about
+ // disconnection, but also a proper fence before the read of
+ // `to_wake`, so this assert cannot be removed with also removing
+ // the `to_wake` assert.
+ assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
+ assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ }
+}
diff --git a/ctr-std/src/sync/mpsc/sync.rs b/ctr-std/src/sync/mpsc/sync.rs
new file mode 100644
index 0000000..1d16e00
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/sync.rs
@@ -0,0 +1,528 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Synchronous channels/ports
+///
+/// This channel implementation differs significantly from the asynchronous
+/// implementations found next to it (oneshot/stream/share). This is an
+/// implementation of a synchronous, bounded buffer channel.
+///
+/// Each channel is created with some amount of backing buffer, and sends will
+/// *block* until buffer space becomes available. A buffer size of 0 is valid,
+/// which means that every successful send is paired with a successful recv.
+///
+/// This flavor of channels defines a new `send_opt` method for channels which
+/// is the method by which a message is sent but the thread does not panic if it
+/// cannot be delivered.
+///
+/// Another major difference is that send() will *always* return back the data
+/// if it couldn't be sent. This is because it is deterministically known when
+/// the data is received and when it is not received.
+///
+/// Implementation-wise, it can all be summed up with "use a mutex plus some
+/// logic". The mutex used here is an OS native mutex, meaning that no user code
+/// is run inside of the mutex (to prevent context switching). This
+/// implementation shares almost all code for the buffered and unbuffered cases
+/// of a synchronous channel. There are a few branches for the unbuffered case,
+/// but they're mostly just relevant to blocking senders.
+
+pub use self::Failure::*;
+use self::Blocker::*;
+
+use core::intrinsics::abort;
+use core::isize;
+use core::mem;
+use core::ptr;
+
+use sync::atomic::{Ordering, AtomicUsize};
+use sync::mpsc::blocking::{self, WaitToken, SignalToken};
+use sync::mpsc::select::StartResult::{self, Installed, Abort};
+use sync::{Mutex, MutexGuard};
+use time::Instant;
+
+const MAX_REFCOUNT: usize = (isize::MAX) as usize;
+
+pub struct Packet<T> {
+ /// Only field outside of the mutex. Just done for kicks, but mainly because
+ /// the other shared channel already had the code implemented
+ channels: AtomicUsize,
+
+ lock: Mutex<State<T>>,
+}
+
+unsafe impl<T: Send> Send for Packet<T> { }
+
+unsafe impl<T: Send> Sync for Packet<T> { }
+
+struct State<T> {
+ disconnected: bool, // Is the channel disconnected yet?
+ queue: Queue, // queue of senders waiting to send data
+ blocker: Blocker, // currently blocked thread on this channel
+ buf: Buffer<T>, // storage for buffered messages
+ cap: usize, // capacity of this channel
+
+ /// A curious flag used to indicate whether a sender failed or succeeded in
+ /// blocking. This is used to transmit information back to the thread that it
+ /// must dequeue its message from the buffer because it was not received.
+ /// This is only relevant in the 0-buffer case. This obviously cannot be
+ /// safely constructed, but it's guaranteed to always have a valid pointer
+ /// value.
+ canceled: Option<&'static mut bool>,
+}
+
+unsafe impl<T: Send> Send for State<T> {}
+
+/// Possible flavors of threads who can be blocked on this channel.
+enum Blocker {
+ BlockedSender(SignalToken),
+ BlockedReceiver(SignalToken),
+ NoneBlocked
+}
+
+/// Simple queue for threading threads together. Nodes are stack-allocated, so
+/// this structure is not safe at all
+struct Queue {
+ head: *mut Node,
+ tail: *mut Node,
+}
+
+struct Node {
+ token: Option<SignalToken>,
+ next: *mut Node,
+}
+
+unsafe impl Send for Node {}
+
+/// A simple ring-buffer
+struct Buffer<T> {
+ buf: Vec<Option<T>>,
+ start: usize,
+ size: usize,
+}
+
+#[derive(Debug)]
+pub enum Failure {
+ Empty,
+ Disconnected,
+}
+
+/// Atomically blocks the current thread, placing it into `slot`, unlocking `lock`
+/// in the meantime. This re-locks the mutex upon returning.
+fn wait<'a, 'b, T>(lock: &'a Mutex<State<T>>,
+ mut guard: MutexGuard<'b, State<T>>,
+ f: fn(SignalToken) -> Blocker)
+ -> MutexGuard<'a, State<T>>
+{
+ let (wait_token, signal_token) = blocking::tokens();
+ match mem::replace(&mut guard.blocker, f(signal_token)) {
+ NoneBlocked => {}
+ _ => unreachable!(),
+ }
+ drop(guard); // unlock
+ wait_token.wait(); // block
+ lock.lock().unwrap() // relock
+}
+
+/// Same as wait, but waiting at most until `deadline`.
+fn wait_timeout_receiver<'a, 'b, T>(lock: &'a Mutex<State<T>>,
+ deadline: Instant,
+ mut guard: MutexGuard<'b, State<T>>,
+ success: &mut bool)
+ -> MutexGuard<'a, State<T>>
+{
+ let (wait_token, signal_token) = blocking::tokens();
+ match mem::replace(&mut guard.blocker, BlockedReceiver(signal_token)) {
+ NoneBlocked => {}
+ _ => unreachable!(),
+ }
+ drop(guard); // unlock
+ *success = wait_token.wait_max_until(deadline); // block
+ let mut new_guard = lock.lock().unwrap(); // relock
+ if !*success {
+ abort_selection(&mut new_guard);
+ }
+ new_guard
+}
+
+fn abort_selection<'a, T>(guard: &mut MutexGuard<'a , State<T>>) -> bool {
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ NoneBlocked => true,
+ BlockedSender(token) => {
+ guard.blocker = BlockedSender(token);
+ true
+ }
+ BlockedReceiver(token) => { drop(token); false }
+ }
+}
+
+/// Wakes up a thread, dropping the lock at the correct time
+fn wakeup<T>(token: SignalToken, guard: MutexGuard<State<T>>) {
+ // We need to be careful to wake up the waiting thread *outside* of the mutex
+ // in case it incurs a context switch.
+ drop(guard);
+ token.signal();
+}
+
+impl<T> Packet<T> {
+ pub fn new(cap: usize) -> Packet<T> {
+ Packet {
+ channels: AtomicUsize::new(1),
+ lock: Mutex::new(State {
+ disconnected: false,
+ blocker: NoneBlocked,
+ cap: cap,
+ canceled: None,
+ queue: Queue {
+ head: ptr::null_mut(),
+ tail: ptr::null_mut(),
+ },
+ buf: Buffer {
+ buf: (0..cap + if cap == 0 {1} else {0}).map(|_| None).collect(),
+ start: 0,
+ size: 0,
+ },
+ }),
+ }
+ }
+
+ // wait until a send slot is available, returning locked access to
+ // the channel state.
+ fn acquire_send_slot(&self) -> MutexGuard<State<T>> {
+ let mut node = Node { token: None, next: ptr::null_mut() };
+ loop {
+ let mut guard = self.lock.lock().unwrap();
+ // are we ready to go?
+ if guard.disconnected || guard.buf.size() < guard.buf.cap() {
+ return guard;
+ }
+ // no room; actually block
+ let wait_token = guard.queue.enqueue(&mut node);
+ drop(guard);
+ wait_token.wait();
+ }
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ let mut guard = self.acquire_send_slot();
+ if guard.disconnected { return Err(t) }
+ guard.buf.enqueue(t);
+
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ // if our capacity is 0, then we need to wait for a receiver to be
+ // available to take our data. After waiting, we check again to make
+ // sure the port didn't go away in the meantime. If it did, we need
+ // to hand back our data.
+ NoneBlocked if guard.cap == 0 => {
+ let mut canceled = false;
+ assert!(guard.canceled.is_none());
+ guard.canceled = Some(unsafe { mem::transmute(&mut canceled) });
+ let mut guard = wait(&self.lock, guard, BlockedSender);
+ if canceled {Err(guard.buf.dequeue())} else {Ok(())}
+ }
+
+ // success, we buffered some data
+ NoneBlocked => Ok(()),
+
+ // success, someone's about to receive our buffered data.
+ BlockedReceiver(token) => { wakeup(token, guard); Ok(()) }
+
+ BlockedSender(..) => panic!("lolwut"),
+ }
+ }
+
+ pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
+ let mut guard = self.lock.lock().unwrap();
+ if guard.disconnected {
+ Err(super::TrySendError::Disconnected(t))
+ } else if guard.buf.size() == guard.buf.cap() {
+ Err(super::TrySendError::Full(t))
+ } else if guard.cap == 0 {
+ // With capacity 0, even though we have buffer space we can't
+ // transfer the data unless there's a receiver waiting.
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ NoneBlocked => Err(super::TrySendError::Full(t)),
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(token) => {
+ guard.buf.enqueue(t);
+ wakeup(token, guard);
+ Ok(())
+ }
+ }
+ } else {
+ // If the buffer has some space and the capacity isn't 0, then we
+ // just enqueue the data for later retrieval, ensuring to wake up
+ // any blocked receiver if there is one.
+ assert!(guard.buf.size() < guard.buf.cap());
+ guard.buf.enqueue(t);
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ BlockedReceiver(token) => wakeup(token, guard),
+ NoneBlocked => {}
+ BlockedSender(..) => unreachable!(),
+ }
+ Ok(())
+ }
+ }
+
+ // Receives a message from this channel
+ //
+ // When reading this, remember that there can only ever be one receiver at
+ // time.
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure> {
+ let mut guard = self.lock.lock().unwrap();
+
+ let mut woke_up_after_waiting = false;
+ // Wait for the buffer to have something in it. No need for a
+ // while loop because we're the only receiver.
+ if !guard.disconnected && guard.buf.size() == 0 {
+ if let Some(deadline) = deadline {
+ guard = wait_timeout_receiver(&self.lock,
+ deadline,
+ guard,
+ &mut woke_up_after_waiting);
+ } else {
+ guard = wait(&self.lock, guard, BlockedReceiver);
+ woke_up_after_waiting = true;
+ }
+ }
+
+ // NB: Channel could be disconnected while waiting, so the order of
+ // these conditionals is important.
+ if guard.disconnected && guard.buf.size() == 0 {
+ return Err(Disconnected);
+ }
+
+ // Pick up the data, wake up our neighbors, and carry on
+ assert!(guard.buf.size() > 0 || (deadline.is_some() && !woke_up_after_waiting));
+
+ if guard.buf.size() == 0 { return Err(Empty); }
+
+ let ret = guard.buf.dequeue();
+ self.wakeup_senders(woke_up_after_waiting, guard);
+ Ok(ret)
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure> {
+ let mut guard = self.lock.lock().unwrap();
+
+ // Easy cases first
+ if guard.disconnected && guard.buf.size() == 0 { return Err(Disconnected) }
+ if guard.buf.size() == 0 { return Err(Empty) }
+
+ // Be sure to wake up neighbors
+ let ret = Ok(guard.buf.dequeue());
+ self.wakeup_senders(false, guard);
+ ret
+ }
+
+ // Wake up pending senders after some data has been received
+ //
+ // * `waited` - flag if the receiver blocked to receive some data, or if it
+ // just picked up some data on the way out
+ // * `guard` - the lock guard that is held over this channel's lock
+ fn wakeup_senders(&self, waited: bool, mut guard: MutexGuard<State<T>>) {
+ let pending_sender1: Option<SignalToken> = guard.queue.dequeue();
+
+ // If this is a no-buffer channel (cap == 0), then if we didn't wait we
+ // need to ACK the sender. If we waited, then the sender waking us up
+ // was already the ACK.
+ let pending_sender2 = if guard.cap == 0 && !waited {
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ NoneBlocked => None,
+ BlockedReceiver(..) => unreachable!(),
+ BlockedSender(token) => {
+ guard.canceled.take();
+ Some(token)
+ }
+ }
+ } else {
+ None
+ };
+ mem::drop(guard);
+
+ // only outside of the lock do we wake up the pending threads
+ pending_sender1.map(|t| t.signal());
+ pending_sender2.map(|t| t.signal());
+ }
+
+ // Prepares this shared packet for a channel clone, essentially just bumping
+ // a refcount.
+ pub fn clone_chan(&self) {
+ let old_count = self.channels.fetch_add(1, Ordering::SeqCst);
+
+ // See comments on Arc::clone() on why we do this (for `mem::forget`).
+ if old_count > MAX_REFCOUNT {
+ unsafe {
+ abort();
+ }
+ }
+ }
+
+ pub fn drop_chan(&self) {
+ // Only flag the channel as disconnected if we're the last channel
+ match self.channels.fetch_sub(1, Ordering::SeqCst) {
+ 1 => {}
+ _ => return
+ }
+
+ // Not much to do other than wake up a receiver if one's there
+ let mut guard = self.lock.lock().unwrap();
+ if guard.disconnected { return }
+ guard.disconnected = true;
+ match mem::replace(&mut guard.blocker, NoneBlocked) {
+ NoneBlocked => {}
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(token) => wakeup(token, guard),
+ }
+ }
+
+ pub fn drop_port(&self) {
+ let mut guard = self.lock.lock().unwrap();
+
+ if guard.disconnected { return }
+ guard.disconnected = true;
+
+ // If the capacity is 0, then the sender may want its data back after
+ // we're disconnected. Otherwise it's now our responsibility to destroy
+ // the buffered data. As with many other portions of this code, this
+ // needs to be careful to destroy the data *outside* of the lock to
+ // prevent deadlock.
+ let _data = if guard.cap != 0 {
+ mem::replace(&mut guard.buf.buf, Vec::new())
+ } else {
+ Vec::new()
+ };
+ let mut queue = mem::replace(&mut guard.queue, Queue {
+ head: ptr::null_mut(),
+ tail: ptr::null_mut(),
+ });
+
+ let waiter = match mem::replace(&mut guard.blocker, NoneBlocked) {
+ NoneBlocked => None,
+ BlockedSender(token) => {
+ *guard.canceled.take().unwrap() = true;
+ Some(token)
+ }
+ BlockedReceiver(..) => unreachable!(),
+ };
+ mem::drop(guard);
+
+ while let Some(token) = queue.dequeue() { token.signal(); }
+ waiter.map(|t| t.signal());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // If Ok, the value is whether this port has data, if Err, then the upgraded
+ // port needs to be checked instead of this one.
+ pub fn can_recv(&self) -> bool {
+ let guard = self.lock.lock().unwrap();
+ guard.disconnected || guard.buf.size() > 0
+ }
+
+ // Attempts to start selection on this port. This can either succeed or fail
+ // because there is data waiting.
+ pub fn start_selection(&self, token: SignalToken) -> StartResult {
+ let mut guard = self.lock.lock().unwrap();
+ if guard.disconnected || guard.buf.size() > 0 {
+ Abort
+ } else {
+ match mem::replace(&mut guard.blocker, BlockedReceiver(token)) {
+ NoneBlocked => {}
+ BlockedSender(..) => unreachable!(),
+ BlockedReceiver(..) => unreachable!(),
+ }
+ Installed
+ }
+ }
+
+ // Remove a previous selecting thread from this port. This ensures that the
+ // blocked thread will no longer be visible to any other threads.
+ //
+ // The return value indicates whether there's data on this port.
+ pub fn abort_selection(&self) -> bool {
+ let mut guard = self.lock.lock().unwrap();
+ abort_selection(&mut guard)
+ }
+}
+
+impl<T> Drop for Packet<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.channels.load(Ordering::SeqCst), 0);
+ let mut guard = self.lock.lock().unwrap();
+ assert!(guard.queue.dequeue().is_none());
+ assert!(guard.canceled.is_none());
+ }
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// Buffer, a simple ring buffer backed by Vec<T>
+////////////////////////////////////////////////////////////////////////////////
+
+impl<T> Buffer<T> {
+ fn enqueue(&mut self, t: T) {
+ let pos = (self.start + self.size) % self.buf.len();
+ self.size += 1;
+ let prev = mem::replace(&mut self.buf[pos], Some(t));
+ assert!(prev.is_none());
+ }
+
+ fn dequeue(&mut self) -> T {
+ let start = self.start;
+ self.size -= 1;
+ self.start = (self.start + 1) % self.buf.len();
+ let result = &mut self.buf[start];
+ result.take().unwrap()
+ }
+
+ fn size(&self) -> usize { self.size }
+ fn cap(&self) -> usize { self.buf.len() }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Queue, a simple queue to enqueue threads with (stack-allocated nodes)
+////////////////////////////////////////////////////////////////////////////////
+
+impl Queue {
+ fn enqueue(&mut self, node: &mut Node) -> WaitToken {
+ let (wait_token, signal_token) = blocking::tokens();
+ node.token = Some(signal_token);
+ node.next = ptr::null_mut();
+
+ if self.tail.is_null() {
+ self.head = node as *mut Node;
+ self.tail = node as *mut Node;
+ } else {
+ unsafe {
+ (*self.tail).next = node as *mut Node;
+ self.tail = node as *mut Node;
+ }
+ }
+
+ wait_token
+ }
+
+ fn dequeue(&mut self) -> Option<SignalToken> {
+ if self.head.is_null() {
+ return None
+ }
+ let node = self.head;
+ self.head = unsafe { (*node).next };
+ if self.head.is_null() {
+ self.tail = ptr::null_mut();
+ }
+ unsafe {
+ (*node).next = ptr::null_mut();
+ Some((*node).token.take().unwrap())
+ }
+ }
+}
diff --git a/ctr-std/src/sync/mutex.rs b/ctr-std/src/sync/mutex.rs
index 0d6ad5e..97b84d5 100644
--- a/ctr-std/src/sync/mutex.rs
+++ b/ctr-std/src/sync/mutex.rs
@@ -133,11 +133,13 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> { }
/// dropped (falls out of scope), the lock will be unlocked.
///
/// The data protected by the mutex can be access through this guard via its
-/// `Deref` and `DerefMut` implementations.
+/// [`Deref`] and [`DerefMut`] implementations.
///
/// This structure is created by the [`lock()`] and [`try_lock()`] methods on
/// [`Mutex`].
///
+/// [`Deref`]: ../../std/ops/trait.Deref.html
+/// [`DerefMut`]: ../../std/ops/trait.DerefMut.html
/// [`lock()`]: struct.Mutex.html#method.lock
/// [`try_lock()`]: struct.Mutex.html#method.try_lock
/// [`Mutex`]: struct.Mutex.html
diff --git a/ctr-std/src/sync/once.rs b/ctr-std/src/sync/once.rs
new file mode 100644
index 0000000..1e7394c
--- /dev/null
+++ b/ctr-std/src/sync/once.rs
@@ -0,0 +1,496 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! A "once initialization" primitive
+//!
+//! This primitive is meant to be used to run one-time initialization. An
+//! example use case would be for initializing an FFI library.
+
+// A "once" is a relatively simple primitive, and it's also typically provided
+// by the OS as well (see `pthread_once` or `InitOnceExecuteOnce`). The OS
+// primitives, however, tend to have surprising restrictions, such as the Unix
+// one doesn't allow an argument to be passed to the function.
+//
+// As a result, we end up implementing it ourselves in the standard library.
+// This also gives us the opportunity to optimize the implementation a bit which
+// should help the fast path on call sites. Consequently, let's explain how this
+// primitive works now!
+//
+// So to recap, the guarantees of a Once are that it will call the
+// initialization closure at most once, and it will never return until the one
+// that's running has finished running. This means that we need some form of
+// blocking here while the custom callback is running at the very least.
+// Additionally, we add on the restriction of **poisoning**. Whenever an
+// initialization closure panics, the Once enters a "poisoned" state which means
+// that all future calls will immediately panic as well.
+//
+// So to implement this, one might first reach for a `StaticMutex`, but those
+// unfortunately need to be deallocated (e.g. call `destroy()`) to free memory
+// on all OSes (some of the BSDs allocate memory for mutexes). It also gets a
+// lot harder with poisoning to figure out when the mutex needs to be
+// deallocated because it's not after the closure finishes, but after the first
+// successful closure finishes.
+//
+// All in all, this is instead implemented with atomics and lock-free
+// operations! Whee! Each `Once` has one word of atomic state, and this state is
+// CAS'd on to determine what to do. There are four possible state of a `Once`:
+//
+// * Incomplete - no initialization has run yet, and no thread is currently
+// using the Once.
+// * Poisoned - some thread has previously attempted to initialize the Once, but
+// it panicked, so the Once is now poisoned. There are no other
+// threads currently accessing this Once.
+// * Running - some thread is currently attempting to run initialization. It may
+// succeed, so all future threads need to wait for it to finish.
+// Note that this state is accompanied with a payload, described
+// below.
+// * Complete - initialization has completed and all future calls should finish
+// immediately.
+//
+// With 4 states we need 2 bits to encode this, and we use the remaining bits
+// in the word we have allocated as a queue of threads waiting for the thread
+// responsible for entering the RUNNING state. This queue is just a linked list
+// of Waiter nodes which is monotonically increasing in size. Each node is
+// allocated on the stack, and whenever the running closure finishes it will
+// consume the entire queue and notify all waiters they should try again.
+//
+// You'll find a few more details in the implementation, but that's the gist of
+// it!
+
+use fmt;
+use marker;
+use ptr;
+use sync::atomic::{AtomicUsize, AtomicBool, Ordering};
+use thread::{self, Thread};
+
+/// A synchronization primitive which can be used to run a one-time global
+/// initialization. Useful for one-time initialization for FFI or related
+/// functionality. This type can only be constructed with the `ONCE_INIT`
+/// value.
+///
+/// # Examples
+///
+/// ```
+/// use std::sync::{Once, ONCE_INIT};
+///
+/// static START: Once = ONCE_INIT;
+///
+/// START.call_once(|| {
+/// // run initialization here
+/// });
+/// ```
+#[stable(feature = "rust1", since = "1.0.0")]
+pub struct Once {
+ // This `state` word is actually an encoded version of just a pointer to a
+ // `Waiter`, so we add the `PhantomData` appropriately.
+ state: AtomicUsize,
+ _marker: marker::PhantomData<*mut Waiter>,
+}
+
+// The `PhantomData` of a raw pointer removes these two auto traits, but we
+// enforce both below in the implementation so this should be safe to add.
+#[stable(feature = "rust1", since = "1.0.0")]
+unsafe impl Sync for Once {}
+#[stable(feature = "rust1", since = "1.0.0")]
+unsafe impl Send for Once {}
+
+/// State yielded to the `call_once_force` method which can be used to query
+/// whether the `Once` was previously poisoned or not.
+#[unstable(feature = "once_poison", issue = "33577")]
+#[derive(Debug)]
+pub struct OnceState {
+ poisoned: bool,
+}
+
+/// Initialization value for static `Once` values.
+#[stable(feature = "rust1", since = "1.0.0")]
+pub const ONCE_INIT: Once = Once::new();
+
+// Four states that a Once can be in, encoded into the lower bits of `state` in
+// the Once structure.
+const INCOMPLETE: usize = 0x0;
+const POISONED: usize = 0x1;
+const RUNNING: usize = 0x2;
+const COMPLETE: usize = 0x3;
+
+// Mask to learn about the state. All other bits are the queue of waiters if
+// this is in the RUNNING state.
+const STATE_MASK: usize = 0x3;
+
+// Representation of a node in the linked list of waiters in the RUNNING state.
+struct Waiter {
+ thread: Option<Thread>,
+ signaled: AtomicBool,
+ next: *mut Waiter,
+}
+
+// Helper struct used to clean up after a closure call with a `Drop`
+// implementation to also run on panic.
+struct Finish {
+ panicked: bool,
+ me: &'static Once,
+}
+
+impl Once {
+ /// Creates a new `Once` value.
+ #[stable(feature = "once_new", since = "1.2.0")]
+ pub const fn new() -> Once {
+ Once {
+ state: AtomicUsize::new(INCOMPLETE),
+ _marker: marker::PhantomData,
+ }
+ }
+
+ /// Performs an initialization routine once and only once. The given closure
+ /// will be executed if this is the first time `call_once` has been called,
+ /// and otherwise the routine will *not* be invoked.
+ ///
+ /// This method will block the calling thread if another initialization
+ /// routine is currently running.
+ ///
+ /// When this function returns, it is guaranteed that some initialization
+ /// has run and completed (it may not be the closure specified). It is also
+ /// guaranteed that any memory writes performed by the executed closure can
+ /// be reliably observed by other threads at this point (there is a
+ /// happens-before relation between the closure and code executing after the
+ /// return).
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use std::sync::{Once, ONCE_INIT};
+ ///
+ /// static mut VAL: usize = 0;
+ /// static INIT: Once = ONCE_INIT;
+ ///
+ /// // Accessing a `static mut` is unsafe much of the time, but if we do so
+ /// // in a synchronized fashion (e.g. write once or read all) then we're
+ /// // good to go!
+ /// //
+ /// // This function will only call `expensive_computation` once, and will
+ /// // otherwise always return the value returned from the first invocation.
+ /// fn get_cached_val() -> usize {
+ /// unsafe {
+ /// INIT.call_once(|| {
+ /// VAL = expensive_computation();
+ /// });
+ /// VAL
+ /// }
+ /// }
+ ///
+ /// fn expensive_computation() -> usize {
+ /// // ...
+ /// # 2
+ /// }
+ /// ```
+ ///
+ /// # Panics
+ ///
+ /// The closure `f` will only be executed once if this is called
+ /// concurrently amongst many threads. If that closure panics, however, then
+ /// it will *poison* this `Once` instance, causing all future invocations of
+ /// `call_once` to also panic.
+ ///
+ /// This is similar to [poisoning with mutexes][poison].
+ ///
+ /// [poison]: struct.Mutex.html#poisoning
+ #[stable(feature = "rust1", since = "1.0.0")]
+ pub fn call_once<F>(&'static self, f: F) where F: FnOnce() {
+ // Fast path, just see if we've completed initialization.
+ if self.state.load(Ordering::SeqCst) == COMPLETE {
+ return
+ }
+
+ let mut f = Some(f);
+ self.call_inner(false, &mut |_| f.take().unwrap()());
+ }
+
+ /// Performs the same function as `call_once` except ignores poisoning.
+ ///
+ /// If this `Once` has been poisoned (some initialization panicked) then
+ /// this function will continue to attempt to call initialization functions
+ /// until one of them doesn't panic.
+ ///
+ /// The closure `f` is yielded a structure which can be used to query the
+ /// state of this `Once` (whether initialization has previously panicked or
+ /// not).
+ #[unstable(feature = "once_poison", issue = "33577")]
+ pub fn call_once_force<F>(&'static self, f: F) where F: FnOnce(&OnceState) {
+ // same as above, just with a different parameter to `call_inner`.
+ if self.state.load(Ordering::SeqCst) == COMPLETE {
+ return
+ }
+
+ let mut f = Some(f);
+ self.call_inner(true, &mut |p| {
+ f.take().unwrap()(&OnceState { poisoned: p })
+ });
+ }
+
+ // This is a non-generic function to reduce the monomorphization cost of
+ // using `call_once` (this isn't exactly a trivial or small implementation).
+ //
+ // Additionally, this is tagged with `#[cold]` as it should indeed be cold
+ // and it helps let LLVM know that calls to this function should be off the
+ // fast path. Essentially, this should help generate more straight line code
+ // in LLVM.
+ //
+ // Finally, this takes an `FnMut` instead of a `FnOnce` because there's
+ // currently no way to take an `FnOnce` and call it via virtual dispatch
+ // without some allocation overhead.
+ #[cold]
+ fn call_inner(&'static self,
+ ignore_poisoning: bool,
+ mut init: &mut FnMut(bool)) {
+ let mut state = self.state.load(Ordering::SeqCst);
+
+ 'outer: loop {
+ match state {
+ // If we're complete, then there's nothing to do, we just
+ // jettison out as we shouldn't run the closure.
+ COMPLETE => return,
+
+ // If we're poisoned and we're not in a mode to ignore
+ // poisoning, then we panic here to propagate the poison.
+ POISONED if !ignore_poisoning => {
+ panic!("Once instance has previously been poisoned");
+ }
+
+ // Otherwise if we see a poisoned or otherwise incomplete state
+ // we will attempt to move ourselves into the RUNNING state. If
+ // we succeed, then the queue of waiters starts at null (all 0
+ // bits).
+ POISONED |
+ INCOMPLETE => {
+ let old = self.state.compare_and_swap(state, RUNNING,
+ Ordering::SeqCst);
+ if old != state {
+ state = old;
+ continue
+ }
+
+ // Run the initialization routine, letting it know if we're
+ // poisoned or not. The `Finish` struct is then dropped, and
+ // the `Drop` implementation here is responsible for waking
+ // up other waiters both in the normal return and panicking
+ // case.
+ let mut complete = Finish {
+ panicked: true,
+ me: self,
+ };
+ init(state == POISONED);
+ complete.panicked = false;
+ return
+ }
+
+ // All other values we find should correspond to the RUNNING
+ // state with an encoded waiter list in the more significant
+ // bits. We attempt to enqueue ourselves by moving us to the
+ // head of the list and bail out if we ever see a state that's
+ // not RUNNING.
+ _ => {
+ assert!(state & STATE_MASK == RUNNING);
+ let mut node = Waiter {
+ thread: Some(thread::current()),
+ signaled: AtomicBool::new(false),
+ next: ptr::null_mut(),
+ };
+ let me = &mut node as *mut Waiter as usize;
+ assert!(me & STATE_MASK == 0);
+
+ while state & STATE_MASK == RUNNING {
+ node.next = (state & !STATE_MASK) as *mut Waiter;
+ let old = self.state.compare_and_swap(state,
+ me | RUNNING,
+ Ordering::SeqCst);
+ if old != state {
+ state = old;
+ continue
+ }
+
+ // Once we've enqueued ourselves, wait in a loop.
+ // Afterwards reload the state and continue with what we
+ // were doing from before.
+ while !node.signaled.load(Ordering::SeqCst) {
+ thread::park();
+ }
+ state = self.state.load(Ordering::SeqCst);
+ continue 'outer
+ }
+ }
+ }
+ }
+ }
+}
+
+#[stable(feature = "std_debug", since = "1.16.0")]
+impl fmt::Debug for Once {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.pad("Once { .. }")
+ }
+}
+
+impl Drop for Finish {
+ fn drop(&mut self) {
+ // Swap out our state with however we finished. We should only ever see
+ // an old state which was RUNNING.
+ let queue = if self.panicked {
+ self.me.state.swap(POISONED, Ordering::SeqCst)
+ } else {
+ self.me.state.swap(COMPLETE, Ordering::SeqCst)
+ };
+ assert_eq!(queue & STATE_MASK, RUNNING);
+
+ // Decode the RUNNING to a list of waiters, then walk that entire list
+ // and wake them up. Note that it is crucial that after we store `true`
+ // in the node it can be free'd! As a result we load the `thread` to
+ // signal ahead of time and then unpark it after the store.
+ unsafe {
+ let mut queue = (queue & !STATE_MASK) as *mut Waiter;
+ while !queue.is_null() {
+ let next = (*queue).next;
+ let thread = (*queue).thread.take().unwrap();
+ (*queue).signaled.store(true, Ordering::SeqCst);
+ thread.unpark();
+ queue = next;
+ }
+ }
+ }
+}
+
+impl OnceState {
+ /// Returns whether the associated `Once` has been poisoned.
+ ///
+ /// Once an initalization routine for a `Once` has panicked it will forever
+ /// indicate to future forced initialization routines that it is poisoned.
+ #[unstable(feature = "once_poison", issue = "33577")]
+ pub fn poisoned(&self) -> bool {
+ self.poisoned
+ }
+}
+
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use panic;
+ use sync::mpsc::channel;
+ use thread;
+ use super::Once;
+
+ #[test]
+ fn smoke_once() {
+ static O: Once = Once::new();
+ let mut a = 0;
+ O.call_once(|| a += 1);
+ assert_eq!(a, 1);
+ O.call_once(|| a += 1);
+ assert_eq!(a, 1);
+ }
+
+ #[test]
+ fn stampede_once() {
+ static O: Once = Once::new();
+ static mut RUN: bool = false;
+
+ let (tx, rx) = channel();
+ for _ in 0..10 {
+ let tx = tx.clone();
+ thread::spawn(move|| {
+ for _ in 0..4 { thread::yield_now() }
+ unsafe {
+ O.call_once(|| {
+ assert!(!RUN);
+ RUN = true;
+ });
+ assert!(RUN);
+ }
+ tx.send(()).unwrap();
+ });
+ }
+
+ unsafe {
+ O.call_once(|| {
+ assert!(!RUN);
+ RUN = true;
+ });
+ assert!(RUN);
+ }
+
+ for _ in 0..10 {
+ rx.recv().unwrap();
+ }
+ }
+
+ #[test]
+ fn poison_bad() {
+ static O: Once = Once::new();
+
+ // poison the once
+ let t = panic::catch_unwind(|| {
+ O.call_once(|| panic!());
+ });
+ assert!(t.is_err());
+
+ // poisoning propagates
+ let t = panic::catch_unwind(|| {
+ O.call_once(|| {});
+ });
+ assert!(t.is_err());
+
+ // we can subvert poisoning, however
+ let mut called = false;
+ O.call_once_force(|p| {
+ called = true;
+ assert!(p.poisoned())
+ });
+ assert!(called);
+
+ // once any success happens, we stop propagating the poison
+ O.call_once(|| {});
+ }
+
+ #[test]
+ fn wait_for_force_to_finish() {
+ static O: Once = Once::new();
+
+ // poison the once
+ let t = panic::catch_unwind(|| {
+ O.call_once(|| panic!());
+ });
+ assert!(t.is_err());
+
+ // make sure someone's waiting inside the once via a force
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ let t1 = thread::spawn(move || {
+ O.call_once_force(|p| {
+ assert!(p.poisoned());
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ });
+ });
+
+ rx1.recv().unwrap();
+
+ // put another waiter on the once
+ let t2 = thread::spawn(|| {
+ let mut called = false;
+ O.call_once(|| {
+ called = true;
+ });
+ assert!(!called);
+ });
+
+ tx2.send(()).unwrap();
+
+ assert!(t1.join().is_ok());
+ assert!(t2.join().is_ok());
+
+ }
+}