Move from a custom channel to futures::sync

Closes #143
This commit is contained in:
Alex Crichton 2017-01-10 00:02:53 -08:00
parent c47c445be6
commit d0833074d6
4 changed files with 34 additions and 287 deletions

View File

@ -104,7 +104,6 @@ extern crate log;
#[macro_use]
pub mod io;
mod mpsc_queue;
mod heap;
#[doc(hidden)]
pub mod channel;

View File

@ -1,148 +0,0 @@
/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* The views and conclusions contained in the software and documentation are
* those of the authors and should not be interpreted as representing official
* policies, either expressed or implied, of Dmitry Vyukov.
*/
//! A mostly lock-free multi-producer, single consumer queue.
//!
//! This module contains an implementation of a concurrent MPSC queue. This
//! queue can be used to share data between threads, and is also used as the
//! building block of channels in rust.
//!
//! Note that the current implementation of this queue has a caveat of the `pop`
//! method, and see the method for more information about it. Due to this
//! caveat, this queue may not be appropriate for all use-cases.
// http://www.1024cores.net/home/lock-free-algorithms
// /queues/non-intrusive-mpsc-node-based-queue
// NOTE: this implementation is lifted from the standard library and only
// slightly modified
pub use self::PopResult::*;
use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
/// A result of the `pop` function.
pub enum PopResult<T> {
/// Some data has been popped
Data(T),
/// The queue is empty
Empty,
/// The queue is in an inconsistent state. Popping data should succeed, but
/// some pushers have yet to make enough progress in order allow a pop to
/// succeed. It is recommended that a pop() occur "in the near future" in
/// order to see if the sender has made progress or not
Inconsistent,
}
struct Node<T> {
next: AtomicPtr<Node<T>>,
value: Option<T>,
}
/// The multi-producer single-consumer structure. This is not cloneable, but it
/// may be safely shared so long as it is guaranteed that there is only one
/// popper at a time (many pushers are allowed).
pub struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
}
unsafe impl<T: Send> Send for Queue<T> { }
unsafe impl<T: Send> Sync for Queue<T> { }
impl<T> Node<T> {
unsafe fn new(v: Option<T>) -> *mut Node<T> {
Box::into_raw(Box::new(Node {
next: AtomicPtr::new(ptr::null_mut()),
value: v,
}))
}
}
impl<T> Queue<T> {
/// Creates a new queue that is safe to share among multiple producers and
/// one consumer.
pub fn new() -> Queue<T> {
let stub = unsafe { Node::new(None) };
Queue {
head: AtomicPtr::new(stub),
tail: UnsafeCell::new(stub),
}
}
/// Pushes a new value onto this queue.
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));
let prev = self.head.swap(n, Ordering::AcqRel);
(*prev).next.store(n, Ordering::Release);
}
}
/// Pops some data from this queue.
///
/// Note that the current implementation means that this function cannot
/// return `Option<T>`. It is possible for this queue to be in an
/// inconsistent state where many pushes have succeeded and completely
/// finished, but pops cannot return `Some(t)`. This inconsistent state
/// happens when a pusher is pre-empted at an inopportune moment.
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
///
/// This function is unsafe because only one thread can call it at a time.
pub unsafe fn pop(&self) -> PopResult<T> {
let tail = *self.tail.get();
let next = (*tail).next.load(Ordering::Acquire);
if !next.is_null() {
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take().unwrap();
drop(Box::from_raw(tail));
return Data(ret);
}
if self.head.load(Ordering::Acquire) == tail {Empty} else {Inconsistent}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Ordering::Relaxed);
drop(Box::from_raw(cur));
cur = next;
}
}
}
}

View File

@ -1,118 +0,0 @@
//! A thin wrapper around a mpsc queue and mio-based channel information
//!
//! Normally the standard library's channels would suffice but we unfortunately
//! need the `Sender<T>` half to be `Sync`, so to accomplish this for now we
//! just vendor the same mpsc queue as the one in the standard library and then
//! we pair that with the `mio::channel` module's Ctl pairs to control the
//! readiness notifications on the channel.
use std::cell::Cell;
use std::io;
use std::marker;
use std::sync::Arc;
use mio;
use mio::channel::{ctl_pair, SenderCtl, ReceiverCtl};
use mpsc_queue::{Queue, PopResult};
pub struct Sender<T> {
ctl: SenderCtl,
inner: Arc<Queue<T>>,
}
pub struct Receiver<T> {
ctl: ReceiverCtl,
inner: Arc<Queue<T>>,
_marker: marker::PhantomData<Cell<()>>, // this type is not Sync
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Queue::new());
let (tx, rx) = ctl_pair();
let tx = Sender {
ctl: tx,
inner: inner.clone(),
};
let rx = Receiver {
ctl: rx,
inner: inner.clone(),
_marker: marker::PhantomData,
};
(tx, rx)
}
impl<T> Sender<T> {
pub fn send(&self, data: T) -> io::Result<()> {
self.inner.push(data);
self.ctl.inc()
}
}
impl<T> Receiver<T> {
pub fn recv(&self) -> io::Result<Option<T>> {
// Note that the underlying method is `unsafe` because it's only safe
// if one thread accesses it at a time.
//
// We, however, are the only thread with a `Receiver<T>` because this
// type is not `Sync`. and we never handed out another instance.
match unsafe { self.inner.pop() } {
PopResult::Data(t) => {
try!(self.ctl.dec());
Ok(Some(t))
}
// If the queue is either in an inconsistent or empty state, then
// we return `None` for both instances. Note that the standard
// library performs a yield loop in the event of `Inconsistent`,
// which means that there's data in the queue but a sender hasn't
// finished their operation yet.
//
// We do this because the queue will continue to be readable as
// the thread performing the push will eventually call `inc`, so
// if we return `None` and the event loop just loops aruond calling
// this method then we'll eventually get back to the same spot
// and due the retry.
//
// Basically, the inconsistent state doesn't mean we need to busy
// wait, but instead we can forge ahead and assume by the time we
// go to the kernel and come back we'll no longer be in an
// inconsistent state.
PopResult::Empty |
PopResult::Inconsistent => Ok(None),
}
}
}
// Just delegate everything to `self.ctl`
impl<T> mio::Evented for Receiver<T> {
fn register(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.register(poll, token, interest, opts)
}
fn reregister(&self,
poll: &mio::Poll,
token: mio::Token,
interest: mio::Ready,
opts: mio::PollOpt) -> io::Result<()> {
self.ctl.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio::Poll) -> io::Result<()> {
self.ctl.deregister(poll)
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender {
ctl: self.ctl.clone(),
inner: self.inner.clone(),
}
}
}

