Reorganize the entire crate:

Renamed APIs

* Loop => reactor::Core
* LoopHandle => reactor::Handle
* LoopPin => reactor::Pinned
* TcpStream => net::TcpStream
* TcpListener => net::TcpListener
* UdpSocket => net::UdpSocket
* Sender => channel::Sender
* Receiver => channel::Receiver
* Timeout => reactor::Timeout
* ReadinessStream => reactor::PollEvented
* All `LoopHandle` methods to construct objects are now free functions on the
  associated types, e.g. `LoopHandle::tcp_listen` is now `TcpListener::bind`
* All APIs taking a `Handle` now take a `Handle` as the last argument
* All future-returning APIs now return concrete types instead of trait objects

Added APIs

* io::Io trait -- Read + Write + ability to poll

Removed without replacement:

* AddSource
* AddTimeout
* IoToken
* TimeoutToken

Closes #3
Closes #6
This commit is contained in:
Alex Crichton 2016-09-02 11:07:52 -07:00
parent 93c61bb384
commit 6c045d31ac
24 changed files with 811 additions and 476 deletions

View File

@ -9,8 +9,9 @@ use std::net::SocketAddr;
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::Loop;
use tokio_core::io::{copy, TaskIo}; use tokio_core::io::{copy, TaskIo};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
fn main() { fn main() {
env_logger::init().unwrap(); env_logger::init().unwrap();
@ -18,11 +19,11 @@ fn main() {
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>().unwrap();
// Create the event loop that will drive this server // Create the event loop that will drive this server
let mut l = Loop::new().unwrap(); let mut l = Core::new().unwrap();
let pin = l.pin(); let pin = l.pin();
// Create a TCP listener which will listen for incoming connections // Create a TCP listener which will listen for incoming connections
let server = l.handle().tcp_listen(&addr); let server = TcpListener::bind(&addr, &l.handle());
let done = server.and_then(move |socket| { let done = server.and_then(move |socket| {
// Once we've got the TCP listener, inform that we have it // Once we've got the TCP listener, inform that we have it

View File

@ -14,14 +14,16 @@ use std::net::SocketAddr;
use futures::Future; use futures::Future;
use futures::stream::{self, Stream}; use futures::stream::{self, Stream};
use tokio_core::io::IoFuture; use tokio_core::io::IoFuture;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
fn main() { fn main() {
env_logger::init().unwrap(); env_logger::init().unwrap();
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap(); let addr = addr.parse::<SocketAddr>().unwrap();
let mut l = tokio_core::Loop::new().unwrap(); let mut l = Core::new().unwrap();
let server = l.handle().tcp_listen(&addr).and_then(|socket| { let server = TcpListener::bind(&addr, &l.handle()).and_then(|socket| {
socket.incoming().and_then(|(socket, addr)| { socket.incoming().and_then(|(socket, addr)| {
println!("got a socket: {}", addr); println!("got a socket: {}", addr);
write(socket).or_else(|_| Ok(())) write(socket).or_else(|_| Ok(()))
@ -34,7 +36,7 @@ fn main() {
l.run(server).unwrap(); l.run(server).unwrap();
} }
fn write(socket: tokio_core::TcpStream) -> IoFuture<()> { fn write(socket: TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024]; static BUF: &'static [u8] = &[0; 64 * 1024];
let iter = iter::repeat(()).map(|()| Ok(())); let iter = iter::repeat(()).map(|()| Ok(()));
stream::iter(iter).fold(socket, |socket, ()| { stream::iter(iter).fold(socket, |socket, ()| {

View File

@ -1,3 +1,8 @@
//! In-memory evented channels.
//!
//! This module contains a `Sender` and `Receiver` pair types which can be used
//! to send messages between different future tasks.
use std::io; use std::io;
use std::sync::mpsc::TryRecvError; use std::sync::mpsc::TryRecvError;
@ -5,15 +10,15 @@ use futures::{Future, Poll, Async};
use futures::stream::Stream; use futures::stream::Stream;
use mio::channel; use mio::channel;
use {ReadinessStream, LoopHandle};
use io::IoFuture; use io::IoFuture;
use reactor::{Handle, PollEvented};
/// The transmission half of a channel used for sending messages to a receiver. /// The transmission half of a channel used for sending messages to a receiver.
/// ///
/// A `Sender` can be `clone`d to have multiple threads or instances sending /// A `Sender` can be `clone`d to have multiple threads or instances sending
/// messages to one receiver. /// messages to one receiver.
/// ///
/// This type is created by the `LoopHandle::channel` method. /// This type is created by the `channel` function.
pub struct Sender<T> { pub struct Sender<T> {
tx: channel::Sender<T>, tx: channel::Sender<T>,
} }
@ -24,32 +29,36 @@ pub struct Sender<T> {
/// A `Receiver` cannot be cloned, so only one thread can receive messages at a /// A `Receiver` cannot be cloned, so only one thread can receive messages at a
/// time. /// time.
/// ///
/// This type is created by the `LoopHandle::channel` method and implements the /// This type is created by the `channel` function and implements the `Stream`
/// `Stream` trait to represent received messages. /// trait to represent received messages.
pub struct Receiver<T> { pub struct Receiver<T> {
rx: ReadinessStream<channel::Receiver<T>>, rx: PollEvented<channel::Receiver<T>>,
} }
impl LoopHandle { /// Future returned by the `channel` function which will resolve to a
/// Creates a new in-memory channel used for sending data across `Send + /// `Receiver<T>`.
/// 'static` boundaries, frequently threads. pub struct ReceiverNew<T> {
/// inner: IoFuture<Receiver<T>>,
/// This type can be used to conveniently send messages between futures. }
/// Unlike the futures crate `channel` method and types, the returned tx/rx
/// pair is a multi-producer single-consumer (mpsc) channel *with no /// Creates a new in-memory channel used for sending data across `Send +
/// backpressure*. Currently it's left up to the application to implement a /// 'static` boundaries, frequently threads.
/// mechanism, if necessary, to avoid messages piling up. ///
/// /// This type can be used to conveniently send messages between futures.
/// The returned `Sender` can be used to send messages that are processed by /// Unlike the futures crate `channel` method and types, the returned tx/rx
/// the returned `Receiver`. The `Sender` can be cloned to send messages /// pair is a multi-producer single-consumer (mpsc) channel *with no
/// from multiple sources simultaneously. /// backpressure*. Currently it's left up to the application to implement a
pub fn channel<T>(self) -> (Sender<T>, IoFuture<Receiver<T>>) /// mechanism, if necessary, to avoid messages piling up.
where T: Send + 'static, ///
{ /// The returned `Sender` can be used to send messages that are processed by
let (tx, rx) = channel::channel(); /// the returned `Receiver`. The `Sender` can be cloned to send messages
let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx }); /// from multiple sources simultaneously.
(Sender { tx: tx }, rx.boxed()) pub fn channel<T>(handle: &Handle) -> (Sender<T>, ReceiverNew<T>)
} where T: Send + 'static,
{
let (tx, rx) = channel::channel();
let rx = PollEvented::new(rx, handle).map(|rx| Receiver { rx: rx });
(Sender { tx: tx }, ReceiverNew { inner: rx.boxed() })
} }
impl<T> Sender<T> { impl<T> Sender<T> {
@ -87,7 +96,9 @@ impl<T> Stream for Receiver<T> {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<T>, io::Error> { fn poll(&mut self) -> Poll<Option<T>, io::Error> {
try_ready!(self.rx.poll_read()); if let Async::NotReady = self.rx.poll_read() {
return Ok(Async::NotReady)
}
match self.rx.get_ref().try_recv() { match self.rx.get_ref().try_recv() {
Ok(t) => Ok(Async::Ready(Some(t))), Ok(t) => Ok(Async::Ready(Some(t))),
Err(TryRecvError::Empty) => { Err(TryRecvError::Empty) => {
@ -98,3 +109,12 @@ impl<T> Stream for Receiver<T> {
} }
} }
} }
impl<T> Future for ReceiverNew<T> {
type Item = Receiver<T>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Receiver<T>, io::Error> {
self.inner.poll()
}
}

View File

@ -3,9 +3,9 @@
//! Contains various combinators to work with I/O objects and type definitions //! Contains various combinators to work with I/O objects and type definitions
//! as well. //! as well.
use std::io; use std::io::{self, Read, Write};
use futures::BoxFuture; use futures::{BoxFuture, Async};
use futures::stream::BoxStream; use futures::stream::BoxStream;
/// A convenience typedef around a `Future` whose error component is `io::Error` /// A convenience typedef around a `Future` whose error component is `io::Error`
@ -45,3 +45,74 @@ pub use self::read_to_end::{read_to_end, ReadToEnd};
pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite}; pub use self::task::{TaskIo, TaskIoRead, TaskIoWrite};
pub use self::window::Window; pub use self::window::Window;
pub use self::write_all::{write_all, WriteAll}; pub use self::write_all::{write_all, WriteAll};
/// A trait for read/write I/O objects
///
/// This trait represents I/O object which are readable and writable.
/// Additionally, they're associated with the ability to test whether they're
/// readable or writable.
///
/// Imporantly, the methods of this trait are intended to be used in conjuction
/// with the current task of a future. Namely whenever any of them return a
/// value that indicates "would block" the current future's task is arranged to
/// receive a notification when the method would otherwise not indicate that it
/// would block.
pub trait Io: Read + Write {
/// Tests to see if this I/O object may be readable.
///
/// This method returns an `Async<()>` indicating whether the object
/// **might** be readable. It is possible that even if this method returns
/// `Async::Ready` that a call to `read` would return a `WouldBlock` error.
///
/// There is a default implementation for this function which always
/// indicates that an I/O object is readable, but objects which can
/// implement a finer grained version of this are recommended to do so.
///
/// If this function returns `Async::NotReady` then the current future's
/// task is arranged to receive a notification when it might not return
/// `NotReady`.
///
/// # Panics
///
/// This method is likely to panic if called from outside the context of a
/// future's task.
fn poll_read(&mut self) -> Async<()> {
Async::Ready(())
}
/// Tests to see if this I/O object may be writable.
///
/// This method returns an `Async<()>` indicating whether the object
/// **might** be writable. It is possible that even if this method returns
/// `Async::Ready` that a call to `write` would return a `WouldBlock` error.
///
/// There is a default implementation for this function which always
/// indicates that an I/O object is writable, but objects which can
/// implement a finer grained version of this are recommended to do so.
///
/// If this function returns `Async::NotReady` then the current future's
/// task is arranged to receive a notification when it might not return
/// `NotReady`.
///
/// # Panics
///
/// This method is likely to panic if called from outside the context of a
/// future's task.
fn poll_write(&mut self) -> Async<()> {
Async::Ready(())
}
/// Helper method for splitting this read/write object into two halves.
///
/// The two halves returned implement the `Read` and `Write` traits,
/// respectively, but are only usable on the current task.
///
/// # Panics
///
/// This method will panic if there is not currently an active future task.
fn task_split(self) -> (TaskIoRead<Self>, TaskIoWrite<Self>)
where Self: Sized
{
TaskIo::new(self).split()
}
}

View File

@ -1,7 +1,98 @@
//! Mio bindings with streams and futures //! `Future`-powered I/O at the core of Tokio
//! //!
//! This crate uses the `futures_io` and `futures` crates to provide a thin //! This crate uses the `futures` crate to provide an event loop ("reactor
//! binding on top of mio of TCP and UDP sockets. //! core") which can be used to drive I/O like TCP and UDP, spawned future
//! tasks, and other events like channels/timeouts. All asynchronous I/O is
//! powered by the `mio` crate.
//!
//! The concrete types provided in this crate are relatively bare bones but are
//! intended to be the essential foundation for further projects needing an
//! event loop. In this crate you'll find:
//!
//! * TCP, both streams and listeners
//! * UDP sockets
//! * Message queues
//! * Timeouts
//!
//! More functionality is likely to be added over time, but otherwise the crate
//! is intended to be flexible with the `PollEvented` type which accepts any
//! type which implements `mio::Evented`. Using this if you'd like Unix domain
//! sockets, for example, the `tokio-uds` is built externally to offer this
//! functionality.
//!
//! Some other important tasks covered by this crate are:
//!
//! * The ability to spawn futures into an even loop. The `Handle` and `Pinned`
//! types have a `spawn` method which allows executing a future on an event
//! loop. The `Pinned::spawn` method crucially does not require the future
//! itself to be `Send`.
//!
//! * The `Io` trait serves as an abstraction for future crates to build on top
//! of. This packages up `Read` and `Write` functionality as well as the
//! ability to poll for readiness on both ends.
//!
//! * All I/O is futures-aware. If any action in this crate returns "not ready"
//! or "would block", then the current future task is scheduled to receive a
//! notification when it would otherwise make progress.
//!
//! # Examples
//!
//! A simple TCP echo server:
//!
//! ```no_run
//! extern crate futures;
//! extern crate tokio_core;
//!
//! use std::env;
//! use std::net::SocketAddr;
//!
//! use futures::Future;
//! use futures::stream::Stream;
//! use tokio_core::io::{copy, Io};
//! use tokio_core::net::TcpListener;
//! use tokio_core::reactor::Core;
//!
//! fn main() {
//! let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
//! let addr = addr.parse::<SocketAddr>().unwrap();
//!
//! // Create the event loop that will drive this server
//! let mut l = Core::new().unwrap();
//! let pin = l.pin();
//!
//! // Create a TCP listener which will listen for incoming connections
//! let server = TcpListener::bind(&addr, pin.handle());
//!
//! let done = server.and_then(|socket| {
//! // Once we've got the TCP listener, inform that we have it
//! println!("Listening on: {}", addr);
//!
//! // Pull out the stream of incoming connections and then for each new
//! // one spin up a new task copying data.
//! //
//! // We use the `io::copy` future to copy all data from the
//! // reading half onto the writing half.
//! socket.incoming().for_each(|(socket, addr)| {
//! let pair = futures::lazy(|| Ok(socket.task_split()));
//! let amt = pair.and_then(|(reader, writer)| copy(reader, writer));
//!
//! // Once all that is done we print out how much we wrote, and then
//! // critically we *spawn* this future which allows it to run
//! // concurrently with other connections.
//! pin.spawn(amt.then(move |result| {
//! println!("wrote {:?} bytes to {}", result, addr);
//! Ok(())
//! }));
//!
//! Ok(())
//! })
//! });
//!
//! // Execute our server (modeled as a future) and wait for it to
//! // complete.
//! l.run(done).unwrap();
//! }
//! ```
#![deny(missing_docs)] #![deny(missing_docs)]
@ -22,19 +113,8 @@ mod lock;
#[macro_use] #[macro_use]
pub mod io; pub mod io;
mod channel;
mod event_loop;
mod mpsc_queue; mod mpsc_queue;
mod readiness_stream;
mod tcp;
mod timeout;
mod timer_wheel; mod timer_wheel;
mod udp; pub mod channel;
pub mod net;
pub use channel::{Sender, Receiver}; pub mod reactor;
pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout};
pub use event_loop::{TimeoutToken, IoToken};
pub use readiness_stream::ReadinessStream;
pub use tcp::{TcpListener, TcpStream};
pub use timeout::Timeout;
pub use udp::UdpSocket;

11
src/net/mod.rs Normal file
View File

@ -0,0 +1,11 @@
//! TCP/UDP bindings for `tokio-core`
//!
//! This module contains the TCP/UDP networking types, similar to the standard
//! library, which can be used to implement networking protocols.
mod tcp;
mod udp;
pub use self::tcp::{TcpStream, TcpStreamNew};
pub use self::tcp::{TcpListener, TcpListenerNew, Incoming};
pub use self::udp::{UdpSocket, UdpSocketNew};

View File

@ -7,30 +7,45 @@ use futures::stream::Stream;
use futures::{Future, IntoFuture, failed, Poll, Async}; use futures::{Future, IntoFuture, failed, Poll, Async};
use mio; use mio;
use {ReadinessStream, LoopHandle}; use io::{Io, IoFuture, IoStream};
use io::{IoFuture, IoStream}; use reactor::{Handle, PollEvented};
/// An I/O object representing a TCP socket listening for incoming connections. /// An I/O object representing a TCP socket listening for incoming connections.
/// ///
/// This object can be converted into a stream of incoming connections for /// This object can be converted into a stream of incoming connections for
/// various forms of processing. /// various forms of processing.
pub struct TcpListener { pub struct TcpListener {
io: ReadinessStream<mio::tcp::TcpListener>, io: PollEvented<mio::tcp::TcpListener>,
}
/// Future which will resolve to a `TcpListener`
pub struct TcpListenerNew {
inner: IoFuture<TcpListener>,
}
/// Stream returned by the `TcpListener::incoming` function representing the
/// stream of sockets received from a listener.
pub struct Incoming {
inner: IoStream<(TcpStream, SocketAddr)>,
} }
impl TcpListener { impl TcpListener {
fn new(listener: mio::tcp::TcpListener, /// Create a new TCP listener associated with this event loop.
handle: LoopHandle) -> IoFuture<TcpListener> { ///
ReadinessStream::new(handle, listener).map(|io| { /// The TCP listener will bind to the provided `addr` address, if available,
TcpListener { /// and will be returned as a future. The returned future, if resolved
io: io, /// successfully, can then be used to accept incoming connections.
} pub fn bind(addr: &SocketAddr, handle: &Handle) -> TcpListenerNew {
}).boxed() let future = match mio::tcp::TcpListener::bind(addr) {
Ok(l) => TcpListener::new(l, handle),
Err(e) => failed(e).boxed(),
};
TcpListenerNew { inner: future }
} }
/// Create a new TCP listener from the standard library's TCP listener. /// Create a new TCP listener from the standard library's TCP listener.
/// ///
/// This method can be used when the `LoopHandle::tcp_listen` method isn't /// This method can be used when the `Handle::tcp_listen` method isn't
/// sufficient because perhaps some more configuration is needed in terms of /// sufficient because perhaps some more configuration is needed in terms of
/// before the calls to `bind` and `listen`. /// before the calls to `bind` and `listen`.
/// ///
@ -57,15 +72,23 @@ impl TcpListener {
/// well (same for IPv6). /// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener, pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr, addr: &SocketAddr,
handle: LoopHandle) -> IoFuture<TcpListener> { handle: &Handle) -> IoFuture<TcpListener> {
let handle = handle.clone();
mio::tcp::TcpListener::from_listener(listener, addr) mio::tcp::TcpListener::from_listener(listener, addr)
.into_future() .into_future()
.and_then(|l| TcpListener::new(l, handle)) .and_then(move |l| TcpListener::new(l, &handle))
.boxed() .boxed()
} }
fn new(listener: mio::tcp::TcpListener, handle: &Handle)
-> IoFuture<TcpListener> {
PollEvented::new(listener, handle).map(|io| {
TcpListener { io: io }
}).boxed()
}
/// Test whether this socket is ready to be read or not. /// Test whether this socket is ready to be read or not.
pub fn poll_read(&self) -> Poll<(), io::Error> { pub fn poll_read(&self) -> Async<()> {
self.io.poll_read() self.io.poll_read()
} }
@ -82,17 +105,19 @@ impl TcpListener {
/// ///
/// This method returns an implementation of the `Stream` trait which /// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener. /// resolves to the sockets the are accepted on this listener.
pub fn incoming(self) -> IoStream<(TcpStream, SocketAddr)> { pub fn incoming(self) -> Incoming {
struct Incoming { struct MyIncoming {
inner: TcpListener, inner: TcpListener,
} }
impl Stream for Incoming { impl Stream for MyIncoming {
type Item = (mio::tcp::TcpStream, SocketAddr); type Item = (mio::tcp::TcpStream, SocketAddr);
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
try_ready!(self.inner.io.poll_read()); if let Async::NotReady = self.inner.io.poll_read() {
return Ok(Async::NotReady)
}
match self.inner.io.get_ref().accept() { match self.inner.io.get_ref().accept() {
Ok(pair) => Ok(Async::Ready(Some(pair))), Ok(pair) => Ok(Async::Ready(Some(pair))),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
@ -104,13 +129,15 @@ impl TcpListener {
} }
} }
let loop_handle = self.io.loop_handle().clone(); let handle = self.io.handle().clone();
Incoming { inner: self } let stream = MyIncoming { inner: self };
.and_then(move |(tcp, addr)| { Incoming {
ReadinessStream::new(loop_handle.clone(), tcp).map(move |io| { inner: stream.and_then(move |(tcp, addr)| {
PollEvented::new(tcp, &handle).map(move |io| {
(TcpStream { io: io }, addr) (TcpStream { io: io }, addr)
}) })
}).boxed() }).boxed(),
}
} }
/// Sets the value for the `IP_TTL` option on this socket. /// Sets the value for the `IP_TTL` option on this socket.
@ -158,6 +185,24 @@ impl fmt::Debug for TcpListener {
} }
} }
impl Future for TcpListenerNew {
type Item = TcpListener;
type Error = io::Error;
fn poll(&mut self) -> Poll<TcpListener, io::Error> {
self.inner.poll()
}
}
impl Stream for Incoming {
type Item = (TcpStream, SocketAddr);
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
self.inner.poll()
}
}
/// An I/O object representing a TCP stream connected to a remote endpoint. /// An I/O object representing a TCP stream connected to a remote endpoint.
/// ///
/// A TCP stream can either be created by connecting to an endpoint or by /// A TCP stream can either be created by connecting to an endpoint or by
@ -165,27 +210,21 @@ impl fmt::Debug for TcpListener {
/// raw underlying I/O object as well as streams for the read/write /// raw underlying I/O object as well as streams for the read/write
/// notifications on the stream itself. /// notifications on the stream itself.
pub struct TcpStream { pub struct TcpStream {
io: ReadinessStream<mio::tcp::TcpStream>, io: PollEvented<mio::tcp::TcpStream>,
} }
enum TcpStreamNew { /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
/// when the stream is connected.
pub struct TcpStreamNew {
inner: IoFuture<TcpStream>,
}
enum TcpStreamConnect {
Waiting(TcpStream), Waiting(TcpStream),
Empty, Empty,
} }
impl LoopHandle { impl TcpStream {
/// Create a new TCP listener associated with this event loop.
///
/// The TCP listener will bind to the provided `addr` address, if available,
/// and will be returned as a future. The returned future, if resolved
/// successfully, can then be used to accept incoming connections.
pub fn tcp_listen(self, addr: &SocketAddr) -> IoFuture<TcpListener> {
match mio::tcp::TcpListener::bind(addr) {
Ok(l) => TcpListener::new(l, self),
Err(e) => failed(e).boxed(),
}
}
/// Create a new TCP stream connected to the specified address. /// Create a new TCP stream connected to the specified address.
/// ///
/// This function will create a new TCP socket and attempt to connect it to /// This function will create a new TCP socket and attempt to connect it to
@ -193,20 +232,18 @@ impl LoopHandle {
/// stream has successfully connected. If an error happens during the /// stream has successfully connected. If an error happens during the
/// connection or during the socket creation, that error will be returned to /// connection or during the socket creation, that error will be returned to
/// the future instead. /// the future instead.
pub fn tcp_connect(self, addr: &SocketAddr) -> IoFuture<TcpStream> { pub fn connect(addr: &SocketAddr, handle: &Handle) -> TcpStreamNew {
match mio::tcp::TcpStream::connect(addr) { let future = match mio::tcp::TcpStream::connect(addr) {
Ok(tcp) => TcpStream::new(tcp, self), Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(), Err(e) => failed(e).boxed(),
} };
TcpStreamNew { inner: future }
} }
}
impl TcpStream { fn new(connected_stream: mio::tcp::TcpStream, handle: &Handle)
fn new(connected_stream: mio::tcp::TcpStream,
handle: LoopHandle)
-> IoFuture<TcpStream> { -> IoFuture<TcpStream> {
ReadinessStream::new(handle, connected_stream).and_then(|io| { PollEvented::new(connected_stream, handle).and_then(|io| {
TcpStreamNew::Waiting(TcpStream { io: io }) TcpStreamConnect::Waiting(TcpStream { io: io })
}).boxed() }).boxed()
} }
@ -230,7 +267,7 @@ impl TcpStream {
/// (perhaps to `INADDR_ANY`) before this method is called. /// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream, pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr, addr: &SocketAddr,
handle: LoopHandle) -> IoFuture<TcpStream> { handle: &Handle) -> IoFuture<TcpStream> {
match mio::tcp::TcpStream::connect_stream(stream, addr) { match mio::tcp::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle), Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(), Err(e) => failed(e).boxed(),
@ -243,7 +280,7 @@ impl TcpStream {
/// get a notification when the socket does become readable. That is, this /// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will /// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again. /// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> { pub fn poll_read(&self) -> Async<()> {
self.io.poll_read() self.io.poll_read()
} }
@ -253,7 +290,7 @@ impl TcpStream {
/// get a notification when the socket does become writable. That is, this /// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will /// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again. /// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> { pub fn poll_write(&self) -> Async<()> {
self.io.poll_write() self.io.poll_write()
} }
@ -340,15 +377,81 @@ impl TcpStream {
} }
} }
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl Io for TcpStream {
fn poll_read(&mut self) -> Async<()> {
<TcpStream>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<TcpStream>::poll_write(self)
}
}
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl<'a> Io for &'a TcpStream {
fn poll_read(&mut self) -> Async<()> {
<TcpStream>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<TcpStream>::poll_write(self)
}
}
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
impl Future for TcpStreamNew { impl Future for TcpStreamNew {
type Item = TcpStream; type Item = TcpStream;
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<TcpStream, io::Error> {
self.inner.poll()
}
}
impl Future for TcpStreamConnect {
type Item = TcpStream;
type Error = io::Error;
fn poll(&mut self) -> Poll<TcpStream, io::Error> { fn poll(&mut self) -> Poll<TcpStream, io::Error> {
{ {
let stream = match *self { let stream = match *self {
TcpStreamNew::Waiting(ref s) => s, TcpStreamConnect::Waiting(ref s) => s,
TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), TcpStreamConnect::Empty => panic!("can't poll TCP stream twice"),
}; };
// Once we've connected, wait for the stream to be writable as // Once we've connected, wait for the stream to be writable as
@ -357,83 +460,20 @@ impl Future for TcpStreamNew {
// actually hit an error or not. // actually hit an error or not.
// //
// If all that succeeded then we ship everything on up. // If all that succeeded then we ship everything on up.
try_ready!(stream.io.poll_write()); if let Async::NotReady = stream.io.poll_write() {
return Ok(Async::NotReady)
}
if let Some(e) = try!(stream.io.get_ref().take_error()) { if let Some(e) = try!(stream.io.get_ref().take_error()) {
return Err(e) return Err(e)
} }
} }
match mem::replace(self, TcpStreamNew::Empty) { match mem::replace(self, TcpStreamConnect::Empty) {
TcpStreamNew::Waiting(stream) => Ok(Async::Ready(stream)), TcpStreamConnect::Waiting(stream) => Ok(Async::Ready(stream)),
TcpStreamNew::Empty => panic!(), TcpStreamConnect::Empty => panic!(),
} }
} }
} }
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
<&TcpStream>::read(&mut &*self, buf)
}
}
impl Write for TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
<&TcpStream>::write(&mut &*self, buf)
}
fn flush(&mut self) -> io::Result<()> {
<&TcpStream>::flush(&mut &*self)
}
}
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = try!(self.io.poll_read()) {
return Err(mio::would_block())
}
let r = self.io.get_ref().read(buf);
if is_wouldblock(&r) {
self.io.need_read();
}
r
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = try!(self.io.poll_write()) {
return Err(mio::would_block())
}
let r = self.io.get_ref().write(buf);
if is_wouldblock(&r) {
self.io.need_write();
}
r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = try!(self.io.poll_write()) {
return Err(mio::would_block())
}
let r = self.io.get_ref().flush();
if is_wouldblock(&r) {
self.io.need_write();
}
r
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.io.get_ref().fmt(f)
}
}
#[cfg(unix)] #[cfg(unix)]
mod sys { mod sys {
use std::os::unix::prelude::*; use std::os::unix::prelude::*;

View File

@ -5,33 +5,36 @@ use std::fmt;
use futures::{Future, failed, Poll, Async}; use futures::{Future, failed, Poll, Async};
use mio; use mio;
use {ReadinessStream, LoopHandle};
use io::IoFuture; use io::IoFuture;
use reactor::{Handle, PollEvented};
/// An I/O object representing a UDP socket. /// An I/O object representing a UDP socket.
pub struct UdpSocket { pub struct UdpSocket {
io: ReadinessStream<mio::udp::UdpSocket>, io: PollEvented<mio::udp::UdpSocket>,
} }
impl LoopHandle { /// Future returned from `UdpSocket::bind` which will resolve to a `UdpSocket`.
pub struct UdpSocketNew {
inner: IoFuture<UdpSocket>,
}
impl UdpSocket {
/// Create a new UDP socket bound to the specified address. /// Create a new UDP socket bound to the specified address.
/// ///
/// This function will create a new UDP socket and attempt to bind it to the /// This function will create a new UDP socket and attempt to bind it to the
/// `addr` provided. The returned future will be resolved once the socket /// `addr` provided. The returned future will be resolved once the socket
/// has successfully bound. If an error happens during the binding or during /// has successfully bound. If an error happens during the binding or during
/// the socket creation, that error will be returned to the future instead. /// the socket creation, that error will be returned to the future instead.
pub fn udp_bind(self, addr: &SocketAddr) -> IoFuture<UdpSocket> { pub fn bind(addr: &SocketAddr, handle: &Handle) -> UdpSocketNew {
match mio::udp::UdpSocket::bind(addr) { let future = match mio::udp::UdpSocket::bind(addr) {
Ok(udp) => UdpSocket::new(udp, self), Ok(udp) => UdpSocket::new(udp, handle),
Err(e) => failed(e).boxed(), Err(e) => failed(e).boxed(),
} };
UdpSocketNew { inner: future }
} }
}
impl UdpSocket { fn new(socket: mio::udp::UdpSocket, handle: &Handle) -> IoFuture<UdpSocket> {
fn new(socket: mio::udp::UdpSocket, handle: LoopHandle) PollEvented::new(socket, handle).map(|io| {
-> IoFuture<UdpSocket> {
ReadinessStream::new(handle, socket).map(|io| {
UdpSocket { io: io } UdpSocket { io: io }
}).boxed() }).boxed()
} }
@ -46,9 +49,9 @@ impl UdpSocket {
/// configure a socket before it's handed off, such as setting options like /// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses. /// `reuse_address` or binding to multiple addresses.
pub fn from_socket(socket: net::UdpSocket, pub fn from_socket(socket: net::UdpSocket,
handle: LoopHandle) -> IoFuture<UdpSocket> { handle: &Handle) -> IoFuture<UdpSocket> {
match mio::udp::UdpSocket::from_socket(socket) { match mio::udp::UdpSocket::from_socket(socket) {
Ok(tcp) => UdpSocket::new(tcp, handle), Ok(udp) => UdpSocket::new(udp, handle),
Err(e) => failed(e).boxed(), Err(e) => failed(e).boxed(),
} }
} }
@ -64,7 +67,7 @@ impl UdpSocket {
/// get a notification when the socket does become readable. That is, this /// get a notification when the socket does become readable. That is, this
/// is only suitable for calling in a `Future::poll` method and will /// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is readable again. /// automatically handle ensuring a retry once the socket is readable again.
pub fn poll_read(&self) -> Poll<(), io::Error> { pub fn poll_read(&self) -> Async<()> {
self.io.poll_read() self.io.poll_read()
} }
@ -74,7 +77,7 @@ impl UdpSocket {
/// get a notification when the socket does become writable. That is, this /// get a notification when the socket does become writable. That is, this
/// is only suitable for calling in a `Future::poll` method and will /// is only suitable for calling in a `Future::poll` method and will
/// automatically handle ensuring a retry once the socket is writable again. /// automatically handle ensuring a retry once the socket is writable again.
pub fn poll_write(&self) -> Poll<(), io::Error> { pub fn poll_write(&self) -> Async<()> {
self.io.poll_write() self.io.poll_write()
} }
@ -84,14 +87,14 @@ impl UdpSocket {
/// Address type can be any implementor of `ToSocketAddrs` trait. See its /// Address type can be any implementor of `ToSocketAddrs` trait. See its
/// documentation for concrete examples. /// documentation for concrete examples.
pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> { pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
if let Async::NotReady = try!(self.io.poll_write()) { if let Async::NotReady = self.io.poll_write() {
return Err(mio::would_block()) return Err(mio::would_block())
} }
match self.io.get_ref().send_to(buf, target) { match self.io.get_ref().send_to(buf, target) {
Ok(Some(n)) => Ok(n), Ok(Some(n)) => Ok(n),
Ok(None) => { Ok(None) => {
self.io.need_write(); self.io.need_write();
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) Err(mio::would_block())
} }
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -100,14 +103,14 @@ impl UdpSocket {
/// Receives data from the socket. On success, returns the number of bytes /// Receives data from the socket. On success, returns the number of bytes
/// read and the address from whence the data came. /// read and the address from whence the data came.
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
if let Async::NotReady = try!(self.io.poll_read()) { if let Async::NotReady = self.io.poll_read() {
return Err(mio::would_block()) return Err(mio::would_block())
} }
match self.io.get_ref().recv_from(buf) { match self.io.get_ref().recv_from(buf) {
Ok(Some(n)) => Ok(n), Ok(Some(n)) => Ok(n),
Ok(None) => { Ok(None) => {
self.io.need_read(); self.io.need_read();
Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) Err(mio::would_block())
} }
Err(e) => Err(e), Err(e) => Err(e),
} }
@ -260,6 +263,15 @@ impl fmt::Debug for UdpSocket {
} }
} }
impl Future for UdpSocketNew {
type Item = UdpSocket;
type Error = io::Error;
fn poll(&mut self) -> Poll<UdpSocket, io::Error> {
self.inner.poll()
}
}
#[cfg(unix)] #[cfg(unix)]
mod sys { mod sys {
use std::os::unix::prelude::*; use std::os::unix::prelude::*;

View File

@ -6,14 +6,14 @@ use futures::{Future, Poll};
use futures::task; use futures::task;
use mio; use mio;
use event_loop::{Message, LoopHandle, LoopFuture, Direction, Loop}; use reactor::{Message, Handle, CoreFuture, Direction, Core};
/// A future which will resolve a unique `tok` token for an I/O object. /// A future which will resolve a unique `tok` token for an I/O object.
/// ///
/// Created through the `LoopHandle::add_source` method, this future can also /// Created through the `Handle::add_source` method, this future can also
/// resolve to an error if there's an issue communicating with the event loop. /// resolve to an error if there's an issue communicating with the event loop.
pub struct AddSource<E> { pub struct IoTokenNew<E> {
inner: LoopFuture<(E, (Arc<AtomicUsize>, usize)), E>, inner: CoreFuture<(E, (Arc<AtomicUsize>, usize)), E>,
} }
/// A token that identifies an active timeout. /// A token that identifies an active timeout.
@ -23,7 +23,7 @@ pub struct IoToken {
readiness: Arc<AtomicUsize>, readiness: Arc<AtomicUsize>,
} }
impl LoopHandle { impl IoToken {
/// Add a new source to an event loop, returning a future which will resolve /// Add a new source to an event loop, returning a future which will resolve
/// to the token that can be used to identify this source. /// to the token that can be used to identify this source.
/// ///
@ -40,18 +40,37 @@ impl LoopHandle {
/// The returned future will panic if the event loop this handle is /// The returned future will panic if the event loop this handle is
/// associated with has gone away, or if there is an error communicating /// associated with has gone away, or if there is an error communicating
/// with the event loop. /// with the event loop.
pub fn add_source<E>(&self, source: E) -> AddSource<E> pub fn new<E>(source: E, handle: &Handle) -> IoTokenNew<E>
where E: mio::Evented + Send + 'static, where E: mio::Evented + Send + 'static,
{ {
AddSource { IoTokenNew {
inner: LoopFuture { inner: CoreFuture {
loop_handle: self.clone(), handle: handle.clone(),
data: Some(source), data: Some(source),
result: None, result: None,
} },
} }
} }
/// Consumes the last readiness notification the token this source is for
/// registered.
///
/// Currently sources receive readiness notifications on an edge-basis. That
/// is, once you receive a notification that an object can be read, you
/// won't receive any more notifications until all of that data has been
/// read.
///
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
self.readiness.swap(0, Ordering::SeqCst)
}
/// Schedule the current future task to receive a notification when the /// Schedule the current future task to receive a notification when the
/// corresponding I/O object is readable. /// corresponding I/O object is readable.
/// ///
@ -74,8 +93,8 @@ impl LoopHandle {
/// ///
/// This function will also panic if there is not a currently running future /// This function will also panic if there is not a currently running future
/// task. /// task.
pub fn schedule_read(&self, tok: &IoToken) { pub fn schedule_read(&self, handle: &Handle) {
self.send(Message::Schedule(tok.token, task::park(), Direction::Read)); handle.send(Message::Schedule(self.token, task::park(), Direction::Read));
} }
/// Schedule the current future task to receive a notification when the /// Schedule the current future task to receive a notification when the
@ -101,8 +120,8 @@ impl LoopHandle {
/// ///
/// This function will also panic if there is not a currently running future /// This function will also panic if there is not a currently running future
/// task. /// task.
pub fn schedule_write(&self, tok: &IoToken) { pub fn schedule_write(&self, handle: &Handle) {
self.send(Message::Schedule(tok.token, task::park(), Direction::Write)); handle.send(Message::Schedule(self.token, task::park(), Direction::Write));
} }
/// Unregister all information associated with a token on an event loop, /// Unregister all information associated with a token on an event loop,
@ -127,33 +146,12 @@ impl LoopHandle {
/// This function will panic if the event loop this handle is associated /// This function will panic if the event loop this handle is associated
/// with has gone away, or if there is an error communicating with the event /// with has gone away, or if there is an error communicating with the event
/// loop. /// loop.
pub fn drop_source(&self, tok: &IoToken) { pub fn drop_source(&self, handle: &Handle) {
self.send(Message::DropSource(tok.token)); handle.send(Message::DropSource(self.token));
} }
} }
impl IoToken { impl<E> Future for IoTokenNew<E>
/// Consumes the last readiness notification the token this source is for
/// registered.
///
/// Currently sources receive readiness notifications on an edge-basis. That
/// is, once you receive a notification that an object can be read, you
/// won't receive any more notifications until all of that data has been
/// read.
///
/// The event loop will fill in this information and then inform futures
/// that they're ready to go with the `schedule` method, and then the `poll`
/// method can use this to figure out what happened.
///
/// > **Note**: This method should generally not be used directly, but
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
self.readiness.swap(0, Ordering::SeqCst)
}
}
impl<E> Future for AddSource<E>
where E: mio::Evented + Send + 'static, where E: mio::Evented + Send + 'static,
{ {
type Item = (E, IoToken); type Item = (E, IoToken);
@ -164,7 +162,7 @@ impl<E> Future for AddSource<E>
let pair = try!(lp.add_source(&io)); let pair = try!(lp.add_source(&io));
Ok((io, pair)) Ok((io, pair))
}, |io, slot| { }, |io, slot| {
Message::Run(Box::new(move |lp: &Loop| { Message::Run(Box::new(move |lp: &Core| {
let res = lp.add_source(&io).map(|p| (io, p)); let res = lp.add_source(&io).map(|p| (io, p));
slot.try_produce(res).ok() slot.try_produce(res).ok()
.expect("add source try_produce intereference"); .expect("add source try_produce intereference");

View File

@ -1,7 +1,13 @@
//! The core reactor driving all I/O
//!
//! This module contains the `Core` type which is the reactor for all I/O
//! happening in `tokio-core`. This reactor (or event loop) is used to run
//! futures, schedule tasks, issue I/O requests, etc.
use std::cell::RefCell; use std::cell::RefCell;
use std::io::{self, ErrorKind}; use std::io::{self, ErrorKind};
use std::mem; use std::mem;
use std::rc::Rc; use std::rc::{Rc, Weak};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
@ -12,17 +18,20 @@ use mio;
use slab::Slab; use slab::Slab;
use slot::{self, Slot}; use slot::{self, Slot};
use timer_wheel::{TimerWheel, Timeout}; use timer_wheel::{TimerWheel, Timeout as WheelTimeout};
mod channel; mod channel;
mod source; mod io_token;
mod timeout; mod timeout_token;
pub use self::source::{AddSource, IoToken};
pub use self::timeout::{AddTimeout, TimeoutToken};
use self::channel::{Sender, Receiver, channel}; use self::channel::{Sender, Receiver, channel};
mod poll_evented;
mod timeout;
pub use self::poll_evented::{PollEvented, PollEventedNew};
pub use self::timeout::{Timeout, TimeoutNew};
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Loop); scoped_thread_local!(static CURRENT_LOOP: Core);
const SLAB_CAPACITY: usize = 1024 * 64; const SLAB_CAPACITY: usize = 1024 * 64;
@ -33,11 +42,11 @@ const SLAB_CAPACITY: usize = 1024 * 64;
/// multiple handles pointing to it, each of which can then be used to create /// multiple handles pointing to it, each of which can then be used to create
/// various I/O objects to interact with the event loop in interesting ways. /// various I/O objects to interact with the event loop in interesting ways.
// TODO: expand this // TODO: expand this
pub struct Loop { pub struct Core {
id: usize, id: usize,
io: mio::Poll, io: mio::Poll,
events: mio::Events, events: mio::Events,
tx: Arc<Sender<Message>>, tx: Sender<Message>,
rx: Receiver<Message>, rx: Receiver<Message>,
io_dispatch: RefCell<Slab<ScheduledIo, usize>>, io_dispatch: RefCell<Slab<ScheduledIo, usize>>,
task_dispatch: RefCell<Slab<ScheduledTask, usize>>, task_dispatch: RefCell<Slab<ScheduledTask, usize>>,
@ -59,7 +68,7 @@ pub struct Loop {
// state of the timeout itself. The `TimeoutToken` type is an index into the // state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab. // `timeouts` slab.
timer_wheel: RefCell<TimerWheel<usize>>, timer_wheel: RefCell<TimerWheel<usize>>,
timeouts: RefCell<Slab<(Timeout, TimeoutState), usize>>, timeouts: RefCell<Slab<(WheelTimeout, TimeoutState), usize>>,
} }
/// Handle to an event loop, used to construct I/O objects, send messages, and /// Handle to an event loop, used to construct I/O objects, send messages, and
@ -68,17 +77,17 @@ pub struct Loop {
/// Handles can be cloned, and when cloned they will still refer to the /// Handles can be cloned, and when cloned they will still refer to the
/// same underlying event loop. /// same underlying event loop.
#[derive(Clone)] #[derive(Clone)]
pub struct LoopHandle { pub struct Handle {
id: usize, id: usize,
tx: Arc<Sender<Message>>, tx: Sender<Message>,
} }
/// A non-sendable handle to an event loop, useful for manufacturing instances /// A non-sendable handle to an event loop, useful for manufacturing instances
/// of `LoopData`. /// of `LoopData`.
#[derive(Clone)] #[derive(Clone)]
pub struct LoopPin { pub struct Pinned {
handle: LoopHandle, handle: Handle,
futures: Rc<NewFutures>, futures: Weak<NewFutures>,
} }
struct ScheduledIo { struct ScheduledIo {
@ -123,10 +132,10 @@ const TOKEN_FUTURE: mio::Token = mio::Token(1);
const TOKEN_NEW_FUTURES: mio::Token = mio::Token(2); const TOKEN_NEW_FUTURES: mio::Token = mio::Token(2);
const TOKEN_START: usize = 3; const TOKEN_START: usize = 3;
impl Loop { impl Core {
/// Creates a new event loop, returning any error that happened during the /// Creates a new event loop, returning any error that happened during the
/// creation. /// creation.
pub fn new() -> io::Result<Loop> { pub fn new() -> io::Result<Core> {
let (tx, rx) = channel(); let (tx, rx) = channel();
let io = try!(mio::Poll::new()); let io = try!(mio::Poll::new());
try!(io.register(&rx, try!(io.register(&rx,
@ -141,11 +150,11 @@ impl Loop {
TOKEN_NEW_FUTURES, TOKEN_NEW_FUTURES,
mio::Ready::readable(), mio::Ready::readable(),
mio::PollOpt::level()); mio::PollOpt::level());
Ok(Loop { Ok(Core {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io, io: io,
events: mio::Events::with_capacity(1024), events: mio::Events::with_capacity(1024),
tx: Arc::new(tx), tx: tx,
rx: rx, rx: rx,
io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)), io_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)), task_dispatch: RefCell::new(Slab::with_capacity(SLAB_CAPACITY)),
@ -166,8 +175,8 @@ impl Loop {
/// ///
/// Handles to an event loop are cloneable as well and clones will always /// Handles to an event loop are cloneable as well and clones will always
/// refer to the same event loop. /// refer to the same event loop.
pub fn handle(&self) -> LoopHandle { pub fn handle(&self) -> Handle {
LoopHandle { Handle {
id: self.id, id: self.id,
tx: self.tx.clone(), tx: self.tx.clone(),
} }
@ -177,12 +186,12 @@ impl Loop {
/// but can be used as a proxy to the event loop itself. /// but can be used as a proxy to the event loop itself.
/// ///
/// Currently the primary use for this is to use as a handle to add data /// Currently the primary use for this is to use as a handle to add data
/// to the event loop directly. The `LoopPin::add_loop_data` method can /// to the event loop directly. The `Pinned::add_loop_data` method can
/// be used to immediately create instances of `LoopData` structures. /// be used to immediately create instances of `LoopData` structures.
pub fn pin(&self) -> LoopPin { pub fn pin(&self) -> Pinned {
LoopPin { Pinned {
handle: self.handle(), handle: self.handle(),
futures: self.new_futures.clone(), futures: Rc::downgrade(&self.new_futures),
} }
} }
@ -501,7 +510,7 @@ impl Loop {
} }
} }
impl LoopHandle { impl Handle {
fn send(&self, msg: Message) { fn send(&self, msg: Message) {
self.with_loop(|lp| { self.with_loop(|lp| {
match lp { match lp {
@ -528,7 +537,7 @@ impl LoopHandle {
} }
fn with_loop<F, R>(&self, f: F) -> R fn with_loop<F, R>(&self, f: F) -> R
where F: FnOnce(Option<&Loop>) -> R where F: FnOnce(Option<&Core>) -> R
{ {
if CURRENT_LOOP.is_set() { if CURRENT_LOOP.is_set() {
CURRENT_LOOP.with(|lp| { CURRENT_LOOP.with(|lp| {
@ -552,20 +561,20 @@ impl LoopHandle {
/// Note that while the closure, `F`, requires the `Send` bound as it might /// Note that while the closure, `F`, requires the `Send` bound as it might
/// cross threads, the future `R` does not. /// cross threads, the future `R` does not.
pub fn spawn<F, R>(&self, f: F) pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&LoopPin) -> R + Send + 'static, where F: FnOnce(&Pinned) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>, R: IntoFuture<Item=(), Error=()>,
R::Future: 'static, R::Future: 'static,
{ {
self.send(Message::Run(Box::new(|lp: &Loop| { self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.pin()); let f = f(&lp.pin());
lp.spawn(Box::new(f.into_future())); lp.spawn(Box::new(f.into_future()));
}))); })));
} }
} }
impl LoopPin { impl Pinned {
/// Returns a reference to the underlying handle to the event loop. /// Returns a reference to the underlying handle to the event loop.
pub fn handle(&self) -> &LoopHandle { pub fn handle(&self) -> &Handle {
&self.handle &self.handle
} }
@ -573,22 +582,26 @@ impl LoopPin {
pub fn spawn<F>(&self, f: F) pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static, where F: Future<Item=(), Error=()> + 'static,
{ {
self.futures.queue.borrow_mut().push(Box::new(f)); let inner = match self.futures.upgrade() {
self.futures.ready.set_readiness(mio::Ready::readable()).unwrap(); Some(inner) => inner,
None => return,
};
inner.queue.borrow_mut().push(Box::new(f));
inner.ready.set_readiness(mio::Ready::readable()).unwrap();
} }
} }
struct LoopFuture<T, U> { struct CoreFuture<T, U> {
loop_handle: LoopHandle, handle: Handle,
data: Option<U>, data: Option<U>,
result: Option<(Arc<Slot<io::Result<T>>>, slot::Token)>, result: Option<(Arc<Slot<io::Result<T>>>, slot::Token)>,
} }
impl<T, U> LoopFuture<T, U> impl<T, U> CoreFuture<T, U>
where T: 'static, where T: 'static,
{ {
fn poll<F, G>(&mut self, f: F, g: G) -> Poll<T, io::Error> fn poll<F, G>(&mut self, f: F, g: G) -> Poll<T, io::Error>
where F: FnOnce(&Loop, U) -> io::Result<T>, where F: FnOnce(&Core, U) -> io::Result<T>,
G: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message, G: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message,
{ {
match self.result { match self.result {
@ -607,7 +620,7 @@ impl<T, U> LoopFuture<T, U>
} }
None => { None => {
let data = &mut self.data; let data = &mut self.data;
let ret = self.loop_handle.with_loop(|lp| { let ret = self.handle.with_loop(|lp| {
lp.map(|lp| f(lp, data.take().unwrap())) lp.map(|lp| f(lp, data.take().unwrap()))
}); });
if let Some(ret) = ret { if let Some(ret) = ret {
@ -622,7 +635,7 @@ impl<T, U> LoopFuture<T, U>
task.unpark(); task.unpark();
}); });
self.result = Some((result.clone(), token)); self.result = Some((result.clone(), token));
self.loop_handle.send(g(data.take().unwrap(), result)); self.handle.send(g(data.take().unwrap(), result));
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
@ -658,11 +671,11 @@ impl Unpark for MySetReadiness {
} }
trait FnBox: Send + 'static { trait FnBox: Send + 'static {
fn call_box(self: Box<Self>, lp: &Loop); fn call_box(self: Box<Self>, lp: &Core);
} }
impl<F: FnOnce(&Loop) + Send + 'static> FnBox for F { impl<F: FnOnce(&Core) + Send + 'static> FnBox for F {
fn call_box(self: Box<Self>, lp: &Loop) { fn call_box(self: Box<Self>, lp: &Core) {
(*self)(lp) (*self)(lp)
} }
} }

View File

@ -1,15 +1,25 @@
use std::io; //! Readiness tracking streams, backing I/O objects.
//!
//! This module contains the core type which is used to back all I/O on object
//! in `tokio-core`. The `PollEvented` type is the implementation detail of
//! all I/O. Each `PollEvented` manages registration with a reactor,
//! acquisition of a token, and tracking of the readiness state on the
//! underlying I/O primitive.
use std::io::{self, Read, Write};
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use mio; use mio;
use event_loop::{IoToken, LoopHandle, AddSource}; use io::Io;
use reactor::Handle;
use reactor::io_token::{IoToken, IoTokenNew};
/// A concrete implementation of a stream of readiness notifications for I/O /// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop. /// objects that originates from an event loop.
/// ///
/// Created by the `ReadinessStream::new` method, each `ReadinessStream` is /// Created by the `PollEvented::new` method, each `PollEvented` is
/// associated with a specific event loop and source of events that will be /// associated with a specific event loop and source of events that will be
/// registered with an event loop. /// registered with an event loop.
/// ///
@ -22,19 +32,21 @@ use event_loop::{IoToken, LoopHandle, AddSource};
/// It's the responsibility of the wrapper to inform the readiness stream when a /// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of /// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again. /// any scheduling necessary to get notified when the event is ready again.
pub struct ReadinessStream<E> { pub struct PollEvented<E> {
token: IoToken, token: IoToken,
handle: LoopHandle, handle: Handle,
readiness: AtomicUsize, readiness: AtomicUsize,
io: E, io: E,
} }
pub struct ReadinessStreamNew<E> { /// Future returned from `PollEvented::new` which will resolve to a
inner: AddSource<E>, /// `PollEvented`.
handle: LoopHandle, pub struct PollEventedNew<E> {
inner: IoTokenNew<E>,
handle: Handle,
} }
impl<E> ReadinessStream<E> impl<E> PollEvented<E>
where E: mio::Evented + Send + 'static, where E: mio::Evented + Send + 'static,
{ {
/// Creates a new readiness stream associated with the provided /// Creates a new readiness stream associated with the provided
@ -42,15 +54,15 @@ impl<E> ReadinessStream<E>
/// ///
/// This method returns a future which will resolve to the readiness stream /// This method returns a future which will resolve to the readiness stream
/// when it's ready. /// when it's ready.
pub fn new(loop_handle: LoopHandle, source: E) -> ReadinessStreamNew<E> { pub fn new(source: E, handle: &Handle) -> PollEventedNew<E> {
ReadinessStreamNew { PollEventedNew {
inner: loop_handle.add_source(source), inner: IoToken::new(source, handle),
handle: loop_handle, handle: handle.clone(),
} }
} }
} }
impl<E> ReadinessStream<E> { impl<E> PollEvented<E> {
/// Tests to see if this source is ready to be read from or not. /// Tests to see if this source is ready to be read from or not.
/// ///
/// If this stream is not ready for a read then `NotReady` will be returned /// If this stream is not ready for a read then `NotReady` will be returned
@ -58,16 +70,16 @@ impl<E> ReadinessStream<E> {
/// the stream is readable again. In other words, this method is only safe /// the stream is readable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a /// to call from within the context of a future's task, typically done in a
/// `Future::poll` method. /// `Future::poll` method.
pub fn poll_read(&self) -> Poll<(), io::Error> { pub fn poll_read(&self) -> Async<()> {
if self.readiness.load(Ordering::SeqCst) & 1 != 0 { if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
return Ok(Async::Ready(())) return Async::Ready(())
} }
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 1 != 0 { if self.readiness.load(Ordering::SeqCst) & 1 != 0 {
Ok(Async::Ready(())) Async::Ready(())
} else { } else {
self.handle.schedule_read(&self.token); self.token.schedule_read(&self.handle);
Ok(Async::NotReady) Async::NotReady
} }
} }
@ -78,16 +90,16 @@ impl<E> ReadinessStream<E> {
/// the stream is writable again. In other words, this method is only safe /// the stream is writable again. In other words, this method is only safe
/// to call from within the context of a future's task, typically done in a /// to call from within the context of a future's task, typically done in a
/// `Future::poll` method. /// `Future::poll` method.
pub fn poll_write(&self) -> Poll<(), io::Error> { pub fn poll_write(&self) -> Async<()> {
if self.readiness.load(Ordering::SeqCst) & 2 != 0 { if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
return Ok(Async::Ready(())) return Async::Ready(())
} }
self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst);
if self.readiness.load(Ordering::SeqCst) & 2 != 0 { if self.readiness.load(Ordering::SeqCst) & 2 != 0 {
Ok(Async::Ready(())) Async::Ready(())
} else { } else {
self.handle.schedule_write(&self.token); self.token.schedule_write(&self.handle);
Ok(Async::NotReady) Async::NotReady
} }
} }
@ -104,7 +116,7 @@ impl<E> ReadinessStream<E> {
/// then again readable. /// then again readable.
pub fn need_read(&self) { pub fn need_read(&self) {
self.readiness.fetch_and(!1, Ordering::SeqCst); self.readiness.fetch_and(!1, Ordering::SeqCst);
self.handle.schedule_read(&self.token); self.token.schedule_read(&self.handle)
} }
/// Indicates to this source of events that the corresponding I/O object is /// Indicates to this source of events that the corresponding I/O object is
@ -120,12 +132,12 @@ impl<E> ReadinessStream<E> {
/// then again writable. /// then again writable.
pub fn need_write(&self) { pub fn need_write(&self) {
self.readiness.fetch_and(!2, Ordering::SeqCst); self.readiness.fetch_and(!2, Ordering::SeqCst);
self.handle.schedule_write(&self.token); self.token.schedule_write(&self.handle)
} }
/// Returns a reference to the event loop handle that this readiness stream /// Returns a reference to the event loop handle that this readiness stream
/// is associated with. /// is associated with.
pub fn loop_handle(&self) -> &LoopHandle { pub fn handle(&self) -> &Handle {
&self.handle &self.handle
} }
@ -142,15 +154,128 @@ impl<E> ReadinessStream<E> {
} }
} }
impl<E> Future for ReadinessStreamNew<E> impl<E: Read> Read for PollEvented<E> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read() {
return Err(mio::would_block())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
return r
}
}
impl<E: Write> Write for PollEvented<E> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write() {
return Err(mio::would_block())
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write() {
return Err(mio::would_block())
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write();
}
return r
}
}
impl<E: Read + Write> Io for PollEvented<E> {
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
impl<'a, E> Read for &'a PollEvented<E>
where &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read() {
return Err(mio::would_block())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.need_read();
}
return r
}
}
impl<'a, E> Write for &'a PollEvented<E>
where &'a E: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write() {
return Err(mio::would_block())
}
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.need_write();
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write() {
return Err(mio::would_block())
}
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.need_write();
}
return r
}
}
impl<'a, E> Io for &'a PollEvented<E>
where &'a E: Read + Write,
{
fn poll_read(&mut self) -> Async<()> {
<PollEvented<E>>::poll_read(self)
}
fn poll_write(&mut self) -> Async<()> {
<PollEvented<E>>::poll_write(self)
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E> Drop for PollEvented<E> {
fn drop(&mut self) {
self.token.drop_source(&self.handle);
}
}
impl<E> Future for PollEventedNew<E>
where E: mio::Evented + Send + 'static, where E: mio::Evented + Send + 'static,
{ {
type Item = ReadinessStream<E>; type Item = PollEvented<E>;
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<ReadinessStream<E>, io::Error> { fn poll(&mut self) -> Poll<PollEvented<E>, io::Error> {
let (io, token) = try_ready!(self.inner.poll()); let (io, token) = try_ready!(self.inner.poll());
Ok(ReadinessStream { Ok(PollEvented {
token: token, token: token,
handle: self.handle.clone(), handle: self.handle.clone(),
io: io, io: io,
@ -158,9 +283,3 @@ impl<E> Future for ReadinessStreamNew<E>
}.into()) }.into())
} }
} }
impl<E> Drop for ReadinessStream<E> {
fn drop(&mut self) {
self.handle.drop_source(&self.token)
}
}

View File

@ -1,11 +1,16 @@
//! Support for creating futures that represent timeouts.
//!
//! This module contains the `Timeout` type which is a future that will resolve
//! at a particular point in the future.
use std::io; use std::io;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use futures::{Future, Poll, Async}; use futures::{Future, Poll, Async};
use LoopHandle; use reactor::Handle;
use reactor::timeout_token::TimeoutToken;
use io::IoFuture; use io::IoFuture;
use event_loop::TimeoutToken;
/// A future representing the notification that a timeout has occurred. /// A future representing the notification that a timeout has occurred.
/// ///
@ -16,17 +21,23 @@ use event_loop::TimeoutToken;
/// otherwise indicated to fire at. /// otherwise indicated to fire at.
pub struct Timeout { pub struct Timeout {
token: TimeoutToken, token: TimeoutToken,
handle: LoopHandle, handle: Handle,
} }
impl LoopHandle { /// Future returned from `Timeout::new` and `Timeout::new_at` which will resolve
/// to the actual `Timeout` itself.
pub struct TimeoutNew {
inner: IoFuture<Timeout>,
}
impl Timeout {
/// Creates a new timeout which will fire at `dur` time into the future. /// Creates a new timeout which will fire at `dur` time into the future.
/// ///
/// This function will return a future that will resolve to the actual /// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be /// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future. /// set to fire at the specified point in the future.
pub fn timeout(self, dur: Duration) -> IoFuture<Timeout> { pub fn new(dur: Duration, handle: &Handle) -> TimeoutNew {
self.timeout_at(Instant::now() + dur) Timeout::new_at(Instant::now() + dur, handle)
} }
/// Creates a new timeout which will fire at the time specified by `at`. /// Creates a new timeout which will fire at the time specified by `at`.
@ -34,13 +45,16 @@ impl LoopHandle {
/// This function will return a future that will resolve to the actual /// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be /// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future. /// set to fire at the specified point in the future.
pub fn timeout_at(self, at: Instant) -> IoFuture<Timeout> { pub fn new_at(at: Instant, handle: &Handle) -> TimeoutNew {
self.add_timeout(at).map(move |token| { let handle = handle.clone();
Timeout { TimeoutNew {
token: token, inner: TimeoutToken::new(at, &handle).map(move |token| {
handle: self, Timeout {
} token: token,
}).boxed() handle: handle,
}
}).boxed(),
}
} }
} }
@ -54,14 +68,23 @@ impl Future for Timeout {
if *self.token.when() <= now { if *self.token.when() <= now {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} else { } else {
self.handle.update_timeout(&self.token); self.token.update_timeout(&self.handle);
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
} }
impl Drop for Timeout { impl Future for TimeoutNew {
fn drop(&mut self) { type Item = Timeout;
self.handle.cancel_timeout(&self.token); type Error = io::Error;
fn poll(&mut self) -> Poll<Timeout, io::Error> {
self.inner.poll()
}
}
impl Drop for Timeout {
fn drop(&mut self) {
self.token.cancel_timeout(&self.handle);
} }
} }

View File

@ -4,49 +4,12 @@ use std::time::Instant;
use futures::{Future, Poll}; use futures::{Future, Poll};
use futures::task; use futures::task;
use event_loop::{Message, Loop, LoopHandle, LoopFuture}; use reactor::{Message, Core, Handle, CoreFuture};
impl LoopHandle { /// Return value from the `Handle::add_timeout` method, a future that will
/// Adds a new timeout to get fired at the specified instant, notifying the
/// specified task.
pub fn add_timeout(&self, at: Instant) -> AddTimeout {
AddTimeout {
inner: LoopFuture {
loop_handle: self.clone(),
data: Some(at),
result: None,
},
}
}
/// Updates a previously added timeout to notify a new task instead.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, timeout: &TimeoutToken) {
self.send(Message::UpdateTimeout(timeout.token, task::park()))
}
/// Cancel a previously added timeout.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn cancel_timeout(&self, timeout: &TimeoutToken) {
debug!("cancel timeout {}", timeout.token);
self.send(Message::CancelTimeout(timeout.token))
}
}
/// Return value from the [`LoopHandle::add_timeout`] method, a future that will
/// resolve to a `TimeoutToken` to configure the behavior of that timeout. /// resolve to a `TimeoutToken` to configure the behavior of that timeout.
/// pub struct TimeoutTokenNew {
/// [`LoopHandle::add_timeout`]: struct.LoopHandle.html#method.add_timeout inner: CoreFuture<(usize, Instant), Instant>,
pub struct AddTimeout {
inner: LoopFuture<(usize, Instant), Instant>,
} }
/// A token that identifies an active timeout. /// A token that identifies an active timeout.
@ -55,21 +18,19 @@ pub struct TimeoutToken {
when: Instant, when: Instant,
} }
impl Future for AddTimeout {
type Item = TimeoutToken;
type Error = io::Error;
fn poll(&mut self) -> Poll<TimeoutToken, io::Error> {
let (t, i) = try_ready!(self.inner.poll(Loop::add_timeout,
Message::AddTimeout));
Ok(TimeoutToken {
token: t,
when: i,
}.into())
}
}
impl TimeoutToken { impl TimeoutToken {
/// Adds a new timeout to get fired at the specified instant, notifying the
/// specified task.
pub fn new(at: Instant, handle: &Handle) -> TimeoutTokenNew {
TimeoutTokenNew {
inner: CoreFuture {
handle: handle.clone(),
data: Some(at),
result: None,
},
}
}
/// Returns the instant in time when this timeout token will "fire". /// Returns the instant in time when this timeout token will "fire".
/// ///
/// Note that this instant may *not* be the instant that was passed in when /// Note that this instant may *not* be the instant that was passed in when
@ -79,5 +40,39 @@ impl TimeoutToken {
pub fn when(&self) -> &Instant { pub fn when(&self) -> &Instant {
&self.when &self.when
} }
/// Updates a previously added timeout to notify a new task instead.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, handle: &Handle) {
handle.send(Message::UpdateTimeout(self.token, task::park()))
}
/// Cancel a previously added timeout.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn cancel_timeout(&self, handle: &Handle) {
debug!("cancel timeout {}", self.token);
handle.send(Message::CancelTimeout(self.token))
}
} }
impl Future for TimeoutTokenNew {
type Item = TimeoutToken;
type Error = io::Error;
fn poll(&mut self) -> Poll<TimeoutToken, io::Error> {
let (t, i) = try_ready!(self.inner.poll(Core::add_timeout,
Message::AddTimeout));
Ok(TimeoutToken {
token: t,
when: i,
}.into())
}
}

View File

@ -9,6 +9,8 @@ use std::io::{Read, Write, BufReader, BufWriter};
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::io::copy; use tokio_core::io::copy;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -22,8 +24,8 @@ fn echo_server() {
const N: usize = 1024; const N: usize = 1024;
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -8,6 +8,8 @@ use std::io::{Write, Read};
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::io::read_to_end; use tokio_core::io::read_to_end;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -18,8 +20,8 @@ macro_rules! t {
#[test] #[test]
fn chain_clients() { fn chain_clients() {
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -9,6 +9,8 @@ use std::thread;
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::io::{copy, TaskIo}; use tokio_core::io::{copy, TaskIo};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -21,8 +23,8 @@ macro_rules! t {
fn echo_server() { fn echo_server() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -8,6 +8,8 @@ use std::io::{Write, Read};
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::io::read_to_end; use tokio_core::io::read_to_end;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -18,8 +20,8 @@ macro_rules! t {
#[test] #[test]
fn limit() { fn limit() {
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -1,64 +0,0 @@
extern crate env_logger;
extern crate futures;
extern crate mio;
extern crate tokio_core;
use futures::{Future, Poll, Async};
use futures::task;
use tokio_core::{Loop, IoToken, LoopHandle};
struct Next(usize);
impl Future for Next {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.0 == 0 {
task::park().unpark();
self.0 += 1;
Ok(Async::NotReady)
} else {
Ok(().into())
}
}
}
#[test]
fn poll_after_ready() {
drop(env_logger::init());
let mut lp = Loop::new().unwrap();
let handle = lp.handle();
let (tx, rx) = mio::channel::channel::<u32>();
let (_rx, token) = lp.run(handle.add_source(rx)).unwrap();
tx.send(2).unwrap();
lp.run(Next(0)).unwrap();
lp.run(ScheduleThenPoll {
token: token,
handle: handle,
n: 0,
}).unwrap();
struct ScheduleThenPoll {
token: IoToken,
handle: LoopHandle,
n: usize,
}
impl Future for ScheduleThenPoll {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.n == 0 {
self.handle.schedule_read(&self.token);
self.n += 1;
Ok(Async::NotReady)
} else {
assert!(self.token.take_readiness() & 1 != 0);
Ok(().into())
}
}
}
}

View File

@ -3,12 +3,12 @@ extern crate env_logger;
extern crate futures; extern crate futures;
use futures::Future; use futures::Future;
use tokio_core::Loop; use tokio_core::reactor::Core;
#[test] #[test]
fn simple() { fn simple() {
drop(env_logger::init()); drop(env_logger::init());
let mut lp = Loop::new().unwrap(); let mut lp = Core::new().unwrap();
let (tx1, rx1) = futures::oneshot(); let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot(); let (tx2, rx2) = futures::oneshot();
@ -29,7 +29,7 @@ fn simple() {
#[test] #[test]
fn spawn_in_poll() { fn spawn_in_poll() {
drop(env_logger::init()); drop(env_logger::init());
let mut lp = Loop::new().unwrap(); let mut lp = Core::new().unwrap();
let (tx1, rx1) = futures::oneshot(); let (tx1, rx1) = futures::oneshot();
let (tx2, rx2) = futures::oneshot(); let (tx2, rx2) = futures::oneshot();

View File

@ -9,6 +9,8 @@ use std::thread;
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::io::{copy, TaskIo}; use tokio_core::io::{copy, TaskIo};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -21,8 +23,8 @@ macro_rules! t {
fn echo_server() { fn echo_server() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());

View File

@ -2,12 +2,14 @@ extern crate env_logger;
extern crate futures; extern crate futures;
extern crate tokio_core; extern crate tokio_core;
use std::net::{TcpListener, TcpStream}; use std::net;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::thread; use std::thread;
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_core::reactor::Core;
use tokio_core::net::{TcpListener, TcpStream};
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -19,14 +21,14 @@ macro_rules! t {
#[test] #[test]
fn connect() { fn connect() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = t!(TcpListener::bind("127.0.0.1:0")); let srv = t!(net::TcpListener::bind("127.0.0.1:0"));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());
let t = thread::spawn(move || { let t = thread::spawn(move || {
t!(srv.accept()).0 t!(srv.accept()).0
}); });
let stream = l.handle().tcp_connect(&addr); let stream = TcpStream::connect(&addr, &l.handle());
let mine = t!(l.run(stream)); let mine = t!(l.run(stream));
let theirs = t.join().unwrap(); let theirs = t.join().unwrap();
@ -37,8 +39,8 @@ fn connect() {
#[test] #[test]
fn accept() { fn accept() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());
@ -49,7 +51,7 @@ fn accept() {
}).into_future().map_err(|e| e.0); }).into_future().map_err(|e| e.0);
assert!(rx.try_recv().is_err()); assert!(rx.try_recv().is_err());
let t = thread::spawn(move || { let t = thread::spawn(move || {
TcpStream::connect(&addr).unwrap() net::TcpStream::connect(&addr).unwrap()
}); });
let (mine, _remaining) = t!(l.run(client)); let (mine, _remaining) = t!(l.run(client));
@ -63,13 +65,13 @@ fn accept() {
#[test] #[test]
fn accept2() { fn accept2() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let srv = l.handle().tcp_listen(&"127.0.0.1:0".parse().unwrap()); let srv = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &l.handle());
let srv = t!(l.run(srv)); let srv = t!(l.run(srv));
let addr = t!(srv.local_addr()); let addr = t!(srv.local_addr());
let t = thread::spawn(move || { let t = thread::spawn(move || {
TcpStream::connect(&addr).unwrap() net::TcpStream::connect(&addr).unwrap()
}); });
let (tx, rx) = channel(); let (tx, rx) = channel();

View File

@ -5,6 +5,7 @@ extern crate tokio_core;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::Future; use futures::Future;
use tokio_core::reactor::{Core, Timeout};
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -16,9 +17,9 @@ macro_rules! t {
#[test] #[test]
fn smoke() { fn smoke() {
drop(env_logger::init()); drop(env_logger::init());
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let dur = Duration::from_millis(10); let dur = Duration::from_millis(10);
let timeout = l.handle().timeout(dur).and_then(|t| t); let timeout = Timeout::new(dur, &l.handle()).and_then(|t| t);
let start = Instant::now(); let start = Instant::now();
t!(l.run(timeout)); t!(l.run(timeout));
assert!(start.elapsed() >= dur); assert!(start.elapsed() >= dur);

View File

@ -6,7 +6,8 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Poll}; use futures::{Future, Poll};
use tokio_core::UdpSocket; use tokio_core::net::UdpSocket;
use tokio_core::reactor::Core;
macro_rules! t { macro_rules! t {
($e:expr) => (match $e { ($e:expr) => (match $e {
@ -17,9 +18,9 @@ macro_rules! t {
#[test] #[test]
fn send_messages() { fn send_messages() {
let mut l = t!(tokio_core::Loop::new()); let mut l = t!(Core::new());
let a = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap()); let a = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let b = l.handle().udp_bind(&"127.0.0.1:0".parse().unwrap()); let b = UdpSocket::bind(&t!("127.0.0.1:0".parse()), &l.handle());
let (a, b) = t!(l.run(a.join(b))); let (a, b) = t!(l.run(a.join(b)));
let a_addr = t!(a.local_addr()); let a_addr = t!(a.local_addr());
let b_addr = t!(b.local_addr()); let b_addr = t!(b.local_addr());