diff options
| author | Fenrir <[email protected]> | 2018-01-21 14:06:28 -0700 |
|---|---|---|
| committer | FenrirWolf <[email protected]> | 2018-01-21 19:16:33 -0700 |
| commit | 23be3f4885688e5e0011005e2295c75168854c0a (patch) | |
| tree | dd0850f9c73c489e114a761d5c0757f3dbec3a65 /ctr-std/src/sync/mpsc/stream.rs | |
| parent | Update CI for Rust nightly-2017-12-01 + other fixes (diff) | |
| download | ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.tar.xz ctru-rs-23be3f4885688e5e0011005e2295c75168854c0a.zip | |
Recreate ctr-std from latest nightly
Diffstat (limited to 'ctr-std/src/sync/mpsc/stream.rs')
| -rw-r--r-- | ctr-std/src/sync/mpsc/stream.rs | 105 |
1 files changed, 61 insertions, 44 deletions
diff --git a/ctr-std/src/sync/mpsc/stream.rs b/ctr-std/src/sync/mpsc/stream.rs index 47cd897..d1515eb 100644 --- a/ctr-std/src/sync/mpsc/stream.rs +++ b/ctr-std/src/sync/mpsc/stream.rs @@ -41,15 +41,22 @@ const MAX_STEALS: isize = 5; const MAX_STEALS: isize = 1 << 20; pub struct Packet<T> { - queue: spsc::Queue<Message<T>>, // internal queue for all message + // internal queue for all messages + queue: spsc::Queue<Message<T>, ProducerAddition, ConsumerAddition>, +} +struct ProducerAddition { cnt: AtomicIsize, // How many items are on this channel - steals: UnsafeCell<isize>, // How many times has a port received without blocking? to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. } +struct ConsumerAddition { + steals: UnsafeCell<isize>, // How many times has a port received without blocking? +} + + pub enum Failure<T> { Empty, Disconnected, @@ -78,13 +85,18 @@ enum Message<T> { impl<T> Packet<T> { pub fn new() -> Packet<T> { Packet { - queue: unsafe { spsc::Queue::new(128) }, - - cnt: AtomicIsize::new(0), - steals: UnsafeCell::new(0), - to_wake: AtomicUsize::new(0), - - port_dropped: AtomicBool::new(false), + queue: unsafe { spsc::Queue::with_additions( + 128, + ProducerAddition { + cnt: AtomicIsize::new(0), + to_wake: AtomicUsize::new(0), + + port_dropped: AtomicBool::new(false), + }, + ConsumerAddition { + steals: UnsafeCell::new(0), + } + )}, } } @@ -92,7 +104,7 @@ impl<T> Packet<T> { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.port_dropped.load(Ordering::SeqCst) { return Err(t) } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { return Err(t) } match self.do_send(Data(t)) { UpSuccess | UpDisconnected => {}, @@ -104,14 +116,16 @@ impl<T> Packet<T> { pub fn upgrade(&self, up: Receiver<T>) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. - if self.port_dropped.load(Ordering::SeqCst) { return UpDisconnected } + if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + return UpDisconnected + } self.do_send(GoUp(up)) } fn do_send(&self, t: Message<T>) -> UpgradeResult { self.queue.push(t); - match self.cnt.fetch_add(1, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { // As described in the mod's doc comment, -1 == wakeup -1 => UpWoke(self.take_to_wake()), // As as described before, SPSC queues must be >= -2 @@ -125,7 +139,7 @@ impl<T> Packet<T> { // will never remove this data. We can only have at most one item to // drain (the port drains the rest). DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); let first = self.queue.pop(); let second = self.queue.pop(); assert!(second.is_none()); @@ -144,8 +158,8 @@ impl<T> Packet<T> { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { - let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(0, Ordering::SeqCst); + let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); assert!(ptr != 0); unsafe { SignalToken::cast_from_usize(ptr) } } @@ -154,14 +168,16 @@ impl<T> Packet<T> { // back if it shouldn't sleep. Note that this is the location where we take // steals into account. fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); let ptr = unsafe { token.cast_to_usize() }; - self.to_wake.store(ptr, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); - let steals = unsafe { ptr::replace(self.steals.get(), 0) }; + let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { - DISCONNECTED => { self.cnt.store(DISCONNECTED, Ordering::SeqCst); } + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + DISCONNECTED => { + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + } // If we factor in our steals and notice that the channel has no // data, we successfully sleep n => { @@ -170,7 +186,7 @@ impl<T> Packet<T> { } } - self.to_wake.store(0, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); Err(unsafe { SignalToken::cast_from_usize(ptr) }) } @@ -201,7 +217,7 @@ impl<T> Packet<T> { // "steal" factored into the channel count above). data @ Ok(..) | data @ Err(Upgraded(..)) => unsafe { - *self.steals.get() -= 1; + *self.queue.consumer_addition().steals.get() -= 1; data }, @@ -223,20 +239,21 @@ impl<T> Packet<T> { // down as much as possible (without going negative), and then // adding back in whatever we couldn't factor into steals. Some(data) => unsafe { - if *self.steals.get() > MAX_STEALS { - match self.cnt.swap(0, Ordering::SeqCst) { + if *self.queue.consumer_addition().steals.get() > MAX_STEALS { + match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store( + DISCONNECTED, Ordering::SeqCst); } n => { - let m = cmp::min(n, *self.steals.get()); - *self.steals.get() -= m; + let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); + *self.queue.consumer_addition().steals.get() -= m; self.bump(n - m); } } - assert!(*self.steals.get() >= 0); + assert!(*self.queue.consumer_addition().steals.get() >= 0); } - *self.steals.get() += 1; + *self.queue.consumer_addition().steals.get() += 1; match data { Data(t) => Ok(t), GoUp(up) => Err(Upgraded(up)), @@ -244,7 +261,7 @@ impl<T> Packet<T> { }, None => { - match self.cnt.load(Ordering::SeqCst) { + match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { n if n != DISCONNECTED => Err(Empty), // This is a little bit of a tricky case. We failed to pop @@ -273,7 +290,7 @@ impl<T> Packet<T> { pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. - match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { -1 => { self.take_to_wake().signal(); } DISCONNECTED => {} n => { assert!(n >= 0); } @@ -300,7 +317,7 @@ impl<T> Packet<T> { // sends are gated on this flag, so we're immediately guaranteed that // there are a bounded number of active sends that we'll have to deal // with. - self.port_dropped.store(true, Ordering::SeqCst); + self.queue.producer_addition().port_dropped.store(true, Ordering::SeqCst); // Now that we're guaranteed to deal with a bounded number of senders, // we need to drain the queue. This draining process happens atomically @@ -310,9 +327,9 @@ impl<T> Packet<T> { // continue to fail while active senders send data while we're dropping // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). - let mut steals = unsafe { *self.steals.get() }; + let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; while { - let cnt = self.cnt.compare_and_swap( + let cnt = self.queue.producer_addition().cnt.compare_and_swap( steals, DISCONNECTED, Ordering::SeqCst); cnt != DISCONNECTED && cnt != steals } { @@ -353,9 +370,9 @@ impl<T> Packet<T> { // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { - match self.cnt.fetch_add(amt, Ordering::SeqCst) { + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); DISCONNECTED } n => n @@ -404,8 +421,8 @@ impl<T> Packet<T> { // this end. This is fine because we know it's a small bounded windows // of time until the data is actually sent. if was_upgrade { - assert_eq!(unsafe { *self.steals.get() }, 0); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); return Ok(true) } @@ -418,7 +435,7 @@ impl<T> Packet<T> { // If we were previously disconnected, then we know for sure that there // is no thread in to_wake, so just keep going let has_data = if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1; @@ -441,13 +458,13 @@ impl<T> Packet<T> { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(Ordering::SeqCst) != 0 { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { thread::yield_now(); } } unsafe { - assert_eq!(*self.steals.get(), 0); - *self.steals.get() = steals; + assert_eq!(*self.queue.consumer_addition().steals.get(), 0); + *self.queue.consumer_addition().steals.get() = steals; } // if we were previously positive, then there's surely data to @@ -481,7 +498,7 @@ impl<T> Drop for Packet<T> { // disconnection, but also a proper fence before the read of // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. - assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); } } |