aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync/mpsc/select.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ctr-std/src/sync/mpsc/select.rs')
-rw-r--r--ctr-std/src/sync/mpsc/select.rs791
1 files changed, 791 insertions, 0 deletions
diff --git a/ctr-std/src/sync/mpsc/select.rs b/ctr-std/src/sync/mpsc/select.rs
new file mode 100644
index 0000000..8b4da53
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/select.rs
@@ -0,0 +1,791 @@
+// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+//! Selection over an array of receivers
+//!
+//! This module contains the implementation machinery necessary for selecting
+//! over a number of receivers. One large goal of this module is to provide an
+//! efficient interface to selecting over any receiver of any type.
+//!
+//! This is achieved through an architecture of a "receiver set" in which
+//! receivers are added to a set and then the entire set is waited on at once.
+//! The set can be waited on multiple times to prevent re-adding each receiver
+//! to the set.
+//!
+//! Usage of this module is currently encouraged to go through the use of the
+//! `select!` macro. This macro allows naturally binding of variables to the
+//! received values of receivers in a much more natural syntax then usage of the
+//! `Select` structure directly.
+//!
+//! # Examples
+//!
+//! ```rust
+//! #![feature(mpsc_select)]
+//!
+//! use std::sync::mpsc::channel;
+//!
+//! let (tx1, rx1) = channel();
+//! let (tx2, rx2) = channel();
+//!
+//! tx1.send(1).unwrap();
+//! tx2.send(2).unwrap();
+//!
+//! select! {
+//! val = rx1.recv() => {
+//! assert_eq!(val.unwrap(), 1);
+//! },
+//! val = rx2.recv() => {
+//! assert_eq!(val.unwrap(), 2);
+//! }
+//! }
+//! ```
+
+#![allow(dead_code)]
+#![unstable(feature = "mpsc_select",
+ reason = "This implementation, while likely sufficient, is unsafe and \
+ likely to be error prone. At some point in the future this \
+ module will likely be replaced, and it is currently \
+ unknown how much API breakage that will cause. The ability \
+ to select over a number of channels will remain forever, \
+ but no guarantees beyond this are being made",
+ issue = "27800")]
+
+
+use fmt;
+
+use core::cell::{Cell, UnsafeCell};
+use core::marker;
+use core::ptr;
+use core::usize;
+
+use sync::mpsc::{Receiver, RecvError};
+use sync::mpsc::blocking::{self, SignalToken};
+
+/// The "receiver set" of the select interface. This structure is used to manage
+/// a set of receivers which are being selected over.
+pub struct Select {
+ inner: UnsafeCell<SelectInner>,
+ next_id: Cell<usize>,
+}
+
+struct SelectInner {
+ head: *mut Handle<'static, ()>,
+ tail: *mut Handle<'static, ()>,
+}
+
+impl !marker::Send for Select {}
+
+/// A handle to a receiver which is currently a member of a `Select` set of
+/// receivers. This handle is used to keep the receiver in the set as well as
+/// interact with the underlying receiver.
+pub struct Handle<'rx, T:Send+'rx> {
+ /// The ID of this handle, used to compare against the return value of
+ /// `Select::wait()`
+ id: usize,
+ selector: *mut SelectInner,
+ next: *mut Handle<'static, ()>,
+ prev: *mut Handle<'static, ()>,
+ added: bool,
+ packet: &'rx (Packet+'rx),
+
+ // due to our fun transmutes, we be sure to place this at the end. (nothing
+ // previous relies on T)
+ rx: &'rx Receiver<T>,
+}
+
+struct Packets { cur: *mut Handle<'static, ()> }
+
+#[doc(hidden)]
+#[derive(PartialEq, Eq)]
+pub enum StartResult {
+ Installed,
+ Abort,
+}
+
+#[doc(hidden)]
+pub trait Packet {
+ fn can_recv(&self) -> bool;
+ fn start_selection(&self, token: SignalToken) -> StartResult;
+ fn abort_selection(&self) -> bool;
+}
+
+impl Select {
+ /// Creates a new selection structure. This set is initially empty.
+ ///
+ /// Usage of this struct directly can sometimes be burdensome, and usage is much easier through
+ /// the `select!` macro.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// #![feature(mpsc_select)]
+ ///
+ /// use std::sync::mpsc::Select;
+ ///
+ /// let select = Select::new();
+ /// ```
+ pub fn new() -> Select {
+ Select {
+ inner: UnsafeCell::new(SelectInner {
+ head: ptr::null_mut(),
+ tail: ptr::null_mut(),
+ }),
+ next_id: Cell::new(1),
+ }
+ }
+
+ /// Creates a new handle into this receiver set for a new receiver. Note
+ /// that this does *not* add the receiver to the receiver set, for that you
+ /// must call the `add` method on the handle itself.
+ pub fn handle<'a, T: Send>(&'a self, rx: &'a Receiver<T>) -> Handle<'a, T> {
+ let id = self.next_id.get();
+ self.next_id.set(id + 1);
+ Handle {
+ id: id,
+ selector: self.inner.get(),
+ next: ptr::null_mut(),
+ prev: ptr::null_mut(),
+ added: false,
+ rx: rx,
+ packet: rx,
+ }
+ }
+
+ /// Waits for an event on this receiver set. The returned value is *not* an
+ /// index, but rather an id. This id can be queried against any active
+ /// `Handle` structures (each one has an `id` method). The handle with
+ /// the matching `id` will have some sort of event available on it. The
+ /// event could either be that data is available or the corresponding
+ /// channel has been closed.
+ pub fn wait(&self) -> usize {
+ self.wait2(true)
+ }
+
+ /// Helper method for skipping the preflight checks during testing
+ fn wait2(&self, do_preflight_checks: bool) -> usize {
+ // Note that this is currently an inefficient implementation. We in
+ // theory have knowledge about all receivers in the set ahead of time,
+ // so this method shouldn't really have to iterate over all of them yet
+ // again. The idea with this "receiver set" interface is to get the
+ // interface right this time around, and later this implementation can
+ // be optimized.
+ //
+ // This implementation can be summarized by:
+ //
+ // fn select(receivers) {
+ // if any receiver ready { return ready index }
+ // deschedule {
+ // block on all receivers
+ // }
+ // unblock on all receivers
+ // return ready index
+ // }
+ //
+ // Most notably, the iterations over all of the receivers shouldn't be
+ // necessary.
+ unsafe {
+ // Stage 1: preflight checks. Look for any packets ready to receive
+ if do_preflight_checks {
+ for handle in self.iter() {
+ if (*handle).packet.can_recv() {
+ return (*handle).id();
+ }
+ }
+ }
+
+ // Stage 2: begin the blocking process
+ //
+ // Create a number of signal tokens, and install each one
+ // sequentially until one fails. If one fails, then abort the
+ // selection on the already-installed tokens.
+ let (wait_token, signal_token) = blocking::tokens();
+ for (i, handle) in self.iter().enumerate() {
+ match (*handle).packet.start_selection(signal_token.clone()) {
+ StartResult::Installed => {}
+ StartResult::Abort => {
+ // Go back and abort the already-begun selections
+ for handle in self.iter().take(i) {
+ (*handle).packet.abort_selection();
+ }
+ return (*handle).id;
+ }
+ }
+ }
+
+ // Stage 3: no messages available, actually block
+ wait_token.wait();
+
+ // Stage 4: there *must* be message available; find it.
+ //
+ // Abort the selection process on each receiver. If the abort
+ // process returns `true`, then that means that the receiver is
+ // ready to receive some data. Note that this also means that the
+ // receiver may have yet to have fully read the `to_wake` field and
+ // woken us up (although the wakeup is guaranteed to fail).
+ //
+ // This situation happens in the window of where a sender invokes
+ // increment(), sees -1, and then decides to wake up the thread. After
+ // all this is done, the sending thread will set `selecting` to
+ // `false`. Until this is done, we cannot return. If we were to
+ // return, then a sender could wake up a receiver which has gone
+ // back to sleep after this call to `select`.
+ //
+ // Note that it is a "fairly small window" in which an increment()
+ // views that it should wake a thread up until the `selecting` bit
+ // is set to false. For now, the implementation currently just spins
+ // in a yield loop. This is very distasteful, but this
+ // implementation is already nowhere near what it should ideally be.
+ // A rewrite should focus on avoiding a yield loop, and for now this
+ // implementation is tying us over to a more efficient "don't
+ // iterate over everything every time" implementation.
+ let mut ready_id = usize::MAX;
+ for handle in self.iter() {
+ if (*handle).packet.abort_selection() {
+ ready_id = (*handle).id;
+ }
+ }
+
+ // We must have found a ready receiver
+ assert!(ready_id != usize::MAX);
+ return ready_id;
+ }
+ }
+
+ fn iter(&self) -> Packets { Packets { cur: unsafe { &*self.inner.get() }.head } }
+}
+
+impl<'rx, T: Send> Handle<'rx, T> {
+ /// Retrieves the id of this handle.
+ #[inline]
+ pub fn id(&self) -> usize { self.id }
+
+ /// Blocks to receive a value on the underlying receiver, returning `Some` on
+ /// success or `None` if the channel disconnects. This function has the same
+ /// semantics as `Receiver.recv`
+ pub fn recv(&mut self) -> Result<T, RecvError> { self.rx.recv() }
+
+ /// Adds this handle to the receiver set that the handle was created from. This
+ /// method can be called multiple times, but it has no effect if `add` was
+ /// called previously.
+ ///
+ /// This method is unsafe because it requires that the `Handle` is not moved
+ /// while it is added to the `Select` set.
+ pub unsafe fn add(&mut self) {
+ if self.added { return }
+ let selector = &mut *self.selector;
+ let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
+
+ if selector.head.is_null() {
+ selector.head = me;
+ selector.tail = me;
+ } else {
+ (*me).prev = selector.tail;
+ assert!((*me).next.is_null());
+ (*selector.tail).next = me;
+ selector.tail = me;
+ }
+ self.added = true;
+ }
+
+ /// Removes this handle from the `Select` set. This method is unsafe because
+ /// it has no guarantee that the `Handle` was not moved since `add` was
+ /// called.
+ pub unsafe fn remove(&mut self) {
+ if !self.added { return }
+
+ let selector = &mut *self.selector;
+ let me = self as *mut Handle<'rx, T> as *mut Handle<'static, ()>;
+
+ if self.prev.is_null() {
+ assert_eq!(selector.head, me);
+ selector.head = self.next;
+ } else {
+ (*self.prev).next = self.next;
+ }
+ if self.next.is_null() {
+ assert_eq!(selector.tail, me);
+ selector.tail = self.prev;
+ } else {
+ (*self.next).prev = self.prev;
+ }
+
+ self.next = ptr::null_mut();
+ self.prev = ptr::null_mut();
+
+ self.added = false;
+ }
+}
+
+impl Drop for Select {
+ fn drop(&mut self) {
+ unsafe {
+ assert!((&*self.inner.get()).head.is_null());
+ assert!((&*self.inner.get()).tail.is_null());
+ }
+ }
+}
+
+impl<'rx, T: Send> Drop for Handle<'rx, T> {
+ fn drop(&mut self) {
+ unsafe { self.remove() }
+ }
+}
+
+impl Iterator for Packets {
+ type Item = *mut Handle<'static, ()>;
+
+ fn next(&mut self) -> Option<*mut Handle<'static, ()>> {
+ if self.cur.is_null() {
+ None
+ } else {
+ let ret = Some(self.cur);
+ unsafe { self.cur = (*self.cur).next; }
+ ret
+ }
+ }
+}
+
+impl fmt::Debug for Select {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Select {{ .. }}")
+ }
+}
+
+impl<'rx, T:Send+'rx> fmt::Debug for Handle<'rx, T> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "Handle {{ .. }}")
+ }
+}
+
+#[allow(unused_imports)]
+#[cfg(all(test, not(target_os = "emscripten")))]
+mod tests {
+ use thread;
+ use sync::mpsc::*;
+
+ // Don't use the libstd version so we can pull in the right Select structure
+ // (std::comm points at the wrong one)
+ macro_rules! select {
+ (
+ $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
+ ) => ({
+ let sel = Select::new();
+ $( let mut $rx = sel.handle(&$rx); )+
+ unsafe {
+ $( $rx.add(); )+
+ }
+ let ret = sel.wait();
+ $( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+
+ { unreachable!() }
+ })
+ }
+
+ #[test]
+ fn smoke() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ tx1.send(1).unwrap();
+ select! {
+ foo = rx1.recv() => { assert_eq!(foo.unwrap(), 1); },
+ _bar = rx2.recv() => { panic!() }
+ }
+ tx2.send(2).unwrap();
+ select! {
+ _foo = rx1.recv() => { panic!() },
+ bar = rx2.recv() => { assert_eq!(bar.unwrap(), 2) }
+ }
+ drop(tx1);
+ select! {
+ foo = rx1.recv() => { assert!(foo.is_err()); },
+ _bar = rx2.recv() => { panic!() }
+ }
+ drop(tx2);
+ select! {
+ bar = rx2.recv() => { assert!(bar.is_err()); }
+ }
+ }
+
+ #[test]
+ fn smoke2() {
+ let (_tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (_tx3, rx3) = channel::<i32>();
+ let (_tx4, rx4) = channel::<i32>();
+ let (tx5, rx5) = channel::<i32>();
+ tx5.send(4).unwrap();
+ select! {
+ _foo = rx1.recv() => { panic!("1") },
+ _foo = rx2.recv() => { panic!("2") },
+ _foo = rx3.recv() => { panic!("3") },
+ _foo = rx4.recv() => { panic!("4") },
+ foo = rx5.recv() => { assert_eq!(foo.unwrap(), 4); }
+ }
+ }
+
+ #[test]
+ fn closed() {
+ let (_tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ drop(tx2);
+
+ select! {
+ _a1 = rx1.recv() => { panic!() },
+ a2 = rx2.recv() => { assert!(a2.is_err()); }
+ }
+ }
+
+ #[test]
+ fn unblocks() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<i32>();
+
+ let _t = thread::spawn(move|| {
+ for _ in 0..20 { thread::yield_now(); }
+ tx1.send(1).unwrap();
+ rx3.recv().unwrap();
+ for _ in 0..20 { thread::yield_now(); }
+ });
+
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ _b = rx2.recv() => { panic!() }
+ }
+ tx3.send(1).unwrap();
+ select! {
+ a = rx1.recv() => { assert!(a.is_err()) },
+ _b = rx2.recv() => { panic!() }
+ }
+ }
+
+ #[test]
+ fn both_ready() {
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ for _ in 0..20 { thread::yield_now(); }
+ tx1.send(1).unwrap();
+ tx2.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
+ }
+ select! {
+ a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
+ a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
+ }
+ assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
+ assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn stress() {
+ const AMT: i32 = 10000;
+ let (tx1, rx1) = channel::<i32>();
+ let (tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ for i in 0..AMT {
+ if i % 2 == 0 {
+ tx1.send(i).unwrap();
+ } else {
+ tx2.send(i).unwrap();
+ }
+ rx3.recv().unwrap();
+ }
+ });
+
+ for i in 0..AMT {
+ select! {
+ i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
+ i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
+ }
+ tx3.send(()).unwrap();
+ }
+ }
+
+ #[test]
+ fn cloning() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ rx3.recv().unwrap();
+ tx1.clone();
+ assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
+ tx1.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ tx3.send(()).unwrap();
+ select! {
+ _i1 = rx1.recv() => {},
+ _i2 = rx2.recv() => panic!()
+ }
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn cloning2() {
+ let (tx1, rx1) = channel::<i32>();
+ let (_tx2, rx2) = channel::<i32>();
+ let (tx3, rx3) = channel::<()>();
+
+ let _t = thread::spawn(move|| {
+ rx3.recv().unwrap();
+ tx1.clone();
+ assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
+ tx1.send(2).unwrap();
+ rx3.recv().unwrap();
+ });
+
+ tx3.send(()).unwrap();
+ select! {
+ _i1 = rx1.recv() => {},
+ _i2 = rx2.recv() => panic!()
+ }
+ tx3.send(()).unwrap();
+ }
+
+ #[test]
+ fn cloning3() {
+ let (tx1, rx1) = channel::<()>();
+ let (tx2, rx2) = channel::<()>();
+ let (tx3, rx3) = channel::<()>();
+ let _t = thread::spawn(move|| {
+ let s = Select::new();
+ let mut h1 = s.handle(&rx1);
+ let mut h2 = s.handle(&rx2);
+ unsafe { h2.add(); }
+ unsafe { h1.add(); }
+ assert_eq!(s.wait(), h2.id);
+ tx3.send(()).unwrap();
+ });
+
+ for _ in 0..1000 { thread::yield_now(); }
+ drop(tx1.clone());
+ tx2.send(()).unwrap();
+ rx3.recv().unwrap();
+ }
+
+ #[test]
+ fn preflight1() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight2() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight3() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ select! {
+ _n = rx.recv() => {}
+ }
+ }
+
+ #[test]
+ fn preflight4() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight5() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight6() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight7() {
+ let (tx, rx) = channel::<()>();
+ drop(tx);
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight8() {
+ let (tx, rx) = channel();
+ tx.send(()).unwrap();
+ drop(tx);
+ rx.recv().unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn preflight9() {
+ let (tx, rx) = channel();
+ drop(tx.clone());
+ tx.send(()).unwrap();
+ drop(tx);
+ rx.recv().unwrap();
+ let s = Select::new();
+ let mut h = s.handle(&rx);
+ unsafe { h.add(); }
+ assert_eq!(s.wait2(false), h.id);
+ }
+
+ #[test]
+ fn oneshot_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn stream_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ tx1.send(()).unwrap();
+ tx1.send(()).unwrap();
+ rx1.recv().unwrap();
+ rx1.recv().unwrap();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn shared_data_waiting() {
+ let (tx1, rx1) = channel();
+ let (tx2, rx2) = channel();
+ drop(tx1.clone());
+ tx1.send(()).unwrap();
+ rx1.recv().unwrap();
+ let _t = thread::spawn(move|| {
+ select! {
+ _n = rx1.recv() => {}
+ }
+ tx2.send(()).unwrap();
+ });
+
+ for _ in 0..100 { thread::yield_now() }
+ tx1.send(()).unwrap();
+ rx2.recv().unwrap();
+ }
+
+ #[test]
+ fn sync1() {
+ let (tx, rx) = sync_channel::<i32>(1);
+ tx.send(1).unwrap();
+ select! {
+ n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
+ }
+ }
+
+ #[test]
+ fn sync2() {
+ let (tx, rx) = sync_channel::<i32>(0);
+ let _t = thread::spawn(move|| {
+ for _ in 0..100 { thread::yield_now() }
+ tx.send(1).unwrap();
+ });
+ select! {
+ n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
+ }
+ }
+
+ #[test]
+ fn sync3() {
+ let (tx1, rx1) = sync_channel::<i32>(0);
+ let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
+ let _t = thread::spawn(move|| { tx1.send(1).unwrap(); });
+ let _t = thread::spawn(move|| { tx2.send(2).unwrap(); });
+ select! {
+ n = rx1.recv() => {
+ let n = n.unwrap();
+ assert_eq!(n, 1);
+ assert_eq!(rx2.recv().unwrap(), 2);
+ },
+ n = rx2.recv() => {
+ let n = n.unwrap();
+ assert_eq!(n, 2);
+ assert_eq!(rx1.recv().unwrap(), 1);
+ }
+ }
+ }
+
+ #[test]
+ fn fmt_debug_select() {
+ let sel = Select::new();
+ assert_eq!(format!("{:?}", sel), "Select { .. }");
+ }
+
+ #[test]
+ fn fmt_debug_handle() {
+ let (_, rx) = channel::<i32>();
+ let sel = Select::new();
+ let handle = sel.handle(&rx);
+ assert_eq!(format!("{:?}", handle), "Handle { .. }");
+ }
+}