View File

@ -15,16 +15,15 @@ use std::time::{Instant, Duration};
use futures::{self, Future, IntoFuture, Async};
use futures::executor::{self, Spawn, Unpark};
use futures::sync::mpsc;
use futures::task::Task;
use mio;
use slab::Slab;
use heap::{Heap, Slot};
mod channel;
mod io_token;
mod timeout_token;
use self::channel::{Sender, Receiver, channel};
mod poll_evented;
mod timeout;
@ -45,8 +44,11 @@ scoped_thread_local!(static CURRENT_LOOP: Core);
// TODO: expand this
pub struct Core {
events: mio::Events,
tx: Sender<Message>,
rx: Receiver<Message>,
tx: mpsc::UnboundedSender<Message>,
rx: RefCell<Spawn<mpsc::UnboundedReceiver<Message>>>,
_rx_registration: mio::Registration,
rx_readiness: Arc<MySetReadiness>,
inner: Rc<RefCell<Inner>>,
// Used for determining when the future passed to `run` is ready. Once the
@ -82,7 +84,7 @@ struct Inner {
#[derive(Clone)]
pub struct Remote {
id: usize,
tx: Sender<Message>,
tx: mpsc::UnboundedSender<Message>,
}
/// A non-sendable handle to an event loop, useful for manufacturing instances
@ -133,20 +135,26 @@ impl Core {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub fn new() -> io::Result<Core> {
let (tx, rx) = channel();
let io = try!(mio::Poll::new());
try!(io.register(&rx,
TOKEN_MESSAGES,
mio::Ready::readable(),
mio::PollOpt::edge()));
let future_pair = mio::Registration::new(&io,
TOKEN_FUTURE,
mio::Ready::readable(),
mio::PollOpt::level());
let (tx, rx) = mpsc::unbounded();
let channel_pair = mio::Registration::new(&io,
TOKEN_MESSAGES,
mio::Ready::readable(),
mio::PollOpt::level());
let rx_readiness = Arc::new(MySetReadiness(channel_pair.1));
rx_readiness.unpark();
Ok(Core {
events: mio::Events::with_capacity(1024),
tx: tx,
rx: rx,
rx: RefCell::new(executor::spawn(rx)),
_rx_registration: channel_pair.0,
rx_readiness: rx_readiness,
_future_registration: future_pair.0,
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
@ -274,6 +282,7 @@ impl Core {
trace!("event {:?} {:?}", event.kind(), event.token());
if token == TOKEN_MESSAGES {
self.rx_readiness.0.set_readiness(mio::Ready::none()).unwrap();
CURRENT_LOOP.set(&self, || self.consume_queue());
} else if token == TOKEN_FUTURE {
self.future_readiness.0.set_readiness(mio::Ready::none()).unwrap();
@ -377,8 +386,13 @@ impl Core {
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
while let Some(msg) = self.rx.recv().unwrap() {
self.notify(msg);
let unpark = self.rx_readiness.clone();
loop {
match self.rx.borrow_mut().poll_stream(unpark.clone()).unwrap() {
Async::Ready(Some(msg)) => self.notify(msg),
Async::NotReady |
Async::Ready(None) => break,
}
}
}
@ -525,15 +539,15 @@ impl Remote {
lp.notify(msg);
}
None => {
match self.tx.send(msg) {
// TODO: shouldn't have to `clone` here, can we upstream
// that &self works with `UnboundedSender`?
match mpsc::UnboundedSender::send(&mut self.tx.clone(), msg) {
Ok(()) => {}
// This should only happen when there was an error
// writing to the pipe to wake up the event loop,
// hopefully that never happens
Err(e) => {
panic!("error sending message to event loop: {}", e)
}
// TODO: this error should punt upwards and we should
// notify the caller that the message wasn't
// received. This is tokio-core#17
Err(e) => drop(e),
}
}
}