I/O resources lazily bind to reactor. (#160)

This patch makes a significant change to how I/O resources bind to a
reactor. Currently, an I/O resource (TCP, UDP, PollEvented) will bind
itself with a reactor upon creation.

First, some history.

Originally, tokio-core required that I/O resources be explicitly
associated with a reactor upon creation by passing in a `&Handle`. Tokio
reform introduced a default reactor. If I/O resources do not specify a
reactor upon creation, then the default reactor is used.

However, futures tend to favor being lazy. Creating a future should do
no work, instead it is defining a computation to be performed once the
future is executed. Binding an I/O resource with a reactor on creation
goes against this pattern.

This patch fixes this by allowing I/O resources to lazily bind to a
reactor. An explicit `&Handle` can still be used on creation, but if no
reactor is specified, then the default reactor is used. However, this
binding happens during execution time (read / write) and not creation.
This commit is contained in:
Carl Lerche 2018-02-28 09:03:13 -08:00 committed by GitHub
parent 1190176be7
commit 2eabc37599
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1275 additions and 86 deletions

191
src/atomic_task.rs Normal file
View File

@ -0,0 +1,191 @@
use futures::task::{self, Task};
use std::fmt;
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Release};
/// A synchronization primitive for task notification.
///
/// `AtomicTask` will coordinate concurrent notifications with the consumer
/// potentially "updating" the underlying task to notify. This is useful in
/// scenarios where a computation completes in another thread and wants to
/// notify the consumer, but the consumer is in the process of being migrated to
/// a new logical task.
///
/// Consumers should call `register` before checking the result of a computation
/// and producers should call `notify` after producing the computation (this
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `notify` to be called **before** `register`. This results in a no-op.
///
/// A single `AtomicTask` may be reused for any number of calls to `register` or
/// `notify`.
///
/// `AtomicTask` does not provide any memory ordering guarantees, as such the
/// user should use caution and use other synchronization primitives to guard
/// the result of the underlying computation.
pub struct AtomicTask {
state: AtomicUsize,
task: UnsafeCell<Option<Task>>,
}
/// Initial state, the `AtomicTask` is currently not being used.
///
/// The value `2` is picked specifically because it between the write lock &
/// read lock values. Since the read lock is represented by an incrementing
/// counter, this enables an atomic fetch_sub operation to be used for releasing
/// a lock.
const WAITING: usize = 2;
/// The `register` function has determined that the task is no longer current.
/// This implies that `AtomicTask::register` is being called from a different
/// task than is represented by the currently stored task. The write lock is
/// obtained to update the task cell.
const LOCKED_WRITE: usize = 0;
/// At least one call to `notify` happened concurrently to `register` updating
/// the task cell. This state is detected when `register` exits the mutation
/// code and signals to `register` that it is responsible for notifying its own
/// task.
const LOCKED_WRITE_NOTIFIED: usize = 1;
/// The `notify` function has locked access to the task cell for notification.
///
/// The constant is left here mostly for documentation reasons.
#[allow(dead_code)]
const LOCKED_READ: usize = 3;
impl AtomicTask {
/// Create an `AtomicTask` initialized with the given `Task`
pub fn new() -> AtomicTask {
// Make sure that task is Sync
trait AssertSync: Sync {}
impl AssertSync for Task {}
AtomicTask {
state: AtomicUsize::new(WAITING),
task: UnsafeCell::new(None),
}
}
/// Registers the **current** task to be notified on calls to `notify`.
pub fn register(&self) {
self.register_task(task::current());
}
/// Registers the task to be notified on calls to `notify`.
///
/// The new task will take place of any previous tasks that were registered
/// by previous calls to `register`. Any calls to `notify` that happen after
/// a call to `register` (as defined by the memory ordering rules), will
/// notify the `register` caller's task.
///
/// It is safe to call `register` with multiple other threads concurrently
/// calling `notify`. This will result in the `register` caller's current
/// task being notified once.
///
/// This function is safe to call concurrently, but this is generally a bad
/// idea. Concurrent calls to `register` will attempt to register different
/// tasks to be notified. One of the callers will win and have its task set,
/// but there is no guarantee as to which caller will succeed.
pub fn register_task(&self, task: Task) {
match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) {
WAITING => {
unsafe {
// Locked acquired, update the task cell
*self.task.get() = Some(task);
// Release the lock. If the state transitioned to
// `LOCKED_NOTIFIED`, this means that an notify has been
// signaled, so notify the task.
if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) {
(*self.task.get()).as_ref().unwrap().notify();
}
}
}
LOCKED_WRITE | LOCKED_WRITE_NOTIFIED => {
// A thread is concurrently calling `register`. This shouldn't
// happen as it doesn't really make much sense, but it isn't
// unsafe per se. Since two threads are concurrently trying to
// update the task, it's undefined which one "wins" (no ordering
// guarantees), so we can just do nothing.
}
state => {
debug_assert!(state != LOCKED_WRITE, "unexpected state LOCKED_WRITE");
debug_assert!(state != LOCKED_WRITE_NOTIFIED, "unexpected state LOCKED_WRITE_NOTIFIED");
// Currently in a read locked state, this implies that `notify`
// is currently being called on the old task handle. So, we call
// notify on the new task handle
task.notify();
}
}
}
/// Notifies the task that last called `register`.
///
/// If `register` has not been called yet, then this does nothing.
pub fn notify(&self) {
let mut curr = WAITING;
loop {
if curr == LOCKED_WRITE {
// Transition the state to LOCKED_NOTIFIED
let actual = self.state.compare_and_swap(LOCKED_WRITE, LOCKED_WRITE_NOTIFIED, Release);
if curr == actual {
// Success, return
return;
}
// update current state variable and try again
curr = actual;
} else if curr == LOCKED_WRITE_NOTIFIED {
// Currently in `LOCKED_WRITE_NOTIFIED` state, nothing else to do.
return;
} else {
// Currently in a LOCKED_READ state, so attempt to increment the
// lock count.
let actual = self.state.compare_and_swap(curr, curr + 1, Acquire);
// Locked acquired
if actual == curr {
// Notify the task
unsafe {
if let Some(ref task) = *self.task.get() {
task.notify();
}
}
// Release the lock
self.state.fetch_sub(1, Release);
// Done
return;
}
// update current state variable and try again
curr = actual;
}
}
}
}
impl Default for AtomicTask {
fn default() -> Self {
AtomicTask::new()
}
}
impl fmt::Debug for AtomicTask {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "AtomicTask")
}
}
unsafe impl Send for AtomicTask {}
unsafe impl Sync for AtomicTask {}

