//! A fixed capacity Multiple-Producer Multiple-Consumer (MPMC) lock-free queue //! //! NOTE: This module is not available on targets that do *not* support CAS operations and are not //! emulated by the [`atomic_polyfill`] crate (e.g., MSP430). //! //! # Example //! //! This queue can be constructed in "const context". Placing it in a `static` variable lets *all* //! contexts (interrupts / threads / `main`) safely enqueue and dequeue items from it. //! //! ``` ignore //! #![no_main] //! #![no_std] //! //! use panic_semihosting as _; //! //! use cortex_m::{asm, peripheral::syst::SystClkSource}; //! use cortex_m_rt::{entry, exception}; //! use cortex_m_semihosting::hprintln; //! use heapless::mpmc::Q2; //! //! static Q: Q2 = Q2::new(); //! //! #[entry] //! fn main() -> ! { //! if let Some(p) = cortex_m::Peripherals::take() { //! let mut syst = p.SYST; //! //! // configures the system timer to trigger a SysTick exception every second //! syst.set_clock_source(SystClkSource::Core); //! syst.set_reload(12_000_000); //! syst.enable_counter(); //! syst.enable_interrupt(); //! } //! //! loop { //! if let Some(x) = Q.dequeue() { //! hprintln!("{}", x).ok(); //! } else { //! asm::wfi(); //! } //! } //! } //! //! #[exception] //! fn SysTick() { //! static mut COUNT: u8 = 0; //! //! Q.enqueue(*COUNT).ok(); //! *COUNT += 1; //! } //! ``` //! //! # Benchmark //! //! Measured on a ARM Cortex-M3 core running at 8 MHz and with zero Flash wait cycles //! //! N| `Q8::::enqueue().ok()` (`z`) | `Q8::::dequeue()` (`z`) | //! -|----------------------------------|-----------------------------| //! 0|34 |35 | //! 1|52 |53 | //! 2|69 |71 | //! //! - `N` denotes the number of *interruptions*. On Cortex-M, an interruption consists of an //! interrupt handler preempting the would-be atomic section of the `enqueue` / `dequeue` //! operation. Note that it does *not* matter if the higher priority handler uses the queue or //! not. //! - All execution times are in clock cycles. 1 clock cycle = 125 ns. //! - Execution time is *dependent* of `mem::size_of::()`. Both operations include one //! `memcpy(T)` in their successful path. //! - The optimization level is indicated in parentheses. //! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue` //! and `Ok` is returned by `enqueue`). //! //! # Portability //! //! This module requires CAS atomic instructions which are not available on all architectures //! (e.g. ARMv6-M (`thumbv6m-none-eabi`) and MSP430 (`msp430-none-elf`)). These atomics can be emulated //! however with [`atomic_polyfill`], which is enabled with the `cas` feature and is enabled by default //! for `thumbv6m-none-eabi` and `riscv32` targets. MSP430 is currently not supported by [`atomic_polyfill`]. //! //! # References //! //! This is an implementation of Dmitry Vyukov's ["Bounded MPMC queue"][0] minus the cache padding. //! //! [0]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue use core::{cell::UnsafeCell, mem::MaybeUninit}; #[cfg(all(feature = "mpmc_large", not(cas_atomic_polyfill)))] type AtomicTargetSize = core::sync::atomic::AtomicUsize; #[cfg(all(feature = "mpmc_large", cas_atomic_polyfill))] type AtomicTargetSize = atomic_polyfill::AtomicUsize; #[cfg(all(not(feature = "mpmc_large"), not(cas_atomic_polyfill)))] type AtomicTargetSize = core::sync::atomic::AtomicU8; #[cfg(all(not(feature = "mpmc_large"), cas_atomic_polyfill))] type AtomicTargetSize = atomic_polyfill::AtomicU8; #[cfg(not(cas_atomic_polyfill))] type Ordering = core::sync::atomic::Ordering; #[cfg(cas_atomic_polyfill)] type Ordering = atomic_polyfill::Ordering; #[cfg(feature = "mpmc_large")] type IntSize = usize; #[cfg(not(feature = "mpmc_large"))] type IntSize = u8; /// MPMC queue with a capability for 2 elements. pub type Q2 = MpMcQueue; /// MPMC queue with a capability for 4 elements. pub type Q4 = MpMcQueue; /// MPMC queue with a capability for 8 elements. pub type Q8 = MpMcQueue; /// MPMC queue with a capability for 16 elements. pub type Q16 = MpMcQueue; /// MPMC queue with a capability for 32 elements. pub type Q32 = MpMcQueue; /// MPMC queue with a capability for 64 elements. pub type Q64 = MpMcQueue; /// MPMC queue with a capacity for N elements /// N must be a power of 2 /// The max value of N is u8::MAX - 1 if `mpmc_large` feature is not enabled. pub struct MpMcQueue { buffer: UnsafeCell<[Cell; N]>, dequeue_pos: AtomicTargetSize, enqueue_pos: AtomicTargetSize, } impl MpMcQueue { const MASK: IntSize = (N - 1) as IntSize; const EMPTY_CELL: Cell = Cell::new(0); const ASSERT: [(); 1] = [()]; /// Creates an empty queue pub const fn new() -> Self { // Const assert crate::sealed::greater_than_1::(); crate::sealed::power_of_two::(); // Const assert on size. Self::ASSERT[!(N < (IntSize::MAX as usize)) as usize]; let mut cell_count = 0; let mut result_cells: [Cell; N] = [Self::EMPTY_CELL; N]; while cell_count != N { result_cells[cell_count] = Cell::new(cell_count); cell_count += 1; } Self { buffer: UnsafeCell::new(result_cells), dequeue_pos: AtomicTargetSize::new(0), enqueue_pos: AtomicTargetSize::new(0), } } /// Returns the item in the front of the queue, or `None` if the queue is empty pub fn dequeue(&self) -> Option { unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) } } /// Adds an `item` to the end of the queue /// /// Returns back the `item` if the queue is full pub fn enqueue(&self, item: T) -> Result<(), T> { unsafe { enqueue( self.buffer.get() as *mut _, &self.enqueue_pos, Self::MASK, item, ) } } } impl Default for MpMcQueue { fn default() -> Self { Self::new() } } unsafe impl Sync for MpMcQueue where T: Send {} struct Cell { data: MaybeUninit, sequence: AtomicTargetSize, } impl Cell { const fn new(seq: usize) -> Self { Self { data: MaybeUninit::uninit(), sequence: AtomicTargetSize::new(seq as IntSize), } } } unsafe fn dequeue( buffer: *mut Cell, dequeue_pos: &AtomicTargetSize, mask: IntSize, ) -> Option { let mut pos = dequeue_pos.load(Ordering::Relaxed); let mut cell; loop { cell = buffer.add(usize::from(pos & mask)); let seq = (*cell).sequence.load(Ordering::Acquire); let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8); if dif == 0 { if dequeue_pos .compare_exchange_weak( pos, pos.wrapping_add(1), Ordering::Relaxed, Ordering::Relaxed, ) .is_ok() { break; } } else if dif < 0 { return None; } else { pos = dequeue_pos.load(Ordering::Relaxed); } } let data = (*cell).data.as_ptr().read(); (*cell) .sequence .store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release); Some(data) } unsafe fn enqueue( buffer: *mut Cell, enqueue_pos: &AtomicTargetSize, mask: IntSize, item: T, ) -> Result<(), T> { let mut pos = enqueue_pos.load(Ordering::Relaxed); let mut cell; loop { cell = buffer.add(usize::from(pos & mask)); let seq = (*cell).sequence.load(Ordering::Acquire); let dif = (seq as i8).wrapping_sub(pos as i8); if dif == 0 { if enqueue_pos .compare_exchange_weak( pos, pos.wrapping_add(1), Ordering::Relaxed, Ordering::Relaxed, ) .is_ok() { break; } } else if dif < 0 { return Err(item); } else { pos = enqueue_pos.load(Ordering::Relaxed); } } (*cell).data.as_mut_ptr().write(item); (*cell) .sequence .store(pos.wrapping_add(1), Ordering::Release); Ok(()) } #[cfg(test)] mod tests { use super::Q2; #[test] fn sanity() { let q = Q2::new(); q.enqueue(0).unwrap(); q.enqueue(1).unwrap(); assert!(q.enqueue(2).is_err()); assert_eq!(q.dequeue(), Some(0)); assert_eq!(q.dequeue(), Some(1)); assert_eq!(q.dequeue(), None); } #[test] fn drain_at_pos255() { let q = Q2::new(); for _ in 0..255 { assert!(q.enqueue(0).is_ok()); assert_eq!(q.dequeue(), Some(0)); } // this should not block forever assert_eq!(q.dequeue(), None); } #[test] fn full_at_wrapped_pos0() { let q = Q2::new(); for _ in 0..254 { assert!(q.enqueue(0).is_ok()); assert_eq!(q.dequeue(), Some(0)); } assert!(q.enqueue(0).is_ok()); assert!(q.enqueue(0).is_ok()); // this should not block forever assert!(q.enqueue(0).is_err()); } }