diff --git a/src/lib.rs b/src/lib.rs index 542219190..c588f4c01 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,7 +104,6 @@ extern crate log; #[macro_use] pub mod io; -mod mpsc_queue; mod heap; #[doc(hidden)] pub mod channel; diff --git a/src/mpsc_queue.rs b/src/mpsc_queue.rs deleted file mode 100644 index 7893bbb7e..000000000 --- a/src/mpsc_queue.rs +++ /dev/null @@ -1,148 +0,0 @@ -/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED - * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT - * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR - * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE - * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF - * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * The views and conclusions contained in the software and documentation are - * those of the authors and should not be interpreted as representing official - * policies, either expressed or implied, of Dmitry Vyukov. - */ - -//! A mostly lock-free multi-producer, single consumer queue. -//! -//! This module contains an implementation of a concurrent MPSC queue. This -//! queue can be used to share data between threads, and is also used as the -//! building block of channels in rust. -//! -//! Note that the current implementation of this queue has a caveat of the `pop` -//! method, and see the method for more information about it. Due to this -//! caveat, this queue may not be appropriate for all use-cases. - -// http://www.1024cores.net/home/lock-free-algorithms -// /queues/non-intrusive-mpsc-node-based-queue - -// NOTE: this implementation is lifted from the standard library and only -// slightly modified - -pub use self::PopResult::*; - -use std::cell::UnsafeCell; -use std::ptr; -use std::sync::atomic::{AtomicPtr, Ordering}; - -/// A result of the `pop` function. -pub enum PopResult { - /// Some data has been popped - Data(T), - /// The queue is empty - Empty, - /// The queue is in an inconsistent state. Popping data should succeed, but - /// some pushers have yet to make enough progress in order allow a pop to - /// succeed. It is recommended that a pop() occur "in the near future" in - /// order to see if the sender has made progress or not - Inconsistent, -} - -struct Node { - next: AtomicPtr>, - value: Option, -} - -/// The multi-producer single-consumer structure. This is not cloneable, but it -/// may be safely shared so long as it is guaranteed that there is only one -/// popper at a time (many pushers are allowed). -pub struct Queue { - head: AtomicPtr>, - tail: UnsafeCell<*mut Node>, -} - -unsafe impl Send for Queue { } -unsafe impl Sync for Queue { } - -impl Node { - unsafe fn new(v: Option) -> *mut Node { - Box::into_raw(Box::new(Node { - next: AtomicPtr::new(ptr::null_mut()), - value: v, - })) - } -} - -impl Queue { - /// Creates a new queue that is safe to share among multiple producers and - /// one consumer. - pub fn new() -> Queue { - let stub = unsafe { Node::new(None) }; - Queue { - head: AtomicPtr::new(stub), - tail: UnsafeCell::new(stub), - } - } - - /// Pushes a new value onto this queue. - pub fn push(&self, t: T) { - unsafe { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, Ordering::AcqRel); - (*prev).next.store(n, Ordering::Release); - } - } - - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option`. It is possible for this queue to be in an - /// inconsistent state where many pushes have succeeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - /// - /// This function is unsafe because only one thread can call it at a time. - pub unsafe fn pop(&self) -> PopResult { - let tail = *self.tail.get(); - let next = (*tail).next.load(Ordering::Acquire); - - if !next.is_null() { - *self.tail.get() = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take().unwrap(); - drop(Box::from_raw(tail)); - return Data(ret); - } - - if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent} - } -} - -impl Drop for Queue { - fn drop(&mut self) { - unsafe { - let mut cur = *self.tail.get(); - while !cur.is_null() { - let next = (*cur).next.load(Ordering::Relaxed); - drop(Box::from_raw(cur)); - cur = next; - } - } - } -} diff --git a/src/reactor/channel.rs b/src/reactor/channel.rs deleted file mode 100644 index e3ddd3935..000000000 --- a/src/reactor/channel.rs +++ /dev/null @@ -1,118 +0,0 @@ -//! A thin wrapper around a mpsc queue and mio-based channel information -//! -//! Normally the standard library's channels would suffice but we unfortunately -//! need the `Sender` half to be `Sync`, so to accomplish this for now we -//! just vendor the same mpsc queue as the one in the standard library and then -//! we pair that with the `mio::channel` module's Ctl pairs to control the -//! readiness notifications on the channel. - -use std::cell::Cell; -use std::io; -use std::marker; -use std::sync::Arc; - -use mio; -use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl}; - -use mpsc_queue::{Queue, PopResult}; - -pub struct Sender { - ctl: SenderCtl, - inner: Arc>, -} - -pub struct Receiver { - ctl: ReceiverCtl, - inner: Arc>, - _marker: marker::PhantomData>, // this type is not Sync -} - -pub fn channel() -> (Sender, Receiver) { - let inner = Arc::new(Queue::new()); - let (tx, rx) = ctl_pair(); - - let tx = Sender { - ctl: tx, - inner: inner.clone(), - }; - let rx = Receiver { - ctl: rx, - inner: inner.clone(), - _marker: marker::PhantomData, - }; - (tx, rx) -} - -impl Sender { - pub fn send(&self, data: T) -> io::Result<()> { - self.inner.push(data); - self.ctl.inc() - } -} - -impl Receiver { - pub fn recv(&self) -> io::Result> { - // Note that the underlying method is `unsafe` because it's only safe - // if one thread accesses it at a time. - // - // We, however, are the only thread with a `Receiver` because this - // type is not `Sync`. and we never handed out another instance. - match unsafe { self.inner.pop() } { - PopResult::Data(t) => { - try!(self.ctl.dec()); - Ok(Some(t)) - } - - // If the queue is either in an inconsistent or empty state, then - // we return `None` for both instances. Note that the standard - // library performs a yield loop in the event of `Inconsistent`, - // which means that there's data in the queue but a sender hasn't - // finished their operation yet. - // - // We do this because the queue will continue to be readable as - // the thread performing the push will eventually call `inc`, so - // if we return `None` and the event loop just loops aruond calling - // this method then we'll eventually get back to the same spot - // and due the retry. - // - // Basically, the inconsistent state doesn't mean we need to busy - // wait, but instead we can forge ahead and assume by the time we - // go to the kernel and come back we'll no longer be in an - // inconsistent state. - PopResult::Empty | - PopResult::Inconsistent => Ok(None), - } - } -} - -// Just delegate everything to `self.ctl` -impl mio::Evented for Receiver { - fn register(&self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt) -> io::Result<()> { - self.ctl.register(poll, token, interest, opts) - } - - fn reregister(&self, - poll: &mio::Poll, - token: mio::Token, - interest: mio::Ready, - opts: mio::PollOpt) -> io::Result<()> { - self.ctl.reregister(poll, token, interest, opts) - } - - fn deregister(&self, poll: &mio::Poll) -> io::Result<()> { - self.ctl.deregister(poll) - } -} - -impl Clone for Sender { - fn clone(&self) -> Sender { - Sender { - ctl: self.ctl.clone(), - inner: self.inner.clone(), - } - } -} diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 3787602fe..117207526 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -15,16 +15,15 @@ use std::time::{Instant, Duration}; use futures::{self, Future, IntoFuture, Async}; use futures::executor::{self, Spawn, Unpark}; +use futures::sync::mpsc; use futures::task::Task; use mio; use slab::Slab; use heap::{Heap, Slot}; -mod channel; mod io_token; mod timeout_token; -use self::channel::{Sender, Receiver, channel}; mod poll_evented; mod timeout; @@ -45,8 +44,11 @@ scoped_thread_local!(static CURRENT_LOOP: Core); // TODO: expand this pub struct Core { events: mio::Events, - tx: Sender, - rx: Receiver, + tx: mpsc::UnboundedSender, + rx: RefCell>>, + _rx_registration: mio::Registration, + rx_readiness: Arc, + inner: Rc>, // Used for determining when the future passed to `run` is ready. Once the @@ -82,7 +84,7 @@ struct Inner { #[derive(Clone)] pub struct Remote { id: usize, - tx: Sender, + tx: mpsc::UnboundedSender, } /// A non-sendable handle to an event loop, useful for manufacturing instances @@ -133,20 +135,26 @@ impl Core { /// Creates a new event loop, returning any error that happened during the /// creation. pub fn new() -> io::Result { - let (tx, rx) = channel(); let io = try!(mio::Poll::new()); - try!(io.register(&rx, - TOKEN_MESSAGES, - mio::Ready::readable(), - mio::PollOpt::edge())); let future_pair = mio::Registration::new(&io, TOKEN_FUTURE, mio::Ready::readable(), mio::PollOpt::level()); + let (tx, rx) = mpsc::unbounded(); + let channel_pair = mio::Registration::new(&io, + TOKEN_MESSAGES, + mio::Ready::readable(), + mio::PollOpt::level()); + let rx_readiness = Arc::new(MySetReadiness(channel_pair.1)); + rx_readiness.unpark(); + Ok(Core { events: mio::Events::with_capacity(1024), tx: tx, - rx: rx, + rx: RefCell::new(executor::spawn(rx)), + _rx_registration: channel_pair.0, + rx_readiness: rx_readiness, + _future_registration: future_pair.0, future_readiness: Arc::new(MySetReadiness(future_pair.1)), @@ -274,6 +282,7 @@ impl Core { trace!("event {:?} {:?}", event.kind(), event.token()); if token == TOKEN_MESSAGES { + self.rx_readiness.0.set_readiness(mio::Ready::none()).unwrap(); CURRENT_LOOP.set(&self, || self.consume_queue()); } else if token == TOKEN_FUTURE { self.future_readiness.0.set_readiness(mio::Ready::none()).unwrap(); @@ -377,8 +386,13 @@ impl Core { fn consume_queue(&self) { debug!("consuming notification queue"); // TODO: can we do better than `.unwrap()` here? - while let Some(msg) = self.rx.recv().unwrap() { - self.notify(msg); + let unpark = self.rx_readiness.clone(); + loop { + match self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap() { + Async::Ready(Some(msg)) => self.notify(msg), + Async::NotReady | + Async::Ready(None) => break, + } } } @@ -525,15 +539,15 @@ impl Remote { lp.notify(msg); } None => { - match self.tx.send(msg) { + // TODO: shouldn't have to `clone` here, can we upstream + // that &self works with `UnboundedSender`? + match mpsc::UnboundedSender::send(&mut self.tx.clone(), msg) { Ok(()) => {} - // This should only happen when there was an error - // writing to the pipe to wake up the event loop, - // hopefully that never happens - Err(e) => { - panic!("error sending message to event loop: {}", e) - } + // TODO: this error should punt upwards and we should + // notify the caller that the message wasn't + // received. This is tokio-core#17 + Err(e) => drop(e), } } }