aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync/mpsc/select.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ctr-std/src/sync/mpsc/select.rs')
-rw-r--r--ctr-std/src/sync/mpsc/select.rs779
1 files changed, 0 insertions, 779 deletions
diff --git a/ctr-std/src/sync/mpsc/select.rs b/ctr-std/src/sync/mpsc/select.rs
deleted file mode 100644
index a7a284c..0000000
--- a/ctr-std/src/sync/mpsc/select.rs
+++ /dev/null
@@ -1,779 +0,0 @@
-// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
-// file at the top-level directory of this distribution and at
-// http://rust-lang.org/COPYRIGHT.
-//
-// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
-// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
-// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
-// option. This file may not be copied, modified, or distributed
-// except according to those terms.
-
-//! Selection over an array of receivers
-//!
-//! This module contains the implementation machinery necessary for selecting
-//! over a number of receivers. One large goal of this module is to provide an
-//! efficient interface to selecting over any receiver of any type.
-//!
-//! This is achieved through an architecture of a "receiver set" in which
-//! receivers are added to a set and then the entire set is waited on at once.
-//! The set can be waited on multiple times to prevent re-adding each receiver
-//! to the set.
-//!
-//! Usage of this module is currently encouraged to go through the use of the
-//! `select!` macro. This macro allows naturally binding of variables to the
-//! received values of receivers in a much more natural syntax then usage of the
-//! `Select` structure directly.
-//!
-//! # Examples
-//!
-//! ```rust
-//! #![feature(mpsc_select)]
-//!
-//! use std::sync::mpsc::channel;
-//!
-//! let (tx1, rx1) = channel();
-//! let (tx2, rx2) = channel();
-//!
-//! tx1.send(1).unwrap();
-//! tx2.send(2).unwrap();
-//!
-//! select! {
-//! val = rx1.recv() => {
-//! assert_eq!(val.unwrap(), 1);
-//! },
-//! val = rx2.recv() => {
-//! assert_eq!(val.unwrap(), 2);
-//! }
-//! }
-//! ```
-
-#![allow(dead_code)]
-#![unstable(feature = "mpsc_select",
- reason = "This implementation, while likely sufficient, is unsafe and \
- likely to be error prone. At some point in the future this \
- module will likely be replaced, and it is currently \
- unknown how much API breakage that will cause. The ability \
- to select over a number of channels will remain forever, \
- but no guarantees beyond this are being made",
- issue = "27800")]
-
-
-use fmt;
-
-use core::cell::{Cell, UnsafeCell};
-use core::marker;
-use core::ptr;
-use core::usize;
-
-use sync::mpsc::{Receiver, RecvError};
-use sync::mpsc::blocking::{self, SignalToken};
-
-/// The "receiver set" of the select interface. This structure is used to manage
-/// a set of receivers which are being selected over.
-pub struct Select {
- inner: UnsafeCell<SelectInner>,
- next_id: Cell<usize>,
-}
-
-struct SelectInner {
- head: *mut Handle<'static, ()>,
- tail: *mut Handle<'static, ()>,
-}
-
-impl !marker::Send for Select {}
-
-/// A handle to a receiver which is currently a member of a `Select` set of
-/// receivers. This handle is used to keep the receiver in the set as well as
-/// interact with the underlying receiver.
-pub struct Handle<'rx, T:Send+'rx> {
- /// The ID of this handle, used to compare against the return value of
- /// `Select::wait()`
- id: usize,
- selector: *mut SelectInner,
- next: *mut Handle<'static, ()>,
- prev: *mut Handle<'static, ()>,
- added: bool,
- packet: &'rx (dyn Packet+'rx),
-
- // due to our fun transmutes, we be sure to place this at the end. (nothing
- // previous relies on T)
- rx: &'rx Receiver<T>,
-}
-
-struct Packets { cur: *mut Handle<'static, ()> }
-
-#[doc(hidden)]
-#[derive(PartialEq, Eq)]
-pub enum StartResult {
- Installed,
- Abort,
-}
-
-#[doc(hidden)]
-pub trait Packet {
- fn can_recv(&self) -> bool;
- fn start_selection(&self, token: SignalToken) -> StartResult;
- fn abort_selection(&self) -> bool;
-}
-
-impl Select {
- /// Creates a new selection structure. This set is initially empty.
- ///
- /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
- /// the `select!` macro.
- ///
- /// # Examples
- ///
- /// ```
- /// #![feature(mpsc_select)]
- ///
- /// use std::sync::mpsc::Select;
- ///
- /// let select = Select::new();
- /// ```
- pub fn new() -> Select {
- Select {
- inner: UnsafeCell::new(SelectInner {
- head: ptr::null_mut(),
- tail: ptr::null_mut(),
- }),
- next_id: Cell::new(1),
- }
- }
-
- /// Creates a new handle into this receiver set for a new receiver. Note
- /// that this does *not* add the receiver to the receiver set, for that you
- /// must call the `add` method on the handle itself.
- pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
- let id = self.next_id.get();
- self.next_id.set(id + 1);
- Handle {
- id,
- selector: self.inner.get(),
- next: ptr::null_mut(),
- prev: ptr::null_mut(),
- added: false,
- rx,
- packet: rx,
- }
- }
-
- /// Waits for an event on this receiver set. The returned value is *not* an
- /// index, but rather an id. This id can be queried against any active
- /// `Handle` structures (each one has an `id` method). The handle with
- /// the matching `id` will have some sort of event available on it. The
- /// event could either be that data is available or the corresponding
- /// channel has been closed.
- pub fn wait(&self) -> usize {
- self.wait2(true)
- }
-
- /// Helper method for skipping the preflight checks during testing
- fn wait2(&self, do_preflight_checks: bool) -> usize {
- // Note that this is currently an inefficient implementation. We in
- // theory have knowledge about all receivers in the set ahead of time,
- // so this method shouldn't really have to iterate over all of them yet
- // again. The idea with this "receiver set" interface is to get the
- // interface right this time around, and later this implementation can
- // be optimized.
- //
- // This implementation can be summarized by:
- //
- // fn select(receivers) {
- // if any receiver ready { return ready index }
- // deschedule {
- // block on all receivers
- // }
- // unblock on all receivers
- // return ready index
- // }
- //
- // Most notably, the iterations over all of the receivers shouldn't be
- // necessary.
- unsafe {
- // Stage 1: preflight checks. Look for any packets ready to receive
- if do_preflight_checks {
- for handle in self.iter() {
- if (*handle).packet.can_recv() {
- return (*handle).id();
- }
- }
- }
-
- // Stage 2: begin the blocking process
- //
- // Create a number of signal tokens, and install each one
- // sequentially until one fails. If one fails, then abort the
- // selection on the already-installed tokens.
- let (wait_token, signal_token) = blocking::tokens();
- for (i, handle) in self.iter().enumerate() {
- match (*handle).packet.start_selection(signal_token.clone()) {
- StartResult::Installed => {}
- StartResult::Abort => {
- // Go back and abort the already-begun selections
- for handle in self.iter().take(i) {
- (*handle).packet.abort_selection();
- }
- return (*handle).id;
- }
- }
- }
-
- // Stage 3: no messages available, actually block
- wait_token.wait();
-
- // Stage 4: there *must* be message available; find it.
- //
- // Abort the selection process on each receiver. If the abort
- // process returns `true`, then that means that the receiver is
- // ready to receive some data. Note that this also means that the
- // receiver may have yet to have fully read the `to_wake` field and
- // woken us up (although the wakeup is guaranteed to fail).
- //
- // This situation happens in the window of where a sender invokes
- // increment(), sees -1, and then decides to wake up the thread. After
- // all this is done, the sending thread will set `selecting` to
- // `false`. Until this is done, we cannot return. If we were to
- // return, then a sender could wake up a receiver which has gone
- // back to sleep after this call to `select`.
- //
- // Note that it is a "fairly small window" in which an increment()
- // views that it should wake a thread up until the `selecting` bit
- // is set to false. For now, the implementation currently just spins
- // in a yield loop. This is very distasteful, but this
- // implementation is already nowhere near what it should ideally be.
- // A rewrite should focus on avoiding a yield loop, and for now this
- // implementation is tying us over to a more efficient "don't
- // iterate over everything every time" implementation.
- let mut ready_id = usize::MAX;
- for handle in self.iter() {
- if (*handle).packet.abort_selection() {
- ready_id = (*handle).id;
- }
- }
-
- // We must have found a ready receiver
- assert!(ready_id != usize::MAX);
- return ready_id;
- }
- }
-
- fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
-}
-
-impl<'rx, T: Send> Handle<'rx, T> {
- /// Retrieves the id of this handle.
- #[inline]
- pub fn id(&self) -> usize { self.id }
-
- /// Blocks to receive a value on the underlying receiver, returning `Some` on
- /// success or `None` if the channel disconnects. This function has the same
- /// semantics as `Receiver.recv`
- pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
-
- /// Adds this handle to the receiver set that the handle was created from. This
- /// method can be called multiple times, but it has no effect if `add` was
- /// called previously.
- ///
- /// This method is unsafe because it requires that the `Handle` is not moved
- /// while it is added to the `Select` set.
- pub unsafe fn add(&mut self) {
- if self.added { return }
- let selector = &mut *self.selector;
- let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
-
- if selector.head.is_null() {
- selector.head = me;
- selector.tail = me;
- } else {
- (*me).prev = selector.tail;
- assert!((*me).next.is_null());
- (*selector.tail).next = me;
- selector.tail = me;
- }
- self.added = true;
- }
-
- /// Removes this handle from the `Select` set. This method is unsafe because
- /// it has no guarantee that the `Handle` was not moved since `add` was
- /// called.
- pub unsafe fn remove(&mut self) {
- if !self.added { return }
-
- let selector = &mut *self.selector;
- let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
-
- if self.prev.is_null() {
- assert_eq!(selector.head, me);
- selector.head = self.next;
- } else {
- (*self.prev).next = self.next;
- }
- if self.next.is_null() {
- assert_eq!(selector.tail, me);
- selector.tail = self.prev;
- } else {
- (*self.next).prev = self.prev;
- }
-
- self.next = ptr::null_mut();
- self.prev = ptr::null_mut();
-
- self.added = false;
- }
-}
-
-impl Drop for Select {
- fn drop(&mut self) {
- unsafe {
- assert!((&*self.inner.get()).head.is_null());
- assert!((&*self.inner.get()).tail.is_null());
- }
- }
-}
-
-impl<'rx, T: Send> Drop for Handle<'rx, T> {
- fn drop(&mut self) {
- unsafe { self.remove() }
- }
-}
-
-impl Iterator for Packets {
- type Item = *mut Handle<'static, ()>;
-
- fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
- if self.cur.is_null() {
- None
- } else {
- let ret = Some(self.cur);
- unsafe { self.cur = (*self.cur).next; }
- ret
- }
- }
-}
-
-impl fmt::Debug for Select {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Select").finish()
- }
-}
-
-impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.debug_struct("Handle").finish()
- }
-}
-
-#[allow(unused_imports)]
-#[cfg(all(test, not(target_os = "emscripten")))]
-mod tests {
- use thread;
- use sync::mpsc::*;
-
- // Don't use the libstd version so we can pull in the right Select structure
- // (std::comm points at the wrong one)
- macro_rules! select {
- (
- $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
- ) => ({
- let sel = Select::new();
- $( let mut $rx = sel.handle(&$rx); )+
- unsafe {
- $( $rx.add(); )+
- }
- let ret = sel.wait();
- $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
- { unreachable!() }
- })
- }
-
- #[test]
- fn smoke() {
- let (tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<i32>();
- tx1.send(1).unwrap();
- select! {
- foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
- _bar = rx2.recv() => { panic!() }
- }
- tx2.send(2).unwrap();
- select! {
- _foo = rx1.recv() => { panic!() },
- bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
- }
- drop(tx1);
- select! {
- foo = rx1.recv() => { assert!(foo.is_err()); },
- _bar = rx2.recv() => { panic!() }
- }
- drop(tx2);
- select! {
- bar = rx2.recv() => { assert!(bar.is_err()); }
- }
- }
-
- #[test]
- fn smoke2() {
- let (_tx1, rx1) = channel::<i32>();
- let (_tx2, rx2) = channel::<i32>();
- let (_tx3, rx3) = channel::<i32>();
- let (_tx4, rx4) = channel::<i32>();
- let (tx5, rx5) = channel::<i32>();
- tx5.send(4).unwrap();
- select! {
- _foo = rx1.recv() => { panic!("1") },
- _foo = rx2.recv() => { panic!("2") },
- _foo = rx3.recv() => { panic!("3") },
- _foo = rx4.recv() => { panic!("4") },
- foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
- }
- }
-
- #[test]
- fn closed() {
- let (_tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<i32>();
- drop(tx2);
-
- select! {
- _a1 = rx1.recv() => { panic!() },
- a2 = rx2.recv() => { assert!(a2.is_err()); }
- }
- }
-
- #[test]
- fn unblocks() {
- let (tx1, rx1) = channel::<i32>();
- let (_tx2, rx2) = channel::<i32>();
- let (tx3, rx3) = channel::<i32>();
-
- let _t = thread::spawn(move|| {
- for _ in 0..20 { thread::yield_now(); }
- tx1.send(1).unwrap();
- rx3.recv().unwrap();
- for _ in 0..20 { thread::yield_now(); }
- });
-
- select! {
- a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
- _b = rx2.recv() => { panic!() }
- }
- tx3.send(1).unwrap();
- select! {
- a = rx1.recv() => { assert!(a.is_err()) },
- _b = rx2.recv() => { panic!() }
- }
- }
-
- #[test]
- fn both_ready() {
- let (tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<i32>();
- let (tx3, rx3) = channel::<()>();
-
- let _t = thread::spawn(move|| {
- for _ in 0..20 { thread::yield_now(); }
- tx1.send(1).unwrap();
- tx2.send(2).unwrap();
- rx3.recv().unwrap();
- });
-
- select! {
- a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
- a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
- }
- select! {
- a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
- a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
- }
- assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
- assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
- tx3.send(()).unwrap();
- }
-
- #[test]
- fn stress() {
- const AMT: i32 = 10000;
- let (tx1, rx1) = channel::<i32>();
- let (tx2, rx2) = channel::<i32>();
- let (tx3, rx3) = channel::<()>();
-
- let _t = thread::spawn(move|| {
- for i in 0..AMT {
- if i % 2 == 0 {
- tx1.send(i).unwrap();
- } else {
- tx2.send(i).unwrap();
- }
- rx3.recv().unwrap();
- }
- });
-
- for i in 0..AMT {
- select! {
- i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
- i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
- }
- tx3.send(()).unwrap();
- }
- }
-
- #[allow(unused_must_use)]
- #[test]
- fn cloning() {
- let (tx1, rx1) = channel::<i32>();
- let (_tx2, rx2) = channel::<i32>();
- let (tx3, rx3) = channel::<()>();
-
- let _t = thread::spawn(move|| {
- rx3.recv().unwrap();
- tx1.clone();
- assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
- tx1.send(2).unwrap();
- rx3.recv().unwrap();
- });
-
- tx3.send(()).unwrap();
- select! {
- _i1 = rx1.recv() => {},
- _i2 = rx2.recv() => panic!()
- }
- tx3.send(()).unwrap();
- }
-
- #[allow(unused_must_use)]
- #[test]
- fn cloning2() {
- let (tx1, rx1) = channel::<i32>();
- let (_tx2, rx2) = channel::<i32>();
- let (tx3, rx3) = channel::<()>();
-
- let _t = thread::spawn(move|| {
- rx3.recv().unwrap();
- tx1.clone();
- assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
- tx1.send(2).unwrap();
- rx3.recv().unwrap();
- });
-
- tx3.send(()).unwrap();
- select! {
- _i1 = rx1.recv() => {},
- _i2 = rx2.recv() => panic!()
- }
- tx3.send(()).unwrap();
- }
-
- #[test]
- fn cloning3() {
- let (tx1, rx1) = channel::<()>();
- let (tx2, rx2) = channel::<()>();
- let (tx3, rx3) = channel::<()>();
- let _t = thread::spawn(move|| {
- let s = Select::new();
- let mut h1 = s.handle(&rx1);
- let mut h2 = s.handle(&rx2);
- unsafe { h2.add(); }
- unsafe { h1.add(); }
- assert_eq!(s.wait(), h2.id);
- tx3.send(()).unwrap();
- });
-
- for _ in 0..1000 { thread::yield_now(); }
- drop(tx1.clone());
- tx2.send(()).unwrap();
- rx3.recv().unwrap();
- }
-
- #[test]
- fn preflight1() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- select! {
- _n = rx.recv() => {}
- }
- }
-
- #[test]
- fn preflight2() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- tx.send(()).unwrap();
- select! {
- _n = rx.recv() => {}
- }
- }
-
- #[test]
- fn preflight3() {
- let (tx, rx) = channel();
- drop(tx.clone());
- tx.send(()).unwrap();
- select! {
- _n = rx.recv() => {}
- }
- }
-
- #[test]
- fn preflight4() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn preflight5() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- tx.send(()).unwrap();
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn preflight6() {
- let (tx, rx) = channel();
- drop(tx.clone());
- tx.send(()).unwrap();
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn preflight7() {
- let (tx, rx) = channel::<()>();
- drop(tx);
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn preflight8() {
- let (tx, rx) = channel();
- tx.send(()).unwrap();
- drop(tx);
- rx.recv().unwrap();
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn preflight9() {
- let (tx, rx) = channel();
- drop(tx.clone());
- tx.send(()).unwrap();
- drop(tx);
- rx.recv().unwrap();
- let s = Select::new();
- let mut h = s.handle(&rx);
- unsafe { h.add(); }
- assert_eq!(s.wait2(false), h.id);
- }
-
- #[test]
- fn oneshot_data_waiting() {
- let (tx1, rx1) = channel();
- let (tx2, rx2) = channel();
- let _t = thread::spawn(move|| {
- select! {
- _n = rx1.recv() => {}
- }
- tx2.send(()).unwrap();
- });
-
- for _ in 0..100 { thread::yield_now() }
- tx1.send(()).unwrap();
- rx2.recv().unwrap();
- }
-
- #[test]
- fn stream_data_waiting() {
- let (tx1, rx1) = channel();
- let (tx2, rx2) = channel();
- tx1.send(()).unwrap();
- tx1.send(()).unwrap();
- rx1.recv().unwrap();
- rx1.recv().unwrap();
- let _t = thread::spawn(move|| {
- select! {
- _n = rx1.recv() => {}
- }
- tx2.send(()).unwrap();
- });
-
- for _ in 0..100 { thread::yield_now() }
- tx1.send(()).unwrap();
- rx2.recv().unwrap();
- }
-
- #[test]
- fn shared_data_waiting() {
- let (tx1, rx1) = channel();
- let (tx2, rx2) = channel();
- drop(tx1.clone());
- tx1.send(()).unwrap();
- rx1.recv().unwrap();
- let _t = thread::spawn(move|| {
- select! {
- _n = rx1.recv() => {}
- }
- tx2.send(()).unwrap();
- });
-
- for _ in 0..100 { thread::yield_now() }
- tx1.send(()).unwrap();
- rx2.recv().unwrap();
- }
-
- #[test]
- fn sync1() {
- let (tx, rx) = sync_channel::<i32>(1);
- tx.send(1).unwrap();
- select! {
- n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
- }
- }
-
- #[test]
- fn sync2() {
- let (tx, rx) = sync_channel::<i32>(0);
- let _t = thread::spawn(move|| {
- for _ in 0..100 { thread::yield_now() }
- tx.send(1).unwrap();
- });
- select! {
- n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
- }
- }
-
- #[test]
- fn sync3() {
- let (tx1, rx1) = sync_channel::<i32>(0);
- let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
- let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
- let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
- select! {
- n = rx1.recv() => {
- let n = n.unwrap();
- assert_eq!(n, 1);
- assert_eq!(rx2.recv().unwrap(), 2);
- },
- n = rx2.recv() => {
- let n = n.unwrap();
- assert_eq!(n, 2);
- assert_eq!(rx1.recv().unwrap(), 1);
- }
- }
- }
-}