Swap Handle/Pinned

* Handle -> Remote
* Pinned -> Handle

All APIs now take a `&Handle` by default and in general can return an immediate
`io::Result` instead of an `IoFuture`. This reflects how most usage will likely
be done through handles rather than remotes, and also all previous functionality
can be recovered with a `oneshot` plus `Remote::spawn`.

Closes #15
This commit is contained in:
Alex Crichton 2016-09-07 16:11:19 -07:00
parent e60002b653
commit 66cff8e84b
23 changed files with 327 additions and 1344 deletions

View File

@ -20,36 +20,34 @@ fn main() {
// Create the event loop that will drive this server
let mut l = Core::new().unwrap();
let pin = l.pin();
let handle = l.handle();
// Create a TCP listener which will listen for incoming connections
let server = TcpListener::bind(&addr, &l.handle());
let socket = TcpListener::bind(&addr, &l.handle()).unwrap();
let done = server.and_then(move |socket| {
// Once we've got the TCP listener, inform that we have it
println!("Listening on: {}", addr);
// Once we've got the TCP listener, inform that we have it
println!("Listening on: {}", addr);
// Pull out the stream of incoming connections and then for each new
// one spin up a new task copying data.
//
// We use the `io::copy` future to copy all data from the
// reading half onto the writing half.
socket.incoming().for_each(move |(socket, addr)| {
let pair = futures::lazy(|| futures::finished(socket.split()));
let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
// Pull out the stream of incoming connections and then for each new
// one spin up a new task copying data.
//
// We use the `io::copy` future to copy all data from the
// reading half onto the writing half.
let done = socket.incoming().for_each(move |(socket, addr)| {
let pair = futures::lazy(|| futures::finished(socket.split()));
let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
// Once all that is done we print out how much we wrote, and then
// critically we *spawn* this future which allows it to run
// concurrently with other connections.
let msg = amt.map(move |amt| {
println!("wrote {} bytes to {}", amt, addr)
}).map_err(|e| {
panic!("error: {}", e);
});
pin.spawn(msg);
// Once all that is done we print out how much we wrote, and then
// critically we *spawn* this future which allows it to run
// concurrently with other connections.
let msg = amt.map(move |amt| {
println!("wrote {} bytes to {}", amt, addr)
}).map_err(|e| {
panic!("error: {}", e);
});
handle.spawn(msg);
Ok(())
})
Ok(())
});
l.run(done).unwrap();
}

View File

@ -23,16 +23,15 @@ fn main() {
let addr = addr.parse::<SocketAddr>().unwrap();
let mut l = Core::new().unwrap();
let server = TcpListener::bind(&addr, &l.handle()).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
write(socket).or_else(|_| Ok(()))
}).for_each(|()| {
println!("lost the socket");
Ok(())
})
});
let socket = TcpListener::bind(&addr, &l.handle()).unwrap();
println!("Listenering on: {}", addr);
let server = socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr);
write(socket).or_else(|_| Ok(()))
}).for_each(|()| {
println!("lost the socket");
Ok(())
});
l.run(server).unwrap();
}

View File

@ -6,11 +6,10 @@
use std::io;
use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll, Async};
use futures::{Poll, Async};
use futures::stream::Stream;
use mio::channel;
use io::IoFuture;
use reactor::{Handle, PollEvented};
/// The transmission half of a channel used for sending messages to a receiver.
@ -35,12 +34,6 @@ pub struct Receiver<T> {
rx: PollEvented<channel::Receiver<T>>,
}
/// Future returned by the `channel` function which will resolve to a
/// `Receiver<T>`.
pub struct ReceiverNew<T> {
inner: IoFuture<Receiver<T>>,
}
/// Creates a new in-memory channel used for sending data across `Send +
/// 'static` boundaries, frequently threads.
///
@ -53,12 +46,12 @@ pub struct ReceiverNew<T> {
/// The returned `Sender` can be used to send messages that are processed by
/// the returned `Receiver`. The `Sender` can be cloned to send messages
/// from multiple sources simultaneously.
pub fn channel<T>(handle: &Handle) -> (Sender<T>, ReceiverNew<T>)
pub fn channel<T>(handle: &Handle) -> io::Result<(Sender<T>, Receiver<T>)>
where T: Send + 'static,
{
let (tx, rx) = channel::channel();
let rx = PollEvented::new(rx, handle).map(|rx| Receiver { rx: rx });
(Sender { tx: tx }, ReceiverNew { inner: rx.boxed() })
let rx = try!(PollEvented::new(rx, handle));
Ok((Sender { tx: tx }, Receiver { rx: rx }))
}
impl<T> Sender<T> {
@ -109,12 +102,3 @@ impl<T> Stream for Receiver<T> {
}
}
}
impl<T> Future for ReceiverNew<T> {
type Item = Receiver<T>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Receiver<T>, io::Error> {
self.inner.poll()
}
}

View File

@ -58,34 +58,32 @@
//!
//! // Create the event loop that will drive this server
//! let mut l = Core::new().unwrap();
//! let pin = l.pin();
//! let handle = l.handle();
//!
//! // Create a TCP listener which will listen for incoming connections
//! let server = TcpListener::bind(&addr, pin.handle());
//! let socket = TcpListener::bind(&addr, &handle).unwrap();
//!
//! let done = server.and_then(|socket| {
//! // Once we've got the TCP listener, inform that we have it
//! println!("Listening on: {}", addr);
//! // Once we've got the TCP listener, inform that we have it
//! println!("Listening on: {}", addr);
//!
//! // Pull out the stream of incoming connections and then for each new
//! // one spin up a new task copying data.
//! //
//! // We use the `io::copy` future to copy all data from the
//! // reading half onto the writing half.
//! socket.incoming().for_each(|(socket, addr)| {
//! let pair = futures::lazy(|| Ok(socket.split()));
//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
//!
//! // Once all that is done we print out how much we wrote, and then
//! // critically we *spawn* this future which allows it to run
//! // concurrently with other connections.
//! pin.spawn(amt.then(move |result| {
//! println!("wrote {:?} bytes to {}", result, addr);
//! Ok(())
//! }));
//! // Pull out the stream of incoming connections and then for each new
//! // one spin up a new task copying data.
//! //
//! // We use the `io::copy` future to copy all data from the
//! // reading half onto the writing half.
//! let done = socket.incoming().for_each(|(socket, addr)| {
//! let pair = futures::lazy(|| Ok(socket.split()));
//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
//!
//! // Once all that is done we print out how much we wrote, and then
//! // critically we *spawn* this future which allows it to run
//! // concurrently with other connections.
//! handle.spawn(amt.then(move |result| {
//! println!("wrote {:?} bytes to {}", result, addr);
//! Ok(())
//! })
//! }));
//!
//! Ok(())
//! });
//!
//! // Execute our server (modeled as a future) and wait for it to
@ -107,9 +105,6 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
mod slot;
mod lock;
#[macro_use]
pub mod io;

View File

