diff --git a/src/atomic_task.rs b/src/atomic_task.rs new file mode 100644 index 000000000..1510eee26 --- /dev/null +++ b/src/atomic_task.rs @@ -0,0 +1,191 @@ +use futures::task::{self, Task}; + +use std::fmt; +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, Release}; + +/// A synchronization primitive for task notification. +/// +/// `AtomicTask` will coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to notify. This is useful in +/// scenarios where a computation completes in another thread and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `notify` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `notify` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicTask` may be reused for any number of calls to `register` or +/// `notify`. +/// +/// `AtomicTask` does not provide any memory ordering guarantees, as such the +/// user should use caution and use other synchronization primitives to guard +/// the result of the underlying computation. +pub struct AtomicTask { + state: AtomicUsize, + task: UnsafeCell>, +} + +/// Initial state, the `AtomicTask` is currently not being used. +/// +/// The value `2` is picked specifically because it between the write lock & +/// read lock values. Since the read lock is represented by an incrementing +/// counter, this enables an atomic fetch_sub operation to be used for releasing +/// a lock. +const WAITING: usize = 2; + +/// The `register` function has determined that the task is no longer current. +/// This implies that `AtomicTask::register` is being called from a different +/// task than is represented by the currently stored task. The write lock is +/// obtained to update the task cell. +const LOCKED_WRITE: usize = 0; + +/// At least one call to `notify` happened concurrently to `register` updating +/// the task cell. This state is detected when `register` exits the mutation +/// code and signals to `register` that it is responsible for notifying its own +/// task. +const LOCKED_WRITE_NOTIFIED: usize = 1; + + +/// The `notify` function has locked access to the task cell for notification. +/// +/// The constant is left here mostly for documentation reasons. +#[allow(dead_code)] +const LOCKED_READ: usize = 3; + +impl AtomicTask { + /// Create an `AtomicTask` initialized with the given `Task` + pub fn new() -> AtomicTask { + // Make sure that task is Sync + trait AssertSync: Sync {} + impl AssertSync for Task {} + + AtomicTask { + state: AtomicUsize::new(WAITING), + task: UnsafeCell::new(None), + } + } + + /// Registers the **current** task to be notified on calls to `notify`. + pub fn register(&self) { + self.register_task(task::current()); + } + + /// Registers the task to be notified on calls to `notify`. + /// + /// The new task will take place of any previous tasks that were registered + /// by previous calls to `register`. Any calls to `notify` that happen after + /// a call to `register` (as defined by the memory ordering rules), will + /// notify the `register` caller's task. + /// + /// It is safe to call `register` with multiple other threads concurrently + /// calling `notify`. This will result in the `register` caller's current + /// task being notified once. + /// + /// This function is safe to call concurrently, but this is generally a bad + /// idea. Concurrent calls to `register` will attempt to register different + /// tasks to be notified. One of the callers will win and have its task set, + /// but there is no guarantee as to which caller will succeed. + pub fn register_task(&self, task: Task) { + match self.state.compare_and_swap(WAITING, LOCKED_WRITE, Acquire) { + WAITING => { + unsafe { + // Locked acquired, update the task cell + *self.task.get() = Some(task); + + // Release the lock. If the state transitioned to + // `LOCKED_NOTIFIED`, this means that an notify has been + // signaled, so notify the task. + if LOCKED_WRITE_NOTIFIED == self.state.swap(WAITING, Release) { + (*self.task.get()).as_ref().unwrap().notify(); + } + } + } + LOCKED_WRITE | LOCKED_WRITE_NOTIFIED => { + // A thread is concurrently calling `register`. This shouldn't + // happen as it doesn't really make much sense, but it isn't + // unsafe per se. Since two threads are concurrently trying to + // update the task, it's undefined which one "wins" (no ordering + // guarantees), so we can just do nothing. + } + state => { + debug_assert!(state != LOCKED_WRITE, "unexpected state LOCKED_WRITE"); + debug_assert!(state != LOCKED_WRITE_NOTIFIED, "unexpected state LOCKED_WRITE_NOTIFIED"); + + // Currently in a read locked state, this implies that `notify` + // is currently being called on the old task handle. So, we call + // notify on the new task handle + task.notify(); + } + } + } + + /// Notifies the task that last called `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn notify(&self) { + let mut curr = WAITING; + + loop { + if curr == LOCKED_WRITE { + // Transition the state to LOCKED_NOTIFIED + let actual = self.state.compare_and_swap(LOCKED_WRITE, LOCKED_WRITE_NOTIFIED, Release); + + if curr == actual { + // Success, return + return; + } + + // update current state variable and try again + curr = actual; + + } else if curr == LOCKED_WRITE_NOTIFIED { + // Currently in `LOCKED_WRITE_NOTIFIED` state, nothing else to do. + return; + + } else { + // Currently in a LOCKED_READ state, so attempt to increment the + // lock count. + let actual = self.state.compare_and_swap(curr, curr + 1, Acquire); + + // Locked acquired + if actual == curr { + // Notify the task + unsafe { + if let Some(ref task) = *self.task.get() { + task.notify(); + } + } + + // Release the lock + self.state.fetch_sub(1, Release); + + // Done + return; + } + + // update current state variable and try again + curr = actual; + + } + } + } +} + +impl Default for AtomicTask { + fn default() -> Self { + AtomicTask::new() + } +} + +impl fmt::Debug for AtomicTask { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "AtomicTask") + } +} + +unsafe impl Send for AtomicTask {} +unsafe impl Sync for AtomicTask {} diff --git a/src/lib.rs b/src/lib.rs index 11dfc007c..f36ce326d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,3 +107,5 @@ pub mod runtime; pub use executor::spawn; pub use runtime::run; + +mod atomic_task; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index bebba9674..35cd4f003 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -11,14 +11,14 @@ use iovec::IoVec; use mio; use tokio_io::{AsyncRead, AsyncWrite}; -use reactor::{Handle, PollEvented}; +use reactor::{Handle, PollEvented2}; /// An I/O object representing a TCP socket listening for incoming connections. /// /// This object can be converted into a stream of incoming connections for /// various forms of processing. pub struct TcpListener { - io: PollEvented, + io: PollEvented2, } /// Stream returned by the `TcpListener::incoming` function representing the @@ -35,8 +35,8 @@ impl TcpListener { /// The TCP listener will bind to the provided `addr` address, if available. /// If the result is `Ok`, the socket has successfully bound. pub fn bind(addr: &SocketAddr) -> io::Result { - let l = try!(mio::net::TcpListener::bind(addr)); - TcpListener::new(l, &Handle::default()) + let l = mio::net::TcpListener::bind(addr)?; + Ok(TcpListener::new(l)) } /// Attempt to accept a connection and create a new connected `TcpStream` if @@ -58,9 +58,13 @@ impl TcpListener { /// future's task. It's recommended to only call this from the /// implementation of a `Future::poll`, if necessary. pub fn accept(&mut self) -> io::Result<(TcpStream, SocketAddr)> { - let (stream, addr) = self.accept_std()?; - let stream = TcpStream::from_std(stream, self.io.handle())?; - Ok((stream, addr)) + let (io, addr) = self.accept_std()?; + + let io = mio::net::TcpStream::from_stream(io)?; + let io = PollEvented2::new(io); + let io = TcpStream { io }; + + Ok((io, addr)) } /// Attempt to accept a connection and create a new connected `TcpStream` if @@ -76,7 +80,7 @@ impl TcpListener { /// This function will panic for the same reasons as `accept`, notably if /// called outside the context of a future. pub fn accept_std(&mut self) -> io::Result<(net::TcpStream, SocketAddr)> { - if let Async::NotReady = self.io.poll_read() { + if let Async::NotReady = self.io.poll_read_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } @@ -118,16 +122,17 @@ impl TcpListener { /// will only be for the same IP version as `addr` specified. That is, if /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as /// well (same for IPv6). - pub fn from_std(listener: net::TcpListener, - handle: &Handle) -> io::Result { - let l = mio::net::TcpListener::from_std(listener)?; - TcpListener::new(l, handle) + pub fn from_std(listener: net::TcpListener, handle: &Handle) + -> io::Result + { + let io = mio::net::TcpListener::from_std(listener)?; + let io = PollEvented2::new_with_handle(io, handle)?; + Ok(TcpListener { io }) } - fn new(listener: mio::net::TcpListener, handle: &Handle) - -> io::Result { - let io = try!(PollEvented::new(listener, handle)); - Ok(TcpListener { io: io }) + fn new(listener: mio::net::TcpListener) -> TcpListener { + let io = PollEvented2::new(listener); + TcpListener { io } } /// Returns the local address that this listener is bound to. @@ -190,7 +195,7 @@ impl Stream for Incoming { /// [accepting]: struct.TcpListener.html#method.accept /// [listener]: struct.TcpListener.html pub struct TcpStream { - io: PollEvented, + io: PollEvented2, } /// Future returned by `TcpStream::connect` which will resolve to a `TcpStream` @@ -217,19 +222,19 @@ impl TcpStream { /// stream has successfully connected, or it wil return an error if one /// occurs. pub fn connect(addr: &SocketAddr) -> ConnectFuture { + use self::ConnectFutureState::*; + let inner = match mio::net::TcpStream::connect(addr) { - Ok(tcp) => TcpStream::new(tcp, &Handle::default()), - Err(e) => ConnectFutureState::Error(e), + Ok(tcp) => Waiting(TcpStream::new(tcp)), + Err(e) => Error(e), }; - ConnectFuture { inner: inner } + + ConnectFuture { inner } } - fn new(connected_stream: mio::net::TcpStream, handle: &Handle) - -> ConnectFutureState { - match PollEvented::new(connected_stream, handle) { - Ok(io) => ConnectFutureState::Waiting(TcpStream { io: io }), - Err(e) => ConnectFutureState::Error(e), - } + fn new(connected: mio::net::TcpStream) -> TcpStream { + let io = PollEvented2::new(connected); + TcpStream { io } } /// Create a new `TcpStream` from a `net::TcpStream`. @@ -241,10 +246,10 @@ impl TcpStream { pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result { - let inner = mio::net::TcpStream::from_stream(stream)?; - Ok(TcpStream { - io: try!(PollEvented::new(inner, handle)), - }) + let io = mio::net::TcpStream::from_stream(stream)?; + let io = PollEvented2::new_with_handle(io, handle)?; + + Ok(TcpStream { io }) } /// Creates a new `TcpStream` from the pending socket inside the given @@ -270,10 +275,16 @@ impl TcpStream { handle: &Handle) -> ConnectFuture { - let inner = match mio::net::TcpStream::connect_stream(stream, addr) { - Ok(tcp) => TcpStream::new(tcp, handle), - Err(e) => ConnectFutureState::Error(e), + use self::ConnectFutureState::*; + + let io = mio::net::TcpStream::connect_stream(stream, addr) + .and_then(|io| PollEvented2::new_with_handle(io, handle)); + + let inner = match io { + Ok(io) => Waiting(TcpStream { io }), + Err(e) => Error(e), }; + ConnectFuture { inner: inner } } @@ -294,7 +305,7 @@ impl TcpStream { /// Successive calls return the same data. This is accomplished by passing /// `MSG_PEEK` as a flag to the underlying recv system call. pub fn peek(&mut self, buf: &mut [u8]) -> io::Result { - if let Async::NotReady = self.io.poll_read() { + if let Async::NotReady = self.io.poll_read_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } @@ -440,6 +451,8 @@ impl TcpStream { } } +// ===== impl Read / Write ===== + impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.io.read(buf) @@ -461,7 +474,45 @@ impl AsyncRead for TcpStream { } fn read_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = self.io.poll_read() { + <&TcpStream>::read_buf(&mut &*self, buf) + } +} + +impl AsyncWrite for TcpStream { + fn shutdown(&mut self) -> Poll<(), io::Error> { + <&TcpStream>::shutdown(&mut &*self) + } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + <&TcpStream>::write_buf(&mut &*self, buf) + } +} + +// ===== impl Read / Write for &'a ===== + +impl<'a> Read for &'a TcpStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + (&self.io).read(buf) + } +} + +impl<'a> Write for &'a TcpStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + (&self.io).write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + (&self.io).flush() + } +} + +impl<'a> AsyncRead for &'a TcpStream { + unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { + false + } + + fn read_buf(&mut self, buf: &mut B) -> Poll { + if let Async::NotReady = self.io.poll_read_ready()? { return Ok(Async::NotReady) } @@ -509,13 +560,13 @@ impl AsyncRead for TcpStream { } } -impl AsyncWrite for TcpStream { +impl<'a> AsyncWrite for &'a TcpStream { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) } fn write_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = self.io.poll_write() { + if let Async::NotReady = self.io.poll_write_ready()? { return Ok(Async::NotReady) } @@ -582,9 +633,10 @@ impl Future for ConnectFutureState { // actually hit an error or not. // // If all that succeeded then we ship everything on up. - if let Async::NotReady = stream.io.poll_write() { + if let Async::NotReady = stream.io.poll_write_ready()? { return Ok(Async::NotReady) } + if let Some(e) = try!(stream.io.get_ref().take_error()) { return Err(e) } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 4d1deb4b9..bfac94041 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -5,11 +5,11 @@ use std::fmt; use futures::{Async, Future, Poll}; use mio; -use reactor::{Handle, PollEvented}; +use reactor::{Handle, PollEvented2}; /// An I/O object representing a UDP socket. pub struct UdpSocket { - io: PollEvented, + io: PollEvented2, } mod frame; @@ -19,13 +19,13 @@ impl UdpSocket { /// This function will create a new UDP socket and attempt to bind it to /// the `addr` provided. pub fn bind(addr: &SocketAddr) -> io::Result { - let udp = try!(mio::net::UdpSocket::bind(addr)); - UdpSocket::new(udp, &Handle::default()) + mio::net::UdpSocket::bind(addr) + .map(UdpSocket::new) } - fn new(socket: mio::net::UdpSocket, handle: &Handle) -> io::Result { - let io = try!(PollEvented::new(socket, handle)); - Ok(UdpSocket { io: io }) + fn new(socket: mio::net::UdpSocket) -> UdpSocket { + let io = PollEvented2::new(socket); + UdpSocket { io: io } } /// Creates a new `UdpSocket` from the previously bound socket provided. @@ -39,8 +39,9 @@ impl UdpSocket { /// `reuse_address` or binding to multiple addresses. pub fn from_std(socket: net::UdpSocket, handle: &Handle) -> io::Result { - let udp = try!(mio::net::UdpSocket::from_socket(socket)); - UdpSocket::new(udp, handle) + let io = mio::net::UdpSocket::from_socket(socket)?; + let io = PollEvented2::new_with_handle(io, handle)?; + Ok(UdpSocket { io }) } /// Returns the local address that this socket is bound to. @@ -63,7 +64,7 @@ impl UdpSocket { /// This function will panic if called outside the context of a future's /// task. pub fn send(&mut self, buf: &[u8]) -> io::Result { - if let Async::NotReady = self.io.poll_write() { + if let Async::NotReady = self.io.poll_write_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } @@ -86,7 +87,7 @@ impl UdpSocket { /// This function will panic if called outside the context of a future's /// task. pub fn recv(&mut self, buf: &mut [u8]) -> io::Result { - if let Async::NotReady = self.io.poll_read() { + if let Async::NotReady = self.io.poll_read_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } @@ -112,7 +113,7 @@ impl UdpSocket { /// This function will panic if called outside the context of a future's /// task. pub fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result { - if let Async::NotReady = self.io.poll_write() { + if let Async::NotReady = self.io.poll_write_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } @@ -155,7 +156,7 @@ impl UdpSocket { /// This function will panic if called outside the context of a future's /// task. pub fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - if let Async::NotReady = self.io.poll_read() { + if let Async::NotReady = self.io.poll_read_ready()? { return Err(io::ErrorKind::WouldBlock.into()) } diff --git a/src/reactor/background.rs b/src/reactor/background.rs index 6d4e83115..f0836ceb8 100644 --- a/src/reactor/background.rs +++ b/src/reactor/background.rs @@ -4,9 +4,10 @@ use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; +use atomic_task::AtomicTask; + use reactor::{Reactor, Handle}; use futures::{Future, Async, Poll}; -use futures::task::AtomicTask; /// Handle to the reactor running on a background thread. #[derive(Debug)] @@ -117,6 +118,8 @@ impl Drop for Background { None => return, }; + inner.shutdown_now(); + let shutdown = Shutdown { inner }; let _ = shutdown.wait(); } diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 9ae52b273..4e62eb1d2 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -19,6 +19,8 @@ use tokio_executor::Enter; use tokio_executor::park::{Park, Unpark}; +use atomic_task::AtomicTask; + use std::{fmt, usize}; use std::io::{self, ErrorKind}; use std::mem; @@ -29,17 +31,24 @@ use std::sync::{Arc, Weak, RwLock}; use std::time::{Duration, Instant}; use log::Level; -use futures::task::AtomicTask; use mio; use mio::event::Evented; use slab::Slab; +use futures::task::Task; pub(crate) mod background; use self::background::Background; mod poll_evented; +#[allow(deprecated)] pub use self::poll_evented::PollEvented; +mod registration; +pub use self::registration::Registration; + +mod poll_evented2; +pub use self::poll_evented2::PollEvented as PollEvented2; + /// The core reactor, or event loop. /// /// The event loop is the main source of blocking in an application which drives @@ -100,7 +109,8 @@ struct ScheduledIo { writer: AtomicTask, } -enum Direction { +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(crate) enum Direction { Read, Write, } @@ -358,11 +368,24 @@ impl fmt::Debug for Reactor { impl Handle { /// Returns a handle to the current reactor. pub fn current() -> Handle { - Handle::default() + Handle::try_current() + .unwrap_or(Handle { inner: Weak::new() }) + } + + /// Try to get a handle to the current reactor. + /// + /// Returns `Err` if no handle is found. + pub(crate) fn try_current() -> io::Result { + CURRENT_REACTOR.with(|current| { + match *current.borrow() { + Some(ref handle) => Ok(handle.clone()), + None => Handle::fallback(), + } + }) } /// Returns a handle to the fallback reactor. - fn fallback() -> Handle { + fn fallback() -> io::Result { let mut fallback = HANDLE_FALLBACK.load(SeqCst); // If the fallback hasn't been previously initialized then let's spin @@ -373,7 +396,8 @@ impl Handle { if fallback == 0 { let reactor = match Reactor::new() { Ok(reactor) => reactor, - Err(_) => return Handle { inner: Weak::new() }, + Err(_) => return Err(io::Error::new(io::ErrorKind::Other, + "failed to create reactor")), }; // If we successfully set ourselves as the actual fallback then we @@ -392,7 +416,7 @@ impl Handle { Err(_) => {} } - return ret + return Ok(ret); } fallback = HANDLE_FALLBACK.load(SeqCst); @@ -403,12 +427,14 @@ impl Handle { // handle as we don't actually have an owning reference to it. assert!(fallback != 0); - unsafe { + let ret = unsafe { let handle = Handle::from_usize(fallback); let ret = handle.clone(); drop(handle.into_usize()); - return ret - } + ret + }; + + Ok(ret) } /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise @@ -450,12 +476,7 @@ impl Unpark for Handle { impl Default for Handle { fn default() -> Handle { - CURRENT_REACTOR.with(|current| { - match *current.borrow() { - Some(ref handle) => handle.clone(), - None => Handle::fallback(), - } - }) + Handle::current() } } @@ -490,7 +511,8 @@ impl Inner { let mut io_dispatch = self.io_dispatch.write().unwrap(); if io_dispatch.len() == MAX_SOURCES { - return Err(io::Error::new(io::ErrorKind::Other, "reactor at max registered I/O resources")); + return Err(io::Error::new(io::ErrorKind::Other, "reactor at max \ + registered I/O resources")); } // Acquire a write lock @@ -520,7 +542,7 @@ impl Inner { } /// Registers interest in the I/O resource associated with `token`. - fn schedule(&self, token: usize, dir: Direction) { + fn register(&self, token: usize, dir: Direction, t: Task) { debug!("scheduling direction for: {}", token); let io_dispatch = self.io_dispatch.read().unwrap(); let sched = io_dispatch.get(token).unwrap(); @@ -530,7 +552,7 @@ impl Inner { Direction::Write => (&sched.writer, mio::Ready::writable()), }; - task.register(); + task.register_task(t); if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 { task.notify(); @@ -551,14 +573,33 @@ impl Drop for Inner { } } +impl Direction { + fn ready(&self) -> mio::Ready { + match *self { + Direction::Read => read_ready(), + Direction::Write => write_ready(), + } + } + + fn mask(&self) -> usize { + ready2usize(self.ready()) + } +} + // ===== misc ===== +const READ: usize = 1 << 0; +const WRITE: usize = 1 << 1; + fn read_ready() -> mio::Ready { mio::Ready::readable() | platform::hup() } -const READ: usize = 1 << 0; -const WRITE: usize = 1 << 1; +fn write_ready() -> mio::Ready { + mio::Ready::writable() +} + +// === legacy fn ready2usize(ready: mio::Ready) -> usize { let mut bits = 0; diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 00b71e06a..16f20d491 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -6,11 +6,13 @@ //! acquisition of a token, and tracking of the readiness state on the //! underlying I/O primitive. +#![allow(deprecated)] + use std::fmt; use std::io::{self, Read, Write}; use std::sync::atomic::Ordering; -use futures::{Async, Poll}; +use futures::{task, Async, Poll}; use mio::event::Evented; use mio::Ready; use tokio_io::{AsyncRead, AsyncWrite}; @@ -68,6 +70,8 @@ struct Registration { /// Essentially a good rule of thumb is that if you're using the `poll_ready` /// method you want to also use `need_read` to signal blocking and you should /// otherwise probably avoid using two tasks on the same `PollEvented`. +#[deprecated(since = "0.1.2", note = "PollEvented2 instead")] +#[doc(hidden)] pub struct PollEvented { registration: Registration, io: E, @@ -233,12 +237,7 @@ impl PollEvented { let bits = super::ready2usize(super::read_ready()); self.registration.readiness &= !bits; - let inner = match self.registration.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.schedule(self.registration.token, Direction::Read); - Ok(()) + self.register(Direction::Read) } /// Indicates to this source of events that the corresponding I/O object is @@ -273,12 +272,7 @@ impl PollEvented { let bits = super::ready2usize(Ready::writable()); self.registration.readiness &= !bits; - let inner = match self.registration.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.schedule(self.registration.token, Direction::Write); - Ok(()) + self.register(Direction::Write) } /// Returns a reference to the event loop handle that this readiness stream @@ -326,6 +320,16 @@ impl PollEvented { inner.deregister_source(&self.io) } + + fn register(&self, dir: Direction) -> io::Result<()> { + let inner = match self.registration.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + + inner.register(self.registration.token, dir, task::current()); + Ok(()) + } } impl Read for PollEvented { diff --git a/src/reactor/poll_evented2.rs b/src/reactor/poll_evented2.rs new file mode 100644 index 000000000..9affad0e5 --- /dev/null +++ b/src/reactor/poll_evented2.rs @@ -0,0 +1,419 @@ +//! +//! Readiness tracking streams, backing I/O objects. +//! +//! This module contains the core type which is used to back all I/O on object +//! in `tokio-core`. The `PollEvented` type is the implementation detail of +//! all I/O. Each `PollEvented` manages registration with a reactor, +//! acquisition of a token, and tracking of the readiness state on the +//! underlying I/O primitive. + +#![allow(warnings)] + +use reactor::Handle; +use reactor::registration::Registration; + +use futures::{task, Async, Poll}; +use mio; +use mio::event::Evented; +use tokio_io::{AsyncRead, AsyncWrite}; + +use std::fmt; +use std::io::{self, Read, Write}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; + +/// A concrete implementation of a stream of readiness notifications for I/O +/// objects that originates from an event loop. +/// +/// Created by the `PollEvented::new` method, each `PollEvented` is +/// associated with a specific event loop and source of events that will be +/// registered with an event loop. +/// +/// An instance of `PollEvented` is essentially the bridge between the `mio` +/// world and the `tokio-core` world, providing abstractions to receive +/// notifications about changes to an object's `mio::Ready` state. +/// +/// Each readiness stream has a number of methods to test whether the underlying +/// object is readable or writable. Once the methods return that an object is +/// readable/writable, then it will continue to do so until the `need_read` or +/// `need_write` methods are called. +/// +/// That is, this object is typically wrapped in another form of I/O object. +/// It's the responsibility of the wrapper to inform the readiness stream when a +/// "would block" I/O event is seen. The readiness stream will then take care of +/// any scheduling necessary to get notified when the event is ready again. +/// +/// You can find more information about creating a custom I/O object [online]. +/// +/// [online]: https://tokio.rs/docs/going-deeper-tokio/core-low-level/#custom-io +/// +/// ## Readiness to read/write +/// +/// A `PollEvented` allows listening and waiting for an arbitrary `mio::Ready` +/// instance, including the platform-specific contents of `mio::Ready`. At most +/// two future tasks, however, can be waiting on a `PollEvented`. The +/// `need_read` and `need_write` methods can block two separate tasks, one on +/// reading and one on writing. Not all I/O events correspond to read/write, +/// however! +/// +/// To account for this a `PollEvented` gets a little interesting when working +/// with an arbitrary instance of `mio::Ready` that may not map precisely to +/// "write" and "read" tasks. Currently it is defined that instances of +/// `mio::Ready` that do *not* return true from `is_writable` are all notified +/// through `need_read`, or the read task. +/// +/// In other words, `poll_ready` with the `mio::UnixReady::hup` event will block +/// the read task of this `PollEvented` if the `hup` event isn't available. +/// Essentially a good rule of thumb is that if you're using the `poll_ready` +/// method you want to also use `need_read` to signal blocking and you should +/// otherwise probably avoid using two tasks on the same `PollEvented`. +pub struct PollEvented { + io: E, + inner: Inner, +} + +struct Inner { + registration: Registration, + + /// Currently visible read readiness + read_readiness: AtomicUsize, + + /// Currently visible write readiness + write_readiness: AtomicUsize, +} + +// ===== impl PollEvented ===== + +impl PollEvented +where E: Evented +{ + /// Creates a new `PollEvented` associated with the default reactor. + pub fn new(io: E) -> PollEvented { + PollEvented { + io: io, + inner: Inner { + registration: Registration::new(), + read_readiness: AtomicUsize::new(0), + write_readiness: AtomicUsize::new(0), + } + } + } + + /// Creates a new `PollEvented` associated with the specified reactor. + pub fn new_with_handle(io: E, handle: &Handle) -> io::Result { + let ret = PollEvented::new(io); + ret.inner.registration.register_with(&ret.io, handle)?; + Ok(ret) + } + + /// Tests to see if this source is ready to be read from or not. + /// + /// If this stream is not ready for a read then `Async::NotReady` will be + /// returned and the current task will be scheduled to receive a + /// notification when the stream is readable again. In other words, this + /// method is only safe to call from within the context of a future's task, + /// typically done in a `Future::poll` method. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_read_ready(&self) -> Poll { + self.register()?; + + // Load the cached readiness + match self.inner.read_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = self.inner.registration.take_read_ready()? { + n |= super::ready2usize(ready); + self.inner.read_readiness.store(n, Relaxed); + } + + return Ok(super::usize2ready(n).into()); + } + } + + let ready = try_ready!(self.inner.registration.poll_read_ready()); + + // Cache the value + self.inner.read_readiness.store(super::ready2usize(ready), Relaxed); + + Ok(ready.into()) + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer readable, but it needs to be. + /// + /// This function, like `poll_read`, is only safe to call from the context + /// of a future's task (typically in a `Future::poll` implementation). It + /// informs this readiness stream that the underlying object is no longer + /// readable, typically because a "would block" error was seen. + /// + /// *All* readiness bits associated with this stream except the writable bit + /// will be reset when this method is called. The current task is then + /// scheduled to receive a notification whenever anything changes other than + /// the writable bit. Note that this typically just means the readable bit + /// is used here, but if you're using a custom I/O object for events like + /// hup/error this may also be relevant. + /// + /// Note that it is also only valid to call this method if `poll_read` + /// previously indicated that the object is readable. That is, this function + /// must always be paired with calls to `poll_read` previously. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_read(&self) -> io::Result<()> { + self.inner.read_readiness.store(0, Relaxed); + + if self.poll_read_ready()?.is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Tests to see if this source is ready to be written to or not. + /// + /// If this stream is not ready for a write then `Async::NotReady` will be + /// returned and the current task will be scheduled to receive a + /// notification when the stream is writable again. In other words, this + /// method is only safe to call from within the context of a future's task, + /// typically done in a `Future::poll` method. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn poll_write_ready(&self) -> Poll { + self.register()?; + + match self.inner.write_readiness.load(Relaxed) { + 0 => {} + mut n => { + // Check what's new with the reactor. + if let Some(ready) = self.inner.registration.take_write_ready()? { + n |= super::ready2usize(ready); + self.inner.write_readiness.store(n, Relaxed); + } + + return Ok(super::usize2ready(n).into()); + } + } + + let ready = try_ready!(self.inner.registration.poll_write_ready()); + + // Cache the value + self.inner.write_readiness.store(super::ready2usize(ready), Relaxed); + + Ok(ready.into()) + } + + /// Indicates to this source of events that the corresponding I/O object is + /// no longer writable, but it needs to be. + /// + /// This function, like `poll_write_ready`, is only safe to call from the + /// context of a future's task (typically in a `Future::poll` + /// implementation). It informs this readiness stream that the underlying + /// object is no longer writable, typically because a "would block" error + /// was seen. + /// + /// The flag indicating that this stream is writable is unset and the + /// current task is scheduled to receive a notification when the stream is + /// then again writable. + /// + /// Note that it is also only valid to call this method if + /// `poll_write_ready` previously indicated that the object is writable. + /// That is, this function must always be paired with calls to `poll_write` + /// previously. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's + /// task. + pub fn need_write(&self) -> io::Result<()> { + self.inner.write_readiness.store(0, Relaxed); + + if self.poll_write_ready()?.is_ready() { + // Notify the current task + task::current().notify(); + } + + Ok(()) + } + + /// Ensure that the I/O resource is registered with the reactor. + fn register(&self) -> io::Result<()> { + self.inner.registration.register(&self.io)?; + Ok(()) + } +} + +impl PollEvented { + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + &self.io + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + &mut self.io + } + + /// Consumes self, returning the inner I/O object + pub fn into_inner(self) -> E { + self.io + } +} + +// ===== Read / Write impls ===== + +impl Read for PollEvented +where E: Evented + Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if let Async::NotReady = self.poll_read_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().read(buf); + + if is_wouldblock(&r) { + self.need_read()?; + } + + return r + } +} + +impl Write for PollEvented +where E: Evented + Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().write(buf); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_mut().flush(); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } +} + +impl AsyncRead for PollEvented +where E: Evented + Read, +{ +} + +impl AsyncWrite for PollEvented +where E: Evented + Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +// ===== &'a Read / &'a Write impls ===== + +impl<'a, E> Read for &'a PollEvented +where E: Evented, &'a E: Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if let Async::NotReady = self.poll_read_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().read(buf); + + if is_wouldblock(&r) { + self.need_read()?; + } + + return r + } +} + +impl<'a, E> Write for &'a PollEvented +where E: Evented, &'a E: Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().write(buf); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } + + fn flush(&mut self) -> io::Result<()> { + if let Async::NotReady = self.poll_write_ready()? { + return Err(io::ErrorKind::WouldBlock.into()) + } + + let r = self.get_ref().flush(); + + if is_wouldblock(&r) { + self.need_write()?; + } + + return r + } +} + +impl<'a, E> AsyncRead for &'a PollEvented +where E: Evented, &'a E: Read, +{ +} + +impl<'a, E> AsyncWrite for &'a PollEvented +where E: Evented, &'a E: Write, +{ + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +fn is_wouldblock(r: &io::Result) -> bool { + match *r { + Ok(_) => false, + Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, + } +} + + +impl fmt::Debug for PollEvented { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("PollEvented") + .field("io", &self.io) + .finish() + } +} diff --git a/src/reactor/registration.rs b/src/reactor/registration.rs new file mode 100644 index 000000000..69d4d00e8 --- /dev/null +++ b/src/reactor/registration.rs @@ -0,0 +1,394 @@ +use reactor::{Handle, Direction}; + +use futures::{Async, Poll}; +use futures::task::{self, Task}; +use mio::{self, Evented}; + +use std::{io, mem, usize}; +use std::cell::UnsafeCell; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::SeqCst; + +/// Handle to a reactor registration. +/// +/// A registration represents an I/O resource registered with a Reactor such +/// that it will receive task notifications on readiness. +/// +/// The registration is lazily made and supports concurrent operations. This +/// allows a `Registration` instance to be created without the reactor handle +/// that will eventually be used to drive the resource. +/// +/// The difficulty is due to the fact that a single registration drives two +/// separate tasks -- A read half and a write half. +#[derive(Debug)] +pub struct Registration { + /// Stores the handle. Once set, the value is not changed. + /// + /// Setting this requires acquiring the lock from state. + inner: UnsafeCell>, + + /// Tracks the state of the registration. + /// + /// The least significant 2 bits are used to track the lifecycle of the + /// registration. The rest of the `state` variable is a pointer to tasks + /// that must be notified once the lock is released. + state: AtomicUsize, +} + +#[derive(Debug)] +struct Inner { + handle: Handle, + token: usize, +} + +/// Tasks waiting on readiness notifications. +#[derive(Debug)] +struct Node { + direction: Direction, + task: Task, + next: Option>, +} + +/// Initial state. The handle is not set and the registration is idle. +const INIT: usize = 0; + +/// A thread locked the state and will associate a handle. +const LOCKED: usize = 1; + +/// A handle has been associated with the registration. +const READY: usize = 2; + +/// Masks the lifecycle state +const LIFECYCLE_MASK: usize = 0b11; + +/// A fake token used to identify error situations +const ERROR: usize = usize::MAX; + +// ===== impl Registration ===== + +impl Registration { + /// Create a new `Registration`. + /// + /// This registration is not associated with a Reactor instance. Call + /// `register` to establish the association. + pub fn new() -> Registration { + Registration { + inner: UnsafeCell::new(None), + state: AtomicUsize::new(INIT), + } + } + + /// Register the I/O resource with the default reactor. + /// + /// This function is safe to call concurrently and repeatedly. However, only + /// the first call will establish the registration. Subsequent calls will be + /// no-ops. + /// + /// If the registration happened successfully, `Ok(true)` is returned. + /// + /// If an I/O resource has previously been successfully registered, + /// `Ok(false)` is returned. + /// + /// If an error is encountered during registration, `Err` is returned. + pub fn register(&self, io: &T) -> io::Result + where T: Evented, + { + self.register2(io, || Handle::try_current()) + } + + /// Register the I/O resource with the specified reactor. + /// + /// This function is safe to call concurrently and repeatedly. However, only + /// the first call will establish the registration. Subsequent calls will be + /// no-ops. + /// + /// If the registration happened successfully, `Ok(true)` is returned. + /// + /// If an I/O resource has previously been successfully registered, + /// `Ok(false)` is returned. + /// + /// If an error is encountered during registration, `Err` is returned. + pub fn register_with(&self, io: &T, handle: &Handle) -> io::Result + where T: Evented, + { + self.register2(io, || Ok(handle.clone())) + } + + fn register2(&self, io: &T, f: F) -> io::Result + where T: Evented, + F: Fn() -> io::Result, + { + let mut state = self.state.load(SeqCst); + + loop { + match state { + INIT => { + // Registration is currently not associated with a handle. + // Get a handle then attempt to lock the state. + let handle = f()?; + + let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst); + + if actual != state { + state = actual; + continue; + } + + // Create the actual registration + let (inner, res) = Inner::new(io, handle); + + unsafe { *self.inner.get() = Some(inner); } + + // Transition out of the locked state. This acquires the + // current value, potentially having a list of tasks that + // are pending readiness notifications. + let actual = self.state.swap(READY, SeqCst); + + // Consume the stack of nodes. + let ptr = actual & !LIFECYCLE_MASK; + + if ptr != 0 { + let mut read = false; + let mut write = false; + let mut curr = unsafe { Box::from_raw(ptr as *mut Node) }; + + let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; + + loop { + let node = *curr; + let Node { + direction, + task, + next, + } = node; + + let flag = match direction { + Direction::Read => &mut read, + Direction::Write => &mut write, + }; + + if !*flag { + *flag = true; + + inner.register(direction, task); + } + + match next { + Some(next) => curr = next, + None => break, + } + } + } + + return res.map(|_| true); + } + _ => return Ok(false), + } + } + } + + /// Poll for changes in the I/O resource's read readiness. + pub fn poll_read_ready(&self) -> Poll { + self.poll_ready(Direction::Read, true) + .map(|v| match v { + Some(v) => Async::Ready(v), + _ => Async::NotReady, + }) + } + + /// Try taking the I/O resource's read readiness. + /// + /// Unlike `poll_read_ready`, this does not register the current task for + /// notification. + pub fn take_read_ready(&self) -> io::Result> { + self.poll_ready(Direction::Read, false) + + } + + /// Poll for changes in the I/O resource's write readiness. + pub fn poll_write_ready(&self) -> Poll { + self.poll_ready(Direction::Write, true) + .map(|v| match v { + Some(v) => Async::Ready(v), + _ => Async::NotReady, + }) + } + + /// Try taking the I/O resource's write readiness. + /// + /// Unlike `poll_write_ready`, this does not register the current task for + /// notification. + pub fn take_write_ready(&self) -> io::Result> { + self.poll_ready(Direction::Write, false) + } + + fn poll_ready(&self, direction: Direction, notify: bool) + -> io::Result> + { + let mut state = self.state.load(SeqCst); + + // Cache the node pointer + let mut node = None; + + loop { + match state { + INIT => { + return Err(io::Error::new(io::ErrorKind::Other, "must call `register` + before poll_read_ready")); + } + READY => { + let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; + return inner.poll_ready(direction, notify); + } + _ => { + if !notify { + // Skip the notification tracking junk. + return Ok(None); + } + + let ptr = state & !LIFECYCLE_MASK; + + // Get the node + let mut n = node.take().unwrap_or_else(|| { + Box::new(Node { + direction, + task: task::current(), + next: None, + }) + }); + + n.next = if ptr == 0 { + None + } else { + // Great care must be taken of the CAS fails + Some(unsafe { Box::from_raw(ptr as *mut Node) }) + }; + + let ptr = Box::into_raw(n); + let next = ptr as usize | (state & LIFECYCLE_MASK); + + let actual = self.state.compare_and_swap(state, next, SeqCst); + + if actual != state { + // Back out of the node boxing + let mut n = unsafe { Box::from_raw(ptr) }; + + // We don't really own this + mem::forget(n.next.take()); + + // Save this for next loop + node = Some(n); + + state = actual; + continue; + } + + return Ok(None); + } + } + } + } +} + +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} + +// ===== impl Inner ===== + +impl Inner { + fn new(io: &T, handle: Handle) -> (Self, io::Result<()>) + where T: Evented, + { + let mut res = Ok(()); + + let token = match handle.inner() { + Some(inner) => match inner.add_source(io) { + Ok(token) => token, + Err(e) => { + res = Err(e); + ERROR + } + }, + None => { + res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone")); + ERROR + } + }; + + let inner = Inner { + handle, + token, + }; + + (inner, res) + } + + fn register(&self, direction: Direction, task: Task) { + if self.token == ERROR { + task.notify(); + return; + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => { + task.notify(); + return; + } + }; + + inner.register(self.token, direction, task); + } + + fn poll_ready(&self, direction: Direction, notify: bool) + -> io::Result> + { + if self.token == ERROR { + return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor")); + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + + let mask = direction.mask(); + + let io_dispatch = inner.io_dispatch.read().unwrap(); + let sched = &io_dispatch[self.token]; + + let mut ready = mask & sched.readiness.fetch_and(!mask, SeqCst); + + if ready == 0 && notify { + // Update the task info + match direction { + Direction::Read => sched.reader.register(), + Direction::Write => sched.writer.register(), + } + + // Try again + ready = mask & sched.readiness.fetch_and(!mask, SeqCst); + } + + if ready == 0 { + Ok(None) + } else { + Ok(Some(super::usize2ready(ready))) + } + } +} + +impl Drop for Inner { + fn drop(&mut self) { + if self.token == ERROR { + return; + } + + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return, + }; + + inner.drop_source(self.token); + } +} diff --git a/tests/global.rs b/tests/global.rs index a863176b1..9f38302fa 100644 --- a/tests/global.rs +++ b/tests/global.rs @@ -1,11 +1,14 @@ extern crate futures; extern crate tokio; +extern crate tokio_io; extern crate env_logger; -use std::thread; +use std::{io, thread}; +use std::sync::Arc; use futures::prelude::*; use tokio::net::{TcpStream, TcpListener}; +use tokio::runtime::Runtime; macro_rules! t { ($e:expr) => (match $e { @@ -36,3 +39,82 @@ fn hammer() { thread.join().unwrap(); } } + +struct Rd(Arc); +struct Wr(Arc); + +impl io::Read for Rd { + fn read(&mut self, dst: &mut [u8]) -> io::Result { + <&TcpStream>::read(&mut &*self.0, dst) + } +} + +impl tokio_io::AsyncRead for Rd { +} + +impl io::Write for Wr { + fn write(&mut self, src: &[u8]) -> io::Result { + <&TcpStream>::write(&mut &*self.0, src) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl tokio_io::AsyncWrite for Wr { + fn shutdown(&mut self) -> Poll<(), io::Error> { + Ok(().into()) + } +} + +#[test] +fn hammer_split() { + use tokio_io::io; + + const N: usize = 100; + + let _ = env_logger::init(); + + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap())); + let addr = t!(srv.local_addr()); + + let mut rt = Runtime::new().unwrap(); + + fn split(socket: TcpStream) -> Box + Send> { + let socket = Arc::new(socket); + let rd = Rd(socket.clone()); + let wr = Wr(socket); + + let rd = io::read(rd, vec![0; 1]) + .map(|_| ()) + .map_err(|e| panic!("read error = {:?}", e)); + + let wr = io::write_all(wr, b"1") + .map(|_| ()) + .map_err(|e| panic!("write error = {:?}", e)); + + Box::new({ + tokio::spawn(rd) + .join(tokio::spawn(wr)) + .map(|_| ()) + }) + } + + rt.spawn({ + srv.incoming() + .map_err(|e| panic!("accept error = {:?}", e)) + .take(N as u64) + .for_each(|socket| split(socket)) + }); + + for _ in 0..N { + rt.spawn({ + TcpStream::connect(&addr) + .map_err(|e| panic!("connect error = {:?}", e)) + .and_then(|socket| split(socket)) + }); + } + + rt.shutdown_on_idle().wait().unwrap(); +}