From 66cff8e84b0bdbdc716fe1cd715363aa7e79aefa Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 7 Sep 2016 16:11:19 -0700 Subject: [PATCH] Swap Handle/Pinned * Handle -> Remote * Pinned -> Handle All APIs now take a `&Handle` by default and in general can return an immediate `io::Result` instead of an `IoFuture`. This reflects how most usage will likely be done through handles rather than remotes, and also all previous functionality can be recovered with a `oneshot` plus `Remote::spawn`. Closes #15 --- examples/echo.rs | 46 ++- examples/sink.rs | 17 +- src/channel.rs | 24 +- src/lib.rs | 45 +-- src/lock.rs | 106 ------ src/net/mod.rs | 2 +- src/net/tcp.rs | 58 ++- src/net/udp.rs | 26 +- src/reactor/io_token.rs | 56 +-- src/reactor/mod.rs | 433 ++++++++++------------ src/reactor/poll_evented.rs | 50 +-- src/reactor/timeout.rs | 37 +- src/reactor/timeout_token.rs | 41 +-- src/slot.rs | 691 ----------------------------------- tests/buffered.rs | 3 +- tests/chain.rs | 3 +- tests/echo.rs | 3 +- tests/limit.rs | 3 +- tests/spawn.rs | 10 +- tests/stream-buffered.rs | 3 +- tests/tcp.rs | 6 +- tests/timeout.rs | 3 +- tests/udp.rs | 5 +- 23 files changed, 327 insertions(+), 1344 deletions(-) delete mode 100644 src/lock.rs delete mode 100644 src/slot.rs diff --git a/examples/echo.rs b/examples/echo.rs index 4f46e5fc8..cf441e5d2 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -20,36 +20,34 @@ fn main() { // Create the event loop that will drive this server let mut l = Core::new().unwrap(); - let pin = l.pin(); + let handle = l.handle(); // Create a TCP listener which will listen for incoming connections - let server = TcpListener::bind(&addr, &l.handle()); + let socket = TcpListener::bind(&addr, &l.handle()).unwrap(); - let done = server.and_then(move |socket| { - // Once we've got the TCP listener, inform that we have it - println!("Listening on: {}", addr); + // Once we've got the TCP listener, inform that we have it + println!("Listening on: {}", addr); - // Pull out the stream of incoming connections and then for each new - // one spin up a new task copying data. - // - // We use the `io::copy` future to copy all data from the - // reading half onto the writing half. - socket.incoming().for_each(move |(socket, addr)| { - let pair = futures::lazy(|| futures::finished(socket.split())); - let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); + // Pull out the stream of incoming connections and then for each new + // one spin up a new task copying data. + // + // We use the `io::copy` future to copy all data from the + // reading half onto the writing half. + let done = socket.incoming().for_each(move |(socket, addr)| { + let pair = futures::lazy(|| futures::finished(socket.split())); + let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); - // Once all that is done we print out how much we wrote, and then - // critically we *spawn* this future which allows it to run - // concurrently with other connections. - let msg = amt.map(move |amt| { - println!("wrote {} bytes to {}", amt, addr) - }).map_err(|e| { - panic!("error: {}", e); - }); - pin.spawn(msg); + // Once all that is done we print out how much we wrote, and then + // critically we *spawn* this future which allows it to run + // concurrently with other connections. + let msg = amt.map(move |amt| { + println!("wrote {} bytes to {}", amt, addr) + }).map_err(|e| { + panic!("error: {}", e); + }); + handle.spawn(msg); - Ok(()) - }) + Ok(()) }); l.run(done).unwrap(); } diff --git a/examples/sink.rs b/examples/sink.rs index c825274c3..4e2373aa5 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -23,16 +23,15 @@ fn main() { let addr = addr.parse::().unwrap(); let mut l = Core::new().unwrap(); - let server = TcpListener::bind(&addr, &l.handle()).and_then(|socket| { - socket.incoming().and_then(|(socket, addr)| { - println!("got a socket: {}", addr); - write(socket).or_else(|_| Ok(())) - }).for_each(|()| { - println!("lost the socket"); - Ok(()) - }) - }); + let socket = TcpListener::bind(&addr, &l.handle()).unwrap(); println!("Listenering on: {}", addr); + let server = socket.incoming().and_then(|(socket, addr)| { + println!("got a socket: {}", addr); + write(socket).or_else(|_| Ok(())) + }).for_each(|()| { + println!("lost the socket"); + Ok(()) + }); l.run(server).unwrap(); } diff --git a/src/channel.rs b/src/channel.rs index 151abf956..e5aa86b23 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -6,11 +6,10 @@ use std::io; use std::sync::mpsc::TryRecvError; -use futures::{Future, Poll, Async}; +use futures::{Poll, Async}; use futures::stream::Stream; use mio::channel; -use io::IoFuture; use reactor::{Handle, PollEvented}; /// The transmission half of a channel used for sending messages to a receiver. @@ -35,12 +34,6 @@ pub struct Receiver { rx: PollEvented>, } -/// Future returned by the `channel` function which will resolve to a -/// `Receiver`. -pub struct ReceiverNew { - inner: IoFuture>, -} - /// Creates a new in-memory channel used for sending data across `Send + /// 'static` boundaries, frequently threads. /// @@ -53,12 +46,12 @@ pub struct ReceiverNew { /// The returned `Sender` can be used to send messages that are processed by /// the returned `Receiver`. The `Sender` can be cloned to send messages /// from multiple sources simultaneously. -pub fn channel(handle: &Handle) -> (Sender, ReceiverNew) +pub fn channel(handle: &Handle) -> io::Result<(Sender, Receiver)> where T: Send + 'static, { let (tx, rx) = channel::channel(); - let rx = PollEvented::new(rx, handle).map(|rx| Receiver { rx: rx }); - (Sender { tx: tx }, ReceiverNew { inner: rx.boxed() }) + let rx = try!(PollEvented::new(rx, handle)); + Ok((Sender { tx: tx }, Receiver { rx: rx })) } impl Sender { @@ -109,12 +102,3 @@ impl Stream for Receiver { } } } - -impl Future for ReceiverNew { - type Item = Receiver; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - self.inner.poll() - } -} diff --git a/src/lib.rs b/src/lib.rs index 737765550..f162e9e8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,34 +58,32 @@ //! //! // Create the event loop that will drive this server //! let mut l = Core::new().unwrap(); -//! let pin = l.pin(); +//! let handle = l.handle(); //! //! // Create a TCP listener which will listen for incoming connections -//! let server = TcpListener::bind(&addr, pin.handle()); +//! let socket = TcpListener::bind(&addr, &handle).unwrap(); //! -//! let done = server.and_then(|socket| { -//! // Once we've got the TCP listener, inform that we have it -//! println!("Listening on: {}", addr); +//! // Once we've got the TCP listener, inform that we have it +//! println!("Listening on: {}", addr); //! -//! // Pull out the stream of incoming connections and then for each new -//! // one spin up a new task copying data. -//! // -//! // We use the `io::copy` future to copy all data from the -//! // reading half onto the writing half. -//! socket.incoming().for_each(|(socket, addr)| { -//! let pair = futures::lazy(|| Ok(socket.split())); -//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); -//! -//! // Once all that is done we print out how much we wrote, and then -//! // critically we *spawn* this future which allows it to run -//! // concurrently with other connections. -//! pin.spawn(amt.then(move |result| { -//! println!("wrote {:?} bytes to {}", result, addr); -//! Ok(()) -//! })); +//! // Pull out the stream of incoming connections and then for each new +//! // one spin up a new task copying data. +//! // +//! // We use the `io::copy` future to copy all data from the +//! // reading half onto the writing half. +//! let done = socket.incoming().for_each(|(socket, addr)| { +//! let pair = futures::lazy(|| Ok(socket.split())); +//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); //! +//! // Once all that is done we print out how much we wrote, and then +//! // critically we *spawn* this future which allows it to run +//! // concurrently with other connections. +//! handle.spawn(amt.then(move |result| { +//! println!("wrote {:?} bytes to {}", result, addr); //! Ok(()) -//! }) +//! })); +//! +//! Ok(()) //! }); //! //! // Execute our server (modeled as a future) and wait for it to @@ -107,9 +105,6 @@ extern crate scoped_tls; #[macro_use] extern crate log; -mod slot; -mod lock; - #[macro_use] pub mod io; diff --git a/src/lock.rs b/src/lock.rs deleted file mode 100644 index e5bc6b2b6..000000000 --- a/src/lock.rs +++ /dev/null @@ -1,106 +0,0 @@ -//! A "mutex" which only supports try_lock -//! -//! As a futures library the eventual call to an event loop should be the only -//! thing that ever blocks, so this is assisted with a fast user-space -//! implementation of a lock that can only have a `try_lock` operation. - -extern crate core; - -use self::core::cell::UnsafeCell; -use self::core::ops::{Deref, DerefMut}; -use self::core::sync::atomic::Ordering::{Acquire, Release}; -use self::core::sync::atomic::AtomicBool; - -/// A "mutex" around a value, similar to `std::sync::Mutex`. -/// -/// This lock only supports the `try_lock` operation, however, and does not -/// implement poisoning. -pub struct Lock { - locked: AtomicBool, - data: UnsafeCell, -} - -/// Sentinel representing an acquired lock through which the data can be -/// accessed. -pub struct TryLock<'a, T: 'a> { - __ptr: &'a Lock, -} - -// The `Lock` structure is basically just a `Mutex`, and these two impls are -// intended to mirror the standard library's corresponding impls for `Mutex`. -// -// If a `T` is sendable across threads, so is the lock, and `T` must be sendable -// across threads to be `Sync` because it allows mutable access from multiple -// threads. -unsafe impl Send for Lock {} -unsafe impl Sync for Lock {} - -impl Lock { - /// Creates a new lock around the given value. - pub fn new(t: T) -> Lock { - Lock { - locked: AtomicBool::new(false), - data: UnsafeCell::new(t), - } - } - - /// Attempts to acquire this lock, returning whether the lock was acquired or - /// not. - /// - /// If `Some` is returned then the data this lock protects can be accessed - /// through the sentinel. This sentinel allows both mutable and immutable - /// access. - /// - /// If `None` is returned then the lock is already locked, either elsewhere - /// on this thread or on another thread. - pub fn try_lock(&self) -> Option> { - if !self.locked.swap(true, Acquire) { - Some(TryLock { __ptr: self }) - } else { - None - } - } -} - -impl<'a, T> Deref for TryLock<'a, T> { - type Target = T; - fn deref(&self) -> &T { - // The existence of `TryLock` represents that we own the lock, so we - // can safely access the data here. - unsafe { &*self.__ptr.data.get() } - } -} - -impl<'a, T> DerefMut for TryLock<'a, T> { - fn deref_mut(&mut self) -> &mut T { - // The existence of `TryLock` represents that we own the lock, so we - // can safely access the data here. - // - // Additionally, we're the *only* `TryLock` in existence so mutable - // access should be ok. - unsafe { &mut *self.__ptr.data.get() } - } -} - -impl<'a, T> Drop for TryLock<'a, T> { - fn drop(&mut self) { - self.__ptr.locked.store(false, Release); - } -} - -#[cfg(test)] -mod tests { - use super::Lock; - - #[test] - fn smoke() { - let a = Lock::new(1); - let mut a1 = a.try_lock().unwrap(); - assert!(a.try_lock().is_none()); - assert_eq!(*a1, 1); - *a1 = 2; - drop(a1); - assert_eq!(*a.try_lock().unwrap(), 2); - assert_eq!(*a.try_lock().unwrap(), 2); - } -} diff --git a/src/net/mod.rs b/src/net/mod.rs index 296ff873e..56d43c4c9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -7,5 +7,5 @@ mod tcp; mod udp; pub use self::tcp::{TcpStream, TcpStreamNew}; -pub use self::tcp::{TcpListener, TcpListenerNew, Incoming}; +pub use self::tcp::{TcpListener, Incoming}; pub use self::udp::{UdpSocket, UdpSocketNew}; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 463a042db..b3b4b8d77 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -4,7 +4,7 @@ use std::mem; use std::net::{self, SocketAddr, Shutdown}; use futures::stream::Stream; -use futures::{Future, IntoFuture, failed, Poll, Async}; +use futures::{self, Future, failed, Poll, Async}; use mio; use io::{Io, IoFuture, IoStream}; @@ -18,11 +18,6 @@ pub struct TcpListener { io: PollEvented, } -/// Future which will resolve to a `TcpListener` -pub struct TcpListenerNew { - inner: IoFuture, -} - /// Stream returned by the `TcpListener::incoming` function representing the /// stream of sockets received from a listener. pub struct Incoming { @@ -35,12 +30,9 @@ impl TcpListener { /// The TCP listener will bind to the provided `addr` address, if available, /// and will be returned as a future. The returned future, if resolved /// successfully, can then be used to accept incoming connections. - pub fn bind(addr: &SocketAddr, handle: &Handle) -> TcpListenerNew { - let future = match mio::tcp::TcpListener::bind(addr) { - Ok(l) => TcpListener::new(l, handle), - Err(e) => failed(e).boxed(), - }; - TcpListenerNew { inner: future } + pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result { + let l = try!(mio::tcp::TcpListener::bind(addr)); + TcpListener::new(l, handle) } /// Create a new TCP listener from the standard library's TCP listener. @@ -72,19 +64,15 @@ impl TcpListener { /// well (same for IPv6). pub fn from_listener(listener: net::TcpListener, addr: &SocketAddr, - handle: &Handle) -> IoFuture { - let handle = handle.clone(); - mio::tcp::TcpListener::from_listener(listener, addr) - .into_future() - .and_then(move |l| TcpListener::new(l, &handle)) - .boxed() + handle: &Handle) -> io::Result { + let l = try!(mio::tcp::TcpListener::from_listener(listener, addr)); + TcpListener::new(l, handle) } fn new(listener: mio::tcp::TcpListener, handle: &Handle) - -> IoFuture { - PollEvented::new(listener, handle).map(|io| { - TcpListener { io: io } - }).boxed() + -> io::Result { + let io = try!(PollEvented::new(listener, handle)); + Ok(TcpListener { io: io }) } /// Test whether this socket is ready to be read or not. @@ -129,13 +117,19 @@ impl TcpListener { } } - let handle = self.io.handle().clone(); + let remote = self.io.remote().clone(); let stream = MyIncoming { inner: self }; Incoming { inner: stream.and_then(move |(tcp, addr)| { - PollEvented::new(tcp, &handle).map(move |io| { - (TcpStream { io: io }, addr) - }) + let (tx, rx) = futures::oneshot(); + remote.spawn(move |handle| { + let res = PollEvented::new(tcp, handle).map(move |io| { + (TcpStream { io: io }, addr) + }); + tx.complete(res); + Ok(()) + }); + rx.then(|r| r.expect("shouldn't be canceled")) }).boxed(), } } @@ -185,15 +179,6 @@ impl fmt::Debug for TcpListener { } } -impl Future for TcpListenerNew { - type Item = TcpListener; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll() - } -} - impl Stream for Incoming { type Item = (TcpStream, SocketAddr); type Error = io::Error; @@ -242,7 +227,8 @@ impl TcpStream { fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle) -> IoFuture { - PollEvented::new(connected_stream, handle).and_then(|io| { + let tcp = PollEvented::new(connected_stream, handle); + futures::done(tcp).and_then(|io| { TcpStreamConnect::Waiting(TcpStream { io: io }) }).boxed() } diff --git a/src/net/udp.rs b/src/net/udp.rs index e11ed8733..a3ef6f30e 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -2,7 +2,7 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; use std::fmt; -use futures::{Future, failed, Poll, Async}; +use futures::{Future, Poll, Async}; use mio; use io::IoFuture; @@ -25,18 +25,14 @@ impl UdpSocket { /// `addr` provided. The returned future will be resolved once the socket /// has successfully bound. If an error happens during the binding or during /// the socket creation, that error will be returned to the future instead. - pub fn bind(addr: &SocketAddr, handle: &Handle) -> UdpSocketNew { - let future = match mio::udp::UdpSocket::bind(addr) { - Ok(udp) => UdpSocket::new(udp, handle), - Err(e) => failed(e).boxed(), - }; - UdpSocketNew { inner: future } + pub fn bind(addr: &SocketAddr, handle: &Handle) -> io::Result { + let udp = try!(mio::udp::UdpSocket::bind(addr)); + UdpSocket::new(udp, handle) } - fn new(socket: mio::udp::UdpSocket, handle: &Handle) -> IoFuture { - PollEvented::new(socket, handle).map(|io| { - UdpSocket { io: io } - }).boxed() + fn new(socket: mio::udp::UdpSocket, handle: &Handle) -> io::Result { + let io = try!(PollEvented::new(socket, handle)); + Ok(UdpSocket { io: io }) } /// Creates a new `UdpSocket` from the previously bound socket provided. @@ -49,11 +45,9 @@ impl UdpSocket { /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. pub fn from_socket(socket: net::UdpSocket, - handle: &Handle) -> IoFuture { - match mio::udp::UdpSocket::from_socket(socket) { - Ok(udp) => UdpSocket::new(udp, handle), - Err(e) => failed(e).boxed(), - } + handle: &Handle) -> io::Result { + let udp = try!(mio::udp::UdpSocket::from_socket(socket)); + UdpSocket::new(udp, handle) } /// Returns the local address that this stream is bound to. diff --git a/src/reactor/io_token.rs b/src/reactor/io_token.rs index dbd6c70d1..94782e5f5 100644 --- a/src/reactor/io_token.rs +++ b/src/reactor/io_token.rs @@ -2,19 +2,10 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::io; -use futures::{Future, Poll}; use futures::task; use mio; -use reactor::{Message, Handle, CoreFuture, Direction, Core}; - -/// A future which will resolve a unique `tok` token for an I/O object. -/// -/// Created through the `Handle::add_source` method, this future can also -/// resolve to an error if there's an issue communicating with the event loop. -pub struct IoTokenNew { - inner: CoreFuture<(E, (Arc, usize)), E>, -} +use reactor::{Message, Remote, Handle, Direction}; /// A token that identifies an active timeout. pub struct IoToken { @@ -40,15 +31,13 @@ impl IoToken { /// The returned future will panic if the event loop this handle is /// associated with has gone away, or if there is an error communicating /// with the event loop. - pub fn new(source: E, handle: &Handle) -> IoTokenNew - where E: mio::Evented + Send + 'static, - { - IoTokenNew { - inner: CoreFuture { - handle: handle.clone(), - data: Some(source), - result: None, - }, + pub fn new(source: &mio::Evented, handle: &Handle) -> io::Result { + match handle.inner.upgrade() { + Some(inner) => { + let (ready, token) = try!(inner.borrow_mut().add_source(source)); + Ok(IoToken { token: token, readiness: ready }) + } + None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")), } } @@ -93,7 +82,7 @@ impl IoToken { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_read(&self, handle: &Handle) { + pub fn schedule_read(&self, handle: &Remote) { handle.send(Message::Schedule(self.token, task::park(), Direction::Read)); } @@ -120,7 +109,7 @@ impl IoToken { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_write(&self, handle: &Handle) { + pub fn schedule_write(&self, handle: &Remote) { handle.send(Message::Schedule(self.token, task::park(), Direction::Write)); } @@ -146,30 +135,7 @@ impl IoToken { /// This function will panic if the event loop this handle is associated /// with has gone away, or if there is an error communicating with the event /// loop. - pub fn drop_source(&self, handle: &Handle) { + pub fn drop_source(&self, handle: &Remote) { handle.send(Message::DropSource(self.token)); } } - -impl Future for IoTokenNew - where E: mio::Evented + Send + 'static, -{ - type Item = (E, IoToken); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { - let res = try_ready!(self.inner.poll(|lp, io| { - let pair = try!(lp.add_source(&io)); - Ok((io, pair)) - }, |io, slot| { - Message::Run(Box::new(move |lp: &Core| { - let res = lp.add_source(&io).map(|p| (io, p)); - slot.try_produce(res).ok() - .expect("add source try_produce intereference"); - })) - })); - - let (io, (ready, token)) = res; - Ok((io, IoToken { token: token, readiness: ready }).into()) - } -} diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 6ebdad22f..c3e0a24ed 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -12,12 +12,11 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::time::{Instant, Duration}; -use futures::{Future, Poll, IntoFuture, Async}; +use futures::{Future, IntoFuture, Async}; use futures::task::{self, Unpark, Task, Spawn}; use mio; use slab::Slab; -use slot::{self, Slot}; use timer_wheel::{TimerWheel, Timeout as WheelTimeout}; mod channel; @@ -27,8 +26,8 @@ use self::channel::{Sender, Receiver, channel}; mod poll_evented; mod timeout; -pub use self::poll_evented::{PollEvented, PollEventedNew}; -pub use self::timeout::{Timeout, TimeoutNew}; +pub use self::poll_evented::PollEvented; +pub use self::timeout::Timeout; static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; scoped_thread_local!(static CURRENT_LOOP: Core); @@ -43,23 +42,25 @@ const SLAB_CAPACITY: usize = 1024 * 64; /// various I/O objects to interact with the event loop in interesting ways. // TODO: expand this pub struct Core { - id: usize, - io: mio::Poll, events: mio::Events, tx: Sender, rx: Receiver, - io_dispatch: RefCell>, - task_dispatch: RefCell>, - - // Incoming queue of newly spawned futures - new_futures: Rc, - _new_futures_registration: mio::Registration, + inner: Rc>, // Used for determining when the future passed to `run` is ready. Once the // registration is passed to `io` above we never touch it again, just keep // it alive. _future_registration: mio::Registration, future_readiness: Arc, +} + +struct Inner { + id: usize, + io: mio::Poll, + + // Dispatch slabs for I/O and futures events + io_dispatch: Slab, + task_dispatch: Slab, // Timer wheel keeping track of all timeouts. The `usize` stored in the // timer wheel is an index into the slab below. @@ -67,8 +68,8 @@ pub struct Core { // The slab below keeps track of the timeouts themselves as well as the // state of the timeout itself. The `TimeoutToken` type is an index into the // `timeouts` slab. - timer_wheel: RefCell>, - timeouts: RefCell>, + timer_wheel: TimerWheel, + timeouts: Slab<(WheelTimeout, TimeoutState)>, } /// Handle to an event loop, used to construct I/O objects, send messages, and @@ -77,7 +78,7 @@ pub struct Core { /// Handles can be cloned, and when cloned they will still refer to the /// same underlying event loop. #[derive(Clone)] -pub struct Handle { +pub struct Remote { id: usize, tx: Sender, } @@ -85,9 +86,9 @@ pub struct Handle { /// A non-sendable handle to an event loop, useful for manufacturing instances /// of `LoopData`. #[derive(Clone)] -pub struct Pinned { - handle: Handle, - futures: Weak, +pub struct Handle { + remote: Remote, + inner: Weak>, } struct ScheduledIo { @@ -102,11 +103,6 @@ struct ScheduledTask { wake: Arc, } -struct NewFutures { - queue: RefCell>>>, - ready: mio::SetReadiness, -} - enum TimeoutState { NotFired, Fired, @@ -121,7 +117,6 @@ enum Direction { enum Message { DropSource(usize), Schedule(usize, Task, Direction), - AddTimeout(Instant, Arc>>), UpdateTimeout(usize, Task), CancelTimeout(usize), Run(Box), @@ -129,8 +124,7 @@ enum Message { const TOKEN_MESSAGES: mio::Token = mio::Token(0); const TOKEN_FUTURE: mio::Token = mio::Token(1); -const TOKEN_NEW_FUTURES: mio::Token = mio::Token(2); -const TOKEN_START: usize = 3; +const TOKEN_START: usize = 2; impl Core { /// Creates a new event loop, returning any error that happened during the @@ -146,52 +140,43 @@ impl Core { TOKEN_FUTURE, mio::Ready::readable(), mio::PollOpt::level()); - let new_future_pair = mio::Registration::new(&io, - TOKEN_NEW_FUTURES, - mio::Ready::readable(), - mio::PollOpt::level()); Ok(Core { - id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), - io: io, events: mio::Events::with_capacity(1024), tx: tx, rx: rx, - io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)), - task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)), - timeouts: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)), - timer_wheel: RefCell::new(TimerWheel::new()), _future_registration: future_pair.0, future_readiness: Arc::new(MySetReadiness(future_pair.1)), - _new_futures_registration: new_future_pair.0, - new_futures: Rc::new(NewFutures { - queue: RefCell::new(Vec::new()), - ready: new_future_pair.1, - }), + + inner: Rc::new(RefCell::new(Inner { + id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), + io: io, + io_dispatch: Slab::with_capacity(SLAB_CAPACITY), + task_dispatch: Slab::with_capacity(SLAB_CAPACITY), + timeouts: Slab::with_capacity(SLAB_CAPACITY), + timer_wheel: TimerWheel::new(), + })), }) } - /// Generates a handle to this event loop used to construct I/O objects and - /// send messages. + /// Returns a handle to this event loop which cannot be sent across threads + /// but can be used as a proxy to the event loop itself. /// - /// Handles to an event loop are cloneable as well and clones will always - /// refer to the same event loop. + /// Handles are cloneable and clones always refer to the same event loop. + /// This handle is typically passed into functions that create I/O objects + /// to bind them to this event loop. pub fn handle(&self) -> Handle { Handle { - id: self.id, - tx: self.tx.clone(), + remote: self.remote(), + inner: Rc::downgrade(&self.inner), } } - /// Returns a "pin" of this event loop which cannot be sent across threads - /// but can be used as a proxy to the event loop itself. - /// - /// Currently the primary use for this is to use as a handle to add data - /// to the event loop directly. The `Pinned::add_loop_data` method can - /// be used to immediately create instances of `LoopData` structures. - pub fn pin(&self) -> Pinned { - Pinned { - handle: self.handle(), - futures: Rc::downgrade(&self.new_futures), + /// Generates a remote handle to this event loop which can be used to spawn + /// tasks from other threads into this event loop. + pub fn remote(&self) -> Remote { + Remote { + id: self.inner.borrow().id, + tx: self.tx.clone(), } } @@ -256,14 +241,15 @@ impl Core { // attaching strace, or similar. let start = Instant::now(); loop { - let timeout = self.timer_wheel.borrow().next_timeout().map(|t| { + let inner = self.inner.borrow_mut(); + let timeout = inner.timer_wheel.next_timeout().map(|t| { if t < start { Duration::new(0, 0) } else { t - start } }); - match self.io.poll(&mut self.events, timeout) { + match inner.io.poll(&mut self.events, timeout) { Ok(a) => { amt = a; break; @@ -294,12 +280,6 @@ impl Core { if !finished && CURRENT_LOOP.set(self, || done()) { finished = true; } - } else if token == TOKEN_NEW_FUTURES { - self.new_futures.ready.set_readiness(mio::Ready::none()).unwrap(); - let mut new_futures = self.new_futures.queue.borrow_mut(); - for future in new_futures.drain(..) { - self.spawn(future); - } } else { self.dispatch(token, event.kind()); } @@ -309,7 +289,7 @@ impl Core { } } - fn dispatch(&self, token: mio::Token, ready: mio::Ready) { + fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { let token = usize::from(token) - TOKEN_START; if token % 2 == 0 { self.dispatch_io(token / 2, ready) @@ -318,10 +298,11 @@ impl Core { } } - fn dispatch_io(&self, token: usize, ready: mio::Ready) { + fn dispatch_io(&mut self, token: usize, ready: mio::Ready) { let mut reader = None; let mut writer = None; - if let Some(io) = self.io_dispatch.borrow_mut().get_mut(token) { + let mut inner = self.inner.borrow_mut(); + if let Some(io) = inner.io_dispatch.get_mut(token) { if ready.is_readable() { reader = io.reader.take(); io.readiness.fetch_or(1, Ordering::Relaxed); @@ -331,6 +312,7 @@ impl Core { io.readiness.fetch_or(2, Ordering::Relaxed); } } + drop(inner); // TODO: don't notify the same task twice if let Some(reader) = reader { self.notify_handle(reader); @@ -340,8 +322,9 @@ impl Core { } } - fn dispatch_task(&self, token: usize) { - let (task, wake) = match self.task_dispatch.borrow_mut().get_mut(token) { + fn dispatch_task(&mut self, token: usize) { + let mut inner = self.inner.borrow_mut(); + let (task, wake) = match inner.task_dispatch.get_mut(token) { Some(slot) => (slot.spawn.take(), slot.wake.clone()), None => return, }; @@ -350,24 +333,31 @@ impl Core { Some(task) => task, None => return, }; + drop(inner); let res = CURRENT_LOOP.set(self, || task.poll_future(wake)); - let mut dispatch = self.task_dispatch.borrow_mut(); + inner = self.inner.borrow_mut(); match res { Ok(Async::NotReady) => { - assert!(dispatch[token].spawn.is_none()); - dispatch[token].spawn = Some(task); + assert!(inner.task_dispatch[token].spawn.is_none()); + inner.task_dispatch[token].spawn = Some(task); } Ok(Async::Ready(())) | Err(()) => { - dispatch.remove(token).unwrap(); + inner.task_dispatch.remove(token).unwrap(); } } } fn consume_timeouts(&mut self, now: Instant) { - while let Some(idx) = self.timer_wheel.borrow_mut().poll(now) { + loop { + let mut inner = self.inner.borrow_mut(); + let idx = match inner.timer_wheel.poll(now) { + Some(idx) => idx, + None => break, + }; trace!("firing timeout: {}", idx); - let handle = self.timeouts.borrow_mut()[idx].1.fire(); + let handle = inner.timeouts[idx].1.fire(); + drop(inner); if let Some(handle) = handle { self.notify_handle(handle); } @@ -383,109 +373,6 @@ impl Core { CURRENT_LOOP.set(&self, || handle.unpark()); } - fn add_source(&self, source: &mio::Evented) - -> io::Result<(Arc, usize)> { - debug!("adding a new I/O source"); - let sched = ScheduledIo { - readiness: Arc::new(AtomicUsize::new(0)), - reader: None, - writer: None, - }; - let mut dispatch = self.io_dispatch.borrow_mut(); - if dispatch.vacant_entry().is_none() { - let amt = dispatch.len(); - dispatch.reserve_exact(amt); - } - let entry = dispatch.vacant_entry().unwrap(); - try!(self.io.register(source, - mio::Token(TOKEN_START + entry.index() * 2), - mio::Ready::readable() | mio::Ready::writable(), - mio::PollOpt::edge())); - Ok((sched.readiness.clone(), entry.insert(sched).index())) - } - - fn drop_source(&self, token: usize) { - debug!("dropping I/O source: {}", token); - self.io_dispatch.borrow_mut().remove(token).unwrap(); - } - - fn schedule(&self, token: usize, wake: Task, dir: Direction) { - debug!("scheduling direction for: {}", token); - let to_call = { - let mut dispatch = self.io_dispatch.borrow_mut(); - let sched = dispatch.get_mut(token).unwrap(); - let (slot, bit) = match dir { - Direction::Read => (&mut sched.reader, 1), - Direction::Write => (&mut sched.writer, 2), - }; - if sched.readiness.load(Ordering::SeqCst) & bit != 0 { - *slot = None; - Some(wake) - } else { - *slot = Some(wake); - None - } - }; - if let Some(to_call) = to_call { - debug!("schedule immediately done"); - self.notify_handle(to_call); - } - } - - fn add_timeout(&self, at: Instant) -> io::Result<(usize, Instant)> { - let mut timeouts = self.timeouts.borrow_mut(); - if timeouts.vacant_entry().is_none() { - let len = timeouts.len(); - timeouts.reserve_exact(len); - } - let entry = timeouts.vacant_entry().unwrap(); - let timeout = self.timer_wheel.borrow_mut().insert(at, entry.index()); - let when = *timeout.when(); - let entry = entry.insert((timeout, TimeoutState::NotFired)); - debug!("added a timeout: {}", entry.index()); - Ok((entry.index(), when)) - } - - fn update_timeout(&self, token: usize, handle: Task) { - debug!("updating a timeout: {}", token); - let to_wake = self.timeouts.borrow_mut()[token].1.block(handle); - if let Some(to_wake) = to_wake { - self.notify_handle(to_wake); - } - } - - fn cancel_timeout(&self, token: usize) { - debug!("cancel a timeout: {}", token); - let pair = self.timeouts.borrow_mut().remove(token); - if let Some((timeout, _state)) = pair { - self.timer_wheel.borrow_mut().cancel(&timeout); - } - } - - fn spawn(&self, future: Box>) { - let unpark = { - let mut dispatch = self.task_dispatch.borrow_mut(); - if dispatch.vacant_entry().is_none() { - let len = dispatch.len(); - dispatch.reserve_exact(len); - } - let entry = dispatch.vacant_entry().unwrap(); - let token = TOKEN_START + 2 * entry.index() + 1; - let pair = mio::Registration::new(&self.io, - mio::Token(token), - mio::Ready::readable(), - mio::PollOpt::level()); - let unpark = Arc::new(MySetReadiness(pair.1)); - let entry = entry.insert(ScheduledTask { - spawn: Some(task::spawn(future)), - wake: unpark, - _registration: pair.0, - }); - entry.get().wake.clone() - }; - unpark.unpark(); - } - fn consume_queue(&self) { debug!("consuming notification queue"); // TODO: can we do better than `.unwrap()` here? @@ -496,21 +383,118 @@ impl Core { fn notify(&self, msg: Message) { match msg { - Message::DropSource(tok) => self.drop_source(tok), - Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir), - - Message::AddTimeout(at, slot) => { - slot.try_produce(self.add_timeout(at)) - .expect("interference with try_produce on timeout"); + Message::DropSource(tok) => self.inner.borrow_mut().drop_source(tok), + Message::Schedule(tok, wake, dir) => { + let task = self.inner.borrow_mut().schedule(tok, wake, dir); + if let Some(task) = task { + self.notify_handle(task); + } + } + Message::UpdateTimeout(t, handle) => { + let task = self.inner.borrow_mut().update_timeout(t, handle); + if let Some(task) = task { + self.notify_handle(task); + } + } + Message::CancelTimeout(t) => { + self.inner.borrow_mut().cancel_timeout(t) } - Message::UpdateTimeout(t, handle) => self.update_timeout(t, handle), - Message::CancelTimeout(t) => self.cancel_timeout(t), Message::Run(r) => r.call_box(self), } } } -impl Handle { +impl Inner { + fn add_source(&mut self, source: &mio::Evented) + -> io::Result<(Arc, usize)> { + debug!("adding a new I/O source"); + let sched = ScheduledIo { + readiness: Arc::new(AtomicUsize::new(0)), + reader: None, + writer: None, + }; + if self.io_dispatch.vacant_entry().is_none() { + let amt = self.io_dispatch.len(); + self.io_dispatch.reserve_exact(amt); + } + let entry = self.io_dispatch.vacant_entry().unwrap(); + try!(self.io.register(source, + mio::Token(TOKEN_START + entry.index() * 2), + mio::Ready::readable() | mio::Ready::writable(), + mio::PollOpt::edge())); + Ok((sched.readiness.clone(), entry.insert(sched).index())) + } + + fn drop_source(&mut self, token: usize) { + debug!("dropping I/O source: {}", token); + self.io_dispatch.remove(token).unwrap(); + } + + fn schedule(&mut self, token: usize, wake: Task, dir: Direction) + -> Option { + debug!("scheduling direction for: {}", token); + let sched = self.io_dispatch.get_mut(token).unwrap(); + let (slot, bit) = match dir { + Direction::Read => (&mut sched.reader, 1), + Direction::Write => (&mut sched.writer, 2), + }; + if sched.readiness.load(Ordering::SeqCst) & bit != 0 { + *slot = None; + Some(wake) + } else { + *slot = Some(wake); + None + } + } + + fn add_timeout(&mut self, at: Instant) -> io::Result<(usize, Instant)> { + if self.timeouts.vacant_entry().is_none() { + let len = self.timeouts.len(); + self.timeouts.reserve_exact(len); + } + let entry = self.timeouts.vacant_entry().unwrap(); + let timeout = self.timer_wheel.insert(at, entry.index()); + let when = *timeout.when(); + let entry = entry.insert((timeout, TimeoutState::NotFired)); + debug!("added a timeout: {}", entry.index()); + Ok((entry.index(), when)) + } + + fn update_timeout(&mut self, token: usize, handle: Task) -> Option { + debug!("updating a timeout: {}", token); + self.timeouts[token].1.block(handle) + } + + fn cancel_timeout(&mut self, token: usize) { + debug!("cancel a timeout: {}", token); + let pair = self.timeouts.remove(token); + if let Some((timeout, _state)) = pair { + self.timer_wheel.cancel(&timeout); + } + } + + fn spawn(&mut self, future: Box>) { + if self.task_dispatch.vacant_entry().is_none() { + let len = self.task_dispatch.len(); + self.task_dispatch.reserve_exact(len); + } + let entry = self.task_dispatch.vacant_entry().unwrap(); + let token = TOKEN_START + 2 * entry.index() + 1; + let pair = mio::Registration::new(&self.io, + mio::Token(token), + mio::Ready::readable(), + mio::PollOpt::level()); + let unpark = Arc::new(MySetReadiness(pair.1)); + let entry = entry.insert(ScheduledTask { + spawn: Some(task::spawn(future)), + wake: unpark, + _registration: pair.0, + }); + entry.get().wake.clone().unpark(); + } +} + +impl Remote { fn send(&self, msg: Message) { self.with_loop(|lp| { match lp { @@ -541,7 +525,8 @@ impl Handle { { if CURRENT_LOOP.is_set() { CURRENT_LOOP.with(|lp| { - if lp.id == self.id { + let same = lp.inner.borrow().id == self.id; + if same { f(Some(lp)) } else { f(None) @@ -561,84 +546,32 @@ impl Handle { /// Note that while the closure, `F`, requires the `Send` bound as it might /// cross threads, the future `R` does not. pub fn spawn(&self, f: F) - where F: FnOnce(&Pinned) -> R + Send + 'static, + where F: FnOnce(&Handle) -> R + Send + 'static, R: IntoFuture, R::Future: 'static, { self.send(Message::Run(Box::new(|lp: &Core| { - let f = f(&lp.pin()); - lp.spawn(Box::new(f.into_future())); + let f = f(&lp.handle()); + lp.inner.borrow_mut().spawn(Box::new(f.into_future())); }))); } } -impl Pinned { - /// Returns a reference to the underlying handle to the event loop. - pub fn handle(&self) -> &Handle { - &self.handle +impl Handle { + /// Returns a reference to the underlying remote handle to the event loop. + pub fn remote(&self) -> &Remote { + &self.remote } /// Spawns a new future on the event loop this pin is associated this. pub fn spawn(&self, f: F) where F: Future + 'static, { - let inner = match self.futures.upgrade() { + let inner = match self.inner.upgrade() { Some(inner) => inner, None => return, }; - inner.queue.borrow_mut().push(Box::new(f)); - inner.ready.set_readiness(mio::Ready::readable()).unwrap(); - } -} - -struct CoreFuture { - handle: Handle, - data: Option, - result: Option<(Arc>>, slot::Token)>, -} - -impl CoreFuture - where T: 'static, -{ - fn poll(&mut self, f: F, g: G) -> Poll - where F: FnOnce(&Core, U) -> io::Result, - G: FnOnce(U, Arc>>) -> Message, - { - match self.result { - Some((ref result, ref mut token)) => { - result.cancel(*token); - match result.try_consume() { - Ok(Ok(t)) => return Ok(t.into()), - Ok(Err(e)) => return Err(e), - Err(_) => {} - } - let task = task::park(); - *token = result.on_full(move |_| { - task.unpark(); - }); - Ok(Async::NotReady) - } - None => { - let data = &mut self.data; - let ret = self.handle.with_loop(|lp| { - lp.map(|lp| f(lp, data.take().unwrap())) - }); - if let Some(ret) = ret { - debug!("loop future done immediately on event loop"); - return ret.map(|e| e.into()) - } - debug!("loop future needs to send info to event loop"); - - let task = task::park(); - let result = Arc::new(Slot::new(None)); - let token = result.on_full(move |_| { - task.unpark(); - }); - self.result = Some((result.clone(), token)); - self.handle.send(g(data.take().unwrap(), result)); - Ok(Async::NotReady) - } - } + inner.borrow_mut().spawn(Box::new(f)); } } diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 90bc0881b..1e9f9e338 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -9,12 +9,12 @@ use std::io::{self, Read, Write}; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::{Future, Poll, Async}; +use futures::Async; use mio; use io::Io; -use reactor::Handle; -use reactor::io_token::{IoToken, IoTokenNew}; +use reactor::{Handle, Remote}; +use reactor::io_token::IoToken; /// A concrete implementation of a stream of readiness notifications for I/O /// objects that originates from an event loop. @@ -34,31 +34,24 @@ use reactor::io_token::{IoToken, IoTokenNew}; /// any scheduling necessary to get notified when the event is ready again. pub struct PollEvented { token: IoToken, - handle: Handle, + handle: Remote, readiness: AtomicUsize, io: E, } -/// Future returned from `PollEvented::new` which will resolve to a -/// `PollEvented`. -pub struct PollEventedNew { - inner: IoTokenNew, - handle: Handle, -} - -impl PollEvented - where E: mio::Evented + Send + 'static, -{ +impl PollEvented { /// Creates a new readiness stream associated with the provided /// `loop_handle` and for the given `source`. /// /// This method returns a future which will resolve to the readiness stream /// when it's ready. - pub fn new(source: E, handle: &Handle) -> PollEventedNew { - PollEventedNew { - inner: IoToken::new(source, handle), - handle: handle.clone(), - } + pub fn new(io: E, handle: &Handle) -> io::Result> { + Ok(PollEvented { + token: try!(IoToken::new(&io, handle)), + handle: handle.remote().clone(), + readiness: AtomicUsize::new(0), + io: io, + }) } } @@ -137,7 +130,7 @@ impl PollEvented { /// Returns a reference to the event loop handle that this readiness stream /// is associated with. - pub fn handle(&self) -> &Handle { + pub fn remote(&self) -> &Remote { &self.handle } @@ -266,20 +259,3 @@ impl Drop for PollEvented { self.token.drop_source(&self.handle); } } - -impl Future for PollEventedNew - where E: mio::Evented + Send + 'static, -{ - type Item = PollEvented; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - let (io, token) = try_ready!(self.inner.poll()); - Ok(PollEvented { - token: token, - handle: self.handle.clone(), - io: io, - readiness: AtomicUsize::new(0), - }.into()) - } -} diff --git a/src/reactor/timeout.rs b/src/reactor/timeout.rs index 79d59c1ac..7efbf54b1 100644 --- a/src/reactor/timeout.rs +++ b/src/reactor/timeout.rs @@ -8,9 +8,8 @@ use std::time::{Duration, Instant}; use futures::{Future, Poll, Async}; -use reactor::Handle; +use reactor::{Remote, Handle}; use reactor::timeout_token::TimeoutToken; -use io::IoFuture; /// A future representing the notification that a timeout has occurred. /// @@ -21,13 +20,7 @@ use io::IoFuture; /// otherwise indicated to fire at. pub struct Timeout { token: TimeoutToken, - handle: Handle, -} - -/// Future returned from `Timeout::new` and `Timeout::new_at` which will resolve -/// to the actual `Timeout` itself. -pub struct TimeoutNew { - inner: IoFuture, + handle: Remote, } impl Timeout { @@ -36,7 +29,7 @@ impl Timeout { /// This function will return a future that will resolve to the actual /// timeout object. The timeout object itself is then a future which will be /// set to fire at the specified point in the future. - pub fn new(dur: Duration, handle: &Handle) -> TimeoutNew { + pub fn new(dur: Duration, handle: &Handle) -> io::Result { Timeout::new_at(Instant::now() + dur, handle) } @@ -45,16 +38,11 @@ impl Timeout { /// This function will return a future that will resolve to the actual /// timeout object. The timeout object itself is then a future which will be /// set to fire at the specified point in the future. - pub fn new_at(at: Instant, handle: &Handle) -> TimeoutNew { - let handle = handle.clone(); - TimeoutNew { - inner: TimeoutToken::new(at, &handle).map(move |token| { - Timeout { - token: token, - handle: handle, - } - }).boxed(), - } + pub fn new_at(at: Instant, handle: &Handle) -> io::Result { + Ok(Timeout { + token: try!(TimeoutToken::new(at, &handle)), + handle: handle.remote().clone(), + }) } } @@ -74,15 +62,6 @@ impl Future for Timeout { } } -impl Future for TimeoutNew { - type Item = Timeout; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll() - } -} - impl Drop for Timeout { fn drop(&mut self) { self.token.cancel_timeout(&self.handle); diff --git a/src/reactor/timeout_token.rs b/src/reactor/timeout_token.rs index f7c02d480..98b38716b 100644 --- a/src/reactor/timeout_token.rs +++ b/src/reactor/timeout_token.rs @@ -1,16 +1,9 @@ use std::io; use std::time::Instant; -use futures::{Future, Poll}; use futures::task; -use reactor::{Message, Core, Handle, CoreFuture}; - -/// Return value from the `Handle::add_timeout` method, a future that will -/// resolve to a `TimeoutToken` to configure the behavior of that timeout. -pub struct TimeoutTokenNew { - inner: CoreFuture<(usize, Instant), Instant>, -} +use reactor::{Message, Handle, Remote}; /// A token that identifies an active timeout. pub struct TimeoutToken { @@ -21,13 +14,13 @@ pub struct TimeoutToken { impl TimeoutToken { /// Adds a new timeout to get fired at the specified instant, notifying the /// specified task. - pub fn new(at: Instant, handle: &Handle) -> TimeoutTokenNew { - TimeoutTokenNew { - inner: CoreFuture { - handle: handle.clone(), - data: Some(at), - result: None, - }, + pub fn new(at: Instant, handle: &Handle) -> io::Result { + match handle.inner.upgrade() { + Some(inner) => { + let (token, when) = try!(inner.borrow_mut().add_timeout(at)); + Ok(TimeoutToken { token: token, when: when }) + } + None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")), } } @@ -47,7 +40,7 @@ impl TimeoutToken { /// /// This method will panic if the timeout specified was not created by this /// loop handle's `add_timeout` method. - pub fn update_timeout(&self, handle: &Handle) { + pub fn update_timeout(&self, handle: &Remote) { handle.send(Message::UpdateTimeout(self.token, task::park())) } @@ -57,22 +50,8 @@ impl TimeoutToken { /// /// This method will panic if the timeout specified was not created by this /// loop handle's `add_timeout` method. - pub fn cancel_timeout(&self, handle: &Handle) { + pub fn cancel_timeout(&self, handle: &Remote) { debug!("cancel timeout {}", self.token); handle.send(Message::CancelTimeout(self.token)) } } - -impl Future for TimeoutTokenNew { - type Item = TimeoutToken; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - let (t, i) = try_ready!(self.inner.poll(Core::add_timeout, - Message::AddTimeout)); - Ok(TimeoutToken { - token: t, - when: i, - }.into()) - } -} diff --git a/src/slot.rs b/src/slot.rs deleted file mode 100644 index d802c9876..000000000 --- a/src/slot.rs +++ /dev/null @@ -1,691 +0,0 @@ -//! A slot in memory for communicating between a producer and a consumer. -//! -//! This module contains an implementation detail of this library for a type -//! which is only intended to be shared between one consumer and one producer of -//! a value. It is unlikely that this module will survive stabilization of this -//! library, so it is not recommended to rely on it. - -#![allow(dead_code)] // imported in a few places - -use std::prelude::v1::*; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use lock::Lock; - -/// A slot in memory intended to represent the communication channel between one -/// producer and one consumer. -/// -/// Each slot contains space for a piece of data of type `T`, and can have -/// callbacks registered to run when the slot is either full or empty. -/// -/// Slots are only intended to be shared between exactly one producer and -/// exactly one consumer. If there are multiple concurrent producers or -/// consumers then this is still memory safe but will have unpredictable results -/// (and maybe panics). Note that this does not require that the "consumer" is -/// the same for the entire lifetime of a slot, simply that there is only one -/// consumer at a time. -/// -/// # Registering callbacks -/// -/// [`on_empty`](#method.on_empty) registers a callback to run when the slot -/// becomes empty, and [`on_full`](#method.on_full) registers one to run when it -/// becomes full. In both cases, the callback will run immediately if possible. -/// -/// At most one callback can be registered at any given time: it is an error to -/// attempt to register a callback with `on_full` if one is currently registered -/// via `on_empty`, or any other combination. -/// -/// # Cancellation -/// -/// Registering a callback returns a `Token` which can be used to -/// [`cancel`](#method.cancel) the callback. Only callbacks that have not yet -/// started running can be canceled. Canceling a callback that has already run -/// is not an error, and `cancel` does not signal whether or not the callback -/// was actually canceled to the caller. -pub struct Slot { - // The purpose of this data type is to communicate when a value becomes - // available and coordinate between a producer and consumer about that - // value. Slots end up being at the core of many futures as they handle - // values transferring between producers and consumers, which means that - // they can never block. - // - // As a result, this `Slot` is a lock-free implementation in terms of not - // actually blocking at any point in time. The `Lock` types are - // half-optional and half-not-optional. They aren't actually mutexes as they - // only support a `try_lock` operation, and all the methods below ensure - // that progress can always be made without blocking. - // - // The `state` variable keeps track of the state of this slot, while the - // other fields here are just the payloads of the slot itself. Note that the - // exact bits of `state` are typically wrapped up in a `State` for - // inspection (see below). - state: AtomicUsize, - slot: Lock>, - on_full: Lock>>>, - on_empty: Lock>, Option)>>, -} - -/// Error value returned from erroneous calls to `try_produce`, which contains -/// the value that was passed to `try_produce`. -#[derive(Debug, PartialEq)] -pub struct TryProduceError(T); - -/// Error value returned from erroneous calls to `try_consume`. -#[derive(Debug, PartialEq)] -pub struct TryConsumeError(()); - -/// Error value returned from erroneous calls to `on_full`. -#[derive(Debug, PartialEq)] -pub struct OnFullError(()); - -/// Error value returned from erroneous calls to `on_empty`. -#[derive(Debug, PartialEq)] -pub struct OnEmptyError(()); - -/// A `Token` represents a registered callback, and can be used to cancel the callback. -#[derive(Clone, Copy)] -pub struct Token(usize); - -// Slot state: the lowest 3 bits are flags; the remaining bits are used to -// store the `Token` for the currently registered callback. The special token -// value 0 means no callback is registered. -// -// The flags are: -// - `DATA`: the `Slot` contains a value -// - `ON_FULL`: the `Slot` has an `on_full` callback registered -// - `ON_EMPTY`: the `Slot` has an `on_empty` callback registered -struct State(usize); - -const DATA: usize = 1 << 0; -const ON_FULL: usize = 1 << 1; -const ON_EMPTY: usize = 1 << 2; -const STATE_BITS: usize = 3; -const STATE_MASK: usize = (1 << STATE_BITS) - 1; - -fn _is_send() {} -fn _is_sync() {} - -fn _assert() { - _is_send::>(); - _is_sync::>(); -} - -impl Slot { - /// Creates a new `Slot` containing `val`, which may be `None` to create an - /// empty `Slot`. - pub fn new(val: Option) -> Slot { - Slot { - state: AtomicUsize::new(if val.is_some() {DATA} else {0}), - slot: Lock::new(val), - on_full: Lock::new(None), - on_empty: Lock::new(None), - } - } - - /// Attempts to store `t` in the slot. - /// - /// This method can only be called by the one consumer working on this - /// `Slot`. Concurrent calls to this method or `on_empty` will result in - /// panics or possibly errors. - /// - /// # Errors - /// - /// Returns `Err` if the slot is already full. The value you attempted to - /// store is included in the error value. - /// - /// # Panics - /// - /// This method will panic if called concurrently with `try_produce` or - /// `on_empty`, or if `on_empty` has been called previously but the callback - /// hasn't fired. - pub fn try_produce(&self, t: T) -> Result<(), TryProduceError> { - // First up, let's take a look at our current state. Of our three flags, - // we check a few: - // - // * DATA - if this is set, then the production fails as a value has - // already been produced and we're not ready to receive it yet. - // * ON_EMPTY - this should never be set as it indicates a contract - // violation as the producer already registered interest in - // a value but the callback wasn't fired. - // * ON_FULL - doesn't matter in this use case, we don't check it as - // either state is valid. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_EMPTY)); - if state.flag(DATA) { - return Err(TryProduceError(t)) - } - - // Ok, so we've determined that our state is either `ON_FULL` or `0`, in - // both cases we're going to store our data into our slot. This should - // always succeed as access to `slot` is gated on the `DATA` flag being - // set on the consumer side (which isn't set) and there should only be - // one producer. - let mut slot = self.slot.try_lock().expect("interference with consumer?"); - assert!(slot.is_none()); - *slot = Some(t); - drop(slot); - - // Next, we update our state with `DATA` to say that something is - // available, and we also unset `ON_FULL` because we'll invoke the - // callback if it's available. - loop { - assert!(!state.flag(ON_EMPTY)); - let new_state = state.set_flag(DATA, true).set_flag(ON_FULL, false); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - - // If our previous state we transitioned from indicates that it has an - // on-full callback, we call that callback here. There's a few unwraps - // here that should never fail because the consumer shouldn't be placing - // another callback here and there shouldn't be any other producers as - // well. - if state.flag(ON_FULL) { - let cb = self.on_full.try_lock().expect("interference2") - .take().expect("ON_FULL but no callback"); - cb.call_box(self); - } - Ok(()) - } - - /// Registers `f` as a callback to run when the slot becomes empty. - /// - /// The callback will run immediately if the slot is already empty. Returns - /// a token that can be used to cancel the callback. This method is to be - /// called by the producer, and it is illegal to call this method - /// concurrently with either `on_empty` or `try_produce`. - /// - /// # Panics - /// - /// Panics if another callback was already registered via `on_empty` or - /// `on_full`, or if this value is called concurrently with other producer - /// methods. - pub fn on_empty(&self, item: Option, f: F) -> Token - where F: FnOnce(&Slot, Option) + Send + 'static - { - // First up, as usual, take a look at our state. Of the three flags we - // check two: - // - // * DATA - if set, we keep going, but if unset we're done as there's no - // data and we're already empty. - // * ON_EMPTY - this should be impossible as it's a contract violation - // to call this twice or concurrently. - // * ON_FULL - it's illegal to have both an empty and a full callback - // simultaneously, so we check this just after we ensure - // there's data available. If there's data there should not - // be a full callback as it should have been called. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_EMPTY)); - if !state.flag(DATA) { - f(self, item); - return Token(0) - } - assert!(!state.flag(ON_FULL)); - - // At this point we've precisely determined that our state is `DATA` and - // all other flags are unset. We're cleared for landing in initializing - // the `on_empty` slot so we store our callback here. - let mut slot = self.on_empty.try_lock().expect("on_empty interference"); - assert!(slot.is_none()); - *slot = Some((Box::new(f), item)); - drop(slot); - - // In this loop, we transition ourselves from the `DATA` state to a - // state which has the on empty flag state. Note that we also increase - // the token of this state as we're registering a new callback. - loop { - assert!(state.flag(DATA)); - assert!(!state.flag(ON_FULL)); - assert!(!state.flag(ON_EMPTY)); - let new_state = state.set_flag(ON_EMPTY, true) - .set_token(state.token() + 1); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - - // If we succeeded in the CAS, then we're done and our token is - // valid. - if old == state.0 { - return Token(new_state.token()) - } - state.0 = old; - - // If we failed the CAS but the data was taken in the meantime we - // abort our attempt to set on-empty and call the callback - // immediately. Note that the on-empty flag was never set, so it - // should still be there and should be available to take. - if !state.flag(DATA) { - let cb = self.on_empty.try_lock().expect("on_empty interference2") - .take().expect("on_empty not empty??"); - let (cb, item) = cb; - cb.call_box(self, item); - return Token(0) - } - } - } - - /// Attempts to consume the value stored in the slot. - /// - /// This method can only be called by the one consumer of this slot, and - /// cannot be called concurrently with `try_consume` or `on_full`. - /// - /// # Errors - /// - /// Returns `Err` if the slot is already empty. - /// - /// # Panics - /// - /// This method will panic if called concurrently with `try_consume` or - /// `on_full`, or otherwise show weird behavior. - pub fn try_consume(&self) -> Result { - // The implementation of this method is basically the same as - // `try_produce` above, it's just the opposite of all the operations. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_FULL)); - if !state.flag(DATA) { - return Err(TryConsumeError(())) - } - let mut slot = self.slot.try_lock().expect("interference with producer?"); - let val = slot.take().expect("DATA but not data"); - drop(slot); - - loop { - assert!(!state.flag(ON_FULL)); - let new_state = state.set_flag(DATA, false).set_flag(ON_EMPTY, false); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - assert!(!state.flag(ON_FULL)); - if state.flag(ON_EMPTY) { - let cb = self.on_empty.try_lock().expect("interference3") - .take().expect("ON_EMPTY but no callback"); - let (cb, item) = cb; - cb.call_box(self, item); - } - Ok(val) - } - - /// Registers `f` as a callback to run when the slot becomes full. - /// - /// The callback will run immediately if the slot is already full. Returns a - /// token that can be used to cancel the callback. - /// - /// This method is to be called by the consumer. - /// - /// # Panics - /// - /// Panics if another callback was already registered via `on_empty` or - /// `on_full` or if called concurrently with `on_full` or `try_consume`. - pub fn on_full(&self, f: F) -> Token - where F: FnOnce(&Slot) + Send + 'static - { - // The implementation of this method is basically the same as - // `on_empty` above, it's just the opposite of all the operations. - let mut state = State(self.state.load(Ordering::SeqCst)); - assert!(!state.flag(ON_FULL)); - if state.flag(DATA) { - f(self); - return Token(0) - } - assert!(!state.flag(ON_EMPTY)); - - let mut slot = self.on_full.try_lock().expect("on_full interference"); - assert!(slot.is_none()); - *slot = Some(Box::new(f)); - drop(slot); - - loop { - assert!(!state.flag(DATA)); - assert!(!state.flag(ON_EMPTY)); - assert!(!state.flag(ON_FULL)); - let new_state = state.set_flag(ON_FULL, true) - .set_token(state.token() + 1); - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - return Token(new_state.token()) - } - state.0 = old; - - if state.flag(DATA) { - let cb = self.on_full.try_lock().expect("on_full interference2") - .take().expect("on_full not full??"); - cb.call_box(self); - return Token(0) - } - } - } - - /// Cancels the callback associated with `token`. - /// - /// Canceling a callback that has already started running, or has already - /// run will do nothing, and is not an error. See - /// [Cancellation](#cancellation). - /// - /// # Panics - /// - /// This method may cause panics if it is called concurrently with - /// `on_empty` or `on_full`, depending on which callback is being canceled. - pub fn cancel(&self, token: Token) { - // Tokens with a value of "0" are sentinels which don't actually do - // anything. - let token = token.0; - if token == 0 { - return - } - - let mut state = State(self.state.load(Ordering::SeqCst)); - loop { - // If we've moved on to a different token, then we're guaranteed - // that our token won't show up again, so we can return immediately - // as our closure has likely already run (or been previously - // canceled). - if state.token() != token { - return - } - - // If our token matches, then let's see if we're cancelling either - // the on-full or on-empty callbacks. It's illegal to have them both - // registered, so we only need to look at one. - // - // If neither are set then the token has probably already run, so we - // just continue along our merry way and don't worry. - let new_state = if state.flag(ON_FULL) { - assert!(!state.flag(ON_EMPTY)); - state.set_flag(ON_FULL, false) - } else if state.flag(ON_EMPTY) { - assert!(!state.flag(ON_FULL)); - state.set_flag(ON_EMPTY, false) - } else { - return - }; - let old = self.state.compare_and_swap(state.0, - new_state.0, - Ordering::SeqCst); - if old == state.0 { - break - } - state.0 = old; - } - - // Figure out which callback we just canceled, and now that the flag is - // unset we should own the callback to clear it. - - if state.flag(ON_FULL) { - let cb = self.on_full.try_lock().expect("on_full interference3") - .take().expect("on_full not full??"); - drop(cb); - } else { - let cb = self.on_empty.try_lock().expect("on_empty interference3") - .take().expect("on_empty not empty??"); - drop(cb); - } - } -} - -impl TryProduceError { - /// Extracts the value that was attempted to be produced. - pub fn into_inner(self) -> T { - self.0 - } -} - -trait FnBox: Send { - fn call_box(self: Box, other: &Slot); -} - -impl FnBox for F - where F: FnOnce(&Slot) + Send, -{ - fn call_box(self: Box, other: &Slot) { - (*self)(other) - } -} - -trait FnBox2: Send { - fn call_box(self: Box, other: &Slot, Option); -} - -impl FnBox2 for F - where F: FnOnce(&Slot, Option) + Send, -{ - fn call_box(self: Box, other: &Slot, item: Option) { - (*self)(other, item) - } -} - -impl State { - fn flag(&self, f: usize) -> bool { - self.0 & f != 0 - } - - fn set_flag(&self, f: usize, val: bool) -> State { - State(if val { - self.0 | f - } else { - self.0 & !f - }) - } - - fn token(&self) -> usize { - self.0 >> STATE_BITS - } - - fn set_token(&self, gen: usize) -> State { - State((gen << STATE_BITS) | (self.0 & STATE_MASK)) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::thread; - - use super::Slot; - - #[test] - fn sequential() { - let slot = Slot::new(Some(1)); - - // We can consume once - assert_eq!(slot.try_consume(), Ok(1)); - assert!(slot.try_consume().is_err()); - - // Consume a production - assert_eq!(slot.try_produce(2), Ok(())); - assert_eq!(slot.try_consume(), Ok(2)); - - // Can't produce twice - assert_eq!(slot.try_produce(3), Ok(())); - assert!(slot.try_produce(3).is_err()); - - // on_full is run immediately if full - let hit = Arc::new(AtomicUsize::new(0)); - let hit2 = hit.clone(); - slot.on_full(move |_s| { - hit2.fetch_add(1, Ordering::SeqCst); - }); - assert_eq!(hit.load(Ordering::SeqCst), 1); - - // on_full can be run twice, and we can consume in the callback - let hit2 = hit.clone(); - slot.on_full(move |s| { - hit2.fetch_add(1, Ordering::SeqCst); - assert_eq!(s.try_consume(), Ok(3)); - }); - assert_eq!(hit.load(Ordering::SeqCst), 2); - - // Production can't run a previous callback - assert_eq!(slot.try_produce(4), Ok(())); - assert_eq!(hit.load(Ordering::SeqCst), 2); - assert_eq!(slot.try_consume(), Ok(4)); - - // Productions run new callbacks - let hit2 = hit.clone(); - slot.on_full(move |s| { - hit2.fetch_add(1, Ordering::SeqCst); - assert_eq!(s.try_consume(), Ok(5)); - }); - assert_eq!(slot.try_produce(5), Ok(())); - assert_eq!(hit.load(Ordering::SeqCst), 3); - - // on empty should fire immediately for an empty slot - let hit2 = hit.clone(); - slot.on_empty(None, move |_, _| { - hit2.fetch_add(1, Ordering::SeqCst); - }); - assert_eq!(hit.load(Ordering::SeqCst), 4); - } - - #[test] - fn channel() { - const N: usize = 10000; - - struct Sender { - slot: Arc>, - hit: Arc, - } - - struct Receiver { - slot: Arc>, - hit: Arc, - } - - impl Sender { - fn send(&self, val: usize) { - if self.slot.try_produce(val).is_ok() { - return - } - let me = thread::current(); - self.hit.store(0, Ordering::SeqCst); - let hit = self.hit.clone(); - self.slot.on_empty(None, move |_slot, _| { - hit.store(1, Ordering::SeqCst); - me.unpark(); - }); - while self.hit.load(Ordering::SeqCst) == 0 { - thread::park(); - } - self.slot.try_produce(val).expect("can't produce after on_empty") - } - } - - impl Receiver { - fn recv(&self) -> usize { - if let Ok(i) = self.slot.try_consume() { - return i - } - - let me = thread::current(); - self.hit.store(0, Ordering::SeqCst); - let hit = self.hit.clone(); - self.slot.on_full(move |_slot| { - hit.store(1, Ordering::SeqCst); - me.unpark(); - }); - while self.hit.load(Ordering::SeqCst) == 0 { - thread::park(); - } - self.slot.try_consume().expect("can't consume after on_full") - } - } - - let slot = Arc::new(Slot::new(None)); - let slot2 = slot.clone(); - - let tx = Sender { slot: slot2, hit: Arc::new(AtomicUsize::new(0)) }; - let rx = Receiver { slot: slot, hit: Arc::new(AtomicUsize::new(0)) }; - - let a = thread::spawn(move || { - for i in 0..N { - assert_eq!(rx.recv(), i); - } - }); - - for i in 0..N { - tx.send(i); - } - - a.join().unwrap(); - } - - #[test] - fn cancel() { - let slot = Slot::new(None); - let hits = Arc::new(AtomicUsize::new(0)); - - let add = || { - let hits = hits.clone(); - move |_: &Slot| { hits.fetch_add(1, Ordering::SeqCst); } - }; - let add_empty = || { - let hits = hits.clone(); - move |_: &Slot, _: Option| { - hits.fetch_add(1, Ordering::SeqCst); - } - }; - - // cancel on_full - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_consume().is_err()); - assert!(slot.try_produce(1).is_ok()); - assert!(slot.try_consume().is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n); - - // cancel on_empty - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.try_produce(1).unwrap(); - let token = slot.on_empty(None, add_empty()); - assert_eq!(hits.load(Ordering::SeqCst), n); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_produce(1).is_err()); - - // cancel with no effect - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - assert!(slot.try_consume().is_ok()); - let token = slot.on_empty(None, add_empty()); - assert_eq!(hits.load(Ordering::SeqCst), n + 2); - slot.cancel(token); - assert_eq!(hits.load(Ordering::SeqCst), n + 2); - - // cancel old ones don't count - let n = hits.load(Ordering::SeqCst); - assert_eq!(hits.load(Ordering::SeqCst), n); - let token1 = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n); - assert!(slot.try_produce(1).is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - assert!(slot.try_consume().is_ok()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - let token2 = slot.on_full(add()); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token1); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - slot.cancel(token2); - assert_eq!(hits.load(Ordering::SeqCst), n + 1); - } -} diff --git a/tests/buffered.rs b/tests/buffered.rs index 63aeca8a8..819fe4ed2 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -25,8 +25,7 @@ fn echo_server() { drop(env_logger::init()); let mut l = t!(Core::new()); - let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let msg = "foo bar baz"; diff --git a/tests/chain.rs b/tests/chain.rs index a80eceedf..ae91ca274 100644 --- a/tests/chain.rs +++ b/tests/chain.rs @@ -21,8 +21,7 @@ macro_rules! t { #[test] fn chain_clients() { let mut l = t!(Core::new()); - let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { diff --git a/tests/echo.rs b/tests/echo.rs index 3d661a8b5..83c5a8390 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -24,8 +24,7 @@ fn echo_server() { drop(env_logger::init()); let mut l = t!(Core::new()); - let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let msg = "foo bar baz"; diff --git a/tests/limit.rs b/tests/limit.rs index d1e4b0568..dac363d1c 100644 --- a/tests/limit.rs +++ b/tests/limit.rs @@ -21,8 +21,7 @@ macro_rules! t { #[test] fn limit() { let mut l = t!(Core::new()); - let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { diff --git a/tests/spawn.rs b/tests/spawn.rs index e3e0bbb55..1d39543d5 100644 --- a/tests/spawn.rs +++ b/tests/spawn.rs @@ -12,11 +12,11 @@ fn simple() { let (tx1, rx1) = futures::oneshot(); let (tx2, rx2) = futures::oneshot(); - lp.pin().spawn(futures::lazy(|| { + lp.handle().spawn(futures::lazy(|| { tx1.complete(1); Ok(()) })); - lp.handle().spawn(|_| { + lp.remote().spawn(|_| { futures::lazy(|| { tx2.complete(2); Ok(()) @@ -33,10 +33,10 @@ fn spawn_in_poll() { let (tx1, rx1) = futures::oneshot(); let (tx2, rx2) = futures::oneshot(); - let handle = lp.handle(); - lp.pin().spawn(futures::lazy(move || { + let remote = lp.remote(); + lp.handle().spawn(futures::lazy(move || { tx1.complete(1); - handle.spawn(|_| { + remote.spawn(|_| { futures::lazy(|| { tx2.complete(2); Ok(()) diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index 6465b9950..262b4f7ad 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -24,8 +24,7 @@ fn echo_server() { drop(env_logger::init()); let mut l = t!(Core::new()); - let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { diff --git a/tests/tcp.rs b/tests/tcp.rs index ed76ec14f..929c5962e 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -40,8 +40,7 @@ fn connect() { fn accept() { drop(env_logger::init()); let mut l = t!(Core::new()); - let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let (tx, rx) = channel(); @@ -66,8 +65,7 @@ fn accept() { fn accept2() { drop(env_logger::init()); let mut l = t!(Core::new()); - let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle()); - let srv = t!(l.run(srv)); + let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let addr = t!(srv.local_addr()); let t = thread::spawn(move || { diff --git a/tests/timeout.rs b/tests/timeout.rs index c2beaef48..3f241cd34 100644 --- a/tests/timeout.rs +++ b/tests/timeout.rs @@ -4,7 +4,6 @@ extern crate tokio_core; use std::time::{Instant, Duration}; -use futures::Future; use tokio_core::reactor::{Core, Timeout}; macro_rules! t { @@ -19,7 +18,7 @@ fn smoke() { drop(env_logger::init()); let mut l = t!(Core::new()); let dur = Duration::from_millis(10); - let timeout = Timeout::new(dur, &l.handle()).and_then(|t| t); + let timeout = t!(Timeout::new(dur, &l.handle())); let start = Instant::now(); t!(l.run(timeout)); assert!(start.elapsed() >= dur); diff --git a/tests/udp.rs b/tests/udp.rs index 03e46da37..c5b0ef57d 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -19,9 +19,8 @@ macro_rules! t { #[test] fn send_messages() { let mut l = t!(Core::new()); - let a = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let b = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle()); - let (a, b) = t!(l.run(a.join(b))); + let a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); + let b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle())); let a_addr = t!(a.local_addr()); let b_addr = t!(b.local_addr());