View File

@ -107,3 +107,5 @@ pub mod runtime;
pub use executor::spawn;
pub use runtime::run;
mod atomic_task;

View File

@ -11,14 +11,14 @@ use iovec::IoVec;
use mio;
use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, PollEvented};
use reactor::{Handle, PollEvented2};
/// An I/O object representing a TCP socket listening for incoming connections.
///
/// This object can be converted into a stream of incoming connections for
/// various forms of processing.
pub struct TcpListener {
io: PollEvented<mio::net::TcpListener>,
io: PollEvented2<mio::net::TcpListener>,
}
/// Stream returned by the `TcpListener::incoming` function representing the
@ -35,8 +35,8 @@ impl TcpListener {
/// The TCP listener will bind to the provided `addr` address, if available.
/// If the result is `Ok`, the socket has successfully bound.
pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> {
let l = try!(mio::net::TcpListener::bind(addr));
TcpListener::new(l, &Handle::default())
let l = mio::net::TcpListener::bind(addr)?;
Ok(TcpListener::new(l))
}
/// Attempt to accept a connection and create a new connected `TcpStream` if
@ -58,9 +58,13 @@ impl TcpListener {
/// future's task. It's recommended to only call this from the
/// implementation of a `Future::poll`, if necessary.
pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> {
let (stream, addr) = self.accept_std()?;
let stream = TcpStream::from_std(stream, self.io.handle())?;
Ok((stream, addr))
let (io, addr) = self.accept_std()?;
let io = mio::net::TcpStream::from_stream(io)?;
let io = PollEvented2::new(io);
let io = TcpStream { io };
Ok((io, addr))
}
/// Attempt to accept a connection and create a new connected `TcpStream` if
@ -76,7 +80,7 @@ impl TcpListener {
/// This function will panic for the same reasons as `accept`, notably if
/// called outside the context of a future.
pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> {
if let Async::NotReady = self.io.poll_read() {
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
@ -118,16 +122,17 @@ impl TcpListener {
/// will only be for the same IP version as `addr` specified. That is, if
/// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
/// well (same for IPv6).
pub fn from_std(listener: net::TcpListener,
handle: &Handle) -> io::Result<TcpListener> {
let l = mio::net::TcpListener::from_std(listener)?;
TcpListener::new(l, handle)
pub fn from_std(listener: net::TcpListener, handle: &Handle)
-> io::Result<TcpListener>
{
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented2::new_with_handle(io, handle)?;
Ok(TcpListener { io })
}
fn new(listener: mio::net::TcpListener, handle: &Handle)
-> io::Result<TcpListener> {
let io = try!(PollEvented::new(listener, handle));
Ok(TcpListener { io: io })
fn new(listener: mio::net::TcpListener) -> TcpListener {
let io = PollEvented2::new(listener);
TcpListener { io }
}
/// Returns the local address that this listener is bound to.
@ -190,7 +195,7 @@ impl Stream for Incoming {
/// [accepting]: struct.TcpListener.html#method.accept
/// [listener]: struct.TcpListener.html
pub struct TcpStream {
io: PollEvented<mio::net::TcpStream>,
io: PollEvented2<mio::net::TcpStream>,
}
/// Future returned by `TcpStream::connect` which will resolve to a `TcpStream`
@ -217,19 +222,19 @@ impl TcpStream {
/// stream has successfully connected, or it wil return an error if one
/// occurs.
pub fn connect(addr: &SocketAddr) -> ConnectFuture {
use self::ConnectFutureState::*;
let inner = match mio::net::TcpStream::connect(addr) {
Ok(tcp) => TcpStream::new(tcp, &Handle::default()),
Err(e) => ConnectFutureState::Error(e),
Ok(tcp) => Waiting(TcpStream::new(tcp)),
Err(e) => Error(e),
};
ConnectFuture { inner: inner }
ConnectFuture { inner }
}
fn new(connected_stream: mio::net::TcpStream, handle: &Handle)
-> ConnectFutureState {
match PollEvented::new(connected_stream, handle) {
Ok(io) => ConnectFutureState::Waiting(TcpStream { io: io }),
Err(e) => ConnectFutureState::Error(e),
}
fn new(connected: mio::net::TcpStream) -> TcpStream {
let io = PollEvented2::new(connected);
TcpStream { io }
}
/// Create a new `TcpStream` from a `net::TcpStream`.
@ -241,10 +246,10 @@ impl TcpStream {
pub fn from_std(stream: net::TcpStream, handle: &Handle)
-> io::Result<TcpStream>
{
let inner = mio::net::TcpStream::from_stream(stream)?;
Ok(TcpStream {
io: try!(PollEvented::new(inner, handle)),
})
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented2::new_with_handle(io, handle)?;
Ok(TcpStream { io })
}
/// Creates a new `TcpStream` from the pending socket inside the given
@ -270,10 +275,16 @@ impl TcpStream {
handle: &Handle)
-> ConnectFuture
{
let inner = match mio::net::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => ConnectFutureState::Error(e),
use self::ConnectFutureState::*;
let io = mio::net::TcpStream::connect_stream(stream, addr)
.and_then(|io| PollEvented2::new_with_handle(io, handle));
let inner = match io {
Ok(io) => Waiting(TcpStream { io }),
Err(e) => Error(e),
};
ConnectFuture { inner: inner }
}
@ -294,7 +305,7 @@ impl TcpStream {
/// Successive calls return the same data. This is accomplished by passing
/// `MSG_PEEK` as a flag to the underlying recv system call.
pub fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.io.poll_read() {
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
@ -440,6 +451,8 @@ impl TcpStream {
}
}
// ===== impl Read / Write =====
impl Read for TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.io.read(buf)
@ -461,7 +474,45 @@ impl AsyncRead for TcpStream {
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = self.io.poll_read() {
<&TcpStream>::read_buf(&mut &*self, buf)
}
}
impl AsyncWrite for TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
<&TcpStream>::shutdown(&mut &*self)
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
<&TcpStream>::write_buf(&mut &*self, buf)
}
}
// ===== impl Read / Write for &'a =====
impl<'a> Read for &'a TcpStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
(&self.io).read(buf)
}
}
impl<'a> Write for &'a TcpStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
(&self.io).write(buf)
}
fn flush(&mut self) -> io::Result<()> {
(&self.io).flush()
}
}
impl<'a> AsyncRead for &'a TcpStream {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = self.io.poll_read_ready()? {
return Ok(Async::NotReady)
}
@ -509,13 +560,13 @@ impl AsyncRead for TcpStream {
}
}
impl AsyncWrite for TcpStream {
impl<'a> AsyncWrite for &'a TcpStream {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
if let Async::NotReady = self.io.poll_write() {
if let Async::NotReady = self.io.poll_write_ready()? {
return Ok(Async::NotReady)
}
@ -582,9 +633,10 @@ impl Future for ConnectFutureState {
// actually hit an error or not.
//
// If all that succeeded then we ship everything on up.
if let Async::NotReady = stream.io.poll_write() {
if let Async::NotReady = stream.io.poll_write_ready()? {
return Ok(Async::NotReady)
}
if let Some(e) = try!(stream.io.get_ref().take_error()) {
return Err(e)
}

View File

@ -5,11 +5,11 @@ use std::fmt;
use futures::{Async, Future, Poll};
use mio;
use reactor::{Handle, PollEvented};
use reactor::{Handle, PollEvented2};
/// An I/O object representing a UDP socket.
pub struct UdpSocket {
io: PollEvented<mio::net::UdpSocket>,
io: PollEvented2<mio::net::UdpSocket>,
}
mod frame;
@ -19,13 +19,13 @@ impl UdpSocket {
/// This function will create a new UDP socket and attempt to bind it to
/// the `addr` provided.
pub fn bind(addr: &SocketAddr) -> io::Result<UdpSocket> {
let udp = try!(mio::net::UdpSocket::bind(addr));
UdpSocket::new(udp, &Handle::default())
mio::net::UdpSocket::bind(addr)
.map(UdpSocket::new)
}
fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> {
let io = try!(PollEvented::new(socket, handle));
Ok(UdpSocket { io: io })
fn new(socket: mio::net::UdpSocket) -> UdpSocket {
let io = PollEvented2::new(socket);
UdpSocket { io: io }
}
/// Creates a new `UdpSocket` from the previously bound socket provided.
@ -39,8 +39,9 @@ impl UdpSocket {
/// `reuse_address` or binding to multiple addresses.
pub fn from_std(socket: net::UdpSocket,
handle: &Handle) -> io::Result<UdpSocket> {
let udp = try!(mio::net::UdpSocket::from_socket(socket));
UdpSocket::new(udp, handle)
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented2::new_with_handle(io, handle)?;
Ok(UdpSocket { io })
}
/// Returns the local address that this socket is bound to.
@ -63,7 +64,7 @@ impl UdpSocket {
/// This function will panic if called outside the context of a future's
/// task.
pub fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.io.poll_write() {
if let Async::NotReady = self.io.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
@ -86,7 +87,7 @@ impl UdpSocket {
/// This function will panic if called outside the context of a future's
/// task.
pub fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.io.poll_read() {
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
@ -112,7 +113,7 @@ impl UdpSocket {
/// This function will panic if called outside the context of a future's
/// task.
pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
if let Async::NotReady = self.io.poll_write() {
if let Async::NotReady = self.io.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
@ -155,7 +156,7 @@ impl UdpSocket {
/// This function will panic if called outside the context of a future's
/// task.
pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
if let Async::NotReady = self.io.poll_read() {
if let Async::NotReady = self.io.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}

View File

@ -4,9 +4,10 @@ use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use atomic_task::AtomicTask;
use reactor::{Reactor, Handle};
use futures::{Future, Async, Poll};
use futures::task::AtomicTask;
/// Handle to the reactor running on a background thread.
#[derive(Debug)]
@ -117,6 +118,8 @@ impl Drop for Background {
None => return,
};
inner.shutdown_now();
let shutdown = Shutdown { inner };
let _ = shutdown.wait();
}

View File

@ -19,6 +19,8 @@
use tokio_executor::Enter;
use tokio_executor::park::{Park, Unpark};
use atomic_task::AtomicTask;
use std::{fmt, usize};
use std::io::{self, ErrorKind};
use std::mem;
@ -29,17 +31,24 @@ use std::sync::{Arc, Weak, RwLock};
use std::time::{Duration, Instant};
use log::Level;
use futures::task::AtomicTask;
use mio;
use mio::event::Evented;
use slab::Slab;
use futures::task::Task;
pub(crate) mod background;
use self::background::Background;
mod poll_evented;
#[allow(deprecated)]
pub use self::poll_evented::PollEvented;
mod registration;
pub use self::registration::Registration;
mod poll_evented2;
pub use self::poll_evented2::PollEvented as PollEvented2;
/// The core reactor, or event loop.
///
/// The event loop is the main source of blocking in an application which drives
@ -100,7 +109,8 @@ struct ScheduledIo {
writer: AtomicTask,
}
enum Direction {
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(crate) enum Direction {
Read,
Write,
}
@ -358,11 +368,24 @@ impl fmt::Debug for Reactor {
impl Handle {
/// Returns a handle to the current reactor.
pub fn current() -> Handle {
Handle::default()
Handle::try_current()
.unwrap_or(Handle { inner: Weak::new() })
}
/// Try to get a handle to the current reactor.
///
/// Returns `Err` if no handle is found.
pub(crate) fn try_current() -> io::Result<Handle> {
CURRENT_REACTOR.with(|current| {
match *current.borrow() {
Some(ref handle) => Ok(handle.clone()),
None => Handle::fallback(),
}
})
}
/// Returns a handle to the fallback reactor.
fn fallback() -> Handle {
fn fallback() -> io::Result<Handle> {
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
// If the fallback hasn't been previously initialized then let's spin
@ -373,7 +396,8 @@ impl Handle {
if fallback == 0 {
let reactor = match Reactor::new() {
Ok(reactor) => reactor,
Err(_) => return Handle { inner: Weak::new() },
Err(_) => return Err(io::Error::new(io::ErrorKind::Other,
"failed to create reactor")),
};
// If we successfully set ourselves as the actual fallback then we
@ -392,7 +416,7 @@ impl Handle {
Err(_) => {}
}
return ret
return Ok(ret);
}
fallback = HANDLE_FALLBACK.load(SeqCst);
@ -403,12 +427,14 @@ impl Handle {
// handle as we don't actually have an owning reference to it.
assert!(fallback != 0);
unsafe {
let ret = unsafe {
let handle = Handle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
return ret
}
ret
};
Ok(ret)
}
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
@ -450,12 +476,7 @@ impl Unpark for Handle {
impl Default for Handle {
fn default() -> Handle {
CURRENT_REACTOR.with(|current| {
match *current.borrow() {
Some(ref handle) => handle.clone(),
None => Handle::fallback(),
}
})
Handle::current()
}
}
@ -490,7 +511,8 @@ impl Inner {
let mut io_dispatch = self.io_dispatch.write().unwrap();
if io_dispatch.len() == MAX_SOURCES {
return Err(io::Error::new(io::ErrorKind::Other, "reactor at max registered I/O resources"));
return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \
registered I/O resources"));
}
// Acquire a write lock
@ -520,7 +542,7 @@ impl Inner {
}
/// Registers interest in the I/O resource associated with `token`.
fn schedule(&self, token: usize, dir: Direction) {
fn register(&self, token: usize, dir: Direction, t: Task) {
debug!("scheduling direction for: {}", token);
let io_dispatch = self.io_dispatch.read().unwrap();
let sched = io_dispatch.get(token).unwrap();
@ -530,7 +552,7 @@ impl Inner {
Direction::Write => (&sched.writer, mio::Ready::writable()),
};
task.register();
task.register_task(t);
if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 {
task.notify();
@ -551,14 +573,33 @@ impl Drop for Inner {
}
}
impl Direction {
fn ready(&self) -> mio::Ready {
match *self {
Direction::Read => read_ready(),
Direction::Write => write_ready(),
}
}
fn mask(&self) -> usize {
ready2usize(self.ready())
}
}
// ===== misc =====
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn read_ready() -> mio::Ready {
mio::Ready::readable() | platform::hup()
}
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn write_ready() -> mio::Ready {
mio::Ready::writable()
}
// === legacy
fn ready2usize(ready: mio::Ready) -> usize {
let mut bits = 0;

View File

@ -6,11 +6,13 @@
//! acquisition of a token, and tracking of the readiness state on the
//! underlying I/O primitive.
#![allow(deprecated)]
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::Ordering;
use futures::{Async, Poll};
use futures::{task, Async, Poll};
use mio::event::Evented;
use mio::Ready;
use tokio_io::{AsyncRead, AsyncWrite};
@ -68,6 +70,8 @@ struct Registration {
/// Essentially a good rule of thumb is that if you're using the `poll_ready`
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
#[deprecated(since = "0.1.2", note = "PollEvented2 instead")]
#[doc(hidden)]
pub struct PollEvented<E> {
registration: Registration,
io: E,
@ -233,12 +237,7 @@ impl<E> PollEvented<E> {
let bits = super::ready2usize(super::read_ready());
self.registration.readiness &= !bits;
let inner = match self.registration.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.registration.token, Direction::Read);
Ok(())
self.register(Direction::Read)
}
/// Indicates to this source of events that the corresponding I/O object is
@ -273,12 +272,7 @@ impl<E> PollEvented<E> {
let bits = super::ready2usize(Ready::writable());
self.registration.readiness &= !bits;
let inner = match self.registration.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.schedule(self.registration.token, Direction::Write);
Ok(())
self.register(Direction::Write)
}
/// Returns a reference to the event loop handle that this readiness stream
@ -326,6 +320,16 @@ impl<E> PollEvented<E> {
inner.deregister_source(&self.io)
}
fn register(&self, dir: Direction) -> io::Result<()> {
let inner = match self.registration.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.register(self.registration.token, dir, task::current());
Ok(())
}
}
impl<E: Read> Read for PollEvented<E> {

View File

@ -0,0 +1,419 @@
//!
//! Readiness tracking streams, backing I/O objects.
//!
//! This module contains the core type which is used to back all I/O on object
//! in `tokio-core`. The `PollEvented` type is the implementation detail of
//! all I/O. Each `PollEvented` manages registration with a reactor,
//! acquisition of a token, and tracking of the readiness state on the
//! underlying I/O primitive.
#![allow(warnings)]
use reactor::Handle;
use reactor::registration::Registration;
use futures::{task, Async, Poll};
use mio;
use mio::event::Evented;
use tokio_io::{AsyncRead, AsyncWrite};
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
///
/// Created by the `PollEvented::new` method, each `PollEvented` is
/// associated with a specific event loop and source of events that will be
/// registered with an event loop.
///
/// An instance of `PollEvented` is essentially the bridge between the `mio`
/// world and the `tokio-core` world, providing abstractions to receive
/// notifications about changes to an object's `mio::Ready` state.
///
/// Each readiness stream has a number of methods to test whether the underlying
/// object is readable or writable. Once the methods return that an object is
/// readable/writable, then it will continue to do so until the `need_read` or
/// `need_write` methods are called.
///
/// That is, this object is typically wrapped in another form of I/O object.
/// It's the responsibility of the wrapper to inform the readiness stream when a
/// "would block" I/O event is seen. The readiness stream will then take care of
/// any scheduling necessary to get notified when the event is ready again.
///
/// You can find more information about creating a custom I/O object [online].
///
/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io
///
/// ## Readiness to read/write
///
/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready`
/// instance, including the platform-specific contents of `mio::Ready`. At most
/// two future tasks, however, can be waiting on a `PollEvented`. The
/// `need_read` and `need_write` methods can block two separate tasks, one on
/// reading and one on writing. Not all I/O events correspond to read/write,
/// however!
///
/// To account for this a `PollEvented` gets a little interesting when working
/// with an arbitrary instance of `mio::Ready` that may not map precisely to
/// "write" and "read" tasks. Currently it is defined that instances of
/// `mio::Ready` that do *not* return true from `is_writable` are all notified
/// through `need_read`, or the read task.
///
/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block
/// the read task of this `PollEvented` if the `hup` event isn't available.
/// Essentially a good rule of thumb is that if you're using the `poll_ready`
/// method you want to also use `need_read` to signal blocking and you should
/// otherwise probably avoid using two tasks on the same `PollEvented`.
pub struct PollEvented<E> {
io: E,
inner: Inner,
}
struct Inner {
registration: Registration,
/// Currently visible read readiness
read_readiness: AtomicUsize,
/// Currently visible write readiness
write_readiness: AtomicUsize,
}
// ===== impl PollEvented =====
impl<E> PollEvented<E>
where E: Evented
{
/// Creates a new `PollEvented` associated with the default reactor.
pub fn new(io: E) -> PollEvented<E> {
PollEvented {
io: io,
inner: Inner {
registration: Registration::new(),
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
}
}
}
/// Creates a new `PollEvented` associated with the specified reactor.
pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> {
let ret = PollEvented::new(io);
ret.inner.registration.register_with(&ret.io, handle)?;
Ok(ret)
}
/// Tests to see if this source is ready to be read from or not.
///
/// If this stream is not ready for a read then `Async::NotReady` will be
/// returned and the current task will be scheduled to receive a
/// notification when the stream is readable again. In other words, this
/// method is only safe to call from within the context of a future's task,
/// typically done in a `Future::poll` method.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.register()?;
// Load the cached readiness
match self.inner.read_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_read_ready()? {
n |= super::ready2usize(ready);
self.inner.read_readiness.store(n, Relaxed);
}
return Ok(super::usize2ready(n).into());
}
}
let ready = try_ready!(self.inner.registration.poll_read_ready());
// Cache the value
self.inner.read_readiness.store(super::ready2usize(ready), Relaxed);
Ok(ready.into())
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer readable, but it needs to be.
///
/// This function, like `poll_read`, is only safe to call from the context
/// of a future's task (typically in a `Future::poll` implementation). It
/// informs this readiness stream that the underlying object is no longer
/// readable, typically because a "would block" error was seen.
///
/// *All* readiness bits associated with this stream except the writable bit
/// will be reset when this method is called. The current task is then
/// scheduled to receive a notification whenever anything changes other than
/// the writable bit. Note that this typically just means the readable bit
/// is used here, but if you're using a custom I/O object for events like
/// hup/error this may also be relevant.
///
/// Note that it is also only valid to call this method if `poll_read`
/// previously indicated that the object is readable. That is, this function
/// must always be paired with calls to `poll_read` previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_read(&self) -> io::Result<()> {
self.inner.read_readiness.store(0, Relaxed);
if self.poll_read_ready()?.is_ready() {
// Notify the current task
task::current().notify();
}
Ok(())
}
/// Tests to see if this source is ready to be written to or not.
///
/// If this stream is not ready for a write then `Async::NotReady` will be
/// returned and the current task will be scheduled to receive a
/// notification when the stream is writable again. In other words, this
/// method is only safe to call from within the context of a future's task,
/// typically done in a `Future::poll` method.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.register()?;
match self.inner.write_readiness.load(Relaxed) {
0 => {}
mut n => {
// Check what's new with the reactor.
if let Some(ready) = self.inner.registration.take_write_ready()? {
n |= super::ready2usize(ready);
self.inner.write_readiness.store(n, Relaxed);
}
return Ok(super::usize2ready(n).into());
}
}
let ready = try_ready!(self.inner.registration.poll_write_ready());
// Cache the value
self.inner.write_readiness.store(super::ready2usize(ready), Relaxed);
Ok(ready.into())
}
/// Indicates to this source of events that the corresponding I/O object is
/// no longer writable, but it needs to be.
///
/// This function, like `poll_write_ready`, is only safe to call from the
/// context of a future's task (typically in a `Future::poll`
/// implementation). It informs this readiness stream that the underlying
/// object is no longer writable, typically because a "would block" error
/// was seen.
///
/// The flag indicating that this stream is writable is unset and the
/// current task is scheduled to receive a notification when the stream is
/// then again writable.
///
/// Note that it is also only valid to call this method if
/// `poll_write_ready` previously indicated that the object is writable.
/// That is, this function must always be paired with calls to `poll_write`
/// previously.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's
/// task.
pub fn need_write(&self) -> io::Result<()> {
self.inner.write_readiness.store(0, Relaxed);
if self.poll_write_ready()?.is_ready() {
// Notify the current task
task::current().notify();
}
Ok(())
}
/// Ensure that the I/O resource is registered with the reactor.
fn register(&self) -> io::Result<()> {
self.inner.registration.register(&self.io)?;
Ok(())
}
}
impl<E> PollEvented<E> {
/// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_ref(&self) -> &E {
&self.io
}
/// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping.
pub fn get_mut(&mut self) -> &mut E {
&mut self.io
}
/// Consumes self, returning the inner I/O object
pub fn into_inner(self) -> E {
self.io
}
}
// ===== Read / Write impls =====
impl<E> Read for PollEvented<E>
where E: Evented + Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read()?;
}
return r
}
}
impl<E> Write for PollEvented<E>
where E: Evented + Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write()?;
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write()?;
}
return r
}
}
impl<E> AsyncRead for PollEvented<E>
where E: Evented + Read,
{
}
impl<E> AsyncWrite for PollEvented<E>
where E: Evented + Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
// ===== &'a Read / &'a Write impls =====
impl<'a, E> Read for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().read(buf);
if is_wouldblock(&r) {
self.need_read()?;
}
return r
}
}
impl<'a, E> Write for &'a PollEvented<E>
where E: Evented, &'a E: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().write(buf);
if is_wouldblock(&r) {
self.need_write()?;
}
return r
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write_ready()? {
return Err(io::ErrorKind::WouldBlock.into())
}
let r = self.get_ref().flush();
if is_wouldblock(&r) {
self.need_write()?;
}
return r
}
}
impl<'a, E> AsyncRead for &'a PollEvented<E>
where E: Evented, &'a E: Read,
{
}
impl<'a, E> AsyncWrite for &'a PollEvented<E>
where E: Evented, &'a E: Write,
{
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
impl<E: fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PollEvented")
.field("io", &self.io)
.finish()
}
}

394
src/reactor/registration.rs Normal file
View File

@ -0,0 +1,394 @@
use reactor::{Handle, Direction};
use futures::{Async, Poll};
use futures::task::{self, Task};
use mio::{self, Evented};
use std::{io, mem, usize};
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
/// Handle to a reactor registration.
///
/// A registration represents an I/O resource registered with a Reactor such
/// that it will receive task notifications on readiness.
///
/// The registration is lazily made and supports concurrent operations. This
/// allows a `Registration` instance to be created without the reactor handle
/// that will eventually be used to drive the resource.
///
/// The difficulty is due to the fact that a single registration drives two
/// separate tasks -- A read half and a write half.
#[derive(Debug)]
pub struct Registration {
/// Stores the handle. Once set, the value is not changed.
///
/// Setting this requires acquiring the lock from state.
inner: UnsafeCell<Option<Inner>>,
/// Tracks the state of the registration.
///
/// The least significant 2 bits are used to track the lifecycle of the
/// registration. The rest of the `state` variable is a pointer to tasks
/// that must be notified once the lock is released.
state: AtomicUsize,
}
#[derive(Debug)]
struct Inner {
handle: Handle,
token: usize,
}
/// Tasks waiting on readiness notifications.
#[derive(Debug)]
struct Node {
direction: Direction,
task: Task,
next: Option<Box<Node>>,
}
/// Initial state. The handle is not set and the registration is idle.
const INIT: usize = 0;
/// A thread locked the state and will associate a handle.
const LOCKED: usize = 1;
/// A handle has been associated with the registration.
const READY: usize = 2;
/// Masks the lifecycle state
const LIFECYCLE_MASK: usize = 0b11;
/// A fake token used to identify error situations
const ERROR: usize = usize::MAX;
// ===== impl Registration =====
impl Registration {
/// Create a new `Registration`.
///
/// This registration is not associated with a Reactor instance. Call
/// `register` to establish the association.
pub fn new() -> Registration {
Registration {
inner: UnsafeCell::new(None),
state: AtomicUsize::new(INIT),
}
}
/// Register the I/O resource with the default reactor.
///
/// This function is safe to call concurrently and repeatedly. However, only
/// the first call will establish the registration. Subsequent calls will be
/// no-ops.
///
/// If the registration happened successfully, `Ok(true)` is returned.
///
/// If an I/O resource has previously been successfully registered,
/// `Ok(false)` is returned.
///
/// If an error is encountered during registration, `Err` is returned.
pub fn register<T>(&self, io: &T) -> io::Result<bool>
where T: Evented,
{
self.register2(io, || Handle::try_current())
}
/// Register the I/O resource with the specified reactor.
///
/// This function is safe to call concurrently and repeatedly. However, only
/// the first call will establish the registration. Subsequent calls will be
/// no-ops.
///
/// If the registration happened successfully, `Ok(true)` is returned.
///
/// If an I/O resource has previously been successfully registered,
/// `Ok(false)` is returned.
///
/// If an error is encountered during registration, `Err` is returned.
pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
where T: Evented,
{
self.register2(io, || Ok(handle.clone()))
}
fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
where T: Evented,
F: Fn() -> io::Result<Handle>,
{
let mut state = self.state.load(SeqCst);
loop {
match state {
INIT => {
// Registration is currently not associated with a handle.
// Get a handle then attempt to lock the state.
let handle = f()?;
let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
if actual != state {
state = actual;
continue;
}
// Create the actual registration
let (inner, res) = Inner::new(io, handle);
unsafe { *self.inner.get() = Some(inner); }
// Transition out of the locked state. This acquires the
// current value, potentially having a list of tasks that
// are pending readiness notifications.
let actual = self.state.swap(READY, SeqCst);
// Consume the stack of nodes.
let ptr = actual & !LIFECYCLE_MASK;
if ptr != 0 {
let mut read = false;
let mut write = false;
let mut curr = unsafe { Box::from_raw(ptr as *mut Node) };
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
loop {
let node = *curr;
let Node {
direction,
task,
next,
} = node;
let flag = match direction {
Direction::Read => &mut read,
Direction::Write => &mut write,
};
if !*flag {
*flag = true;
inner.register(direction, task);
}
match next {
Some(next) => curr = next,
None => break,
}
}
}
return res.map(|_| true);
}
_ => return Ok(false),
}
}
}
/// Poll for changes in the I/O resource's read readiness.
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Read, true)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
})
}
/// Try taking the I/O resource's read readiness.
///
/// Unlike `poll_read_ready`, this does not register the current task for
/// notification.
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, false)
}
/// Poll for changes in the I/O resource's write readiness.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Write, true)
.map(|v| match v {
Some(v) => Async::Ready(v),
_ => Async::NotReady,
})
}
/// Try taking the I/O resource's write readiness.
///
/// Unlike `poll_write_ready`, this does not register the current task for
/// notification.
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, false)
}
fn poll_ready(&self, direction: Direction, notify: bool)
-> io::Result<Option<mio::Ready>>
{
let mut state = self.state.load(SeqCst);
// Cache the node pointer
let mut node = None;
loop {
match state {
INIT => {
return Err(io::Error::new(io::ErrorKind::Other, "must call `register`
before poll_read_ready"));
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(direction, notify);
}
_ => {
if !notify {
// Skip the notification tracking junk.
return Ok(None);
}
let ptr = state & !LIFECYCLE_MASK;
// Get the node
let mut n = node.take().unwrap_or_else(|| {
Box::new(Node {
direction,
task: task::current(),
next: None,
})
});
n.next = if ptr == 0 {
None
} else {
// Great care must be taken of the CAS fails
Some(unsafe { Box::from_raw(ptr as *mut Node) })
};
let ptr = Box::into_raw(n);
let next = ptr as usize | (state & LIFECYCLE_MASK);
let actual = self.state.compare_and_swap(state, next, SeqCst);
if actual != state {
// Back out of the node boxing
let mut n = unsafe { Box::from_raw(ptr) };
// We don't really own this
mem::forget(n.next.take());
// Save this for next loop
node = Some(n);
state = actual;
continue;
}
return Ok(None);
}
}
}
}
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
// ===== impl Inner =====
impl Inner {
fn new<T>(io: &T, handle: Handle) -> (Self, io::Result<()>)
where T: Evented,
{
let mut res = Ok(());
let token = match handle.inner() {
Some(inner) => match inner.add_source(io) {
Ok(token) => token,
Err(e) => {
res = Err(e);
ERROR
}
},
None => {
res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
ERROR
}
};
let inner = Inner {
handle,
token,
};
(inner, res)
}
fn register(&self, direction: Direction, task: Task) {
if self.token == ERROR {
task.notify();
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => {
task.notify();
return;
}
};
inner.register(self.token, direction, task);
}
fn poll_ready(&self, direction: Direction, notify: bool)
-> io::Result<Option<mio::Ready>>
{
if self.token == ERROR {
return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
let mask = direction.mask();
let io_dispatch = inner.io_dispatch.read().unwrap();
let sched = &io_dispatch[self.token];
let mut ready = mask & sched.readiness.fetch_and(!mask, SeqCst);
if ready == 0 && notify {
// Update the task info
match direction {
Direction::Read => sched.reader.register(),
Direction::Write => sched.writer.register(),
}
// Try again
ready = mask & sched.readiness.fetch_and(!mask, SeqCst);
}
if ready == 0 {
Ok(None)
} else {
Ok(Some(super::usize2ready(ready)))
}
}
}
impl Drop for Inner {
fn drop(&mut self) {
if self.token == ERROR {
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.token);
}
}

View File

@ -1,11 +1,14 @@
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate env_logger;
use std::thread;
use std::{io, thread};
use std::sync::Arc;
use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
use tokio::runtime::Runtime;
macro_rules! t {
($e:expr) => (match $e {
@ -36,3 +39,82 @@ fn hammer() {
thread.join().unwrap();
}
}
struct Rd(Arc<TcpStream>);
struct Wr(Arc<TcpStream>);
impl io::Read for Rd {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
<&TcpStream>::read(&mut &*self.0, dst)
}
}
impl tokio_io::AsyncRead for Rd {
}
impl io::Write for Wr {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
<&TcpStream>::write(&mut &*self.0, src)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl tokio_io::AsyncWrite for Wr {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
#[test]
fn hammer_split() {
use tokio_io::io;
const N: usize = 100;
let _ = env_logger::init();
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
let addr = t!(srv.local_addr());
let mut rt = Runtime::new().unwrap();
fn split(socket: TcpStream) -> Box<Future<Item = (), Error = ()> + Send> {
let socket = Arc::new(socket);
let rd = Rd(socket.clone());
let wr = Wr(socket);
let rd = io::read(rd, vec![0; 1])
.map(|_| ())
.map_err(|e| panic!("read error = {:?}", e));
let wr = io::write_all(wr, b"1")
.map(|_| ())
.map_err(|e| panic!("write error = {:?}", e));
Box::new({
tokio::spawn(rd)
.join(tokio::spawn(wr))
.map(|_| ())
})
}
rt.spawn({
srv.incoming()
.map_err(|e| panic!("accept error = {:?}", e))
.take(N as u64)
.for_each(|socket| split(socket))
});
for _ in 0..N {
rt.spawn({
TcpStream::connect(&addr)
.map_err(|e| panic!("connect error = {:?}", e))
.and_then(|socket| split(socket))
});
}
rt.shutdown_on_idle().wait().unwrap();
}