Merge pull request #20 from tokio-rs/remove-scheduler

Remove executor from reactor.
This commit is contained in:
Carl Lerche 2017-11-01 07:38:27 -07:00 committed by GitHub
commit 8c838a2709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 117 additions and 186 deletions

View File

@ -18,18 +18,20 @@
//! messages.
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use std::collections::HashMap;
use std::rc::Rc;
use std::cell::RefCell;
use std::iter;
use std::env;
use std::io::{Error, ErrorKind, BufReader};
use std::sync::{Arc, Mutex};
use futures::Future;
use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Core;
use tokio_io::io;
@ -45,9 +47,10 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
// This is a single-threaded server, so we can just use Rc and RefCell to
// store the map of all connections we know about.
let connections = Rc::new(RefCell::new(HashMap::new()));
// This is currently a multi threaded server.
//
// Once the same thread executor lands, transition to single threaded.
let connections = Arc::new(Mutex::new(HashMap::new()));
let srv = socket.incoming().for_each(move |(stream, addr)| {
println!("New Connection: {}", addr);
@ -57,7 +60,7 @@ fn main() {
// send us messages. Then register our address with the stream to send
// data to us.
let (tx, rx) = futures::sync::mpsc::unbounded();
connections.borrow_mut().insert(addr, tx);
connections.lock().unwrap().insert(addr, tx);
// Define here what we do for the actual I/O. That is, read a bunch of
// lines from the socket and dispatch them while we also write any lines
@ -88,7 +91,7 @@ fn main() {
let connections = connections_inner.clone();
line.map(move |(reader, message)| {
println!("{}: {:?}", addr, message);
let mut conns = connections.borrow_mut();
let mut conns = connections.lock().unwrap();
if let Ok(msg) = message {
// For each open connection except the sender, send the
// string via the channel.
@ -114,17 +117,19 @@ fn main() {
amt.map_err(|_| ())
});
let pool = CpuPool::new(1);
// Now that we've got futures representing each half of the socket, we
// use the `select` combinator to wait for either half to be done to
// tear down the other. Then we spawn off the result.
let connections = connections.clone();
let socket_reader = socket_reader.map_err(|_| ());
let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
handle.spawn(connection.then(move |_| {
connections.borrow_mut().remove(&addr);
pool.execute(connection.then(move |_| {
connections.lock().unwrap().remove(&addr);
println!("Connection {} closed.", addr);
Ok(())
}));
})).unwrap();
Ok(())
});

View File

@ -29,6 +29,7 @@ use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
@ -53,13 +54,13 @@ fn main() {
// The compress logic will happen in the function below, but everything's
// still a future! Each client is spawned to concurrently get processed.
let server = socket.incoming().for_each(move |(socket, addr)| {
handle.spawn(compress(socket, &pool).then(move |result| {
pool.execute(compress(socket, &pool).then(move |result| {
match result {
Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
Err(e) => println!("{}: failed when compressing: {}", addr, e),
}
Ok(())
}));
})).unwrap();
Ok(())
});
@ -70,7 +71,7 @@ fn main() {
/// `socket` on the `pool` provided, writing it back out to `socket` as it's
/// available.
fn compress(socket: TcpStream, pool: &CpuPool)
-> Box<Future<Item = (u64, u64), Error = io::Error>>
-> Box<Future<Item = (u64, u64), Error = io::Error> + Send>
{
use tokio_io::io;

View File

@ -15,6 +15,7 @@
//! stdin/stdout to a server" to get up and running.
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
extern crate bytes;
@ -26,6 +27,7 @@ use std::thread;
use futures::sync::mpsc;
use futures::{Sink, Future, Stream};
use futures_cpupool::CpuPool;
use tokio::reactor::Core;
fn main() {
@ -49,6 +51,8 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new(1);
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@ -61,9 +65,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
tcp::connect(&addr, &handle, Box::new(stdin_rx))
tcp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
} else {
udp::connect(&addr, &handle, Box::new(stdin_rx))
udp::connect(&addr, &handle, &pool, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@ -83,6 +87,8 @@ mod tcp {
use bytes::{BufMut, BytesMut};
use futures::{Future, Stream};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpStream;
use tokio::reactor::Handle;
use tokio_io::AsyncRead;
@ -90,11 +96,12 @@ mod tcp {
pub fn connect(addr: &SocketAddr,
handle: &Handle,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
let tcp = TcpStream::connect(addr, handle);
let handle = handle.clone();
let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@ -113,12 +120,12 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
handle.spawn(stdin.forward(sink).then(|result| {
pool.execute(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
}));
})).unwrap();
stream
}).flatten_stream())
}
@ -167,12 +174,15 @@ mod udp {
use bytes::BytesMut;
use futures::{Future, Stream};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{UdpCodec, UdpSocket};
use tokio::reactor::Handle;
pub fn connect(&addr: &SocketAddr,
handle: &Handle,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error>>)
pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
-> Box<Stream<Item = BytesMut, Error = io::Error>>
{
// We'll bind our UDP socket to a local IP/port, but for now we
@ -193,14 +203,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
handle.spawn(stdin.map(move |chunk| {
pool.execute(stdin.map(move |chunk| {
(addr, chunk)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
}));
})).unwrap();
// With UDP we could receive data from any source, so filter out
// anything coming from a different address

View File

@ -14,6 +14,7 @@
//! cargo run --example connect 127.0.0.1:8080
extern crate futures;
extern crate futures_cpupool;
extern crate num_cpus;
extern crate tokio;
extern crate tokio_io;
@ -23,8 +24,10 @@ use std::net::{self, SocketAddr};
use std::thread;
use futures::Future;
use futures::future::Executor;
use futures::stream::Stream;
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpStream;
@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new(1);
let done = rx.for_each(move |socket| {
// First up when we receive a socket we associate it with our event loop
// using the `TcpStream::from_stream` API. After that the socket is not
@ -92,7 +97,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
Ok(())
});
handle.spawn(msg);
pool.execute(msg).unwrap();
Ok(())
});

View File

@ -18,6 +18,7 @@
//! should be able to see them all make progress simultaneously.
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@ -25,7 +26,9 @@ use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures::future::Executor;
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
@ -46,7 +49,7 @@ fn main() {
//
// After the event loop is created we acquire a handle to it through the
// `handle` method. With this handle we'll then later be able to create I/O
// objects and spawn futures.
// objects.
let mut core = Core::new().unwrap();
let handle = core.handle();
@ -58,6 +61,9 @@ fn main() {
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
// A CpuPool allows futures to be executed concurrently.
let pool = CpuPool::new(1);
// Here we convert the `TcpListener` to a stream of incoming connections
// with the `incoming` method. We then define how to process each element in
// the stream with the `for_each` method.
@ -105,16 +111,16 @@ fn main() {
// And this is where much of the magic of this server happens. We
// crucially want all clients to make progress concurrently, rather than
// blocking one on completion of another. To achieve this we use the
// `spawn` function on `Handle` to essentially execute some work in the
// background.
// `execute` function on the `Executor` trait to essentially execute
// some work in the background.
//
// This function will transfer ownership of the future (`msg` in this
// case) to the event loop that `handle` points to. The event loop will
// then drive the future to completion.
//
// Essentially here we're spawning a new task to run concurrently, which
// will allow all of our clients to be processed concurrently.
handle.spawn(msg);
// Essentially here we're executing a new task to run concurrently,
// which will allow all of our clients to be processed concurrently.
pool.execute(msg).unwrap();
Ok(())
});

View File

@ -17,6 +17,7 @@
//! the echo server, and you'll be able to see data flowing between them.
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@ -27,6 +28,8 @@ use std::io::{self, Read, Write};
use futures::stream::Stream;
use futures::{Future, Poll};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
use tokio_io::{AsyncRead, AsyncWrite};
@ -43,6 +46,8 @@ fn main() {
let mut l = Core::new().unwrap();
let handle = l.handle();
let pool = CpuPool::new(1);
// Create a TCP listener which will listen for incoming connections.
let socket = TcpListener::bind(&listen_addr, &l.handle()).unwrap();
println!("Listening on: {}", listen_addr);
@ -88,7 +93,7 @@ fn main() {
// Don't panic. Maybe the client just disconnected too soon.
println!("error: {}", e);
});
handle.spawn(msg);
pool.execute(msg).unwrap();
Ok(())
});

View File

@ -17,6 +17,7 @@
extern crate env_logger;
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
@ -25,7 +26,9 @@ use std::iter;
use std::net::SocketAddr;
use futures::Future;
use futures::future::Executor;
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
@ -35,13 +38,15 @@ fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let pool = CpuPool::new(1);
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&addr, &handle).unwrap();
println!("Listening on: {}", addr);
let server = socket.incoming().for_each(|(socket, addr)| {
println!("got a socket: {}", addr);
handle.spawn(write(socket).or_else(|_| Ok(())));
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
core.run(server).unwrap();

View File

@ -40,17 +40,19 @@
//! returning the previous value, if any.
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::BufReader;
use std::rc::Rc;
use std::env;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::prelude::*;
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio::reactor::Core;
use tokio_io::AsyncRead;
@ -58,10 +60,10 @@ use tokio_io::io::{lines, write_all};
/// The in-memory database shared amongst all clients.
///
/// This database will be shared via `Rc`, so to mutate the internal map we're
/// This database will be shared via `Arc`, so to mutate the internal map we're
/// also going to use a `RefCell` for interior mutability.
struct Database {
map: RefCell<HashMap<String, String>>,
map: Mutex<HashMap<String, String>>,
}
/// Possible requests our clients can send us
@ -87,15 +89,18 @@ fn main() {
let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
println!("Listening on: {}", addr);
// Create a CpuPool to execute tasks
let pool = CpuPool::new(1);
// Create the shared state of this server that will be shared amongst all
// clients. We populate the initial database and then create the `Database`
// structure. Note the usage of `Rc` here which will be used to ensure that
// structure. Note the usage of `Arc` here which will be used to ensure that
// each independently spawned client will have a reference to the in-memory
// database.
let mut initial_db = HashMap::new();
initial_db.insert("foo".to_string(), "bar".to_string());
let db = Rc::new(Database {
map: RefCell::new(initial_db),
let db = Arc::new(Database {
map: Mutex::new(initial_db),
});
let done = listener.incoming().for_each(move |(socket, _addr)| {
@ -125,7 +130,7 @@ fn main() {
Err(e) => return Response::Error { msg: e },
};
let mut db = db.map.borrow_mut();
let mut db = db.map.lock().unwrap();
match request {
Request::Get { key } => {
match db.get(&key) {
@ -154,7 +159,7 @@ fn main() {
// runs concurrently with all other clients, for now ignoring any errors
// that we see.
let msg = writes.then(move |_| Ok(()));
handle.spawn(msg);
pool.execute(msg).unwrap();
Ok(())
});

View File

@ -13,6 +13,7 @@
extern crate bytes;
extern crate futures;
extern crate futures_cpupool;
extern crate http;
extern crate httparse;
extern crate num_cpus;
@ -31,8 +32,10 @@ use std::thread;
use bytes::BytesMut;
use futures::future;
use futures::future::Executor;
use futures::sync::mpsc;
use futures::{Stream, Future, Sink};
use futures_cpupool::CpuPool;
use http::{Request, Response, StatusCode};
use http::header::HeaderValue;
use tokio::net::TcpStream;
@ -69,6 +72,8 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new(1);
let done = rx.for_each(move |socket| {
// Associate each socket we get with our local event loop, and then use
// the codec support in the tokio-io crate to deal with discrete
@ -80,10 +85,10 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
let (tx, rx) = socket.framed(Http).split();
tx.send_all(rx.and_then(respond))
});
handle.spawn(req.then(move |result| {
pool.execute(req.then(move |result| {
drop(result);
Ok(())
}));
})).unwrap();
Ok(())
});
core.run(done).unwrap();
@ -95,7 +100,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
/// represents the various handling a server might do. Currently the contents
/// here are pretty uninteresting.
fn respond(req: Request<()>)
-> Box<Future<Item = Response<String>, Error = io::Error>>
-> Box<Future<Item = Response<String>, Error = io::Error> + Send>
{
let mut ret = Response::builder();
let body = match req.uri().path() {

View File

@ -9,11 +9,14 @@
extern crate tokio;
extern crate env_logger;
extern crate futures;
extern crate futures_cpupool;
use std::io;
use std::net::SocketAddr;
use futures::{Future, Stream, Sink};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec};
use tokio::reactor::Core;
@ -39,6 +42,8 @@ fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new(1);
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
// Bind both our sockets and then figure out what ports we got.
@ -73,6 +78,6 @@ fn main() {
let b = b_sink.send_all(b_stream);
// Spawn the sender of pongs and then wait for our pinger to finish.
handle.spawn(b.then(|_| Ok(())));
pool.execute(b.then(|_| Ok(()))).unwrap();
drop(core.run(a));
}

View File

@ -32,10 +32,13 @@
//!
//! ```no_run
//! extern crate futures;
//! extern crate futures_cpupool;
//! extern crate tokio;
//! extern crate tokio_io;
//!
//! use futures::{Future, Stream};
//! use futures::future::Executor;
//! use futures_cpupool::CpuPool;
//! use tokio_io::AsyncRead;
//! use tokio_io::io::copy;
//! use tokio::net::TcpListener;
@ -46,6 +49,8 @@
//! let mut core = Core::new().unwrap();
//! let handle = core.handle();
//!
//! let pool = CpuPool::new_num_cpus();
//!
//! // Bind the server's socket
//! let addr = "127.0.0.1:12345".parse().unwrap();
//! let listener = TcpListener::bind(&addr, &handle).unwrap();
@ -68,7 +73,7 @@
//! });
//!
//! // Spawn the future as a concurrent task
//! handle.spawn(handle_conn);
//! pool.execute(handle_conn).unwrap();
//!
//! Ok(())
//! });

View File

@ -93,13 +93,12 @@ impl TcpListener {
// eventually.
let (tx, rx) = oneshot::channel();
let remote = self.io.remote().clone();
remote.spawn(move |handle| {
remote.run(move |handle| {
let res = PollEvented::new(sock, handle)
.map(move |io| {
(TcpStream { io: io }, addr)
});
drop(tx.send(res));
Ok(())
});
self.pending_accept = Some(rx);
// continue to polling the `rx` at the beginning of the loop

View File

@ -12,8 +12,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::time::{Instant, Duration};
use futures::{Future, IntoFuture, Async};
use futures::future::{self, Executor, ExecuteError};
use futures::{Future, Async};
use futures::executor::{self, Spawn, Notify};
use futures::sync::mpsc;
use futures::task::Task;
@ -58,7 +57,6 @@ struct Inner {
// Dispatch slabs for I/O and futures events
io_dispatch: Slab<ScheduledIo>,
task_dispatch: Slab<ScheduledTask>,
}
/// An unique ID for a Core
@ -95,12 +93,6 @@ struct ScheduledIo {
writer: Option<Task>,
}
struct ScheduledTask {
_registration: mio::Registration,
spawn: Option<Spawn<Box<Future<Item=(), Error=()>>>>,
wake: Option<Arc<MySetReadiness>>,
}
enum Direction {
Read,
Write,
@ -149,7 +141,6 @@ impl Core {
id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed),
io: io,
io_dispatch: Slab::with_capacity(1),
task_dispatch: Slab::with_capacity(1),
})),
})
}
@ -263,11 +254,7 @@ impl Core {
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let token = usize::from(token) - TOKEN_START;
if token % 2 == 0 {
self.dispatch_io(token / 2, ready)
} else {
self.dispatch_task(token / 2)
}
self.dispatch_io(token, ready)
}
fn dispatch_io(&mut self, token: usize, ready: mio::Ready) {
@ -293,37 +280,6 @@ impl Core {
}
}
fn dispatch_task(&mut self, token: usize) {
let mut inner = self.inner.borrow_mut();
let (task, wake) = match inner.task_dispatch.get_mut(token) {
Some(slot) => (slot.spawn.take(), slot.wake.take()),
None => return,
};
let (mut task, wake) = match (task, wake) {
(Some(task), Some(wake)) => (task, wake),
_ => return,
};
wake.0.set_readiness(mio::Ready::empty()).unwrap();
drop(inner);
let res = CURRENT_LOOP.set(self, || {
task.poll_future_notify(&wake, 0)
});
let _task_to_drop;
inner = self.inner.borrow_mut();
match res {
Ok(Async::NotReady) => {
assert!(inner.task_dispatch[token].spawn.is_none());
inner.task_dispatch[token].spawn = Some(task);
inner.task_dispatch[token].wake = Some(wake);
}
Ok(Async::Ready(())) |
Err(()) => {
_task_to_drop = inner.task_dispatch.remove(token);
}
}
drop(inner);
}
/// Method used to notify a task handle.
///
/// Note that this should be used instead of `handle.notify()` to ensure
@ -365,14 +321,6 @@ impl Core {
}
}
impl<F> Executor<F> for Core
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.handle().execute(future)
}
}
impl fmt::Debug for Core {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Core")
@ -397,7 +345,7 @@ impl Inner {
let entry = self.io_dispatch.vacant_entry();
let key = entry.key();
try!(self.io.register(source,
mio::Token(TOKEN_START + key * 2),
mio::Token(TOKEN_START + key),
mio::Ready::readable() |
mio::Ready::writable() |
platform::all(),
@ -433,28 +381,6 @@ impl Inner {
None
}
}
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
if self.task_dispatch.len() == self.task_dispatch.capacity() {
let len = self.task_dispatch.len();
self.task_dispatch.reserve_exact(len);
}
let entry = self.task_dispatch.vacant_entry();
let token = TOKEN_START + 2 * entry.key() + 1;
let pair = mio::Registration::new2();
self.io.register(&pair.0,
mio::Token(token),
mio::Ready::readable(),
mio::PollOpt::level())
.expect("cannot fail future registration with mio");
let unpark = Arc::new(MySetReadiness(pair.1));
unpark.notify(0);
entry.insert(ScheduledTask {
spawn: Some(executor::spawn(future)),
wake: Some(unpark),
_registration: pair.0,
});
}
}
impl Remote {
@ -525,14 +451,11 @@ impl Remote {
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static,
pub(crate) fn run<F>(&self, f: F)
where F: FnOnce(&Handle) + Send + 'static,
{
self.send(Message::Run(Box::new(|lp: &Core| {
let f = f(&lp.handle());
lp.inner.borrow_mut().spawn(Box::new(f.into_future()));
f(&lp.handle());
})));
}
@ -569,15 +492,6 @@ impl Remote {
}
}
impl<F> Executor<F> for Remote
where F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(|_| future);
Ok(())
}
}
impl fmt::Debug for Remote {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Remote")
@ -592,57 +506,12 @@ impl Handle {
&self.remote
}
/// Spawns a new future on the event loop this handle is associated with.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn<F>(&self, f: F)
where F: Future<Item=(), Error=()> + 'static,
{
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.borrow_mut().spawn(Box::new(f));
}
/// Spawns a closure on this event loop.
///
/// This function is a convenience wrapper around the `spawn` function above
/// for running a closure wrapped in `futures::lazy`. It will spawn the
/// function `f` provided onto the event loop, and continue to run the
/// future returned by `f` on the event loop as well.
///
/// # Panics
///
/// This method will **not** catch panics from polling the future `f`. If
/// the future panics then it's the responsibility of the caller to catch
/// that panic and handle it as appropriate.
pub fn spawn_fn<F, R>(&self, f: F)
where F: FnOnce() -> R + 'static,
R: IntoFuture<Item=(), Error=()> + 'static,
{
self.spawn(future::lazy(f))
}
/// Return the ID of the represented Core
pub fn id(&self) -> CoreId {
self.remote.id()
}
}
impl<F> Executor<F> for Handle
where F: Future<Item = (), Error = ()> + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.spawn(future);
Ok(())
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Handle")

View File

@ -1,5 +1,6 @@
extern crate env_logger;
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
extern crate bytes;
@ -9,6 +10,8 @@ use std::net::Shutdown;
use bytes::{BytesMut, BufMut};
use futures::{Future, Stream, Sink};
use futures::future::Executor;
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio::reactor::Core;
use tokio_io::codec::{Encoder, Decoder};
@ -55,16 +58,19 @@ fn echo() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let pool = CpuPool::new(1);
let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
let addr = listener.local_addr().unwrap();
let pool_inner = pool.clone();
let srv = listener.incoming().for_each(move |(socket, _)| {
let (sink, stream) = socket.framed(LineCodec).split();
handle.spawn(sink.send_all(stream).map(|_| ()).map_err(|_| ()));
pool_inner.execute(sink.send_all(stream).map(|_| ()).map_err(|_| ())).unwrap();
Ok(())
});
let handle = core.handle();
handle.spawn(srv.map_err(|e| panic!("srv error: {}", e)));
pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap();
let client = TcpStream::connect(&addr, &handle);
let client = core.run(client).unwrap();