aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync/mpsc/oneshot.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ctr-std/src/sync/mpsc/oneshot.rs')
-rw-r--r--ctr-std/src/sync/mpsc/oneshot.rs396
1 files changed, 396 insertions, 0 deletions
diff --git a/ctr-std/src/sync/mpsc/oneshot.rs b/ctr-std/src/sync/mpsc/oneshot.rs
new file mode 100644
index 0000000..b8e50c9
--- /dev/null
+++ b/ctr-std/src/sync/mpsc/oneshot.rs
@@ -0,0 +1,396 @@
+// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+/// Oneshot channels/ports
+///
+/// This is the initial flavor of channels/ports used for comm module. This is
+/// an optimization for the one-use case of a channel. The major optimization of
+/// this type is to have one and exactly one allocation when the chan/port pair
+/// is created.
+///
+/// Another possible optimization would be to not use an Arc box because
+/// in theory we know when the shared packet can be deallocated (no real need
+/// for the atomic reference counting), but I was having trouble how to destroy
+/// the data early in a drop of a Port.
+///
+/// # Implementation
+///
+/// Oneshots are implemented around one atomic usize variable. This variable
+/// indicates both the state of the port/chan but also contains any threads
+/// blocked on the port. All atomic operations happen on this one word.
+///
+/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect
+/// on behalf of the channel side of things (it can be mentally thought of as
+/// consuming the port). This upgrade is then also stored in the shared packet.
+/// The one caveat to consider is that when a port sees a disconnected channel
+/// it must check for data because there is no "data plus upgrade" state.
+
+pub use self::Failure::*;
+pub use self::UpgradeResult::*;
+pub use self::SelectionResult::*;
+use self::MyUpgrade::*;
+
+use sync::mpsc::Receiver;
+use sync::mpsc::blocking::{self, SignalToken};
+use cell::UnsafeCell;
+use ptr;
+use sync::atomic::{AtomicUsize, Ordering};
+use time::Instant;
+
+// Various states you can find a port in.
+const EMPTY: usize = 0; // initial state: no data, no blocked receiver
+const DATA: usize = 1; // data ready for receiver to take
+const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
+// Any other value represents a pointer to a SignalToken value. The
+// protocol ensures that when the state moves *to* a pointer,
+// ownership of the token is given to the packet, and when the state
+// moves *from* a pointer, ownership of the token is transferred to
+// whoever changed the state.
+
+pub struct Packet<T> {
+ // Internal state of the chan/port pair (stores the blocked thread as well)
+ state: AtomicUsize,
+ // One-shot data slot location
+ data: UnsafeCell<Option<T>>,
+ // when used for the second time, a oneshot channel must be upgraded, and
+ // this contains the slot for the upgrade
+ upgrade: UnsafeCell<MyUpgrade<T>>,
+}
+
+pub enum Failure<T> {
+ Empty,
+ Disconnected,
+ Upgraded(Receiver<T>),
+}
+
+pub enum UpgradeResult {
+ UpSuccess,
+ UpDisconnected,
+ UpWoke(SignalToken),
+}
+
+pub enum SelectionResult<T> {
+ SelCanceled,
+ SelUpgraded(SignalToken, Receiver<T>),
+ SelSuccess,
+}
+
+enum MyUpgrade<T> {
+ NothingSent,
+ SendUsed,
+ GoUp(Receiver<T>),
+}
+
+impl<T> Packet<T> {
+ pub fn new() -> Packet<T> {
+ Packet {
+ data: UnsafeCell::new(None),
+ upgrade: UnsafeCell::new(NothingSent),
+ state: AtomicUsize::new(EMPTY),
+ }
+ }
+
+ pub fn send(&self, t: T) -> Result<(), T> {
+ unsafe {
+ // Sanity check
+ match *self.upgrade.get() {
+ NothingSent => {}
+ _ => panic!("sending on a oneshot that's already sent on "),
+ }
+ assert!((*self.data.get()).is_none());
+ ptr::write(self.data.get(), Some(t));
+ ptr::write(self.upgrade.get(), SendUsed);
+
+ match self.state.swap(DATA, Ordering::SeqCst) {
+ // Sent the data, no one was waiting
+ EMPTY => Ok(()),
+
+ // Couldn't send the data, the port hung up first. Return the data
+ // back up the stack.
+ DISCONNECTED => {
+ self.state.swap(DISCONNECTED, Ordering::SeqCst);
+ ptr::write(self.upgrade.get(), NothingSent);
+ Err((&mut *self.data.get()).take().unwrap())
+ }
+
+ // Not possible, these are one-use channels
+ DATA => unreachable!(),
+
+ // There is a thread waiting on the other end. We leave the 'DATA'
+ // state inside so it'll pick it up on the other end.
+ ptr => {
+ SignalToken::cast_from_usize(ptr).signal();
+ Ok(())
+ }
+ }
+ }
+ }
+
+ // Just tests whether this channel has been sent on or not, this is only
+ // safe to use from the sender.
+ pub fn sent(&self) -> bool {
+ unsafe {
+ match *self.upgrade.get() {
+ NothingSent => false,
+ _ => true,
+ }
+ }
+ }
+
+ pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
+ // Attempt to not block the thread (it's a little expensive). If it looks
+ // like we're not empty, then immediately go through to `try_recv`.
+ if self.state.load(Ordering::SeqCst) == EMPTY {
+ let (wait_token, signal_token) = blocking::tokens();
+ let ptr = unsafe { signal_token.cast_to_usize() };
+
+ // race with senders to enter the blocking state
+ if self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) == EMPTY {
+ if let Some(deadline) = deadline {
+ let timed_out = !wait_token.wait_max_until(deadline);
+ // Try to reset the state
+ if timed_out {
+ self.abort_selection().map_err(Upgraded)?;
+ }
+ } else {
+ wait_token.wait();
+ debug_assert!(self.state.load(Ordering::SeqCst) != EMPTY);
+ }
+ } else {
+ // drop the signal token, since we never blocked
+ drop(unsafe { SignalToken::cast_from_usize(ptr) });
+ }
+ }
+
+ self.try_recv()
+ }
+
+ pub fn try_recv(&self) -> Result<T, Failure<T>> {
+ unsafe {
+ match self.state.load(Ordering::SeqCst) {
+ EMPTY => Err(Empty),
+
+ // We saw some data on the channel, but the channel can be used
+ // again to send us an upgrade. As a result, we need to re-insert
+ // into the channel that there's no data available (otherwise we'll
+ // just see DATA next time). This is done as a cmpxchg because if
+ // the state changes under our feet we'd rather just see that state
+ // change.
+ DATA => {
+ self.state.compare_and_swap(DATA, EMPTY, Ordering::SeqCst);
+ match (&mut *self.data.get()).take() {
+ Some(data) => Ok(data),
+ None => unreachable!(),
+ }
+ }
+
+ // There's no guarantee that we receive before an upgrade happens,
+ // and an upgrade flags the channel as disconnected, so when we see
+ // this we first need to check if there's data available and *then*
+ // we go through and process the upgrade.
+ DISCONNECTED => {
+ match (&mut *self.data.get()).take() {
+ Some(data) => Ok(data),
+ None => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ SendUsed | NothingSent => Err(Disconnected),
+ GoUp(upgrade) => Err(Upgraded(upgrade))
+ }
+ }
+ }
+ }
+
+ // We are the sole receiver; there cannot be a blocking
+ // receiver already.
+ _ => unreachable!()
+ }
+ }
+ }
+
+ // Returns whether the upgrade was completed. If the upgrade wasn't
+ // completed, then the port couldn't get sent to the other half (it will
+ // never receive it).
+ pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
+ unsafe {
+ let prev = match *self.upgrade.get() {
+ NothingSent => NothingSent,
+ SendUsed => SendUsed,
+ _ => panic!("upgrading again"),
+ };
+ ptr::write(self.upgrade.get(), GoUp(up));
+
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ // If the channel is empty or has data on it, then we're good to go.
+ // Senders will check the data before the upgrade (in case we
+ // plastered over the DATA state).
+ DATA | EMPTY => UpSuccess,
+
+ // If the other end is already disconnected, then we failed the
+ // upgrade. Be sure to trash the port we were given.
+ DISCONNECTED => { ptr::replace(self.upgrade.get(), prev); UpDisconnected }
+
+ // If someone's waiting, we gotta wake them up
+ ptr => UpWoke(SignalToken::cast_from_usize(ptr))
+ }
+ }
+ }
+
+ pub fn drop_chan(&self) {
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ DATA | DISCONNECTED | EMPTY => {}
+
+ // If someone's waiting, we gotta wake them up
+ ptr => unsafe {
+ SignalToken::cast_from_usize(ptr).signal();
+ }
+ }
+ }
+
+ pub fn drop_port(&self) {
+ match self.state.swap(DISCONNECTED, Ordering::SeqCst) {
+ // An empty channel has nothing to do, and a remotely disconnected
+ // channel also has nothing to do b/c we're about to run the drop
+ // glue
+ DISCONNECTED | EMPTY => {}
+
+ // There's data on the channel, so make sure we destroy it promptly.
+ // This is why not using an arc is a little difficult (need the box
+ // to stay valid while we take the data).
+ DATA => unsafe { (&mut *self.data.get()).take().unwrap(); },
+
+ // We're the only ones that can block on this port
+ _ => unreachable!()
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // select implementation
+ ////////////////////////////////////////////////////////////////////////////
+
+ // If Ok, the value is whether this port has data, if Err, then the upgraded
+ // port needs to be checked instead of this one.
+ pub fn can_recv(&self) -> Result<bool, Receiver<T>> {
+ unsafe {
+ match self.state.load(Ordering::SeqCst) {
+ EMPTY => Ok(false), // Welp, we tried
+ DATA => Ok(true), // we have some un-acquired data
+ DISCONNECTED if (*self.data.get()).is_some() => Ok(true), // we have data
+ DISCONNECTED => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => Err(upgrade),
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => { ptr::write(self.upgrade.get(), up); Ok(true) }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+ }
+
+ // Attempts to start selection on this port. This can either succeed, fail
+ // because there is data, or fail because there is an upgrade pending.
+ pub fn start_selection(&self, token: SignalToken) -> SelectionResult<T> {
+ unsafe {
+ let ptr = token.cast_to_usize();
+ match self.state.compare_and_swap(EMPTY, ptr, Ordering::SeqCst) {
+ EMPTY => SelSuccess,
+ DATA => {
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ DISCONNECTED if (*self.data.get()).is_some() => {
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ DISCONNECTED => {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ // The other end sent us an upgrade, so we need to
+ // propagate upwards whether the upgrade can receive
+ // data
+ GoUp(upgrade) => {
+ SelUpgraded(SignalToken::cast_from_usize(ptr), upgrade)
+ }
+
+ // If the other end disconnected without sending an
+ // upgrade, then we have data to receive (the channel is
+ // disconnected).
+ up => {
+ ptr::write(self.upgrade.get(), up);
+ drop(SignalToken::cast_from_usize(ptr));
+ SelCanceled
+ }
+ }
+ }
+ _ => unreachable!(), // we're the "one blocker"
+ }
+ }
+ }
+
+ // Remove a previous selecting thread from this port. This ensures that the
+ // blocked thread will no longer be visible to any other threads.
+ //
+ // The return value indicates whether there's data on this port.
+ pub fn abort_selection(&self) -> Result<bool, Receiver<T>> {
+ let state = match self.state.load(Ordering::SeqCst) {
+ // Each of these states means that no further activity will happen
+ // with regard to abortion selection
+ s @ EMPTY |
+ s @ DATA |
+ s @ DISCONNECTED => s,
+
+ // If we've got a blocked thread, then use an atomic to gain ownership
+ // of it (may fail)
+ ptr => self.state.compare_and_swap(ptr, EMPTY, Ordering::SeqCst)
+ };
+
+ // Now that we've got ownership of our state, figure out what to do
+ // about it.
+ match state {
+ EMPTY => unreachable!(),
+ // our thread used for select was stolen
+ DATA => Ok(true),
+
+ // If the other end has hung up, then we have complete ownership
+ // of the port. First, check if there was data waiting for us. This
+ // is possible if the other end sent something and then hung up.
+ //
+ // We then need to check to see if there was an upgrade requested,
+ // and if so, the upgraded port needs to have its selection aborted.
+ DISCONNECTED => unsafe {
+ if (*self.data.get()).is_some() {
+ Ok(true)
+ } else {
+ match ptr::replace(self.upgrade.get(), SendUsed) {
+ GoUp(port) => Err(port),
+ _ => Ok(true),
+ }
+ }
+ },
+
+ // We woke ourselves up from select.
+ ptr => unsafe {
+ drop(SignalToken::cast_from_usize(ptr));
+ Ok(false)
+ }
+ }
+ }
+}
+
+impl<T> Drop for Packet<T> {
+ fn drop(&mut self) {
+ assert_eq!(self.state.load(Ordering::SeqCst), DISCONNECTED);
+ }
+}