diff options
| author | Vivian Lim <[email protected]> | 2021-02-06 22:11:59 -0800 |
|---|---|---|
| committer | Vivian Lim <[email protected]> | 2021-02-06 22:11:59 -0800 |
| commit | 64423f0e34cc4a7d78c15b345b3b8f58243d8286 (patch) | |
| tree | cc20e2e7f0fc35abf470e20e61d3d48f0d954f3b /ctr-std/src/sync/mpsc/mpsc_queue.rs | |
| parent | Support libctru 2.0 (diff) | |
| download | ctru-rs-64423f0e34cc4a7d78c15b345b3b8f58243d8286.tar.xz ctru-rs-64423f0e34cc4a7d78c15b345b3b8f58243d8286.zip | |
Delete ctr-std to use my fork of the rust repo instead
Diffstat (limited to 'ctr-std/src/sync/mpsc/mpsc_queue.rs')
| -rw-r--r-- | ctr-std/src/sync/mpsc/mpsc_queue.rs | 180 |
1 files changed, 0 insertions, 180 deletions
diff --git a/ctr-std/src/sync/mpsc/mpsc_queue.rs b/ctr-std/src/sync/mpsc/mpsc_queue.rs deleted file mode 100644 index df945ac..0000000 --- a/ctr-std/src/sync/mpsc/mpsc_queue.rs +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright 2017 The Rust Project Developers. See the COPYRIGHT -// file at the top-level directory of this distribution and at -// http://rust-lang.org/COPYRIGHT. -// -// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or -// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license -// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between threads, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue may not be appropriate for all use-cases. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -pub use self::PopResult::*; - -use core::ptr; -use core::cell::UnsafeCell; -use boxed::Box; -use sync::atomic::{AtomicPtr, Ordering}; - -/// A result of the `pop` function. -pub enum PopResult<T> { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node<T> { - next: AtomicPtr<Node<T>>, - value: Option<T>, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue<T> { - head: AtomicPtr<Node<T>>, - tail: UnsafeCell<*mut Node<T>>, -} - -unsafe impl<T: Send> Send for Queue<T> { } -unsafe impl<T: Send> Sync for Queue<T> { } - -impl<T> Node<T> { - unsafe fn new(v: Option<T>) -> *mut Node<T> { - Box::into_raw(box Node { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - }) - } -} - -impl<T> Queue<T> { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue<T> { - let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, Ordering::AcqRel); - (*prev).next.store(n, Ordering::Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option<T>`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&self) -> PopResult<T> { - unsafe { - let tail = *self.tail.get(); - let next = (*tail).next.load(Ordering::Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take().unwrap(); - let _: Box<Node<T>> = Box::from_raw(tail); - return Data(ret); - } - - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} - } - } -} - -impl<T> Drop for Queue<T> { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Ordering::Relaxed); - let _: Box<Node<T>> = Box::from_raw(cur); - cur = next; - } - } - } -} - -#[cfg(all(test, not(target_os = "emscripten")))] -mod tests { - use sync::mpsc::channel; - use super::{Queue, Data, Empty, Inconsistent}; - use sync::Arc; - use thread; - - #[test] - fn test_full() { - let q: Queue<Box<_>> = Queue::new(); - q.push(box 1); - q.push(box 2); - } - - #[test] - fn test() { - let nthreads = 8; - let nmsgs = 1000; - let q = Queue::new(); - match q.pop() { - Empty => {} - Inconsistent | Data(..) => panic!() - } - let (tx, rx) = channel(); - let q = Arc::new(q); - - for _ in 0..nthreads { - let tx = tx.clone(); - let q = q.clone(); - thread::spawn(move|| { - for i in 0..nmsgs { - q.push(i); - } - tx.send(()).unwrap(); - }); - } - - let mut i = 0; - while i < nthreads * nmsgs { - match q.pop() { - Empty | Inconsistent => {}, - Data(_) => { i += 1 } - } - } - drop(tx); - for _ in 0..nthreads { - rx.recv().unwrap(); - } - } -} |