@ -1,106 +0,0 @@
//! A "mutex" which only supports try_lock
//!
//! As a futures library the eventual call to an event loop should be the only
//! thing that ever blocks, so this is assisted with a fast user-space
//! implementation of a lock that can only have a `try_lock` operation.
extern crate core;
use self::core::cell::UnsafeCell;
use self::core::ops::{Deref, DerefMut};
use self::core::sync::atomic::Ordering::{Acquire, Release};
use self::core::sync::atomic::AtomicBool;
/// A "mutex" around a value, similar to `std::sync::Mutex<T>`.
///
/// This lock only supports the `try_lock` operation, however, and does not
/// implement poisoning.
pub struct Lock<T> {
locked: AtomicBool,
data: UnsafeCell<T>,
}
/// Sentinel representing an acquired lock through which the data can be
/// accessed.
pub struct TryLock<'a, T: 'a> {
__ptr: &'a Lock<T>,
}
// The `Lock` structure is basically just a `Mutex<T>`, and these two impls are
// intended to mirror the standard library's corresponding impls for `Mutex<T>`.
//
// If a `T` is sendable across threads, so is the lock, and `T` must be sendable
// across threads to be `Sync` because it allows mutable access from multiple
// threads.
unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}
impl<T> Lock<T> {
/// Creates a new lock around the given value.
pub fn new(t: T) -> Lock<T> {
Lock {
locked: AtomicBool::new(false),
data: UnsafeCell::new(t),
}
}
/// Attempts to acquire this lock, returning whether the lock was acquired or
/// not.
///
/// If `Some` is returned then the data this lock protects can be accessed
/// through the sentinel. This sentinel allows both mutable and immutable
/// access.
///
/// If `None` is returned then the lock is already locked, either elsewhere
/// on this thread or on another thread.
pub fn try_lock(&self) -> Option<TryLock<T>> {
if !self.locked.swap(true, Acquire) {
Some(TryLock { __ptr: self })
} else {
None
}
}
}
impl<'a, T> Deref for TryLock<'a, T> {
type Target = T;
fn deref(&self) -> &T {
// The existence of `TryLock` represents that we own the lock, so we
// can safely access the data here.
unsafe { &*self.__ptr.data.get() }
}
}
impl<'a, T> DerefMut for TryLock<'a, T> {
fn deref_mut(&mut self) -> &mut T {
// The existence of `TryLock` represents that we own the lock, so we
// can safely access the data here.
//
// Additionally, we're the *only* `TryLock` in existence so mutable
// access should be ok.
unsafe { &mut *self.__ptr.data.get() }
}
}
impl<'a, T> Drop for TryLock<'a, T> {
fn drop(&mut self) {
self.__ptr.locked.store(false, Release);
}
}
#[cfg(test)]
mod tests {
use super::Lock;
#[test]
fn smoke() {
let a = Lock::new(1);
let mut a1 = a.try_lock().unwrap();
assert!(a.try_lock().is_none());
assert_eq!(*a1, 1);
*a1 = 2;
drop(a1);
assert_eq!(*a.try_lock().unwrap(), 2);
assert_eq!(*a.try_lock().unwrap(), 2);
}
}

View File

@ -7,5 +7,5 @@ mod tcp;
mod udp;
pub use self::tcp::{TcpStream, TcpStreamNew};
pub use self::tcp::{TcpListener, TcpListenerNew, Incoming};
pub use self::tcp::{TcpListener, Incoming};
pub use self::udp::{UdpSocket, UdpSocketNew};

View File

