aboutsummaryrefslogtreecommitdiff
path: root/ctr-std/src/sync/mpsc/stream.rs
diff options
context:
space:
mode:
authorFenrir <[email protected]>2018-01-21 14:06:28 -0700
committerFenrirWolf <[email protected]>2018-01-21 19:16:33 -0700
commit23be3f4885688e5e0011005e2295c75168854c0a (patch)
treedd0850f9c73c489e114a761d5c0757f3dbec3a65 /ctr-std/src/sync/mpsc/stream.rs
parentUpdate CI for Rust nightly-2017-12-01 + other fixes (diff)
downloadctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.tar.xz
ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.zip
Recreate ctr-std from latest nightly
Diffstat (limited to 'ctr-std/src/sync/mpsc/stream.rs')
-rw-r--r--ctr-std/src/sync/mpsc/stream.rs105
1 files changed, 61 insertions, 44 deletions
diff --git a/ctr-std/src/sync/mpsc/stream.rs b/ctr-std/src/sync/mpsc/stream.rs
index 47cd897..d1515eb 100644
--- a/ctr-std/src/sync/mpsc/stream.rs
+++ b/ctr-std/src/sync/mpsc/stream.rs
@@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5;
const MAX_STEALS: isize = 1 << 20;
pub struct Packet<T> {
- queue: spsc::Queue<Message<T>>, // internal queue for all message
+ // internal queue for all messages
+ queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>,
+}
+struct ProducerAddition {
cnt: AtomicIsize, // How many items are on this channel
- steals: UnsafeCell<isize>, // How many times has a port received without blocking?
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
port_dropped: AtomicBool, // flag if the channel has been destroyed.
}
+struct ConsumerAddition {
+ steals: UnsafeCell<isize>, // How many times has a port received without blocking?
+}
+
+
pub enum Failure<T> {
Empty,
Disconnected,
@@ -78,13 +85,18 @@ enum Message<T> {
impl<T> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
- queue: unsafe { spsc::Queue::new(128) },
-
- cnt: AtomicIsize::new(0),
- steals: UnsafeCell::new(0),
- to_wake: AtomicUsize::new(0),
-
- port_dropped: AtomicBool::new(false),
+ queue: unsafe { spsc::Queue::with_additions(
+ 128,
+ ProducerAddition {
+ cnt: AtomicIsize::new(0),
+ to_wake: AtomicUsize::new(0),
+
+ port_dropped: AtomicBool::new(false),
+ },
+ ConsumerAddition {
+ steals: UnsafeCell::new(0),
+ }
+ )},
}
}
@@ -92,7 +104,7 @@ impl<T> Packet<T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
- if self.port_dropped.load(Ordering::SeqCst) { return Err(t) }
+ if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) }
match self.do_send(Data(t)) {
UpSuccess | UpDisconnected => {},
@@ -104,14 +116,16 @@ impl<T> Packet<T> {
pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
- if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected }
+ if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) {
+ return UpDisconnected
+ }
self.do_send(GoUp(up))
}
fn do_send(&self, t: Message<T>) -> UpgradeResult {
self.queue.push(t);
- match self.cnt.fetch_add(1, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup
-1 => UpWoke(self.take_to_wake()),
// As as described before, SPSC queues must be >= -2
@@ -125,7 +139,7 @@ impl<T> Packet<T> {
// will never remove this data. We can only have at most one item to
// drain (the port drains the rest).
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
let first = self.queue.pop();
let second = self.queue.pop();
assert!(second.is_none());
@@ -144,8 +158,8 @@ impl<T> Packet<T> {
// Consumes ownership of the 'to_wake' field.
fn take_to_wake(&self) -> SignalToken {
- let ptr = self.to_wake.load(Ordering::SeqCst);
- self.to_wake.store(0, Ordering::SeqCst);
+ let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
assert!(ptr != 0);
unsafe { SignalToken::cast_from_usize(ptr) }
}
@@ -154,14 +168,16 @@ impl<T> Packet<T> {
// back if it shouldn't sleep. Note that this is the location where we take
// steals into account.
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
let ptr = unsafe { token.cast_to_usize() };
- self.to_wake.store(ptr, Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
- let steals = unsafe { ptr::replace(self.steals.get(), 0) };
+ let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
- match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
- DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); }
+ match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) {
+ DISCONNECTED => {
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
+ }
// If we factor in our steals and notice that the channel has no
// data, we successfully sleep
n => {
@@ -170,7 +186,7 @@ impl<T> Packet<T> {
}
}
- self.to_wake.store(0, Ordering::SeqCst);
+ self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
Err(unsafe { SignalToken::cast_from_usize(ptr) })
}
@@ -201,7 +217,7 @@ impl<T> Packet<T> {
// "steal" factored into the channel count above).
data @ Ok(..) |
data @ Err(Upgraded(..)) => unsafe {
- *self.steals.get() -= 1;
+ *self.queue.consumer_addition().steals.get() -= 1;
data
},
@@ -223,20 +239,21 @@ impl<T> Packet<T> {
// down as much as possible (without going negative), and then
// adding back in whatever we couldn't factor into steals.
Some(data) => unsafe {
- if *self.steals.get() > MAX_STEALS {
- match self.cnt.swap(0, Ordering::SeqCst) {
+ if *self.queue.consumer_addition().steals.get() > MAX_STEALS {
+ match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) {
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(
+ DISCONNECTED, Ordering::SeqCst);
}
n => {
- let m = cmp::min(n, *self.steals.get());
- *self.steals.get() -= m;
+ let m = cmp::min(n, *self.queue.consumer_addition().steals.get());
+ *self.queue.consumer_addition().steals.get() -= m;
self.bump(n - m);
}
}
- assert!(*self.steals.get() >= 0);
+ assert!(*self.queue.consumer_addition().steals.get() >= 0);
}
- *self.steals.get() += 1;
+ *self.queue.consumer_addition().steals.get() += 1;
match data {
Data(t) => Ok(t),
GoUp(up) => Err(Upgraded(up)),
@@ -244,7 +261,7 @@ impl<T> Packet<T> {
},
None => {
- match self.cnt.load(Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.load(Ordering::SeqCst) {
n if n != DISCONNECTED => Err(Empty),
// This is a little bit of a tricky case. We failed to pop
@@ -273,7 +290,7 @@ impl<T> Packet<T> {
pub fn drop_chan(&self) {
// Dropping a channel is pretty simple, we just flag it as disconnected
// and then wakeup a blocker if there is one.
- match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) {
-1 => { self.take_to_wake().signal(); }
DISCONNECTED => {}
n => { assert!(n >= 0); }
@@ -300,7 +317,7 @@ impl<T> Packet<T> {
// sends are gated on this flag, so we're immediately guaranteed that
// there are a bounded number of active sends that we'll have to deal
// with.
- self.port_dropped.store(true, Ordering::SeqCst);
+ self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst);
// Now that we're guaranteed to deal with a bounded number of senders,
// we need to drain the queue. This draining process happens atomically
@@ -310,9 +327,9 @@ impl<T> Packet<T> {
// continue to fail while active senders send data while we're dropping
// data, but eventually we're guaranteed to break out of this loop
// (because there is a bounded number of senders).
- let mut steals = unsafe { *self.steals.get() };
+ let mut steals = unsafe { *self.queue.consumer_addition().steals.get() };
while {
- let cnt = self.cnt.compare_and_swap(
+ let cnt = self.queue.producer_addition().cnt.compare_and_swap(
steals, DISCONNECTED, Ordering::SeqCst);
cnt != DISCONNECTED && cnt != steals
} {
@@ -353,9 +370,9 @@ impl<T> Packet<T> {
// increment the count on the channel (used for selection)
fn bump(&self, amt: isize) -> isize {
- match self.cnt.fetch_add(amt, Ordering::SeqCst) {
+ match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) {
DISCONNECTED => {
- self.cnt.store(DISCONNECTED, Ordering::SeqCst);
+ self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst);
DISCONNECTED
}
n => n
@@ -404,8 +421,8 @@ impl<T> Packet<T> {
// this end. This is fine because we know it's a small bounded windows
// of time until the data is actually sent.
if was_upgrade {
- assert_eq!(unsafe { *self.steals.get() }, 0);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
return Ok(true)
}
@@ -418,7 +435,7 @@ impl<T> Packet<T> {
// If we were previously disconnected, then we know for sure that there
// is no thread in to_wake, so just keep going
let has_data = if prev == DISCONNECTED {
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
true // there is data, that data is that we're disconnected
} else {
let cur = prev + steals + 1;
@@ -441,13 +458,13 @@ impl<T> Packet<T> {
if prev < 0 {
drop(self.take_to_wake());
} else {
- while self.to_wake.load(Ordering::SeqCst) != 0 {
+ while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
thread::yield_now();
}
}
unsafe {
- assert_eq!(*self.steals.get(), 0);
- *self.steals.get() = steals;
+ assert_eq!(*self.queue.consumer_addition().steals.get(), 0);
+ *self.queue.consumer_addition().steals.get() = steals;
}
// if we were previously positive, then there's surely data to
@@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> {
// disconnection, but also a proper fence before the read of
// `to_wake`, so this assert cannot be removed with also removing
// the `to_wake` assert.
- assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
- assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
+ assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
+ assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
}
}