| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License.. |
| |
| //! A mostly lock-free multi-producer, single consumer queue. |
| //! |
| //! This module contains an implementation of a concurrent MPSC queue. This |
| //! queue can be used to share data between threads, and is also used as the |
| //! building block of channels in rust. |
| //! |
| //! Note that the current implementation of this queue has a caveat of the `pop` |
| //! method, and see the method for more information about it. Due to this |
| //! caveat, this queue might not be appropriate for all use-cases. |
| |
| // https://www.1024cores.net/home/lock-free-algorithms |
| // /queues/non-intrusive-mpsc-node-based-queue |
| |
| |
| pub use self::PopResult::*; |
| |
| use core::cell::UnsafeCell; |
| use core::ptr; |
| |
| use crate::boxed::Box; |
| use crate::sync::atomic::{AtomicPtr, Ordering}; |
| |
| /// A result of the `pop` function. |
| pub enum PopResult<T> { |
| /// Some data has been popped |
| Data(T), |
| /// The queue is empty |
| Empty, |
| /// The queue is in an inconsistent state. Popping data should succeed, but |
| /// some pushers have yet to make enough progress in order allow a pop to |
| /// succeed. It is recommended that a pop() occur "in the near future" in |
| /// order to see if the sender has made progress or not |
| Inconsistent, |
| } |
| |
| struct Node<T> { |
| next: AtomicPtr<Node<T>>, |
| value: Option<T>, |
| } |
| |
| /// The multi-producer single-consumer structure. This is not cloneable, but it |
| /// may be safely shared so long as it is guaranteed that there is only one |
| /// popper at a time (many pushers are allowed). |
| pub struct Queue<T> { |
| head: AtomicPtr<Node<T>>, |
| tail: UnsafeCell<*mut Node<T>>, |
| } |
| |
| unsafe impl<T: Send> Send for Queue<T> {} |
| unsafe impl<T: Send> Sync for Queue<T> {} |
| |
| impl<T> Node<T> { |
| unsafe fn new(v: Option<T>) -> *mut Node<T> { |
| Box::into_raw(box Node { next: AtomicPtr::new(ptr::null_mut()), value: v }) |
| } |
| } |
| |
| impl<T> Queue<T> { |
| /// Creates a new queue that is safe to share among multiple producers and |
| /// one consumer. |
| pub fn new() -> Queue<T> { |
| let stub = unsafe { Node::new(None) }; |
| Queue { head: AtomicPtr::new(stub), tail: UnsafeCell::new(stub) } |
| } |
| |
| /// Pushes a new value onto this queue. |
| pub fn push(&self, t: T) { |
| unsafe { |
| let n = Node::new(Some(t)); |
| let prev = self.head.swap(n, Ordering::AcqRel); |
| (*prev).next.store(n, Ordering::Release); |
| } |
| } |
| |
| /// Pops some data from this queue. |
| /// |
| /// Note that the current implementation means that this function cannot |
| /// return `Option<T>`. It is possible for this queue to be in an |
| /// inconsistent state where many pushes have succeeded and completely |
| /// finished, but pops cannot return `Some(t)`. This inconsistent state |
| /// happens when a pusher is pre-empted at an inopportune moment. |
| /// |
| /// This inconsistent state means that this queue does indeed have data, but |
| /// it does not currently have access to it at this time. |
| pub fn pop(&self) -> PopResult<T> { |
| unsafe { |
| let tail = *self.tail.get(); |
| let next = (*tail).next.load(Ordering::Acquire); |
| |
| if !next.is_null() { |
| *self.tail.get() = next; |
| assert!((*tail).value.is_none()); |
| assert!((*next).value.is_some()); |
| let ret = (*next).value.take().unwrap(); |
| let _: Box<Node<T>> = Box::from_raw(tail); |
| return Data(ret); |
| } |
| |
| if self.head.load(Ordering::Acquire) == tail { Empty } else { Inconsistent } |
| } |
| } |
| } |
| |
| impl<T> Drop for Queue<T> { |
| fn drop(&mut self) { |
| unsafe { |
| let mut cur = *self.tail.get(); |
| while !cur.is_null() { |
| let next = (*cur).next.load(Ordering::Relaxed); |
| let _: Box<Node<T>> = Box::from_raw(cur); |
| cur = next; |
| } |
| } |
| } |
| } |