net: Eagerly bind resources to reactors (#1666)

## Motivation

The `tokio_net` resources can be created outside of a runtime due to how tokio
has been used with futures to date. For example, this allows a `TcpStream` to be
created, and later passed into a runtime:

```
let stream = TcpStream::connect(...).and_then(|socket| {
    // do something
});
tokio::run(stream);
```

In order to support this functionality, the reactor was lazily bound to the
resource on the first call to `poll_read_ready`/`poll_write_ready`. This
required a lot of additional complexity in the binding logic to support.

With the tokio 0.2 common case, this is no longer necessary and can be removed.
All resources are expected to be created from within a runtime, and should panic
if not done so.

Closes #1168

## Solution

The `tokio_net` crate now assumes there to be a `CURRENT_REACTOR` set on the
worker thread creating a resource; this can be assumed if called within a tokio
runtime. If there is no current reactor, the application will panic with a "no
current reactor" message.

With this assumption, all the unsafe and atomics have been removed from
`tokio_net::driver::Registration` as it is no longer needed.

There is no longer any reason to pass in handles to the family of `from_std` methods on `net` resources. `Handle::current` has therefore a more restricted private use where it is only used in `driver::Registration::new`.

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
This commit is contained in:
Kevin Leimkuhler 2019-10-21 16:20:06 -07:00 committed by GitHub
parent 978013a215
commit c9bcbe77b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 220 additions and 584 deletions

View File

@ -24,6 +24,8 @@
//! use tokio::net::TcpStream;
//!
//! # async fn process<T>(_t: T) {}
//!
//! # #[tokio::main]
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let stream = TcpStream::connect("93.184.216.34:9243").await?;
//!
@ -56,19 +58,15 @@
//! the task to run on one of its worker threads. This results in the `and_then`
//! closure to get executed.
//!
//! ## Lazy registration
//! ## Eager registration
//!
//! Notice how the snippet above does not explicitly reference a reactor. When
//! [`TcpStream::connect`] is called, it registers the socket with a reactor,
//! but no reactor is specified. This works because the registration process
//! mentioned above is actually lazy. It doesn't *actually* happen in the
//! [`connect`] function. Instead, the registration is established the first
//! time that the task is polled (again, see [runtime model]).
//!
//! A reactor instance is automatically made available when using the Tokio
//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor
//! sets a thread-local variable referencing the associated [`Reactor`] instance
//! and [`Handle::current`] (used by [`Registration`]) returns the reference.
//! Notice how the snippet does not explicitly reference a reactor. When
//! [`TcpStream::connect`] is called, it registers the socket with the current
//! reactor, but no reactor is specified. This works because a reactor
//! instance is automatically made available when using the Tokio [runtime],
//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a
//! thread-local variable referencing the associated [`Reactor`] instance and
//! [`Handle::current`] (used by [`Registration`]) returns the reference.
//!
//! ## Implementation
//!

View File

@ -39,17 +39,8 @@ pub struct Reactor {
/// A `Handle` is used for associating I/O objects with an event loop
/// explicitly. Typically though you won't end up using a `Handle` that often
/// and will instead use the default reactor for the execution context.
///
/// By default, most components bind lazily to reactors.
/// To get this behavior when manually passing a `Handle`, use `default()`.
#[derive(Clone)]
pub struct Handle {
inner: Option<HandlePriv>,
}
/// Like `Handle`, but never `None`.
#[derive(Clone)]
pub(crate) struct HandlePriv {
inner: Weak<Inner>,
}
@ -62,12 +53,6 @@ pub struct Turn {
_priv: (),
}
#[test]
fn test_handle_size() {
use std::mem;
assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
}
pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
@ -97,7 +82,7 @@ pub(super) enum Direction {
thread_local! {
/// Tracks the reactor for the current execution context.
static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)
static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None)
}
const TOKEN_SHIFT: usize = 22;
@ -115,7 +100,7 @@ fn _assert_kinds() {
// ===== impl Reactor =====
#[derive(Debug)]
///Guard that resets current reactor on drop.
/// Guard that resets current reactor on drop.
pub struct DefaultGuard<'a> {
_lifetime: PhantomData<&'a u8>,
}
@ -129,7 +114,7 @@ impl Drop for DefaultGuard<'_> {
}
}
///Sets handle for a default reactor, returning guard that unsets it on drop.
/// Sets handle for a default reactor, returning guard that unsets it on drop.
pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
@ -140,13 +125,6 @@ pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
for execution context"
);
let handle = match handle.as_priv() {
Some(handle) => handle,
None => {
panic!("`handle` does not reference a reactor");
}
};
*current = Some(handle.clone());
});
@ -189,9 +167,7 @@ impl Reactor {
/// to bind them to this event loop.
pub fn handle(&self) -> Handle {
Handle {
inner: Some(HandlePriv {
inner: Arc::downgrade(&self.inner),
}),
inner: Arc::downgrade(&self.inner),
}
}
@ -349,55 +325,15 @@ impl fmt::Debug for Reactor {
// ===== impl Handle =====
impl Handle {
#[doc(hidden)]
#[deprecated(note = "semantics were sometimes surprising, use Handle::default()")]
pub fn current() -> Handle {
// TODO: Should this panic on error?
HandlePriv::try_current()
.map(|handle| Handle {
inner: Some(handle),
})
.unwrap_or(Handle {
inner: Some(HandlePriv { inner: Weak::new() }),
})
}
pub(crate) fn as_priv(&self) -> Option<&HandlePriv> {
self.inner.as_ref()
}
}
impl Unpark for Handle {
fn unpark(&self) {
if let Some(ref h) = self.inner {
h.wakeup();
}
}
}
impl Default for Handle {
/// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
fn default() -> Handle {
Handle { inner: None }
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
// ===== impl HandlePriv =====
impl HandlePriv {
/// Try to get a handle to the current reactor.
/// Returns a handle to the current reactor
///
/// Returns `Err` if no handle is found.
pub(super) fn try_current() -> io::Result<HandlePriv> {
/// # Panics
///
/// This function panics if there is no current reactor set.
pub(super) fn current() -> Self {
CURRENT_REACTOR.with(|current| match *current.borrow() {
Some(ref handle) => Ok(handle.clone()),
None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")),
Some(ref handle) => handle.clone(),
None => panic!("no current reactor"),
})
}
@ -421,9 +357,15 @@ impl HandlePriv {
}
}
impl fmt::Debug for HandlePriv {
impl Unpark for Handle {
fn unpark(&self) {
self.wakeup();
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "HandlePriv")
write!(f, "Handle")
}
}

View File

@ -1,12 +1,10 @@
use super::platform;
use super::reactor::{Direction, Handle, HandlePriv};
use super::reactor::{Direction, Handle};
use mio::{self, Evented};
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::task::{Context, Poll, Waker};
use std::{io, ptr, usize};
use std::task::{Context, Poll};
use std::{io, usize};
/// Associates an I/O resource with the reactor instance that drives it.
///
@ -14,10 +12,9 @@ use std::{io, ptr, usize};
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
/// The association between an I/O resource is made by calling [`register`].
/// Once the association is established, it remains established until the
/// registration instance is dropped. Subsequent calls to [`register`] are
/// no-ops.
/// The association between an I/O resource is made by calling [`new`]. Once
/// the association is established, it remains established until the
/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One for
/// the read readiness and one for write readiness. These streams are
@ -36,86 +33,38 @@ use std::{io, ptr, usize};
/// These events are included as part of the read readiness event stream. The
/// write readiness event stream is only for `Ready::writable()` events.
///
/// [`register`]: #method.register
/// [`new`]: #method.new
/// [`poll_read_ready`]: #method.poll_read_ready`]
/// [`poll_write_ready`]: #method.poll_write_ready`]
#[derive(Debug)]
pub struct Registration {
/// Stores the handle. Once set, the value is not changed.
///
/// Setting this requires acquiring the lock from state.
inner: UnsafeCell<Option<Inner>>,
/// Tracks the state of the registration.
///
/// The least significant 2 bits are used to track the lifecycle of the
/// registration. The rest of the `state` variable is a pointer to tasks
/// that must be notified once the lock is released.
state: AtomicUsize,
}
#[derive(Debug)]
struct Inner {
handle: HandlePriv,
handle: Handle,
token: usize,
}
/// Tasks waiting on readiness notifications.
#[derive(Debug)]
struct Node {
direction: Direction,
waker: Waker,
next: *mut Node,
}
/// Initial state. The handle is not set and the registration is idle.
const INIT: usize = 0;
/// A thread locked the state and will associate a handle.
const LOCKED: usize = 1;
/// A handle has been associated with the registration.
const READY: usize = 2;
/// Masks the lifecycle state
const LIFECYCLE_MASK: usize = 0b11;
/// A fake token used to identify error situations
const ERROR: usize = usize::MAX;
// ===== impl Registration =====
impl Registration {
/// Create a new `Registration`.
///
/// This registration is not associated with a Reactor instance. Call
/// `register` to establish the association.
pub fn new() -> Registration {
Registration {
inner: UnsafeCell::new(None),
state: AtomicUsize::new(INIT),
}
}
/// Register the I/O resource with the default reactor.
///
/// This function is safe to call concurrently and repeatedly. However, only
/// the first call will establish the registration. Subsequent calls will be
/// no-ops.
///
/// # Return
///
/// If the registration happened successfully, `Ok(true)` is returned.
///
/// If an I/O resource has previously been successfully registered,
/// `Ok(false)` is returned.
///
/// If an error is encountered during registration, `Err` is returned.
pub fn register<T>(&self, io: &T) -> io::Result<bool>
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
pub fn new<T>(io: &T) -> io::Result<Self>
where
T: Evented,
{
self.register2(io, HandlePriv::try_current)
let handle = Handle::current();
let token = if let Some(inner) = handle.inner() {
inner.add_source(io)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
Ok(Self { handle, token })
}
/// Deregister the I/O resource from the reactor it is associated with.
@ -138,114 +87,11 @@ impl Registration {
where
T: Evented,
{
// The state does not need to be checked and coordination is not
// necessary as this function takes `&mut self`. This guarantees a
// single thread is accessing the instance.
if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
inner.deregister(io)?;
}
Ok(())
}
/// Register the I/O resource with the specified reactor.
///
/// This function is safe to call concurrently and repeatedly. However, only
/// the first call will establish the registration. Subsequent calls will be
/// no-ops.
///
/// If the registration happened successfully, `Ok(true)` is returned.
///
/// If an I/O resource has previously been successfully registered,
/// `Ok(false)` is returned.
///
/// If an error is encountered during registration, `Err` is returned.
pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
where
T: Evented,
{
self.register2(io, || match handle.as_priv() {
Some(handle) => Ok(handle.clone()),
None => HandlePriv::try_current(),
})
}
pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool>
where
T: Evented,
{
self.register2(io, || Ok(handle.clone()))
}
fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
where
T: Evented,
F: Fn() -> io::Result<HandlePriv>,
{
let mut state = self.state.load(SeqCst);
loop {
match state {
INIT => {
// Registration is currently not associated with a handle.
// Get a handle then attempt to lock the state.
let handle = f()?;
let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
if actual != state {
state = actual;
continue;
}
// Create the actual registration
let (inner, res) = Inner::new(io, handle);
unsafe {
*self.inner.get() = Some(inner);
}
// Transition out of the locked state. This acquires the
// current value, potentially having a list of tasks that
// are pending readiness notifications.
let actual = self.state.swap(READY, SeqCst);
// Consume the stack of nodes
let mut read = false;
let mut write = false;
let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
while !ptr.is_null() {
let node = unsafe { Box::from_raw(ptr) };
let node = *node;
let Node {
direction,
waker,
next,
} = node;
let flag = match direction {
Direction::Read => &mut read,
Direction::Write => &mut write,
};
if !*flag {
*flag = true;
inner.register(direction, waker);
}
ptr = next;
}
return res.map(|_| true);
}
_ => return Ok(false),
}
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
}
/// Poll for events on the I/O resource's read readiness stream.
@ -350,160 +196,26 @@ impl Registration {
self.poll_ready(Direction::Write, None)
}
/// Poll for events on the I/O resource's `direction` readiness stream.
///
/// If called with a task context, notify the task when a new event is
/// received.
fn poll_ready(
&self,
direction: Direction,
cx: Option<&mut Context<'_>>,
) -> io::Result<Option<mio::Ready>> {
let mut state = self.state.load(SeqCst);
// Cache the node pointer
let mut node = None;
loop {
match state {
INIT => {
return Err(io::Error::new(
io::ErrorKind::Other,
"must call register before poll_read_ready",
));
}
READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(direction, cx);
}
LOCKED => {
let cx = if let Some(ref cx) = cx {
cx
} else {
// Skip the notification tracking junk.
return Ok(None);
};
let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
// Get the node
let mut n = node.take().unwrap_or_else(|| {
Box::new(Node {
direction,
waker: cx.waker().clone(),
next: ptr::null_mut(),
})
});
n.next = next_ptr;
let node_ptr = Box::into_raw(n);
let next = node_ptr as usize | (state & LIFECYCLE_MASK);
let actual = self.state.compare_and_swap(state, next, SeqCst);
if actual != state {
// Back out of the node boxing
let n = unsafe { Box::from_raw(node_ptr) };
// Save this for next loop
node = Some(n);
state = actual;
continue;
}
return Ok(None);
}
_ => unreachable!(),
}
}
}
}
impl Default for Registration {
fn default() -> Self {
Self::new()
}
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
// ===== impl Inner =====
impl Inner {
fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>)
where
T: Evented,
{
let mut res = Ok(());
let token = match handle.inner() {
Some(inner) => match inner.add_source(io) {
Ok(token) => token,
Err(e) => {
res = Err(e);
ERROR
}
},
None => {
res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
ERROR
}
};
let inner = Inner { handle, token };
(inner, res)
}
fn register(&self, direction: Direction, waker: Waker) {
if self.token == ERROR {
waker.wake();
return;
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => {
waker.wake();
return;
}
};
inner.register(self.token, direction, waker);
}
fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
if self.token == ERROR {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to associate with reactor",
));
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
}
fn poll_ready(
&self,
direction: Direction,
cx: Option<&mut Context<'_>>,
) -> io::Result<Option<mio::Ready>> {
if self.token == ERROR {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to associate with reactor",
));
// If the task should be notified about new events, ensure that it has
// been registered
if let Some(ref cx) = cx {
inner.register(self.token, direction, cx.waker().clone())
}
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
let mask = direction.mask();
let mask_no_hup = (mask - platform::hup()).as_usize();
@ -544,17 +256,15 @@ impl Inner {
}
}
impl Drop for Inner {
fn drop(&mut self) {
if self.token == ERROR {
return;
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
impl Drop for Registration {
fn drop(&mut self) {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
inner.drop_source(self.token);
}
}

View File

@ -221,5 +221,5 @@ where
return Err(io::Error::last_os_error());
}
}
Ok(Some(PollEvented::new(Fd { inner: io })))
Ok(Some(PollEvented::new(Fd { inner: io })?))
}

View File

@ -188,5 +188,5 @@ where
None => return None,
};
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
Some(PollEvented::new(pipe))
PollEvented::new(pipe).ok()
}

View File

@ -294,7 +294,7 @@ impl Driver {
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let stream = globals().receiver.try_clone()?;
let wakeup = PollEvented::new(stream);
let wakeup = PollEvented::new(stream)?;
Ok(Driver { wakeup })
}

View File

@ -1,7 +1,6 @@
#[cfg(feature = "async-traits")]
use super::incoming::Incoming;
use super::TcpStream;
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
@ -96,7 +95,7 @@ impl TcpListener {
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(&addr)?;
Ok(TcpListener::new(listener))
TcpListener::new(listener)
}
/// Accept a new incoming connection from this listener.
@ -137,7 +136,7 @@ impl TcpListener {
let (io, addr) = ready!(self.poll_accept_std(cx))?;
let io = mio::net::TcpStream::from_stream(io)?;
let io = TcpStream::new(io);
let io = TcpStream::new(io)?;
Poll::Ready(Ok((io, addr)))
}
@ -173,10 +172,6 @@ impl TcpListener {
/// bound to and the listener will only be guaranteed to accept connections
/// of the same address type currently.
///
/// Finally, the `handle` argument is the event loop that this listener will
/// be bound to.
/// Use [`Handle::default()`] to lazily bind to an event loop, just like `bind` does.
///
/// The platform specific behavior of this function looks like:
///
/// * On Unix, the socket is placed into nonblocking mode and connections
@ -187,29 +182,28 @@ impl TcpListener {
/// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
/// well (same for IPv6).
///
/// [`Handle::default()`]: ../reactor/struct.Handle.html
/// # Examples
///
/// ```no_run
/// ```rust,no_run
/// use std::error::Error;
/// use tokio::net::TcpListener;
/// use tokio_net::driver::Handle;
///
/// use std::net::TcpListener as StdTcpListener;
///
/// let std_listener = StdTcpListener::bind("127.0.0.1:0")?;
/// let listener = TcpListener::from_std(std_listener, &Handle::default())?;
/// # let _ = listener;
/// # Ok::<_, Box<dyn std::error::Error>>(())
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
/// let listener = TcpListener::from_std(std_listener)?;
/// Ok(())
/// }
/// ```
pub fn from_std(listener: net::TcpListener, handle: &Handle) -> io::Result<TcpListener> {
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new_with_handle(io, handle)?;
let io = PollEvented::new(io)?;
Ok(TcpListener { io })
}
fn new(listener: mio::net::TcpListener) -> TcpListener {
let io = PollEvented::new(listener);
TcpListener { io }
fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
let io = PollEvented::new(listener)?;
Ok(TcpListener { io })
}
/// Returns the local address that this listener is bound to.
@ -219,7 +213,7 @@ impl TcpListener {
///
/// # Examples
///
/// ```
/// ```rust,no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
@ -329,9 +323,9 @@ impl TryFrom<net::TcpListener> for TcpListener {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`TcpListener::from_std(stream, &Handle::default())`](TcpListener::from_std).
/// [`TcpListener::from_std(stream)`](TcpListener::from_std).
fn try_from(stream: net::TcpListener) -> Result<Self, Self::Error> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -1,5 +1,4 @@
use super::split::{split, ReadHalf, WriteHalf};
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
@ -101,7 +100,7 @@ impl TcpStream {
/// Establish a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(&addr)?;
let stream = TcpStream::new(sys);
let stream = TcpStream::new(sys)?;
// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
@ -118,33 +117,32 @@ impl TcpStream {
Ok(stream)
}
pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream {
let io = PollEvented::new(connected);
TcpStream { io }
pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
let io = PollEvented::new(connected)?;
Ok(TcpStream { io })
}
/// Create a new `TcpStream` from a `std::net::TcpStream`.
///
/// This function will convert a TCP stream created by the standard library
/// to a TCP stream ready to be used with the provided event loop handle.
/// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does.
///
/// # Examples
///
/// ```no_run
/// ```rust,no_run
/// use std::error::Error;
/// use tokio::net::TcpStream;
/// use tokio_net::driver::Handle;
///
/// # fn dox() -> std::io::Result<()> {
/// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
/// let stream = TcpStream::from_std(std_stream, &Handle::default())?;
/// # Ok(())
/// # }
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn Error>> {
/// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
/// let stream = TcpStream::from_std(std_stream)?;
/// Ok(())
/// }
/// ```
pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> {
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new_with_handle(io, handle)?;
let io = PollEvented::new(io)?;
Ok(TcpStream { io })
}
@ -152,13 +150,9 @@ impl TcpStream {
//
// This should be removed in favor of some in-crate TcpSocket builder API.
#[doc(hidden)]
pub async fn connect_std(
stream: net::TcpStream,
addr: &SocketAddr,
handle: &Handle,
) -> io::Result<TcpStream> {
pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::connect_stream(stream, addr)?;
let io = PollEvented::new_with_handle(io, handle)?;
let io = PollEvented::new(io)?;
let stream = TcpStream { io };
// Once we've connected, wait for the stream to be writable as
@ -743,9 +737,9 @@ impl TryFrom<net::TcpStream> for TcpStream {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`TcpStream::from_std(stream, &Handle::default())`](TcpStream::from_std).
/// [`TcpStream::from_std(stream)`](TcpStream::from_std).
fn try_from(stream: net::TcpStream) -> Result<Self, Self::Error> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -1,5 +1,4 @@
use super::split::{split, UdpSocketRecvHalf, UdpSocketSendHalf};
use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
@ -40,12 +39,13 @@ impl UdpSocket {
}
fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
mio::net::UdpSocket::bind(&addr).map(UdpSocket::new)
let sys = mio::net::UdpSocket::bind(&addr)?;
UdpSocket::new(sys)
}
fn new(socket: mio::net::UdpSocket) -> UdpSocket {
let io = PollEvented::new(socket);
UdpSocket { io }
fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> {
let io = PollEvented::new(socket)?;
Ok(UdpSocket { io })
}
/// Creates a new `UdpSocket` from the previously bound socket provided.
@ -57,11 +57,9 @@ impl UdpSocket {
/// This can be used in conjunction with net2's `UdpBuilder` interface to
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
///
/// Use `Handle::default()` to lazily bind to an event loop, just like `bind` does.
pub fn from_std(socket: net::UdpSocket, handle: &Handle) -> io::Result<UdpSocket> {
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new_with_handle(io, handle)?;
let io = PollEvented::new(io)?;
Ok(UdpSocket { io })
}
@ -386,9 +384,9 @@ impl TryFrom<net::UdpSocket> for UdpSocket {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UdpSocket::from_std(stream, &Handle::default())`](UdpSocket::from_std).
/// [`UdpSocket::from_std(stream)`](UdpSocket::from_std).
fn try_from(stream: net::UdpSocket) -> Result<Self, Self::Error> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -1,4 +1,3 @@
use crate::driver::Handle;
use crate::util::PollEvented;
use futures_core::ready;
@ -25,7 +24,7 @@ impl UnixDatagram {
P: AsRef<Path>,
{
let socket = mio_uds::UnixDatagram::bind(path)?;
Ok(UnixDatagram::new(socket))
UnixDatagram::new(socket)
}
/// Creates an unnamed pair of connected sockets.
@ -35,8 +34,8 @@ impl UnixDatagram {
/// be associated with the default event loop's handle.
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (a, b) = mio_uds::UnixDatagram::pair()?;
let a = UnixDatagram::new(a);
let b = UnixDatagram::new(b);
let a = UnixDatagram::new(a)?;
let b = UnixDatagram::new(b)?;
Ok((a, b))
}
@ -46,21 +45,21 @@ impl UnixDatagram {
///
/// The returned datagram will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result<UnixDatagram> {
pub fn from_std(datagram: net::UnixDatagram) -> io::Result<UnixDatagram> {
let socket = mio_uds::UnixDatagram::from_datagram(datagram)?;
let io = PollEvented::new_with_handle(socket, handle)?;
let io = PollEvented::new(socket)?;
Ok(UnixDatagram { io })
}
fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
let io = PollEvented::new(socket);
UnixDatagram { io }
fn new(socket: mio_uds::UnixDatagram) -> io::Result<UnixDatagram> {
let io = PollEvented::new(socket)?;
Ok(UnixDatagram { io })
}
/// Creates a new `UnixDatagram` which is not bound to any address.
pub fn unbound() -> io::Result<UnixDatagram> {
let socket = mio_uds::UnixDatagram::unbound()?;
Ok(UnixDatagram::new(socket))
UnixDatagram::new(socket)
}
/// Connects the socket to the specified address.
@ -216,9 +215,9 @@ impl TryFrom<net::UnixDatagram> for UnixDatagram {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UnixDatagram::from_std(stream, &Handle::default())`](UnixDatagram::from_std).
/// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std).
fn try_from(stream: net::UnixDatagram) -> Result<Self, Self::Error> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -1,5 +1,4 @@
use super::UnixStream;
use crate::driver::Handle;
use crate::util::PollEvented;
use futures_core::ready;
@ -26,7 +25,7 @@ impl UnixListener {
P: AsRef<Path>,
{
let listener = mio_uds::UnixListener::bind(path)?;
let io = PollEvented::new(listener);
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
@ -35,9 +34,9 @@ impl UnixListener {
///
/// The returned listener will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result<UnixListener> {
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new_with_handle(listener, handle)?;
let io = PollEvented::new(listener)?;
Ok(UnixListener { io })
}
@ -63,7 +62,7 @@ impl UnixListener {
let (io, addr) = ready!(self.poll_accept_std(cx))?;
let io = mio_uds::UnixStream::from_stream(io)?;
Ok((UnixStream::new(io), addr)).into()
Ok((UnixStream::new(io)?, addr)).into()
}
fn poll_accept_std(
@ -117,9 +116,9 @@ impl TryFrom<net::UnixListener> for UnixListener {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UnixListener::from_std(stream, &Handle::default())`](UnixListener::from_std).
/// [`UnixListener::from_std(stream)`](UnixListener::from_std).
fn try_from(stream: net::UnixListener) -> io::Result<Self> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -1,6 +1,5 @@
use super::split::{split, ReadHalf, WriteHalf};
use super::ucred::{self, UCred};
use crate::driver::Handle;
use crate::util::PollEvented;
use tokio_io::{AsyncRead, AsyncWrite};
@ -39,7 +38,7 @@ impl UnixStream {
P: AsRef<Path>,
{
let stream = mio_uds::UnixStream::connect(path)?;
let stream = UnixStream::new(stream);
let stream = UnixStream::new(stream)?;
poll_fn(|cx| stream.io.poll_write_ready(cx)).await?;
Ok(stream)
@ -50,9 +49,9 @@ impl UnixStream {
///
/// The returned stream will be associated with the given event loop
/// specified by `handle` and is ready to perform I/O.
pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result<UnixStream> {
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new_with_handle(stream, handle)?;
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
}
@ -64,15 +63,15 @@ impl UnixStream {
/// be associated with the default event loop's handle.
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = mio_uds::UnixStream::pair()?;
let a = UnixStream::new(a);
let b = UnixStream::new(b);
let a = UnixStream::new(a)?;
let b = UnixStream::new(b)?;
Ok((a, b))
}
pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream {
let io = PollEvented::new(stream);
UnixStream { io }
pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result<UnixStream> {
let io = PollEvented::new(stream)?;
Ok(UnixStream { io })
}
/// Returns the socket address of the local half of this connection.
@ -134,9 +133,9 @@ impl TryFrom<net::UnixStream> for UnixStream {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
/// [`UnixStream::from_std(stream, &Handle::default())`](UnixStream::from_std).
/// [`UnixStream::from_std(stream)`](UnixStream::from_std).
fn try_from(stream: net::UnixStream) -> io::Result<Self> {
Self::from_std(stream, &Handle::default())
Self::from_std(stream)
}
}

View File

@ -157,12 +157,12 @@ pub(crate) mod impl_solaris {
#[cfg(not(target_os = "dragonfly"))]
#[cfg(test)]
mod test {
use crate::uds::UnixStream;
use tokio::net::UnixStream;
use libc::getegid;
use libc::geteuid;
#[test]
#[tokio::test]
#[cfg_attr(
target_os = "freebsd",
ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419"
@ -171,7 +171,7 @@ mod test {
target_os = "netbsd",
ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()"
)]
fn test_socket_pair() {
async fn test_socket_pair() {
let (a, b) = UnixStream::pair().unwrap();
let cred_a = a.peer_cred().unwrap();
let cred_b = b.peer_cred().unwrap();

View File

@ -1,4 +1,4 @@
use crate::driver::{platform, Handle, Registration};
use crate::driver::{platform, Registration};
use tokio_io::{AsyncRead, AsyncWrite};
@ -121,8 +121,6 @@ struct Inner {
macro_rules! poll_ready {
($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
$me.register()?;
// Load cached & encoded readiness.
let mut cached = $me.inner.$cache.load(Relaxed);
let mask = $mask | platform::hup();
@ -168,28 +166,16 @@ where
E: Evented,
{
/// Creates a new `PollEvented` associated with the default reactor.
pub fn new(io: E) -> PollEvented<E> {
PollEvented {
pub fn new(io: E) -> io::Result<Self> {
let registration = Registration::new(&io)?;
Ok(Self {
io: Some(io),
inner: Inner {
registration: Registration::new(),
registration,
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
},
}
}
/// Creates a new `PollEvented` associated with the specified reactor.
pub fn new_with_handle(io: E, handle: &Handle) -> io::Result<Self> {
let ret = PollEvented::new(io);
if let Some(handle) = handle.as_priv() {
ret.inner
.registration
.register_with_priv(ret.io.as_ref().unwrap(), handle)?;
}
Ok(ret)
})
}
/// Returns a shared reference to the underlying I/O object this readiness
@ -341,14 +327,6 @@ where
Ok(())
}
/// Ensure that the I/O resource is registered with the reactor.
fn register(&self) -> io::Result<()> {
self.inner
.registration
.register(self.io.as_ref().unwrap())?;
Ok(())
}
}
// ===== Read / Write impls =====

View File

@ -0,0 +1,22 @@
#![cfg(unix)]
#![cfg(feature = "signal")]
mod support;
use std::convert::TryFrom;
use std::net;
use support::*;
use tokio::net::TcpListener;
#[test]
#[should_panic]
fn no_runtime_panics_binding_net_tcp_listener() {
let listener = net::TcpListener::bind("127.0.0.1:0").expect("failed to bind listener");
let _ = TcpListener::try_from(listener);
}
#[test]
#[should_panic]
fn no_runtime_panics_creating_signals() {
let _ = signal(SignalKind::hangup());
}

View File

@ -4,7 +4,6 @@
use futures_util::future::FutureExt;
use futures_util::stream::FuturesOrdered;
use futures_util::stream::StreamExt;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -21,24 +20,23 @@ fn run_test() {
let finished_clone = finished.clone();
thread::spawn(move || {
let mut futures = FuturesOrdered::new();
for i in 0..2 {
futures.push(
Command::new("echo")
.arg(format!("I am spawned process #{}", i))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap()
.boxed(),
)
}
let mut rt = current_thread::Runtime::new().expect("failed to get runtime");
rt.block_on(with_timeout(futures.collect::<Vec<_>>()));
let mut futures = FuturesOrdered::new();
run_with_timeout(&mut rt, async {
for i in 0..2 {
futures.push(
Command::new("echo")
.arg(format!("I am spawned process #{}", i))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
.unwrap()
.boxed(),
)
}
});
drop(rt);
finished_clone.store(true, Ordering::SeqCst);
});

View File

@ -7,30 +7,25 @@ use support::*;
#[test]
fn dropping_loops_does_not_cause_starvation() {
let (mut rt, signal) = {
let kind = SignalKind::user_defined1();
let kind = SignalKind::user_defined1();
let mut first_rt = CurrentThreadRuntime::new().expect("failed to init first runtime");
let mut first_signal = signal(kind).expect("failed to register first signal");
let mut first_rt = CurrentThreadRuntime::new().expect("failed to init first runtime");
let mut first_signal =
first_rt.block_on(async { signal(kind).expect("failed to register first signal") });
let mut second_rt = CurrentThreadRuntime::new().expect("failed to init second runtime");
let mut second_signal = signal(kind).expect("failed to register second signal");
send_signal(libc::SIGUSR1);
let _ = run_with_timeout(&mut first_rt, first_signal.next())
.expect("failed to await first signal");
let _ = run_with_timeout(&mut second_rt, second_signal.next())
.expect("failed to await second signal");
drop(first_rt);
drop(first_signal);
(second_rt, second_signal)
};
let mut second_rt = CurrentThreadRuntime::new().expect("failed to init second runtime");
let mut second_signal =
second_rt.block_on(async { signal(kind).expect("failed to register second signal") });
send_signal(libc::SIGUSR1);
let _ = run_with_timeout(&mut rt, signal.into_future());
let _ =
run_with_timeout(&mut first_rt, first_signal.next()).expect("failed to await first signal");
drop(first_rt);
drop(first_signal);
send_signal(libc::SIGUSR1);
let _ = run_with_timeout(&mut second_rt, second_signal.next());
}

View File

@ -20,9 +20,11 @@ fn multi_loop() {
let sender = sender.clone();
thread::spawn(move || {
let mut rt = CurrentThreadRuntime::new().unwrap();
let signal = signal(SignalKind::hangup()).unwrap();
sender.send(()).unwrap();
let _ = run_with_timeout(&mut rt, signal.into_future());
let _ = run_with_timeout(&mut rt, async {
let signal = signal(SignalKind::hangup()).unwrap();
sender.send(()).unwrap();
signal.into_future().await
});
})
})
.collect();

View File

@ -2,15 +2,19 @@
#![cfg(feature = "default")]
use tokio::net::TcpListener;
use tokio_net::driver::Reactor;
use tokio_net::driver::{self, Reactor};
use tokio_test::{assert_err, assert_pending, assert_ready, task};
#[test]
fn tcp_doesnt_block() {
let reactor = Reactor::new().unwrap();
let handle = reactor.handle();
// Set the current reactor for this thread
let _reactor_guard = driver::set_default(&handle);
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let mut listener = TcpListener::from_std(listener, &handle).unwrap();
let mut listener = TcpListener::from_std(listener).unwrap();
drop(reactor);
let mut task = task::spawn(async move {
@ -24,8 +28,12 @@ fn tcp_doesnt_block() {
fn drop_wakes() {
let reactor = Reactor::new().unwrap();
let handle = reactor.handle();
// Set the current reactor for this thread
let _reactor_guard = driver::set_default(&handle);
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let mut listener = TcpListener::from_std(listener, &handle).unwrap();
let mut listener = TcpListener::from_std(listener).unwrap();
let mut task = task::spawn(async move {
assert_err!(listener.accept().await);