From 23be3f4885688e5e0011005e2295c75168854c0a Mon Sep 17 00:00:00 2001 From: Fenrir Date: Sun, 21 Jan 2018 14:06:28 -0700 Subject: Recreate ctr-std from latest nightly --- ctr-std/src/sync/mpsc/spsc_queue.rs | 192 +++++++++++++++++++----------------- 1 file changed, 101 insertions(+), 91 deletions(-) (limited to 'ctr-std/src/sync/mpsc/spsc_queue.rs') diff --git a/ctr-std/src/sync/mpsc/spsc_queue.rs b/ctr-std/src/sync/mpsc/spsc_queue.rs index 5858e4b..cc4be92 100644 --- a/ctr-std/src/sync/mpsc/spsc_queue.rs +++ b/ctr-std/src/sync/mpsc/spsc_queue.rs @@ -1,31 +1,12 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue +// Copyright 2017 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. //! A single-producer single-consumer concurrent queue //! @@ -33,18 +14,23 @@ //! concurrently between two threads. This data structure is safe to use and //! enforces the semantics that there is one pusher and one popper. +// http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue + use alloc::boxed::Box; use core::ptr; use core::cell::UnsafeCell; use sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use super::cache_aligned::CacheAligned; + // Node within the linked list queue of messages to send struct Node { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). // is it worth it? value: Option, // nullable for re-use of nodes + cached: bool, // This node goes into the node cache next: AtomicPtr>, // next node in the queue } @@ -52,38 +38,55 @@ struct Node { /// but it can be safely shared in an Arc if it is guaranteed that there /// is only one popper and one pusher touching the queue at any one point in /// time. -pub struct Queue { +pub struct Queue { // consumer fields + consumer: CacheAligned>, + + // producer fields + producer: CacheAligned>, +} + +struct Consumer { tail: UnsafeCell<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from + cache_bound: usize, // maximum cache size + cached_nodes: AtomicUsize, // number of nodes marked as cachable + addition: Addition, +} - // producer fields +struct Producer { head: UnsafeCell<*mut Node>, // where to push to first: UnsafeCell<*mut Node>, // where to get new nodes from tail_copy: UnsafeCell<*mut Node>, // between first/tail - - // Cache maintenance fields. Additions and subtractions are stored - // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: usize, - cache_additions: AtomicUsize, - cache_subtractions: AtomicUsize, + addition: Addition, } -unsafe impl Send for Queue { } +unsafe impl Send for Queue { } -unsafe impl Sync for Queue { } +unsafe impl Sync for Queue { } impl Node { fn new() -> *mut Node { Box::into_raw(box Node { value: None, + cached: false, next: AtomicPtr::new(ptr::null_mut::>()), }) } } -impl Queue { - /// Creates a new queue. +impl Queue { + + /// Creates a new queue. With given additional elements in the producer and + /// consumer portions of the queue. + /// + /// Due to the performance implications of cache-contention, + /// we wish to keep fields used mainly by the producer on a separate cache + /// line than those used by the consumer. + /// Since cache lines are usually 64 bytes, it is unreasonably expensive to + /// allocate one for small fields, so we allow users to insert additional + /// fields into the cache lines already allocated by this for the producer + /// and consumer. /// /// This is unsafe as the type system doesn't enforce a single /// consumer-producer relationship. It also allows the consumer to `pop` @@ -100,19 +103,28 @@ impl Queue { /// cache (if desired). If the value is 0, then the cache has /// no bound. Otherwise, the cache will never grow larger than /// `bound` (although the queue itself could be much larger. - pub unsafe fn new(bound: usize) -> Queue { + pub unsafe fn with_additions( + bound: usize, + producer_addition: ProducerAddition, + consumer_addition: ConsumerAddition, + ) -> Self { let n1 = Node::new(); let n2 = Node::new(); (*n1).next.store(n2, Ordering::Relaxed); Queue { - tail: UnsafeCell::new(n2), - tail_prev: AtomicPtr::new(n1), - head: UnsafeCell::new(n2), - first: UnsafeCell::new(n1), - tail_copy: UnsafeCell::new(n1), - cache_bound: bound, - cache_additions: AtomicUsize::new(0), - cache_subtractions: AtomicUsize::new(0), + consumer: CacheAligned::new(Consumer { + tail: UnsafeCell::new(n2), + tail_prev: AtomicPtr::new(n1), + cache_bound: bound, + cached_nodes: AtomicUsize::new(0), + addition: consumer_addition + }), + producer: CacheAligned::new(Producer { + head: UnsafeCell::new(n2), + first: UnsafeCell::new(n1), + tail_copy: UnsafeCell::new(n1), + addition: producer_addition + }), } } @@ -126,35 +138,25 @@ impl Queue { assert!((*n).value.is_none()); (*n).value = Some(t); (*n).next.store(ptr::null_mut(), Ordering::Relaxed); - (**self.head.get()).next.store(n, Ordering::Release); - *self.head.get() = n; + (**self.producer.head.get()).next.store(n, Ordering::Release); + *(&self.producer.head).get() = n; } } unsafe fn alloc(&self) -> *mut Node { // First try to see if we can consume the 'first' node for our uses. - // We try to avoid as many atomic instructions as possible here, so - // the addition to cache_subtractions is not atomic (plus we're the - // only one subtracting from the cache). - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If the above fails, then update our copy of the tail and try // again. - *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire); - if *self.first.get() != *self.tail_copy.get() { - if self.cache_bound > 0 { - let b = self.cache_subtractions.load(Ordering::Relaxed); - self.cache_subtractions.store(b + 1, Ordering::Relaxed); - } - let ret = *self.first.get(); - *self.first.get() = (*ret).next.load(Ordering::Relaxed); + *self.producer.0.tail_copy.get() = + self.consumer.tail_prev.load(Ordering::Acquire); + if *self.producer.first.get() != *self.producer.tail_copy.get() { + let ret = *self.producer.first.get(); + *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed); return ret; } // If all of that fails, then we have to allocate a new node @@ -170,27 +172,27 @@ impl Queue { // sentinel from where we should start popping from. Hence, look at // tail's next field and see if we can use it. If we do a pop, then // the current tail node is a candidate for going into the cache. - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { return None } assert!((*next).value.is_some()); let ret = (*next).value.take(); - *self.tail.get() = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Ordering::Release); + *self.consumer.0.tail.get() = next; + if self.consumer.cache_bound == 0 { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Ordering::Relaxed); - let subtractions = self.cache_subtractions.load(Ordering::Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { - self.tail_prev.store(tail, Ordering::Release); - self.cache_additions.store(additions + 1, Ordering::Relaxed); + let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed); + if cached_nodes < self.consumer.cache_bound && !(*tail).cached { + self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed); + (*tail).cached = true; + } + + if (*tail).cached { + self.consumer.tail_prev.store(tail, Ordering::Release); } else { - (*self.tail_prev.load(Ordering::Relaxed)) - .next.store(next, Ordering::Relaxed); + (*self.consumer.tail_prev.load(Ordering::Relaxed)) + .next.store(next, Ordering::Relaxed); // We have successfully erased all references to 'tail', so // now we can safely drop it. let _: Box> = Box::from_raw(tail); @@ -211,17 +213,25 @@ impl Queue { // This is essentially the same as above with all the popping bits // stripped out. unsafe { - let tail = *self.tail.get(); + let tail = *self.consumer.tail.get(); let next = (*tail).next.load(Ordering::Acquire); if next.is_null() { None } else { (*next).value.as_mut() } } } + + pub fn producer_addition(&self) -> &ProducerAddition { + &self.producer.addition + } + + pub fn consumer_addition(&self) -> &ConsumerAddition { + &self.consumer.addition + } } -impl Drop for Queue { +impl Drop for Queue { fn drop(&mut self) { unsafe { - let mut cur = *self.first.get(); + let mut cur = *self.producer.first.get(); while !cur.is_null() { let next = (*cur).next.load(Ordering::Relaxed); let _n: Box> = Box::from_raw(cur); @@ -241,7 +251,7 @@ mod tests { #[test] fn smoke() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(1); queue.push(2); assert_eq!(queue.pop(), Some(1)); @@ -258,7 +268,7 @@ mod tests { #[test] fn peek() { unsafe { - let queue = Queue::new(0); + let queue = Queue::with_additions(0, (), ()); queue.push(vec![1]); // Ensure the borrowchecker works @@ -281,7 +291,7 @@ mod tests { #[test] fn drop_full() { unsafe { - let q: Queue> = Queue::new(0); + let q: Queue> = Queue::with_additions(0, (), ()); q.push(box 1); q.push(box 2); } @@ -290,7 +300,7 @@ mod tests { #[test] fn smoke_bound() { unsafe { - let q = Queue::new(0); + let q = Queue::with_additions(0, (), ()); q.push(1); q.push(2); assert_eq!(q.pop(), Some(1)); @@ -312,7 +322,7 @@ mod tests { } unsafe fn stress_bound(bound: usize) { - let q = Arc::new(Queue::new(bound)); + let q = Arc::new(Queue::with_additions(bound, (), ())); let (tx, rx) = channel(); let q2 = q.clone(); -- cgit v1.2.3