diff --git a/tokio-net/src/driver/mod.rs b/tokio-net/src/driver/mod.rs index a05a2f303..116bfc9af 100644 --- a/tokio-net/src/driver/mod.rs +++ b/tokio-net/src/driver/mod.rs @@ -24,6 +24,8 @@ //! use tokio::net::TcpStream; //! //! # async fn process(_t: T) {} +//! +//! # #[tokio::main] //! # async fn dox() -> Result<(), Box> { //! let stream = TcpStream::connect("93.184.216.34:9243").await?; //! @@ -56,19 +58,15 @@ //! the task to run on one of its worker threads. This results in the `and_then` //! closure to get executed. //! -//! ## Lazy registration +//! ## Eager registration //! -//! Notice how the snippet above does not explicitly reference a reactor. When -//! [`TcpStream::connect`] is called, it registers the socket with a reactor, -//! but no reactor is specified. This works because the registration process -//! mentioned above is actually lazy. It doesn't *actually* happen in the -//! [`connect`] function. Instead, the registration is established the first -//! time that the task is polled (again, see [runtime model]). -//! -//! A reactor instance is automatically made available when using the Tokio -//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor -//! sets a thread-local variable referencing the associated [`Reactor`] instance -//! and [`Handle::current`] (used by [`Registration`]) returns the reference. +//! Notice how the snippet does not explicitly reference a reactor. When +//! [`TcpStream::connect`] is called, it registers the socket with the current +//! reactor, but no reactor is specified. This works because a reactor +//! instance is automatically made available when using the Tokio [runtime], +//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a +//! thread-local variable referencing the associated [`Reactor`] instance and +//! [`Handle::current`] (used by [`Registration`]) returns the reference. //! //! ## Implementation //! diff --git a/tokio-net/src/driver/reactor.rs b/tokio-net/src/driver/reactor.rs index 800ab509c..c83ce8d5e 100644 --- a/tokio-net/src/driver/reactor.rs +++ b/tokio-net/src/driver/reactor.rs @@ -39,17 +39,8 @@ pub struct Reactor { /// A `Handle` is used for associating I/O objects with an event loop /// explicitly. Typically though you won't end up using a `Handle` that often /// and will instead use the default reactor for the execution context. -/// -/// By default, most components bind lazily to reactors. -/// To get this behavior when manually passing a `Handle`, use `default()`. #[derive(Clone)] pub struct Handle { - inner: Option, -} - -/// Like `Handle`, but never `None`. -#[derive(Clone)] -pub(crate) struct HandlePriv { inner: Weak, } @@ -62,12 +53,6 @@ pub struct Turn { _priv: (), } -#[test] -fn test_handle_size() { - use std::mem; - assert_eq!(mem::size_of::(), mem::size_of::()); -} - pub(super) struct Inner { /// The underlying system event queue. io: mio::Poll, @@ -97,7 +82,7 @@ pub(super) enum Direction { thread_local! { /// Tracks the reactor for the current execution context. - static CURRENT_REACTOR: RefCell> = RefCell::new(None) + static CURRENT_REACTOR: RefCell> = RefCell::new(None) } const TOKEN_SHIFT: usize = 22; @@ -115,7 +100,7 @@ fn _assert_kinds() { // ===== impl Reactor ===== #[derive(Debug)] -///Guard that resets current reactor on drop. +/// Guard that resets current reactor on drop. pub struct DefaultGuard<'a> { _lifetime: PhantomData<&'a u8>, } @@ -129,7 +114,7 @@ impl Drop for DefaultGuard<'_> { } } -///Sets handle for a default reactor, returning guard that unsets it on drop. +/// Sets handle for a default reactor, returning guard that unsets it on drop. pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { CURRENT_REACTOR.with(|current| { let mut current = current.borrow_mut(); @@ -140,13 +125,6 @@ pub fn set_default(handle: &Handle) -> DefaultGuard<'_> { for execution context" ); - let handle = match handle.as_priv() { - Some(handle) => handle, - None => { - panic!("`handle` does not reference a reactor"); - } - }; - *current = Some(handle.clone()); }); @@ -189,9 +167,7 @@ impl Reactor { /// to bind them to this event loop. pub fn handle(&self) -> Handle { Handle { - inner: Some(HandlePriv { - inner: Arc::downgrade(&self.inner), - }), + inner: Arc::downgrade(&self.inner), } } @@ -349,55 +325,15 @@ impl fmt::Debug for Reactor { // ===== impl Handle ===== impl Handle { - #[doc(hidden)] - #[deprecated(note = "semantics were sometimes surprising, use Handle::default()")] - pub fn current() -> Handle { - // TODO: Should this panic on error? - HandlePriv::try_current() - .map(|handle| Handle { - inner: Some(handle), - }) - .unwrap_or(Handle { - inner: Some(HandlePriv { inner: Weak::new() }), - }) - } - - pub(crate) fn as_priv(&self) -> Option<&HandlePriv> { - self.inner.as_ref() - } -} - -impl Unpark for Handle { - fn unpark(&self) { - if let Some(ref h) = self.inner { - h.wakeup(); - } - } -} - -impl Default for Handle { - /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor. - fn default() -> Handle { - Handle { inner: None } - } -} - -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} - -// ===== impl HandlePriv ===== - -impl HandlePriv { - /// Try to get a handle to the current reactor. + /// Returns a handle to the current reactor /// - /// Returns `Err` if no handle is found. - pub(super) fn try_current() -> io::Result { + /// # Panics + /// + /// This function panics if there is no current reactor set. + pub(super) fn current() -> Self { CURRENT_REACTOR.with(|current| match *current.borrow() { - Some(ref handle) => Ok(handle.clone()), - None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")), + Some(ref handle) => handle.clone(), + None => panic!("no current reactor"), }) } @@ -421,9 +357,15 @@ impl HandlePriv { } } -impl fmt::Debug for HandlePriv { +impl Unpark for Handle { + fn unpark(&self) { + self.wakeup(); + } +} + +impl fmt::Debug for Handle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "HandlePriv") + write!(f, "Handle") } } diff --git a/tokio-net/src/driver/registration.rs b/tokio-net/src/driver/registration.rs index 4d2cd9df0..ed53b84eb 100644 --- a/tokio-net/src/driver/registration.rs +++ b/tokio-net/src/driver/registration.rs @@ -1,12 +1,10 @@ use super::platform; -use super::reactor::{Direction, Handle, HandlePriv}; +use super::reactor::{Direction, Handle}; use mio::{self, Evented}; -use std::cell::UnsafeCell; -use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; -use std::task::{Context, Poll, Waker}; -use std::{io, ptr, usize}; +use std::task::{Context, Poll}; +use std::{io, usize}; /// Associates an I/O resource with the reactor instance that drives it. /// @@ -14,10 +12,9 @@ use std::{io, ptr, usize}; /// that it will receive task notifications on readiness. This is the lowest /// level API for integrating with a reactor. /// -/// The association between an I/O resource is made by calling [`register`]. -/// Once the association is established, it remains established until the -/// registration instance is dropped. Subsequent calls to [`register`] are -/// no-ops. +/// The association between an I/O resource is made by calling [`new`]. Once +/// the association is established, it remains established until the +/// registration instance is dropped. /// /// A registration instance represents two separate readiness streams. One for /// the read readiness and one for write readiness. These streams are @@ -36,86 +33,38 @@ use std::{io, ptr, usize}; /// These events are included as part of the read readiness event stream. The /// write readiness event stream is only for `Ready::writable()` events. /// -/// [`register`]: #method.register +/// [`new`]: #method.new /// [`poll_read_ready`]: #method.poll_read_ready`] /// [`poll_write_ready`]: #method.poll_write_ready`] #[derive(Debug)] pub struct Registration { - /// Stores the handle. Once set, the value is not changed. - /// - /// Setting this requires acquiring the lock from state. - inner: UnsafeCell>, - - /// Tracks the state of the registration. - /// - /// The least significant 2 bits are used to track the lifecycle of the - /// registration. The rest of the `state` variable is a pointer to tasks - /// that must be notified once the lock is released. - state: AtomicUsize, -} - -#[derive(Debug)] -struct Inner { - handle: HandlePriv, + handle: Handle, token: usize, } -/// Tasks waiting on readiness notifications. -#[derive(Debug)] -struct Node { - direction: Direction, - waker: Waker, - next: *mut Node, -} - -/// Initial state. The handle is not set and the registration is idle. -const INIT: usize = 0; - -/// A thread locked the state and will associate a handle. -const LOCKED: usize = 1; - -/// A handle has been associated with the registration. -const READY: usize = 2; - -/// Masks the lifecycle state -const LIFECYCLE_MASK: usize = 0b11; - -/// A fake token used to identify error situations -const ERROR: usize = usize::MAX; - // ===== impl Registration ===== impl Registration { - /// Create a new `Registration`. - /// - /// This registration is not associated with a Reactor instance. Call - /// `register` to establish the association. - pub fn new() -> Registration { - Registration { - inner: UnsafeCell::new(None), - state: AtomicUsize::new(INIT), - } - } - /// Register the I/O resource with the default reactor. /// - /// This function is safe to call concurrently and repeatedly. However, only - /// the first call will establish the registration. Subsequent calls will be - /// no-ops. - /// /// # Return /// - /// If the registration happened successfully, `Ok(true)` is returned. - /// - /// If an I/O resource has previously been successfully registered, - /// `Ok(false)` is returned. - /// - /// If an error is encountered during registration, `Err` is returned. - pub fn register(&self, io: &T) -> io::Result + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + pub fn new(io: &T) -> io::Result where T: Evented, { - self.register2(io, HandlePriv::try_current) + let handle = Handle::current(); + let token = if let Some(inner) = handle.inner() { + inner.add_source(io)? + } else { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + }; + Ok(Self { handle, token }) } /// Deregister the I/O resource from the reactor it is associated with. @@ -138,114 +87,11 @@ impl Registration { where T: Evented, { - // The state does not need to be checked and coordination is not - // necessary as this function takes `&mut self`. This guarantees a - // single thread is accessing the instance. - if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } { - inner.deregister(io)?; - } - - Ok(()) - } - - /// Register the I/O resource with the specified reactor. - /// - /// This function is safe to call concurrently and repeatedly. However, only - /// the first call will establish the registration. Subsequent calls will be - /// no-ops. - /// - /// If the registration happened successfully, `Ok(true)` is returned. - /// - /// If an I/O resource has previously been successfully registered, - /// `Ok(false)` is returned. - /// - /// If an error is encountered during registration, `Err` is returned. - pub fn register_with(&self, io: &T, handle: &Handle) -> io::Result - where - T: Evented, - { - self.register2(io, || match handle.as_priv() { - Some(handle) => Ok(handle.clone()), - None => HandlePriv::try_current(), - }) - } - - pub(crate) fn register_with_priv(&self, io: &T, handle: &HandlePriv) -> io::Result - where - T: Evented, - { - self.register2(io, || Ok(handle.clone())) - } - - fn register2(&self, io: &T, f: F) -> io::Result - where - T: Evented, - F: Fn() -> io::Result, - { - let mut state = self.state.load(SeqCst); - - loop { - match state { - INIT => { - // Registration is currently not associated with a handle. - // Get a handle then attempt to lock the state. - let handle = f()?; - - let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst); - - if actual != state { - state = actual; - continue; - } - - // Create the actual registration - let (inner, res) = Inner::new(io, handle); - - unsafe { - *self.inner.get() = Some(inner); - } - - // Transition out of the locked state. This acquires the - // current value, potentially having a list of tasks that - // are pending readiness notifications. - let actual = self.state.swap(READY, SeqCst); - - // Consume the stack of nodes - - let mut read = false; - let mut write = false; - let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node; - - let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; - - while !ptr.is_null() { - let node = unsafe { Box::from_raw(ptr) }; - let node = *node; - let Node { - direction, - waker, - next, - } = node; - - let flag = match direction { - Direction::Read => &mut read, - Direction::Write => &mut write, - }; - - if !*flag { - *flag = true; - - inner.register(direction, waker); - } - - ptr = next; - } - - return res.map(|_| true); - } - _ => return Ok(false), - } - } + let inner = match self.handle.inner() { + Some(inner) => inner, + None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), + }; + inner.deregister_source(io) } /// Poll for events on the I/O resource's read readiness stream. @@ -350,160 +196,26 @@ impl Registration { self.poll_ready(Direction::Write, None) } + /// Poll for events on the I/O resource's `direction` readiness stream. + /// + /// If called with a task context, notify the task when a new event is + /// received. fn poll_ready( &self, direction: Direction, cx: Option<&mut Context<'_>>, ) -> io::Result> { - let mut state = self.state.load(SeqCst); - - // Cache the node pointer - let mut node = None; - - loop { - match state { - INIT => { - return Err(io::Error::new( - io::ErrorKind::Other, - "must call register before poll_read_ready", - )); - } - READY => { - let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; - return inner.poll_ready(direction, cx); - } - LOCKED => { - let cx = if let Some(ref cx) = cx { - cx - } else { - // Skip the notification tracking junk. - return Ok(None); - }; - - let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node; - - // Get the node - let mut n = node.take().unwrap_or_else(|| { - Box::new(Node { - direction, - waker: cx.waker().clone(), - next: ptr::null_mut(), - }) - }); - - n.next = next_ptr; - - let node_ptr = Box::into_raw(n); - let next = node_ptr as usize | (state & LIFECYCLE_MASK); - - let actual = self.state.compare_and_swap(state, next, SeqCst); - - if actual != state { - // Back out of the node boxing - let n = unsafe { Box::from_raw(node_ptr) }; - - // Save this for next loop - node = Some(n); - - state = actual; - continue; - } - - return Ok(None); - } - _ => unreachable!(), - } - } - } -} - -impl Default for Registration { - fn default() -> Self { - Self::new() - } -} - -unsafe impl Send for Registration {} -unsafe impl Sync for Registration {} - -// ===== impl Inner ===== - -impl Inner { - fn new(io: &T, handle: HandlePriv) -> (Self, io::Result<()>) - where - T: Evented, - { - let mut res = Ok(()); - - let token = match handle.inner() { - Some(inner) => match inner.add_source(io) { - Ok(token) => token, - Err(e) => { - res = Err(e); - ERROR - } - }, - None => { - res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone")); - ERROR - } - }; - - let inner = Inner { handle, token }; - - (inner, res) - } - - fn register(&self, direction: Direction, waker: Waker) { - if self.token == ERROR { - waker.wake(); - return; - } - - let inner = match self.handle.inner() { - Some(inner) => inner, - None => { - waker.wake(); - return; - } - }; - - inner.register(self.token, direction, waker); - } - - fn deregister(&self, io: &E) -> io::Result<()> { - if self.token == ERROR { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to associate with reactor", - )); - } - let inner = match self.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; - inner.deregister_source(io) - } - - fn poll_ready( - &self, - direction: Direction, - cx: Option<&mut Context<'_>>, - ) -> io::Result> { - if self.token == ERROR { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to associate with reactor", - )); + // If the task should be notified about new events, ensure that it has + // been registered + if let Some(ref cx) = cx { + inner.register(self.token, direction, cx.waker().clone()) } - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - let mask = direction.mask(); let mask_no_hup = (mask - platform::hup()).as_usize(); @@ -544,17 +256,15 @@ impl Inner { } } -impl Drop for Inner { - fn drop(&mut self) { - if self.token == ERROR { - return; - } +unsafe impl Send for Registration {} +unsafe impl Sync for Registration {} +impl Drop for Registration { + fn drop(&mut self) { let inner = match self.handle.inner() { Some(inner) => inner, None => return, }; - inner.drop_source(self.token); } } diff --git a/tokio-net/src/process/unix/mod.rs b/tokio-net/src/process/unix/mod.rs index 822dfc2fe..5328c3775 100644 --- a/tokio-net/src/process/unix/mod.rs +++ b/tokio-net/src/process/unix/mod.rs @@ -221,5 +221,5 @@ where return Err(io::Error::last_os_error()); } } - Ok(Some(PollEvented::new(Fd { inner: io }))) + Ok(Some(PollEvented::new(Fd { inner: io })?)) } diff --git a/tokio-net/src/process/windows.rs b/tokio-net/src/process/windows.rs index f245340ad..a01153565 100644 --- a/tokio-net/src/process/windows.rs +++ b/tokio-net/src/process/windows.rs @@ -188,5 +188,5 @@ where None => return None, }; let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) }; - Some(PollEvented::new(pipe)) + PollEvented::new(pipe).ok() } diff --git a/tokio-net/src/signal/unix.rs b/tokio-net/src/signal/unix.rs index f485b4e94..db032b101 100644 --- a/tokio-net/src/signal/unix.rs +++ b/tokio-net/src/signal/unix.rs @@ -294,7 +294,7 @@ impl Driver { // either, since we can't compare Handles or assume they will always // point to the exact same reactor. let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream); + let wakeup = PollEvented::new(stream)?; Ok(Driver { wakeup }) } diff --git a/tokio-net/src/tcp/listener.rs b/tokio-net/src/tcp/listener.rs index b25f25607..0b940f00a 100644 --- a/tokio-net/src/tcp/listener.rs +++ b/tokio-net/src/tcp/listener.rs @@ -1,7 +1,6 @@ #[cfg(feature = "async-traits")] use super::incoming::Incoming; use super::TcpStream; -use crate::driver::Handle; use crate::util::PollEvented; use crate::ToSocketAddrs; @@ -96,7 +95,7 @@ impl TcpListener { fn bind_addr(addr: SocketAddr) -> io::Result { let listener = mio::net::TcpListener::bind(&addr)?; - Ok(TcpListener::new(listener)) + TcpListener::new(listener) } /// Accept a new incoming connection from this listener. @@ -137,7 +136,7 @@ impl TcpListener { let (io, addr) = ready!(self.poll_accept_std(cx))?; let io = mio::net::TcpStream::from_stream(io)?; - let io = TcpStream::new(io); + let io = TcpStream::new(io)?; Poll::Ready(Ok((io, addr))) } @@ -173,10 +172,6 @@ impl TcpListener { /// bound to and the listener will only be guaranteed to accept connections /// of the same address type currently. /// - /// Finally, the `handle` argument is the event loop that this listener will - /// be bound to. - /// Use [`Handle::default()`] to lazily bind to an event loop, just like `bind` does. - /// /// The platform specific behavior of this function looks like: /// /// * On Unix, the socket is placed into nonblocking mode and connections @@ -187,29 +182,28 @@ impl TcpListener { /// `addr` is an IPv4 address then all sockets accepted will be IPv4 as /// well (same for IPv6). /// - /// [`Handle::default()`]: ../reactor/struct.Handle.html /// # Examples /// - /// ```no_run + /// ```rust,no_run + /// use std::error::Error; /// use tokio::net::TcpListener; - /// use tokio_net::driver::Handle; /// - /// use std::net::TcpListener as StdTcpListener; - /// - /// let std_listener = StdTcpListener::bind("127.0.0.1:0")?; - /// let listener = TcpListener::from_std(std_listener, &Handle::default())?; - /// # let _ = listener; - /// # Ok::<_, Box>(()) + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?; + /// let listener = TcpListener::from_std(std_listener)?; + /// Ok(()) + /// } /// ``` - pub fn from_std(listener: net::TcpListener, handle: &Handle) -> io::Result { + pub fn from_std(listener: net::TcpListener) -> io::Result { let io = mio::net::TcpListener::from_std(listener)?; - let io = PollEvented::new_with_handle(io, handle)?; + let io = PollEvented::new(io)?; Ok(TcpListener { io }) } - fn new(listener: mio::net::TcpListener) -> TcpListener { - let io = PollEvented::new(listener); - TcpListener { io } + fn new(listener: mio::net::TcpListener) -> io::Result { + let io = PollEvented::new(listener)?; + Ok(TcpListener { io }) } /// Returns the local address that this listener is bound to. @@ -219,7 +213,7 @@ impl TcpListener { /// /// # Examples /// - /// ``` + /// ```rust,no_run /// use tokio::net::TcpListener; /// /// use std::io; @@ -329,9 +323,9 @@ impl TryFrom for TcpListener { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`TcpListener::from_std(stream, &Handle::default())`](TcpListener::from_std). + /// [`TcpListener::from_std(stream)`](TcpListener::from_std). fn try_from(stream: net::TcpListener) -> Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/tcp/stream.rs b/tokio-net/src/tcp/stream.rs index ea841c264..2634e6905 100644 --- a/tokio-net/src/tcp/stream.rs +++ b/tokio-net/src/tcp/stream.rs @@ -1,5 +1,4 @@ use super::split::{split, ReadHalf, WriteHalf}; -use crate::driver::Handle; use crate::util::PollEvented; use crate::ToSocketAddrs; @@ -101,7 +100,7 @@ impl TcpStream { /// Establish a connection to the specified `addr`. async fn connect_addr(addr: SocketAddr) -> io::Result { let sys = mio::net::TcpStream::connect(&addr)?; - let stream = TcpStream::new(sys); + let stream = TcpStream::new(sys)?; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're @@ -118,33 +117,32 @@ impl TcpStream { Ok(stream) } - pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream { - let io = PollEvented::new(connected); - TcpStream { io } + pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result { + let io = PollEvented::new(connected)?; + Ok(TcpStream { io }) } /// Create a new `TcpStream` from a `std::net::TcpStream`. /// /// This function will convert a TCP stream created by the standard library /// to a TCP stream ready to be used with the provided event loop handle. - /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does. /// /// # Examples /// - /// ```no_run + /// ```rust,no_run + /// use std::error::Error; /// use tokio::net::TcpStream; - /// use tokio_net::driver::Handle; /// - /// # fn dox() -> std::io::Result<()> { - /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; - /// let stream = TcpStream::from_std(std_stream, &Handle::default())?; - /// # Ok(()) - /// # } + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; + /// let stream = TcpStream::from_std(std_stream)?; + /// Ok(()) + /// } /// ``` - pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result { + pub fn from_std(stream: net::TcpStream) -> io::Result { let io = mio::net::TcpStream::from_stream(stream)?; - let io = PollEvented::new_with_handle(io, handle)?; - + let io = PollEvented::new(io)?; Ok(TcpStream { io }) } @@ -152,13 +150,9 @@ impl TcpStream { // // This should be removed in favor of some in-crate TcpSocket builder API. #[doc(hidden)] - pub async fn connect_std( - stream: net::TcpStream, - addr: &SocketAddr, - handle: &Handle, - ) -> io::Result { + pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result { let io = mio::net::TcpStream::connect_stream(stream, addr)?; - let io = PollEvented::new_with_handle(io, handle)?; + let io = PollEvented::new(io)?; let stream = TcpStream { io }; // Once we've connected, wait for the stream to be writable as @@ -743,9 +737,9 @@ impl TryFrom for TcpStream { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`TcpStream::from_std(stream, &Handle::default())`](TcpStream::from_std). + /// [`TcpStream::from_std(stream)`](TcpStream::from_std). fn try_from(stream: net::TcpStream) -> Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/udp/socket.rs b/tokio-net/src/udp/socket.rs index 00a42469a..c59b564b9 100644 --- a/tokio-net/src/udp/socket.rs +++ b/tokio-net/src/udp/socket.rs @@ -1,5 +1,4 @@ use super::split::{split, UdpSocketRecvHalf, UdpSocketSendHalf}; -use crate::driver::Handle; use crate::util::PollEvented; use crate::ToSocketAddrs; @@ -40,12 +39,13 @@ impl UdpSocket { } fn bind_addr(addr: SocketAddr) -> io::Result { - mio::net::UdpSocket::bind(&addr).map(UdpSocket::new) + let sys = mio::net::UdpSocket::bind(&addr)?; + UdpSocket::new(sys) } - fn new(socket: mio::net::UdpSocket) -> UdpSocket { - let io = PollEvented::new(socket); - UdpSocket { io } + fn new(socket: mio::net::UdpSocket) -> io::Result { + let io = PollEvented::new(socket)?; + Ok(UdpSocket { io }) } /// Creates a new `UdpSocket` from the previously bound socket provided. @@ -57,11 +57,9 @@ impl UdpSocket { /// This can be used in conjunction with net2's `UdpBuilder` interface to /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. - /// - /// Use `Handle::default()` to lazily bind to an event loop, just like `bind` does. - pub fn from_std(socket: net::UdpSocket, handle: &Handle) -> io::Result { + pub fn from_std(socket: net::UdpSocket) -> io::Result { let io = mio::net::UdpSocket::from_socket(socket)?; - let io = PollEvented::new_with_handle(io, handle)?; + let io = PollEvented::new(io)?; Ok(UdpSocket { io }) } @@ -386,9 +384,9 @@ impl TryFrom for UdpSocket { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`UdpSocket::from_std(stream, &Handle::default())`](UdpSocket::from_std). + /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). fn try_from(stream: net::UdpSocket) -> Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/uds/datagram.rs b/tokio-net/src/uds/datagram.rs index 445c713fb..9073a2383 100644 --- a/tokio-net/src/uds/datagram.rs +++ b/tokio-net/src/uds/datagram.rs @@ -1,4 +1,3 @@ -use crate::driver::Handle; use crate::util::PollEvented; use futures_core::ready; @@ -25,7 +24,7 @@ impl UnixDatagram { P: AsRef, { let socket = mio_uds::UnixDatagram::bind(path)?; - Ok(UnixDatagram::new(socket)) + UnixDatagram::new(socket) } /// Creates an unnamed pair of connected sockets. @@ -35,8 +34,8 @@ impl UnixDatagram { /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { let (a, b) = mio_uds::UnixDatagram::pair()?; - let a = UnixDatagram::new(a); - let b = UnixDatagram::new(b); + let a = UnixDatagram::new(a)?; + let b = UnixDatagram::new(b)?; Ok((a, b)) } @@ -46,21 +45,21 @@ impl UnixDatagram { /// /// The returned datagram will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_std(datagram: net::UnixDatagram, handle: &Handle) -> io::Result { + pub fn from_std(datagram: net::UnixDatagram) -> io::Result { let socket = mio_uds::UnixDatagram::from_datagram(datagram)?; - let io = PollEvented::new_with_handle(socket, handle)?; + let io = PollEvented::new(socket)?; Ok(UnixDatagram { io }) } - fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { - let io = PollEvented::new(socket); - UnixDatagram { io } + fn new(socket: mio_uds::UnixDatagram) -> io::Result { + let io = PollEvented::new(socket)?; + Ok(UnixDatagram { io }) } /// Creates a new `UnixDatagram` which is not bound to any address. pub fn unbound() -> io::Result { let socket = mio_uds::UnixDatagram::unbound()?; - Ok(UnixDatagram::new(socket)) + UnixDatagram::new(socket) } /// Connects the socket to the specified address. @@ -216,9 +215,9 @@ impl TryFrom for UnixDatagram { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`UnixDatagram::from_std(stream, &Handle::default())`](UnixDatagram::from_std). + /// [`UnixDatagram::from_std(stream)`](UnixDatagram::from_std). fn try_from(stream: net::UnixDatagram) -> Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/uds/listener.rs b/tokio-net/src/uds/listener.rs index 9c1802745..b07ff9f35 100644 --- a/tokio-net/src/uds/listener.rs +++ b/tokio-net/src/uds/listener.rs @@ -1,5 +1,4 @@ use super::UnixStream; -use crate::driver::Handle; use crate::util::PollEvented; use futures_core::ready; @@ -26,7 +25,7 @@ impl UnixListener { P: AsRef, { let listener = mio_uds::UnixListener::bind(path)?; - let io = PollEvented::new(listener); + let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -35,9 +34,9 @@ impl UnixListener { /// /// The returned listener will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_std(listener: net::UnixListener, handle: &Handle) -> io::Result { + pub fn from_std(listener: net::UnixListener) -> io::Result { let listener = mio_uds::UnixListener::from_listener(listener)?; - let io = PollEvented::new_with_handle(listener, handle)?; + let io = PollEvented::new(listener)?; Ok(UnixListener { io }) } @@ -63,7 +62,7 @@ impl UnixListener { let (io, addr) = ready!(self.poll_accept_std(cx))?; let io = mio_uds::UnixStream::from_stream(io)?; - Ok((UnixStream::new(io), addr)).into() + Ok((UnixStream::new(io)?, addr)).into() } fn poll_accept_std( @@ -117,9 +116,9 @@ impl TryFrom for UnixListener { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`UnixListener::from_std(stream, &Handle::default())`](UnixListener::from_std). + /// [`UnixListener::from_std(stream)`](UnixListener::from_std). fn try_from(stream: net::UnixListener) -> io::Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/uds/stream.rs b/tokio-net/src/uds/stream.rs index 1164390d0..3e8e8e828 100644 --- a/tokio-net/src/uds/stream.rs +++ b/tokio-net/src/uds/stream.rs @@ -1,6 +1,5 @@ use super::split::{split, ReadHalf, WriteHalf}; use super::ucred::{self, UCred}; -use crate::driver::Handle; use crate::util::PollEvented; use tokio_io::{AsyncRead, AsyncWrite}; @@ -39,7 +38,7 @@ impl UnixStream { P: AsRef, { let stream = mio_uds::UnixStream::connect(path)?; - let stream = UnixStream::new(stream); + let stream = UnixStream::new(stream)?; poll_fn(|cx| stream.io.poll_write_ready(cx)).await?; Ok(stream) @@ -50,9 +49,9 @@ impl UnixStream { /// /// The returned stream will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_std(stream: net::UnixStream, handle: &Handle) -> io::Result { + pub fn from_std(stream: net::UnixStream) -> io::Result { let stream = mio_uds::UnixStream::from_stream(stream)?; - let io = PollEvented::new_with_handle(stream, handle)?; + let io = PollEvented::new(stream)?; Ok(UnixStream { io }) } @@ -64,15 +63,15 @@ impl UnixStream { /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { let (a, b) = mio_uds::UnixStream::pair()?; - let a = UnixStream::new(a); - let b = UnixStream::new(b); + let a = UnixStream::new(a)?; + let b = UnixStream::new(b)?; Ok((a, b)) } - pub(crate) fn new(stream: mio_uds::UnixStream) -> UnixStream { - let io = PollEvented::new(stream); - UnixStream { io } + pub(crate) fn new(stream: mio_uds::UnixStream) -> io::Result { + let io = PollEvented::new(stream)?; + Ok(UnixStream { io }) } /// Returns the socket address of the local half of this connection. @@ -134,9 +133,9 @@ impl TryFrom for UnixStream { /// Consumes stream, returning the tokio I/O object. /// /// This is equivalent to - /// [`UnixStream::from_std(stream, &Handle::default())`](UnixStream::from_std). + /// [`UnixStream::from_std(stream)`](UnixStream::from_std). fn try_from(stream: net::UnixStream) -> io::Result { - Self::from_std(stream, &Handle::default()) + Self::from_std(stream) } } diff --git a/tokio-net/src/uds/ucred.rs b/tokio-net/src/uds/ucred.rs index 96683fe2a..37ed334fa 100644 --- a/tokio-net/src/uds/ucred.rs +++ b/tokio-net/src/uds/ucred.rs @@ -157,12 +157,12 @@ pub(crate) mod impl_solaris { #[cfg(not(target_os = "dragonfly"))] #[cfg(test)] mod test { - use crate::uds::UnixStream; + use tokio::net::UnixStream; use libc::getegid; use libc::geteuid; - #[test] + #[tokio::test] #[cfg_attr( target_os = "freebsd", ignore = "Requires FreeBSD 12.0 or later. https://bugs.freebsd.org/bugzilla/show_bug.cgi?id=176419" @@ -171,7 +171,7 @@ mod test { target_os = "netbsd", ignore = "NetBSD does not support getpeereid() for sockets created by socketpair()" )] - fn test_socket_pair() { + async fn test_socket_pair() { let (a, b) = UnixStream::pair().unwrap(); let cred_a = a.peer_cred().unwrap(); let cred_b = b.peer_cred().unwrap(); diff --git a/tokio-net/src/util/poll_evented.rs b/tokio-net/src/util/poll_evented.rs index 2d94a9b84..310dcadb4 100644 --- a/tokio-net/src/util/poll_evented.rs +++ b/tokio-net/src/util/poll_evented.rs @@ -1,4 +1,4 @@ -use crate::driver::{platform, Handle, Registration}; +use crate::driver::{platform, Registration}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -121,8 +121,6 @@ struct Inner { macro_rules! poll_ready { ($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{ - $me.register()?; - // Load cached & encoded readiness. let mut cached = $me.inner.$cache.load(Relaxed); let mask = $mask | platform::hup(); @@ -168,28 +166,16 @@ where E: Evented, { /// Creates a new `PollEvented` associated with the default reactor. - pub fn new(io: E) -> PollEvented { - PollEvented { + pub fn new(io: E) -> io::Result { + let registration = Registration::new(&io)?; + Ok(Self { io: Some(io), inner: Inner { - registration: Registration::new(), + registration, read_readiness: AtomicUsize::new(0), write_readiness: AtomicUsize::new(0), }, - } - } - - /// Creates a new `PollEvented` associated with the specified reactor. - pub fn new_with_handle(io: E, handle: &Handle) -> io::Result { - let ret = PollEvented::new(io); - - if let Some(handle) = handle.as_priv() { - ret.inner - .registration - .register_with_priv(ret.io.as_ref().unwrap(), handle)?; - } - - Ok(ret) + }) } /// Returns a shared reference to the underlying I/O object this readiness @@ -341,14 +327,6 @@ where Ok(()) } - - /// Ensure that the I/O resource is registered with the reactor. - fn register(&self) -> io::Result<()> { - self.inner - .registration - .register(self.io.as_ref().unwrap())?; - Ok(()) - } } // ===== Read / Write impls ===== diff --git a/tokio-net/tests/bind_resource.rs b/tokio-net/tests/bind_resource.rs new file mode 100644 index 000000000..89a1656df --- /dev/null +++ b/tokio-net/tests/bind_resource.rs @@ -0,0 +1,22 @@ +#![cfg(unix)] +#![cfg(feature = "signal")] + +mod support; + +use std::convert::TryFrom; +use std::net; +use support::*; +use tokio::net::TcpListener; + +#[test] +#[should_panic] +fn no_runtime_panics_binding_net_tcp_listener() { + let listener = net::TcpListener::bind("127.0.0.1:0").expect("failed to bind listener"); + let _ = TcpListener::try_from(listener); +} + +#[test] +#[should_panic] +fn no_runtime_panics_creating_signals() { + let _ = signal(SignalKind::hangup()); +} diff --git a/tokio-net/tests/process_issue_42.rs b/tokio-net/tests/process_issue_42.rs index 03745e0d2..a25d22f52 100644 --- a/tokio-net/tests/process_issue_42.rs +++ b/tokio-net/tests/process_issue_42.rs @@ -4,7 +4,6 @@ use futures_util::future::FutureExt; use futures_util::stream::FuturesOrdered; -use futures_util::stream::StreamExt; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -21,24 +20,23 @@ fn run_test() { let finished_clone = finished.clone(); thread::spawn(move || { - let mut futures = FuturesOrdered::new(); - for i in 0..2 { - futures.push( - Command::new("echo") - .arg(format!("I am spawned process #{}", i)) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn() - .unwrap() - .boxed(), - ) - } - let mut rt = current_thread::Runtime::new().expect("failed to get runtime"); - rt.block_on(with_timeout(futures.collect::>())); + let mut futures = FuturesOrdered::new(); + run_with_timeout(&mut rt, async { + for i in 0..2 { + futures.push( + Command::new("echo") + .arg(format!("I am spawned process #{}", i)) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .unwrap() + .boxed(), + ) + } + }); drop(rt); - finished_clone.store(true, Ordering::SeqCst); }); diff --git a/tokio-net/tests/signal_drop_multi_loop.rs b/tokio-net/tests/signal_drop_multi_loop.rs index 04b087f22..ee3aa1bdc 100644 --- a/tokio-net/tests/signal_drop_multi_loop.rs +++ b/tokio-net/tests/signal_drop_multi_loop.rs @@ -7,30 +7,25 @@ use support::*; #[test] fn dropping_loops_does_not_cause_starvation() { - let (mut rt, signal) = { - let kind = SignalKind::user_defined1(); + let kind = SignalKind::user_defined1(); - let mut first_rt = CurrentThreadRuntime::new().expect("failed to init first runtime"); - let mut first_signal = signal(kind).expect("failed to register first signal"); + let mut first_rt = CurrentThreadRuntime::new().expect("failed to init first runtime"); + let mut first_signal = + first_rt.block_on(async { signal(kind).expect("failed to register first signal") }); - let mut second_rt = CurrentThreadRuntime::new().expect("failed to init second runtime"); - let mut second_signal = signal(kind).expect("failed to register second signal"); - - send_signal(libc::SIGUSR1); - - let _ = run_with_timeout(&mut first_rt, first_signal.next()) - .expect("failed to await first signal"); - - let _ = run_with_timeout(&mut second_rt, second_signal.next()) - .expect("failed to await second signal"); - - drop(first_rt); - drop(first_signal); - - (second_rt, second_signal) - }; + let mut second_rt = CurrentThreadRuntime::new().expect("failed to init second runtime"); + let mut second_signal = + second_rt.block_on(async { signal(kind).expect("failed to register second signal") }); send_signal(libc::SIGUSR1); - let _ = run_with_timeout(&mut rt, signal.into_future()); + let _ = + run_with_timeout(&mut first_rt, first_signal.next()).expect("failed to await first signal"); + + drop(first_rt); + drop(first_signal); + + send_signal(libc::SIGUSR1); + + let _ = run_with_timeout(&mut second_rt, second_signal.next()); } diff --git a/tokio-net/tests/signal_multi_loop.rs b/tokio-net/tests/signal_multi_loop.rs index e94c8fb2e..58265d879 100644 --- a/tokio-net/tests/signal_multi_loop.rs +++ b/tokio-net/tests/signal_multi_loop.rs @@ -20,9 +20,11 @@ fn multi_loop() { let sender = sender.clone(); thread::spawn(move || { let mut rt = CurrentThreadRuntime::new().unwrap(); - let signal = signal(SignalKind::hangup()).unwrap(); - sender.send(()).unwrap(); - let _ = run_with_timeout(&mut rt, signal.into_future()); + let _ = run_with_timeout(&mut rt, async { + let signal = signal(SignalKind::hangup()).unwrap(); + sender.send(()).unwrap(); + signal.into_future().await + }); }) }) .collect(); diff --git a/tokio/tests/drop-core.rs b/tokio/tests/drop-core.rs index 3b860e4db..f1906ab37 100644 --- a/tokio/tests/drop-core.rs +++ b/tokio/tests/drop-core.rs @@ -2,15 +2,19 @@ #![cfg(feature = "default")] use tokio::net::TcpListener; -use tokio_net::driver::Reactor; +use tokio_net::driver::{self, Reactor}; use tokio_test::{assert_err, assert_pending, assert_ready, task}; #[test] fn tcp_doesnt_block() { let reactor = Reactor::new().unwrap(); let handle = reactor.handle(); + + // Set the current reactor for this thread + let _reactor_guard = driver::set_default(&handle); + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let mut listener = TcpListener::from_std(listener, &handle).unwrap(); + let mut listener = TcpListener::from_std(listener).unwrap(); drop(reactor); let mut task = task::spawn(async move { @@ -24,8 +28,12 @@ fn tcp_doesnt_block() { fn drop_wakes() { let reactor = Reactor::new().unwrap(); let handle = reactor.handle(); + + // Set the current reactor for this thread + let _reactor_guard = driver::set_default(&handle); + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); - let mut listener = TcpListener::from_std(listener, &handle).unwrap(); + let mut listener = TcpListener::from_std(listener).unwrap(); let mut task = task::spawn(async move { assert_err!(listener.accept().await);