@ -4,7 +4,7 @@ use std::mem;
use std::net::{self, SocketAddr, Shutdown};
use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll, Async};
use futures::{self, Future, failed, Poll, Async};
use mio;
use io::{Io, IoFuture, IoStream};
@ -18,11 +18,6 @@ pub struct TcpListener {
io: PollEvented<mio::tcp::TcpListener>,
}
/// Future which will resolve to a `TcpListener`
pub struct TcpListenerNew {
inner: IoFuture<TcpListener>,
}
/// Stream returned by the `TcpListener::incoming` function representing the
/// stream of sockets received from a listener.
pub struct Incoming {
@ -35,12 +30,9 @@ impl TcpListener {
/// The TCP listener will bind to the provided `addr` address, if available,
/// and will be returned as a future. The returned future, if resolved
/// successfully, can then be used to accept incoming connections.
pub fn bind(addr: &SocketAddr, handle: &Handle) -> TcpListenerNew {
let future = match mio::tcp::TcpListener::bind(addr) {
Ok(l) => TcpListener::new(l, handle),
Err(e) => failed(e).boxed(),
};
TcpListenerNew { inner: future }
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<TcpListener> {
let l = try!(mio::tcp::TcpListener::bind(addr));
TcpListener::new(l, handle)
}
/// Create a new TCP listener from the standard library's TCP listener.
@ -72,19 +64,15 @@ impl TcpListener {
/// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr,
handle: &Handle) -> IoFuture<TcpListener> {
let handle = handle.clone();
mio::tcp::TcpListener::from_listener(listener, addr)
.into_future()
.and_then(move |l| TcpListener::new(l, &handle))
.boxed()
handle: &Handle) -> io::Result<TcpListener> {
let l = try!(mio::tcp::TcpListener::from_listener(listener, addr));
TcpListener::new(l, handle)
}
fn new(listener: mio::tcp::TcpListener, handle: &Handle)
-> IoFuture<TcpListener> {
PollEvented::new(listener, handle).map(|io| {
TcpListener { io: io }
}).boxed()
-> io::Result<TcpListener> {
let io = try!(PollEvented::new(listener, handle));
Ok(TcpListener { io: io })
}
/// Test whether this socket is ready to be read or not.
@ -129,13 +117,19 @@ impl TcpListener {
}
}
let handle = self.io.handle().clone();
let remote = self.io.remote().clone();
let stream = MyIncoming { inner: self };
Incoming {
inner: stream.and_then(move |(tcp, addr)| {
PollEvented::new(tcp, &handle).map(move |io| {
(TcpStream { io: io }, addr)
})
let (tx, rx) = futures::oneshot();
remote.spawn(move |handle| {
let res = PollEvented::new(tcp, handle).map(move |io| {
(TcpStream { io: io }, addr)
});
tx.complete(res);
Ok(())
});
rx.then(|r| r.expect("shouldn't be canceled"))
}).boxed(),
}
}
@ -185,15 +179,6 @@ impl fmt::Debug for TcpListener {
}
}
impl Future for TcpListenerNew {
type Item = TcpListener;
type Error = io::Error;
fn poll(&mut self) -> Poll<TcpListener, io::Error> {
self.inner.poll()
}
}
impl Stream for Incoming {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;
@ -242,7 +227,8 @@ impl TcpStream {
fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
-> IoFuture<TcpStream> {
PollEvented::new(connected_stream, handle).and_then(|io| {
let tcp = PollEvented::new(connected_stream, handle);
futures::done(tcp).and_then(|io| {
TcpStreamConnect::Waiting(TcpStream { io: io })
}).boxed()
}

View File

@ -2,7 +2,7 @@ use std::io;
use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr};
use std::fmt;
use futures::{Future, failed, Poll, Async};
use futures::{Future, Poll, Async};
use mio;
use io::IoFuture;
@ -25,18 +25,14 @@ impl UdpSocket {
/// `addr` provided. The returned future will be resolved once the socket
/// has successfully bound. If an error happens during the binding or during
/// the socket creation, that error will be returned to the future instead.
pub fn bind(addr: &SocketAddr, handle: &Handle) -> UdpSocketNew {
let future = match mio::udp::UdpSocket::bind(addr) {
Ok(udp) => UdpSocket::new(udp, handle),
Err(e) => failed(e).boxed(),
};
UdpSocketNew { inner: future }
pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result<UdpSocket> {
let udp = try!(mio::udp::UdpSocket::bind(addr));
UdpSocket::new(udp, handle)
}
fn new(socket: mio::udp::UdpSocket, handle: &Handle) -> IoFuture<UdpSocket> {
PollEvented::new(socket, handle).map(|io| {
UdpSocket { io: io }
}).boxed()
fn new(socket: mio::udp::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> {
let io = try!(PollEvented::new(socket, handle));
Ok(UdpSocket { io: io })
}
/// Creates a new `UdpSocket` from the previously bound socket provided.
@ -49,11 +45,9 @@ impl UdpSocket {
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
pub fn from_socket(socket: net::UdpSocket,
handle: &Handle) -> IoFuture<UdpSocket> {
match mio::udp::UdpSocket::from_socket(socket) {
Ok(udp) => UdpSocket::new(udp, handle),
Err(e) => failed(e).boxed(),
}
handle: &Handle) -> io::Result<UdpSocket> {
let udp = try!(mio::udp::UdpSocket::from_socket(socket));
UdpSocket::new(udp, handle)
}
/// Returns the local address that this stream is bound to.

View File

@ -2,19 +2,10 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::io;
use futures::{Future, Poll};
use futures::task;
use mio;
use reactor::{Message, Handle, CoreFuture, Direction, Core};
/// A future which will resolve a unique `tok` token for an I/O object.
///
/// Created through the `Handle::add_source` method, this future can also
/// resolve to an error if there's an issue communicating with the event loop.
pub struct IoTokenNew<E> {
inner: CoreFuture<(E, (Arc<AtomicUsize>, usize)), E>,
}
use reactor::{Message, Remote, Handle, Direction};
/// A token that identifies an active timeout.
pub struct IoToken {
@ -40,15 +31,13 @@ impl IoToken {
/// The returned future will panic if the event loop this handle is
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new<E>(source: E, handle: &Handle) -> IoTokenNew<E>
where E: mio::Evented + Send + 'static,
{
IoTokenNew {
inner: CoreFuture {
handle: handle.clone(),
data: Some(source),
result: None,
},
pub fn new(source: &mio::Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.inner.upgrade() {
Some(inner) => {
let (ready, token) = try!(inner.borrow_mut().add_source(source));
Ok(IoToken { token: token, readiness: ready })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}
@ -93,7 +82,7 @@ impl IoToken {
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self, handle: &Handle) {
pub fn schedule_read(&self, handle: &Remote) {
handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
}
@ -120,7 +109,7 @@ impl IoToken {
///
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self, handle: &Handle) {
pub fn schedule_write(&self, handle: &Remote) {
handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
}
@ -146,30 +135,7 @@ impl IoToken {
/// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self, handle: &Handle) {
pub fn drop_source(&self, handle: &Remote) {
handle.send(Message::DropSource(self.token));
}
}
impl<E> Future for IoTokenNew<E>
where E: mio::Evented + Send + 'static,
{
type Item = (E, IoToken);
type Error = io::Error;
fn poll(&mut self) -> Poll<(E, IoToken), io::Error> {
let res = try_ready!(self.inner.poll(|lp, io| {
let pair = try!(lp.add_source(&io));
Ok((io, pair))
}, |io, slot| {
Message::Run(Box::new(move |lp: &Core| {
let res = lp.add_source(&io).map(|p| (io, p));
slot.try_produce(res).ok()
.expect("add source try_produce intereference");
}))
}));
let (io, (ready, token)) = res;
Ok((io, IoToken { token: token, readiness: ready }).into())
}
}

View File

@ -12,12 +12,11 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, Poll, IntoFuture, Async};
use futures::{Future, IntoFuture, Async};
use futures::task::{self, Unpark, Task, Spawn};
use mio;
use slab::Slab;
use slot::{self, Slot};
use timer_wheel::{TimerWheel, Timeout as WheelTimeout};
mod channel;
@ -27,8 +26,8 @@ use self::channel::{Sender, Receiver, channel};
mod poll_evented;
mod timeout;
pub use self::poll_evented::{PollEvented, PollEventedNew};
pub use self::timeout::{Timeout, TimeoutNew};
pub use self::poll_evented::PollEvented;
pub use self::timeout::Timeout;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
@ -43,23 +42,25 @@ const SLAB_CAPACITY: usize = 1024 * 64;
/// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this
pub struct Core {
id: usize,
io: mio::Poll,
events: mio::Events,
tx: Sender<Message>,
rx: Receiver<Message>,
io_dispatch: RefCell<Slab<ScheduledIo, usize>>,
task_dispatch: RefCell<Slab<ScheduledTask, usize>>,
// Incoming queue of newly spawned futures
new_futures: Rc<NewFutures>,
_new_futures_registration: mio::Registration,
inner: Rc<RefCell<Inner>>,
// Used for determining when the future passed to `run` is ready. Once the
// registration is passed to `io` above we never touch it again, just keep
// it alive.
_future_registration: mio::Registration,
future_readiness: Arc<MySetReadiness>,
}
struct Inner {
id: usize,
io: mio::Poll,
// Dispatch slabs for I/O and futures events
io_dispatch: Slab<ScheduledIo>,
task_dispatch: Slab<ScheduledTask>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
@ -67,8 +68,8 @@ pub struct Core {
// The slab below keeps track of the timeouts themselves as well as the
// state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab.
timer_wheel: RefCell<TimerWheel<usize>>,
timeouts: RefCell<Slab<(WheelTimeout, TimeoutState), usize>>,
timer_wheel: TimerWheel<usize>,
timeouts: Slab<(WheelTimeout, TimeoutState)>,
}
/// Handle to an event loop, used to construct I/O objects, send messages, and
@ -77,7 +78,7 @@ pub struct Core {
/// Handles can be cloned, and when cloned they will still refer to the
/// same underlying event loop.
#[derive(Clone)]
pub struct Handle {
pub struct Remote {
id: usize,
tx: Sender<Message>,
}
@ -85,9 +86,9 @@ pub struct Handle {
/// A non-sendable handle to an event loop, useful for manufacturing instances
/// of `LoopData`.
#[derive(Clone)]
pub struct Pinned {
handle: Handle,
futures: Weak<NewFutures>,
pub struct Handle {
remote: Remote,
inner: Weak<RefCell<Inner>>,
}
struct ScheduledIo {
@ -102,11 +103,6 @@ struct ScheduledTask {
wake: Arc<MySetReadiness>,
}
struct NewFutures {
queue: RefCell<Vec<Box<Future<Item=(), Error=()>>>>,
ready: mio::SetReadiness,
}
enum TimeoutState {
NotFired,
Fired,
@ -121,7 +117,6 @@ enum Direction {
enum Message {
DropSource(usize),
Schedule(usize, Task, Direction),
AddTimeout(Instant, Arc<Slot<io::Result<(usize, Instant)>>>),
UpdateTimeout(usize, Task),
CancelTimeout(usize),
Run(Box<FnBox>),
@ -129,8 +124,7 @@ enum Message {
const TOKEN_MESSAGES: mio::Token = mio::Token(0);
const TOKEN_FUTURE: mio::Token = mio::Token(1);
const TOKEN_NEW_FUTURES: mio::Token = mio::Token(2);
const TOKEN_START: usize = 3;
const TOKEN_START: usize = 2;
impl Core {
/// Creates a new event loop, returning any error that happened during the
@ -146,52 +140,43 @@ impl Core {
TOKEN_FUTURE,
mio::Ready::readable(),
mio::PollOpt::level());
let new_future_pair = mio::Registration::new(&io,
TOKEN_NEW_FUTURES,
mio::Ready::readable(),
mio::PollOpt::level());
Ok(Core {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
events: mio::Events::with_capacity(1024),
tx: tx,
rx: rx,
io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
timeouts: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
timer_wheel: RefCell::new(TimerWheel::new()),
_future_registration: future_pair.0,
future_readiness: Arc::new(MySetReadiness(future_pair.1)),
_new_futures_registration: new_future_pair.0,
new_futures: Rc::new(NewFutures {
queue: RefCell::new(Vec::new()),
ready: new_future_pair.1,
}),
inner: Rc::new(RefCell::new(Inner {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
io_dispatch: Slab::with_capacity(SLAB_CAPACITY),
task_dispatch: Slab::with_capacity(SLAB_CAPACITY),
timeouts: Slab::with_capacity(SLAB_CAPACITY),
timer_wheel: TimerWheel::new(),
})),
})
}
/// Generates a handle to this event loop used to construct I/O objects and
/// send messages.
/// Returns a handle to this event loop which cannot be sent across threads
/// but can be used as a proxy to the event loop itself.
///
/// Handles to an event loop are cloneable as well and clones will always
/// refer to the same event loop.
/// Handles are cloneable and clones always refer to the same event loop.
/// This handle is typically passed into functions that create I/O objects
/// to bind them to this event loop.
pub fn handle(&self) -> Handle {
Handle {
id: self.id,
tx: self.tx.clone(),
remote: self.remote(),
inner: Rc::downgrade(&self.inner),
}
}
/// Returns a "pin" of this event loop which cannot be sent across threads
/// but can be used as a proxy to the event loop itself.
///
/// Currently the primary use for this is to use as a handle to add data
/// to the event loop directly. The `Pinned::add_loop_data` method can
/// be used to immediately create instances of `LoopData` structures.
pub fn pin(&self) -> Pinned {
Pinned {
handle: self.handle(),
futures: Rc::downgrade(&self.new_futures),
/// Generates a remote handle to this event loop which can be used to spawn
/// tasks from other threads into this event loop.
pub fn remote(&self) -> Remote {
Remote {
id: self.inner.borrow().id,
tx: self.tx.clone(),
}
}
@ -256,14 +241,15 @@ impl Core {
// attaching strace, or similar.
let start = Instant::now();
loop {
let timeout = self.timer_wheel.borrow().next_timeout().map(|t| {
let inner = self.inner.borrow_mut();
let timeout = inner.timer_wheel.next_timeout().map(|t| {
if t < start {
Duration::new(0, 0)
} else {
t - start
}
});
match self.io.poll(&mut self.events, timeout) {
match inner.io.poll(&mut self.events, timeout) {
Ok(a) => {
amt = a;
break;
@ -294,12 +280,6 @@ impl Core {
if !finished && CURRENT_LOOP.set(self, || done()) {
finished = true;
}
} else if token == TOKEN_NEW_FUTURES {
self.new_futures.ready.set_readiness(mio::Ready::none()).unwrap();
let mut new_futures = self.new_futures.queue.borrow_mut();
for future in new_futures.drain(..) {
self.spawn(future);
}
} else {
self.dispatch(token, event.kind());
}
@ -309,7 +289,7 @@ impl Core {
}
}
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
if token % 2 == 0 {
self.dispatch_io(token / 2, ready)
@ -318,10 +298,11 @@ impl Core {
}
}
fn dispatch_io(&self, token: usize, ready: mio::Ready) {
fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
let mut reader = None;
let mut writer = None;
if let Some(io) = self.io_dispatch.borrow_mut().get_mut(token) {
let mut inner = self.inner.borrow_mut();
if let Some(io) = inner.io_dispatch.get_mut(token) {
if ready.is_readable() {
reader = io.reader.take();
io.readiness.fetch_or(1, Ordering::Relaxed);
@ -331,6 +312,7 @@ impl Core {
io.readiness.fetch_or(2, Ordering::Relaxed);
}
}
drop(inner);
// TODO: don't notify the same task twice
if let Some(reader) = reader {
self.notify_handle(reader);
@ -340,8 +322,9 @@ impl Core {
}
}
fn dispatch_task(&self, token: usize) {
let (task, wake) = match self.task_dispatch.borrow_mut().get_mut(token) {
fn dispatch_task(&mut self, token: usize) {
let mut inner = self.inner.borrow_mut();
let (task, wake) = match inner.task_dispatch.get_mut(token) {
Some(slot) => (slot.spawn.take(), slot.wake.clone()),
None => return,
};
@ -350,24 +333,31 @@ impl Core {
Some(task) => task,
None => return,
};
drop(inner);
let res = CURRENT_LOOP.set(self, || task.poll_future(wake));
let mut dispatch = self.task_dispatch.borrow_mut();
inner = self.inner.borrow_mut();
match res {
Ok(Async::NotReady) => {
assert!(dispatch[token].spawn.is_none());
dispatch[token].spawn = Some(task);
assert!(inner.task_dispatch[token].spawn.is_none());
inner.task_dispatch[token].spawn = Some(task);
}
Ok(Async::Ready(())) |
Err(()) => {
dispatch.remove(token).unwrap();
inner.task_dispatch.remove(token).unwrap();
}
}
}
fn consume_timeouts(&mut self, now: Instant) {
while let Some(idx) = self.timer_wheel.borrow_mut().poll(now) {
loop {
let mut inner = self.inner.borrow_mut();
let idx = match inner.timer_wheel.poll(now) {
Some(idx) => idx,
None => break,
};
trace!("firing timeout: {}", idx);
let handle = self.timeouts.borrow_mut()[idx].1.fire();
let handle = inner.timeouts[idx].1.fire();
drop(inner);
if let Some(handle) = handle {
self.notify_handle(handle);
}
@ -383,109 +373,6 @@ impl Core {
CURRENT_LOOP.set(&self, || handle.unpark());
}
fn add_source(&self, source: &mio::Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = ScheduledIo {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
let mut dispatch = self.io_dispatch.borrow_mut();
if dispatch.vacant_entry().is_none() {
let amt = dispatch.len();
dispatch.reserve_exact(amt);
}
let entry = dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(TOKEN_START + entry.index() * 2),
mio::Ready::readable() | mio::Ready::writable(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
}
fn drop_source(&self, token: usize) {
debug!("dropping I/O source: {}", token);
self.io_dispatch.borrow_mut().remove(token).unwrap();
}
fn schedule(&self, token: usize, wake: Task, dir: Direction) {
debug!("scheduling direction for: {}", token);
let to_call = {
let mut dispatch = self.io_dispatch.borrow_mut();
let sched = dispatch.get_mut(token).unwrap();
let (slot, bit) = match dir {
Direction::Read => (&mut sched.reader, 1),
Direction::Write => (&mut sched.writer, 2),
};
if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
*slot = None;
Some(wake)
} else {
*slot = Some(wake);
None
}
};
if let Some(to_call) = to_call {
debug!("schedule immediately done");
self.notify_handle(to_call);
}
}
fn add_timeout(&self, at: Instant) -> io::Result<(usize, Instant)> {
let mut timeouts = self.timeouts.borrow_mut();
if timeouts.vacant_entry().is_none() {
let len = timeouts.len();
timeouts.reserve_exact(len);
}
let entry = timeouts.vacant_entry().unwrap();
let timeout = self.timer_wheel.borrow_mut().insert(at, entry.index());
let when = *timeout.when();
let entry = entry.insert((timeout, TimeoutState::NotFired));
debug!("added a timeout: {}", entry.index());
Ok((entry.index(), when))
}
fn update_timeout(&self, token: usize, handle: Task) {
debug!("updating a timeout: {}", token);
let to_wake = self.timeouts.borrow_mut()[token].1.block(handle);
if let Some(to_wake) = to_wake {
self.notify_handle(to_wake);
}
}
fn cancel_timeout(&self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.borrow_mut().remove(token);
if let Some((timeout, _state)) = pair {
self.timer_wheel.borrow_mut().cancel(&timeout);
}
}
fn spawn(&self, future: Box<Future<Item=(), Error=()>>) {
let unpark = {
let mut dispatch = self.task_dispatch.borrow_mut();
if dispatch.vacant_entry().is_none() {
let len = dispatch.len();
dispatch.reserve_exact(len);
}
let entry = dispatch.vacant_entry().unwrap();
let token = TOKEN_START + 2 * entry.index() + 1;
let pair = mio::Registration::new(&self.io,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level());
let unpark = Arc::new(MySetReadiness(pair.1));
let entry = entry.insert(ScheduledTask {
spawn: Some(task::spawn(future)),
wake: unpark,
_registration: pair.0,
});
entry.get().wake.clone()
};
unpark.unpark();
}
fn consume_queue(&self) {
debug!("consuming notification queue");
// TODO: can we do better than `.unwrap()` here?
@ -496,21 +383,118 @@ impl Core {
fn notify(&self, msg: Message) {
match msg {
Message::DropSource(tok) => self.drop_source(tok),
Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir),
Message::AddTimeout(at, slot) => {
slot.try_produce(self.add_timeout(at))
.expect("interference with try_produce on timeout");
Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok),
Message::Schedule(tok, wake, dir) => {
let task = self.inner.borrow_mut().schedule(tok, wake, dir);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::UpdateTimeout(t, handle) => {
let task = self.inner.borrow_mut().update_timeout(t, handle);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::CancelTimeout(t) => {
self.inner.borrow_mut().cancel_timeout(t)
}
Message::UpdateTimeout(t, handle) => self.update_timeout(t, handle),
Message::CancelTimeout(t) => self.cancel_timeout(t),
Message::Run(r) => r.call_box(self),
}
}
}
impl Handle {
impl Inner {
fn add_source(&mut self, source: &mio::Evented)
-> io::Result<(Arc<AtomicUsize>, usize)> {
debug!("adding a new I/O source");
let sched = ScheduledIo {
readiness: Arc::new(AtomicUsize::new(0)),
reader: None,
writer: None,
};
if self.io_dispatch.vacant_entry().is_none() {
let amt = self.io_dispatch.len();
self.io_dispatch.reserve_exact(amt);
}
let entry = self.io_dispatch.vacant_entry().unwrap();
try!(self.io.register(source,
mio::Token(TOKEN_START + entry.index() * 2),
mio::Ready::readable() | mio::Ready::writable(),
mio::PollOpt::edge()));
Ok((sched.readiness.clone(), entry.insert(sched).index()))
}
fn drop_source(&mut self, token: usize) {
debug!("dropping I/O source: {}", token);
self.io_dispatch.remove(token).unwrap();
}
fn schedule(&mut self, token: usize, wake: Task, dir: Direction)
-> Option<Task> {
debug!("scheduling direction for: {}", token);
let sched = self.io_dispatch.get_mut(token).unwrap();
let (slot, bit) = match dir {
Direction::Read => (&mut sched.reader, 1),
Direction::Write => (&mut sched.writer, 2),
};
if sched.readiness.load(Ordering::SeqCst) & bit != 0 {
*slot = None;
Some(wake)
} else {
*slot = Some(wake);
None
}
}
fn add_timeout(&mut self, at: Instant) -> io::Result<(usize, Instant)> {
if self.timeouts.vacant_entry().is_none() {
let len = self.timeouts.len();
self.timeouts.reserve_exact(len);
}
let entry = self.timeouts.vacant_entry().unwrap();
let timeout = self.timer_wheel.insert(at, entry.index());
let when = *timeout.when();
let entry = entry.insert((timeout, TimeoutState::NotFired));
debug!("added a timeout: {}", entry.index());
Ok((entry.index(), when))
}
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
debug!("updating a timeout: {}", token);
self.timeouts[token].1.block(handle)
}
fn cancel_timeout(&mut self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.remove(token);
if let Some((timeout, _state)) = pair {
self.timer_wheel.cancel(&timeout);
}
}
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
if self.task_dispatch.vacant_entry().is_none() {
let len = self.task_dispatch.len();
self.task_dispatch.reserve_exact(len);
}
let entry = self.task_dispatch.vacant_entry().unwrap();
let token = TOKEN_START + 2 * entry.index() + 1;
let pair = mio::Registration::new(&self.io,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level());
let unpark = Arc::new(MySetReadiness(pair.1));
let entry = entry.insert(ScheduledTask {
spawn: Some(task::spawn(future)),
wake: unpark,
_registration: pair.0,
});
entry.get().wake.clone().unpark();
}
}
impl Remote {
fn send(&self, msg: Message) {
self.with_loop(|lp| {
match lp {
@ -541,7 +525,8 @@ impl Handle {
{
if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| {
if lp.id == self.id {
let same = lp.inner.borrow().id == self.id;
if same {
f(Some(lp))
} else {
f(None)
@ -561,84 +546,32 @@ impl Handle {
/// Note that while the closure, `F`, requires the `Send` bound as it might
/// cross threads, the future `R` does not.
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Pinned) -> R + Send + 'static,
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.pin());
lp.spawn(Box::new(f.into_future()));
let f = f(&lp.handle());
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
})));
}
}
impl Pinned {
/// Returns a reference to the underlying handle to the event loop.
pub fn handle(&self) -> &Handle {
&self.handle
impl Handle {
/// Returns a reference to the underlying remote handle to the event loop.
pub fn remote(&self) -> &Remote {
&self.remote
}
/// Spawns a new future on the event loop this pin is associated this.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.futures.upgrade() {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.queue.borrow_mut().push(Box::new(f));
inner.ready.set_readiness(mio::Ready::readable()).unwrap();
}
}
struct CoreFuture<T, U> {
handle: Handle,
data: Option<U>,
result: Option<(Arc<Slot<io::Result<T>>>, slot::Token)>,
}
impl<T, U> CoreFuture<T, U>
where T: 'static,
{
fn poll<F, G>(&mut self, f: F, g: G) -> Poll<T, io::Error>
where F: FnOnce(&Core, U) -> io::Result<T>,
G: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message,
{
match self.result {
Some((ref result, ref mut token)) => {
result.cancel(*token);
match result.try_consume() {
Ok(Ok(t)) => return Ok(t.into()),
Ok(Err(e)) => return Err(e),
Err(_) => {}
}
let task = task::park();
*token = result.on_full(move |_| {
task.unpark();
});
Ok(Async::NotReady)
}
None => {
let data = &mut self.data;
let ret = self.handle.with_loop(|lp| {
lp.map(|lp| f(lp, data.take().unwrap()))
});
if let Some(ret) = ret {
debug!("loop future done immediately on event loop");
return ret.map(|e| e.into())
}
debug!("loop future needs to send info to event loop");
let task = task::park();
let result = Arc::new(Slot::new(None));
let token = result.on_full(move |_| {
task.unpark();
});
self.result = Some((result.clone(), token));
self.handle.send(g(data.take().unwrap(), result));
Ok(Async::NotReady)
}
}
inner.borrow_mut().spawn(Box::new(f));
}
}

View File

@ -9,12 +9,12 @@
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Future, Poll, Async};
use futures::Async;
use mio;
use io::Io;
use reactor::Handle;
use reactor::io_token::{IoToken, IoTokenNew};
use reactor::{Handle, Remote};
use reactor::io_token::IoToken;
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
@ -34,31 +34,24 @@ use reactor::io_token::{IoToken, IoTokenNew};
/// any scheduling necessary to get notified when the event is ready again.
pub struct PollEvented<E> {
token: IoToken,
handle: Handle,
handle: Remote,
readiness: AtomicUsize,
io: E,
}
/// Future returned from `PollEvented::new` which will resolve to a
/// `PollEvented`.
pub struct PollEventedNew<E> {
inner: IoTokenNew<E>,
handle: Handle,
}
impl<E> PollEvented<E>
where E: mio::Evented + Send + 'static,
{
impl<E: mio::Evented> PollEvented<E> {
/// Creates a new readiness stream associated with the provided
/// `loop_handle` and for the given `source`.
///
/// This method returns a future which will resolve to the readiness stream
/// when it's ready.
pub fn new(source: E, handle: &Handle) -> PollEventedNew<E> {
PollEventedNew {
inner: IoToken::new(source, handle),
handle: handle.clone(),
}
pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> {
Ok(PollEvented {
token: try!(IoToken::new(&io, handle)),
handle: handle.remote().clone(),
readiness: AtomicUsize::new(0),
io: io,
})
}
}
@ -137,7 +130,7 @@ impl<E> PollEvented<E> {
/// Returns a reference to the event loop handle that this readiness stream
/// is associated with.
pub fn handle(&self) -> &Handle {
pub fn remote(&self) -> &Remote {
&self.handle
}
@ -266,20 +259,3 @@ impl<E> Drop for PollEvented<E> {
self.token.drop_source(&self.handle);
}
}
impl<E> Future for PollEventedNew<E>
where E: mio::Evented + Send + 'static,
{
type Item = PollEvented<E>;
type Error = io::Error;
fn poll(&mut self) -> Poll<PollEvented<E>, io::Error> {
let (io, token) = try_ready!(self.inner.poll());
Ok(PollEvented {
token: token,
handle: self.handle.clone(),
io: io,
readiness: AtomicUsize::new(0),
}.into())
}
}

View File

@ -8,9 +8,8 @@ use std::time::{Duration, Instant};
use futures::{Future, Poll, Async};
use reactor::Handle;
use reactor::{Remote, Handle};
use reactor::timeout_token::TimeoutToken;
use io::IoFuture;
/// A future representing the notification that a timeout has occurred.
///
@ -21,13 +20,7 @@ use io::IoFuture;
/// otherwise indicated to fire at.
pub struct Timeout {
token: TimeoutToken,
handle: Handle,
}
/// Future returned from `Timeout::new` and `Timeout::new_at` which will resolve
/// to the actual `Timeout` itself.
pub struct TimeoutNew {
inner: IoFuture<Timeout>,
handle: Remote,
}
impl Timeout {
@ -36,7 +29,7 @@ impl Timeout {
/// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn new(dur: Duration, handle: &Handle) -> TimeoutNew {
pub fn new(dur: Duration, handle: &Handle) -> io::Result<Timeout> {
Timeout::new_at(Instant::now() + dur, handle)
}
@ -45,16 +38,11 @@ impl Timeout {
/// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn new_at(at: Instant, handle: &Handle) -> TimeoutNew {
let handle = handle.clone();
TimeoutNew {
inner: TimeoutToken::new(at, &handle).map(move |token| {
Timeout {
token: token,
handle: handle,
}
}).boxed(),
}
pub fn new_at(at: Instant, handle: &Handle) -> io::Result<Timeout> {
Ok(Timeout {
token: try!(TimeoutToken::new(at, &handle)),
handle: handle.remote().clone(),
})
}
}
@ -74,15 +62,6 @@ impl Future for Timeout {
}
}
impl Future for TimeoutNew {
type Item = Timeout;
type Error = io::Error;
fn poll(&mut self) -> Poll<Timeout, io::Error> {
self.inner.poll()
}
}
impl Drop for Timeout {
fn drop(&mut self) {
self.token.cancel_timeout(&self.handle);

View File

@ -1,16 +1,9 @@
use std::io;
use std::time::Instant;
use futures::{Future, Poll};
use futures::task;
use reactor::{Message, Core, Handle, CoreFuture};
/// Return value from the `Handle::add_timeout` method, a future that will
/// resolve to a `TimeoutToken` to configure the behavior of that timeout.
pub struct TimeoutTokenNew {
inner: CoreFuture<(usize, Instant), Instant>,
}
use reactor::{Message, Handle, Remote};
/// A token that identifies an active timeout.
pub struct TimeoutToken {
@ -21,13 +14,13 @@ pub struct TimeoutToken {
impl TimeoutToken {
/// Adds a new timeout to get fired at the specified instant, notifying the
/// specified task.
pub fn new(at: Instant, handle: &Handle) -> TimeoutTokenNew {
TimeoutTokenNew {
inner: CoreFuture {
handle: handle.clone(),
data: Some(at),
result: None,
},
pub fn new(at: Instant, handle: &Handle) -> io::Result<TimeoutToken> {
match handle.inner.upgrade() {
Some(inner) => {
let (token, when) = try!(inner.borrow_mut().add_timeout(at));
Ok(TimeoutToken { token: token, when: when })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}
@ -47,7 +40,7 @@ impl TimeoutToken {
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, handle: &Handle) {
pub fn update_timeout(&self, handle: &Remote) {
handle.send(Message::UpdateTimeout(self.token, task::park()))
}
@ -57,22 +50,8 @@ impl TimeoutToken {
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn cancel_timeout(&self, handle: &Handle) {
pub fn cancel_timeout(&self, handle: &Remote) {
debug!("cancel timeout {}", self.token);
handle.send(Message::CancelTimeout(self.token))
}
}
impl Future for TimeoutTokenNew {
type Item = TimeoutToken;
type Error = io::Error;
fn poll(&mut self) -> Poll<TimeoutToken, io::Error> {
let (t, i) = try_ready!(self.inner.poll(Core::add_timeout,
Message::AddTimeout));
Ok(TimeoutToken {
token: t,
when: i,
}.into())
}
}

View File

@ -1,691 +0,0 @@
//! A slot in memory for communicating between a producer and a consumer.
//!
//! This module contains an implementation detail of this library for a type
//! which is only intended to be shared between one consumer and one producer of
//! a value. It is unlikely that this module will survive stabilization of this
//! library, so it is not recommended to rely on it.
#![allow(dead_code)] // imported in a few places
use std::prelude::v1::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use lock::Lock;
/// A slot in memory intended to represent the communication channel between one
/// producer and one consumer.
///
/// Each slot contains space for a piece of data of type `T`, and can have
/// callbacks registered to run when the slot is either full or empty.
///
/// Slots are only intended to be shared between exactly one producer and
/// exactly one consumer. If there are multiple concurrent producers or
/// consumers then this is still memory safe but will have unpredictable results
/// (and maybe panics). Note that this does not require that the "consumer" is
/// the same for the entire lifetime of a slot, simply that there is only one
/// consumer at a time.
///
/// # Registering callbacks
///
/// [`on_empty`](#method.on_empty) registers a callback to run when the slot
/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it
/// becomes full. In both cases, the callback will run immediately if possible.
///
/// At most one callback can be registered at any given time: it is an error to
/// attempt to register a callback with `on_full` if one is currently registered
/// via `on_empty`, or any other combination.
///
/// # Cancellation
///
/// Registering a callback returns a `Token` which can be used to
/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet
/// started running can be canceled. Canceling a callback that has already run
/// is not an error, and `cancel` does not signal whether or not the callback
/// was actually canceled to the caller.
pub struct Slot<T> {
// The purpose of this data type is to communicate when a value becomes
// available and coordinate between a producer and consumer about that
// value. Slots end up being at the core of many futures as they handle
// values transferring between producers and consumers, which means that
// they can never block.
//
// As a result, this `Slot` is a lock-free implementation in terms of not
// actually blocking at any point in time. The `Lock` types are
// half-optional and half-not-optional. They aren't actually mutexes as they
// only support a `try_lock` operation, and all the methods below ensure
// that progress can always be made without blocking.
//
// The `state` variable keeps track of the state of this slot, while the
// other fields here are just the payloads of the slot itself. Note that the
// exact bits of `state` are typically wrapped up in a `State` for
// inspection (see below).
state: AtomicUsize,
slot: Lock<Option<T>>,
on_full: Lock<Option<Box<FnBox<T>>>>,
on_empty: Lock<Option<(Box<FnBox2<T>>, Option<T>)>>,
}
/// Error value returned from erroneous calls to `try_produce`, which contains
/// the value that was passed to `try_produce`.
#[derive(Debug, PartialEq)]
pub struct TryProduceError<T>(T);
/// Error value returned from erroneous calls to `try_consume`.
#[derive(Debug, PartialEq)]
pub struct TryConsumeError(());
/// Error value returned from erroneous calls to `on_full`.
#[derive(Debug, PartialEq)]
pub struct OnFullError(());
/// Error value returned from erroneous calls to `on_empty`.
#[derive(Debug, PartialEq)]
pub struct OnEmptyError(());
/// A `Token` represents a registered callback, and can be used to cancel the callback.
#[derive(Clone, Copy)]
pub struct Token(usize);
// Slot state: the lowest 3 bits are flags; the remaining bits are used to
// store the `Token` for the currently registered callback. The special token
// value 0 means no callback is registered.
//
// The flags are:
// - `DATA`: the `Slot` contains a value
// - `ON_FULL`: the `Slot` has an `on_full` callback registered
// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered
struct State(usize);
const DATA: usize = 1 << 0;
const ON_FULL: usize = 1 << 1;
const ON_EMPTY: usize = 1 << 2;
const STATE_BITS: usize = 3;
const STATE_MASK: usize = (1 << STATE_BITS) - 1;
fn _is_send<T: Send>() {}
fn _is_sync<T: Send>() {}
fn _assert() {
_is_send::<Slot<i32>>();
_is_sync::<Slot<u32>>();
}
impl<T> Slot<T> {
/// Creates a new `Slot` containing `val`, which may be `None` to create an
/// empty `Slot`.
pub fn new(val: Option<T>) -> Slot<T> {
Slot {
state: AtomicUsize::new(if val.is_some() {DATA} else {0}),
slot: Lock::new(val),
on_full: Lock::new(None),
on_empty: Lock::new(None),
}
}
/// Attempts to store `t` in the slot.
///
/// This method can only be called by the one consumer working on this
/// `Slot`. Concurrent calls to this method or `on_empty` will result in
/// panics or possibly errors.
///
/// # Errors
///
/// Returns `Err` if the slot is already full. The value you attempted to
/// store is included in the error value.
///
/// # Panics
///
/// This method will panic if called concurrently with `try_produce` or
/// `on_empty`, or if `on_empty` has been called previously but the callback
/// hasn't fired.
pub fn try_produce(&self, t: T) -> Result<(), TryProduceError<T>> {
// First up, let's take a look at our current state. Of our three flags,
// we check a few:
//
// * DATA - if this is set, then the production fails as a value has
// already been produced and we're not ready to receive it yet.
// * ON_EMPTY - this should never be set as it indicates a contract
// violation as the producer already registered interest in
// a value but the callback wasn't fired.
// * ON_FULL - doesn't matter in this use case, we don't check it as
// either state is valid.
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_EMPTY));
if state.flag(DATA) {
return Err(TryProduceError(t))
}
// Ok, so we've determined that our state is either `ON_FULL` or `0`, in
// both cases we're going to store our data into our slot. This should
// always succeed as access to `slot` is gated on the `DATA` flag being
// set on the consumer side (which isn't set) and there should only be
// one producer.
let mut slot = self.slot.try_lock().expect("interference with consumer?");
assert!(slot.is_none());
*slot = Some(t);
drop(slot);
// Next, we update our state with `DATA` to say that something is
// available, and we also unset `ON_FULL` because we'll invoke the
// callback if it's available.
loop {
assert!(!state.flag(ON_EMPTY));
let new_state = state.set_flag(DATA, true).set_flag(ON_FULL, false);
let old = self.state.compare_and_swap(state.0,
new_state.0,
Ordering::SeqCst);
if old == state.0 {
break
}
state.0 = old;
}
// If our previous state we transitioned from indicates that it has an
// on-full callback, we call that callback here. There's a few unwraps
// here that should never fail because the consumer shouldn't be placing
// another callback here and there shouldn't be any other producers as
// well.
if state.flag(ON_FULL) {
let cb = self.on_full.try_lock().expect("interference2")
.take().expect("ON_FULL but no callback");
cb.call_box(self);
}
Ok(())
}
/// Registers `f` as a callback to run when the slot becomes empty.
///
/// The callback will run immediately if the slot is already empty. Returns
/// a token that can be used to cancel the callback. This method is to be
/// called by the producer, and it is illegal to call this method
/// concurrently with either `on_empty` or `try_produce`.
///
/// # Panics
///
/// Panics if another callback was already registered via `on_empty` or
/// `on_full`, or if this value is called concurrently with other producer
/// methods.
pub fn on_empty<F>(&self, item: Option<T>, f: F) -> Token
where F: FnOnce(&Slot<T>, Option<T>) + Send + 'static
{
// First up, as usual, take a look at our state. Of the three flags we
// check two:
//
// * DATA - if set, we keep going, but if unset we're done as there's no
// data and we're already empty.
// * ON_EMPTY - this should be impossible as it's a contract violation
// to call this twice or concurrently.
// * ON_FULL - it's illegal to have both an empty and a full callback
// simultaneously, so we check this just after we ensure
// there's data available. If there's data there should not
// be a full callback as it should have been called.
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_EMPTY));
if !state.flag(DATA) {
f(self, item);
return Token(0)
}
assert!(!state.flag(ON_FULL));
// At this point we've precisely determined that our state is `DATA` and
// all other flags are unset. We're cleared for landing in initializing
// the `on_empty` slot so we store our callback here.
let mut slot = self.on_empty.try_lock().expect("on_empty interference");
assert!(slot.is_none());
*slot = Some((Box::new(f), item));
drop(slot);
// In this loop, we transition ourselves from the `DATA` state to a
// state which has the on empty flag state. Note that we also increase
// the token of this state as we're registering a new callback.
loop {
assert!(state.flag(DATA));
assert!(!state.flag(ON_FULL));
assert!(!state.flag(ON_EMPTY));
let new_state = state.set_flag(ON_EMPTY, true)
.set_token(state.token() + 1);
let old = self.state.compare_and_swap(state.0,
new_state.0,
Ordering::SeqCst);
// If we succeeded in the CAS, then we're done and our token is
// valid.
if old == state.0 {
return Token(new_state.token())
}
state.0 = old;
// If we failed the CAS but the data was taken in the meantime we
// abort our attempt to set on-empty and call the callback
// immediately. Note that the on-empty flag was never set, so it
// should still be there and should be available to take.
if !state.flag(DATA) {
let cb = self.on_empty.try_lock().expect("on_empty interference2")
.take().expect("on_empty not empty??");
let (cb, item) = cb;
cb.call_box(self, item);
return Token(0)
}
}
}
/// Attempts to consume the value stored in the slot.
///
/// This method can only be called by the one consumer of this slot, and
/// cannot be called concurrently with `try_consume` or `on_full`.
///
/// # Errors
///
/// Returns `Err` if the slot is already empty.
///
/// # Panics
///
/// This method will panic if called concurrently with `try_consume` or
/// `on_full`, or otherwise show weird behavior.
pub fn try_consume(&self) -> Result<T, TryConsumeError> {
// The implementation of this method is basically the same as
// `try_produce` above, it's just the opposite of all the operations.
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_FULL));
if !state.flag(DATA) {
return Err(TryConsumeError(()))
}
let mut slot = self.slot.try_lock().expect("interference with producer?");
let val = slot.take().expect("DATA but not data");
drop(slot);
loop {
assert!(!state.flag(ON_FULL));
let new_state = state.set_flag(DATA, false).set_flag(ON_EMPTY, false);
let old = self.state.compare_and_swap(state.0,
new_state.0,
Ordering::SeqCst);
if old == state.0 {
break
}
state.0 = old;
}
assert!(!state.flag(ON_FULL));
if state.flag(ON_EMPTY) {
let cb = self.on_empty.try_lock().expect("interference3")
.take().expect("ON_EMPTY but no callback");
let (cb, item) = cb;
cb.call_box(self, item);
}
Ok(val)
}
/// Registers `f` as a callback to run when the slot becomes full.
///
/// The callback will run immediately if the slot is already full. Returns a
/// token that can be used to cancel the callback.
///
/// This method is to be called by the consumer.
///
/// # Panics
///
/// Panics if another callback was already registered via `on_empty` or
/// `on_full` or if called concurrently with `on_full` or `try_consume`.
pub fn on_full<F>(&self, f: F) -> Token
where F: FnOnce(&Slot<T>) + Send + 'static
{
// The implementation of this method is basically the same as
// `on_empty` above, it's just the opposite of all the operations.
let mut state = State(self.state.load(Ordering::SeqCst));
assert!(!state.flag(ON_FULL));
if state.flag(DATA) {
f(self);
return Token(0)
}
assert!(!state.flag(ON_EMPTY));
let mut slot = self.on_full.try_lock().expect("on_full interference");
assert!(slot.is_none());
*slot = Some(Box::new(f));
drop(slot);
loop {
assert!(!state.flag(DATA));
assert!(!state.flag(ON_EMPTY));
assert!(!state.flag(ON_FULL));
let new_state = state.set_flag(ON_FULL, true)
.set_token(state.token() + 1);
let old = self.state.compare_and_swap(state.0,
new_state.0,
Ordering::SeqCst);
if old == state.0 {
return Token(new_state.token())
}
state.0 = old;
if state.flag(DATA) {
let cb = self.on_full.try_lock().expect("on_full interference2")
.take().expect("on_full not full??");
cb.call_box(self);
return Token(0)
}
}
}
/// Cancels the callback associated with `token`.
///
/// Canceling a callback that has already started running, or has already
/// run will do nothing, and is not an error. See
/// [Cancellation](#cancellation).
///
/// # Panics
///
/// This method may cause panics if it is called concurrently with
/// `on_empty` or `on_full`, depending on which callback is being canceled.
pub fn cancel(&self, token: Token) {
// Tokens with a value of "0" are sentinels which don't actually do
// anything.
let token = token.0;
if token == 0 {
return
}
let mut state = State(self.state.load(Ordering::SeqCst));
loop {
// If we've moved on to a different token, then we're guaranteed
// that our token won't show up again, so we can return immediately
// as our closure has likely already run (or been previously
// canceled).
if state.token() != token {
return
}
// If our token matches, then let's see if we're cancelling either
// the on-full or on-empty callbacks. It's illegal to have them both
// registered, so we only need to look at one.
//
// If neither are set then the token has probably already run, so we
// just continue along our merry way and don't worry.
let new_state = if state.flag(ON_FULL) {
assert!(!state.flag(ON_EMPTY));
state.set_flag(ON_FULL, false)
} else if state.flag(ON_EMPTY) {
assert!(!state.flag(ON_FULL));
state.set_flag(ON_EMPTY, false)
} else {
return
};
let old = self.state.compare_and_swap(state.0,
new_state.0,
Ordering::SeqCst);
if old == state.0 {
break
}
state.0 = old;
}
// Figure out which callback we just canceled, and now that the flag is
// unset we should own the callback to clear it.
if state.flag(ON_FULL) {
let cb = self.on_full.try_lock().expect("on_full interference3")
.take().expect("on_full not full??");
drop(cb);
} else {
let cb = self.on_empty.try_lock().expect("on_empty interference3")
.take().expect("on_empty not empty??");
drop(cb);
}
}
}
impl<T> TryProduceError<T> {
/// Extracts the value that was attempted to be produced.
pub fn into_inner(self) -> T {
self.0
}
}
trait FnBox<T>: Send {
fn call_box(self: Box<Self>, other: &Slot<T>);
}
impl<T, F> FnBox<T> for F
where F: FnOnce(&Slot<T>) + Send,
{
fn call_box(self: Box<F>, other: &Slot<T>) {
(*self)(other)
}
}
trait FnBox2<T>: Send {
fn call_box(self: Box<Self>, other: &Slot<T>, Option<T>);
}
impl<T, F> FnBox2<T> for F
where F: FnOnce(&Slot<T>, Option<T>) + Send,
{
fn call_box(self: Box<F>, other: &Slot<T>, item: Option<T>) {
(*self)(other, item)
}
}
impl State {
fn flag(&self, f: usize) -> bool {
self.0 & f != 0
}
fn set_flag(&self, f: usize, val: bool) -> State {
State(if val {
self.0 | f
} else {
self.0 & !f
})
}
fn token(&self) -> usize {
self.0 >> STATE_BITS
}
fn set_token(&self, gen: usize) -> State {
State((gen << STATE_BITS) | (self.0 & STATE_MASK))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use super::Slot;
#[test]
fn sequential() {
let slot = Slot::new(Some(1));
// We can consume once
assert_eq!(slot.try_consume(), Ok(1));
assert!(slot.try_consume().is_err());
// Consume a production
assert_eq!(slot.try_produce(2), Ok(()));
assert_eq!(slot.try_consume(), Ok(2));
// Can't produce twice
assert_eq!(slot.try_produce(3), Ok(()));
assert!(slot.try_produce(3).is_err());
// on_full is run immediately if full
let hit = Arc::new(AtomicUsize::new(0));
let hit2 = hit.clone();
slot.on_full(move |_s| {
hit2.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(hit.load(Ordering::SeqCst), 1);
// on_full can be run twice, and we can consume in the callback
let hit2 = hit.clone();
slot.on_full(move |s| {
hit2.fetch_add(1, Ordering::SeqCst);
assert_eq!(s.try_consume(), Ok(3));
});
assert_eq!(hit.load(Ordering::SeqCst), 2);
// Production can't run a previous callback
assert_eq!(slot.try_produce(4), Ok(()));
assert_eq!(hit.load(Ordering::SeqCst), 2);
assert_eq!(slot.try_consume(), Ok(4));
// Productions run new callbacks
let hit2 = hit.clone();
slot.on_full(move |s| {
hit2.fetch_add(1, Ordering::SeqCst);
assert_eq!(s.try_consume(), Ok(5));
});
assert_eq!(slot.try_produce(5), Ok(()));
assert_eq!(hit.load(Ordering::SeqCst), 3);
// on empty should fire immediately for an empty slot
let hit2 = hit.clone();
slot.on_empty(None, move |_, _| {
hit2.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(hit.load(Ordering::SeqCst), 4);
}
#[test]
fn channel() {
const N: usize = 10000;
struct Sender {
slot: Arc<Slot<usize>>,
hit: Arc<AtomicUsize>,
}
struct Receiver {
slot: Arc<Slot<usize>>,
hit: Arc<AtomicUsize>,
}
impl Sender {
fn send(&self, val: usize) {
if self.slot.try_produce(val).is_ok() {
return
}
let me = thread::current();
self.hit.store(0, Ordering::SeqCst);
let hit = self.hit.clone();
self.slot.on_empty(None, move |_slot, _| {
hit.store(1, Ordering::SeqCst);
me.unpark();
});
while self.hit.load(Ordering::SeqCst) == 0 {
thread::park();
}
self.slot.try_produce(val).expect("can't produce after on_empty")
}
}
impl Receiver {
fn recv(&self) -> usize {
if let Ok(i) = self.slot.try_consume() {
return i
}
let me = thread::current();
self.hit.store(0, Ordering::SeqCst);
let hit = self.hit.clone();
self.slot.on_full(move |_slot| {
hit.store(1, Ordering::SeqCst);
me.unpark();
});
while self.hit.load(Ordering::SeqCst) == 0 {
thread::park();
}
self.slot.try_consume().expect("can't consume after on_full")
}
}
let slot = Arc::new(Slot::new(None));
let slot2 = slot.clone();
let tx = Sender { slot: slot2, hit: Arc::new(AtomicUsize::new(0)) };
let rx = Receiver { slot: slot, hit: Arc::new(AtomicUsize::new(0)) };
let a = thread::spawn(move || {
for i in 0..N {
assert_eq!(rx.recv(), i);
}
});
for i in 0..N {
tx.send(i);
}
a.join().unwrap();
}
#[test]
fn cancel() {
let slot = Slot::new(None);
let hits = Arc::new(AtomicUsize::new(0));
let add = || {
let hits = hits.clone();
move |_: &Slot<u32>| { hits.fetch_add(1, Ordering::SeqCst); }
};
let add_empty = || {
let hits = hits.clone();
move |_: &Slot<u32>, _: Option<u32>| {
hits.fetch_add(1, Ordering::SeqCst);
}
};
// cancel on_full
let n = hits.load(Ordering::SeqCst);
assert_eq!(hits.load(Ordering::SeqCst), n);
let token = slot.on_full(add());
assert_eq!(hits.load(Ordering::SeqCst), n);
slot.cancel(token);
assert_eq!(hits.load(Ordering::SeqCst), n);
assert!(slot.try_consume().is_err());
assert!(slot.try_produce(1).is_ok());
assert!(slot.try_consume().is_ok());
assert_eq!(hits.load(Ordering::SeqCst), n);
// cancel on_empty
let n = hits.load(Ordering::SeqCst);
assert_eq!(hits.load(Ordering::SeqCst), n);
slot.try_produce(1).unwrap();
let token = slot.on_empty(None, add_empty());
assert_eq!(hits.load(Ordering::SeqCst), n);
slot.cancel(token);
assert_eq!(hits.load(Ordering::SeqCst), n);
assert!(slot.try_produce(1).is_err());
// cancel with no effect
let n = hits.load(Ordering::SeqCst);
assert_eq!(hits.load(Ordering::SeqCst), n);
let token = slot.on_full(add());
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
slot.cancel(token);
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
assert!(slot.try_consume().is_ok());
let token = slot.on_empty(None, add_empty());
assert_eq!(hits.load(Ordering::SeqCst), n + 2);
slot.cancel(token);
assert_eq!(hits.load(Ordering::SeqCst), n + 2);
// cancel old ones don't count
let n = hits.load(Ordering::SeqCst);
assert_eq!(hits.load(Ordering::SeqCst), n);
let token1 = slot.on_full(add());
assert_eq!(hits.load(Ordering::SeqCst), n);
assert!(slot.try_produce(1).is_ok());
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
assert!(slot.try_consume().is_ok());
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
let token2 = slot.on_full(add());
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
slot.cancel(token1);
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
slot.cancel(token2);
assert_eq!(hits.load(Ordering::SeqCst), n + 1);
}
}

View File

@ -25,8 +25,7 @@ fn echo_server() {
drop(env_logger::init());
let mut l = t!(Core::new());
let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let msg = "foo bar baz";

View File

@ -21,8 +21,7 @@ macro_rules! t {
#[test]
fn chain_clients() {
let mut l = t!(Core::new());
let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {

View File

@ -24,8 +24,7 @@ fn echo_server() {
drop(env_logger::init());
let mut l = t!(Core::new());
let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let msg = "foo bar baz";

View File

@ -21,8 +21,7 @@ macro_rules! t {
#[test]
fn limit() {
let mut l = t!(Core::new());
let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {

View File

@ -12,11 +12,11 @@ fn simple() {
let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot();
lp.pin().spawn(futures::lazy(|| {
lp.handle().spawn(futures::lazy(|| {
tx1.complete(1);
Ok(())
}));
lp.handle().spawn(|_| {
lp.remote().spawn(|_| {
futures::lazy(|| {
tx2.complete(2);
Ok(())
@ -33,10 +33,10 @@ fn spawn_in_poll() {
let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot();
let handle = lp.handle();
lp.pin().spawn(futures::lazy(move || {
let remote = lp.remote();
lp.handle().spawn(futures::lazy(move || {
tx1.complete(1);
handle.spawn(|_| {
remote.spawn(|_| {
futures::lazy(|| {
tx2.complete(2);
Ok(())

View File

@ -24,8 +24,7 @@ fn echo_server() {
drop(env_logger::init());
let mut l = t!(Core::new());
let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {

View File

@ -40,8 +40,7 @@ fn connect() {
fn accept() {
drop(env_logger::init());
let mut l = t!(Core::new());
let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let (tx, rx) = channel();
@ -66,8 +65,7 @@ fn accept() {
fn accept2() {
drop(env_logger::init());
let mut l = t!(Core::new());
let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv));
let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let addr = t!(srv.local_addr());
let t = thread::spawn(move || {

View File

@ -4,7 +4,6 @@ extern crate tokio_core;
use std::time::{Instant, Duration};
use futures::Future;
use tokio_core::reactor::{Core, Timeout};
macro_rules! t {
@ -19,7 +18,7 @@ fn smoke() {
drop(env_logger::init());
let mut l = t!(Core::new());
let dur = Duration::from_millis(10);
let timeout = Timeout::new(dur, &l.handle()).and_then(|t| t);
let timeout = t!(Timeout::new(dur, &l.handle()));
let start = Instant::now();
t!(l.run(timeout));
assert!(start.elapsed() >= dur);

View File

@ -19,9 +19,8 @@ macro_rules! t {
#[test]
fn send_messages() {
let mut l = t!(Core::new());
let a = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let b = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let (a, b) = t!(l.run(a.join(b)));
let a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()));
let a_addr = t!(a.local_addr());
let b_addr = t!(b.local_addr());