From ce23db6bc7c6a2853377db629ca8b761a45e0476 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 27 Jun 2023 16:24:36 -0700 Subject: [PATCH] rt: reorganize I/O driver source (#5828) Moves `Driver` into its own file and eliminates a bunch of code defined in macros. --- tokio/src/io/ready.rs | 52 ++-- tokio/src/runtime/io/driver.rs | 332 +++++++++++++++++++++ tokio/src/runtime/io/driver/signal.rs | 22 ++ tokio/src/runtime/io/mod.rs | 349 +--------------------- tokio/src/runtime/io/registration.rs | 54 ++-- tokio/src/runtime/io/scheduled_io.rs | 404 +++++++++++++------------- tokio/src/util/linked_list.rs | 2 +- 7 files changed, 608 insertions(+), 607 deletions(-) create mode 100644 tokio/src/runtime/io/driver.rs create mode 100644 tokio/src/runtime/io/driver/signal.rs diff --git a/tokio/src/io/ready.rs b/tokio/src/io/ready.rs index 33e2ef0b6..c2a80264b 100644 --- a/tokio/src/io/ready.rs +++ b/tokio/src/io/ready.rs @@ -1,5 +1,7 @@ #![cfg_attr(not(feature = "net"), allow(unreachable_pub))] +use crate::io::interest::Interest; + use std::fmt; use std::ops; @@ -208,41 +210,35 @@ impl Ready { pub(crate) fn as_usize(self) -> usize { self.0 } -} -cfg_io_readiness! { - use crate::io::Interest; + pub(crate) fn from_interest(interest: Interest) -> Ready { + let mut ready = Ready::EMPTY; - impl Ready { - pub(crate) fn from_interest(interest: Interest) -> Ready { - let mut ready = Ready::EMPTY; - - if interest.is_readable() { - ready |= Ready::READABLE; - ready |= Ready::READ_CLOSED; - } - - if interest.is_writable() { - ready |= Ready::WRITABLE; - ready |= Ready::WRITE_CLOSED; - } - - #[cfg(any(target_os = "linux", target_os = "android"))] - if interest.is_priority() { - ready |= Ready::PRIORITY; - ready |= Ready::READ_CLOSED; - } - - ready + if interest.is_readable() { + ready |= Ready::READABLE; + ready |= Ready::READ_CLOSED; } - pub(crate) fn intersection(self, interest: Interest) -> Ready { - Ready(self.0 & Ready::from_interest(interest).0) + if interest.is_writable() { + ready |= Ready::WRITABLE; + ready |= Ready::WRITE_CLOSED; } - pub(crate) fn satisfies(self, interest: Interest) -> bool { - self.0 & Ready::from_interest(interest).0 != 0 + #[cfg(any(target_os = "linux", target_os = "android"))] + if interest.is_priority() { + ready |= Ready::PRIORITY; + ready |= Ready::READ_CLOSED; } + + ready + } + + pub(crate) fn intersection(self, interest: Interest) -> Ready { + Ready(self.0 & Ready::from_interest(interest).0) + } + + pub(crate) fn satisfies(self, interest: Interest) -> bool { + self.0 & Ready::from_interest(interest).0 != 0 } } diff --git a/tokio/src/runtime/io/driver.rs b/tokio/src/runtime/io/driver.rs new file mode 100644 index 000000000..0c1debaa0 --- /dev/null +++ b/tokio/src/runtime/io/driver.rs @@ -0,0 +1,332 @@ +// Signal handling +cfg_signal_internal_and_unix! { + mod signal; +} + +use crate::io::interest::Interest; +use crate::io::ready::Ready; +use crate::runtime::driver; +use crate::runtime::io::{IoDriverMetrics, ScheduledIo}; +use crate::util::slab::{self, Slab}; +use crate::{loom::sync::RwLock, util::bit}; + +use std::fmt; +use std::io; +use std::time::Duration; + +/// I/O driver, backed by Mio. +pub(crate) struct Driver { + /// Tracks the number of times `turn` is called. It is safe for this to wrap + /// as it is mostly used to determine when to call `compact()`. + tick: u8, + + /// True when an event with the signal token is received + signal_ready: bool, + + /// Reuse the `mio::Events` value across calls to poll. + events: mio::Events, + + /// Primary slab handle containing the state for each resource registered + /// with this driver. + resources: Slab, + + /// The system event queue. + poll: mio::Poll, +} + +/// A reference to an I/O driver. +pub(crate) struct Handle { + /// Registers I/O resources. + registry: mio::Registry, + + /// Allocates `ScheduledIo` handles when creating new resources. + io_dispatch: RwLock, + + /// Used to wake up the reactor from a call to `turn`. + /// Not supported on Wasi due to lack of threading support. + #[cfg(not(tokio_wasi))] + waker: mio::Waker, + + pub(crate) metrics: IoDriverMetrics, +} + +#[derive(Debug)] +pub(crate) struct ReadyEvent { + pub(super) tick: u8, + pub(crate) ready: Ready, + pub(super) is_shutdown: bool, +} + +cfg_net_unix!( + impl ReadyEvent { + pub(crate) fn with_ready(&self, ready: Ready) -> Self { + Self { + ready, + tick: self.tick, + is_shutdown: self.is_shutdown, + } + } + } +); + +struct IoDispatcher { + allocator: slab::Allocator, + is_shutdown: bool, +} + +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub(super) enum Direction { + Read, + Write, +} + +pub(super) enum Tick { + Set(u8), + Clear(u8), +} + +// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup +// token. +const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); +const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31)); + +const ADDRESS: bit::Pack = bit::Pack::least_significant(24); + +// Packs the generation value in the `readiness` field. +// +// The generation prevents a race condition where a slab slot is reused for a +// new socket while the I/O driver is about to apply a readiness event. The +// generation value is checked when setting new readiness. If the generation do +// not match, then the readiness event is discarded. +pub(super) const GENERATION: bit::Pack = ADDRESS.then(7); + +fn _assert_kinds() { + fn _assert() {} + + _assert::(); +} + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { + let poll = mio::Poll::new()?; + #[cfg(not(tokio_wasi))] + let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; + let registry = poll.registry().try_clone()?; + + let slab = Slab::new(); + let allocator = slab.allocator(); + + let driver = Driver { + tick: 0, + signal_ready: false, + events: mio::Events::with_capacity(nevents), + poll, + resources: slab, + }; + + let handle = Handle { + registry, + io_dispatch: RwLock::new(IoDispatcher::new(allocator)), + #[cfg(not(tokio_wasi))] + waker, + metrics: IoDriverMetrics::default(), + }; + + Ok((driver, handle)) + } + + pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + self.turn(handle, None); + } + + pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { + let handle = rt_handle.io(); + self.turn(handle, Some(duration)); + } + + pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { + let handle = rt_handle.io(); + + if handle.shutdown() { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it that the + // runtime is being shutdown. And shutdown will clear all wakers. + io.shutdown(); + }); + } + } + + fn turn(&mut self, handle: &Handle, max_wait: Option) { + // How often to call `compact()` on the resource slab + const COMPACT_INTERVAL: u8 = 255; + + self.tick = self.tick.wrapping_add(1); + + if self.tick == COMPACT_INTERVAL { + self.resources.compact() + } + + let events = &mut self.events; + + // Block waiting for an event to happen, peeling out how many events + // happened. + match self.poll.poll(events, max_wait) { + Ok(_) => {} + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + #[cfg(tokio_wasi)] + Err(e) if e.kind() == io::ErrorKind::InvalidInput => { + // In case of wasm32_wasi this error happens, when trying to poll without subscriptions + // just return from the park, as there would be nothing, which wakes us up. + } + Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e), + } + + // Process all the events that came in, dispatching appropriately + let mut ready_count = 0; + for event in events.iter() { + let token = event.token(); + + if token == TOKEN_WAKEUP { + // Nothing to do, the event is used to unblock the I/O driver + } else if token == TOKEN_SIGNAL { + self.signal_ready = true; + } else { + Self::dispatch( + &mut self.resources, + self.tick, + token, + Ready::from_mio(event), + ); + ready_count += 1; + } + } + + handle.metrics.incr_ready_count_by(ready_count); + } + + fn dispatch(resources: &mut Slab, tick: u8, token: mio::Token, ready: Ready) { + let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); + + let io = match resources.get(addr) { + Some(io) => io, + None => return, + }; + + let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready); + + if res.is_err() { + // token no longer valid! + return; + } + + io.wake(ready); + } +} + +impl fmt::Debug for Driver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Driver") + } +} + +impl Handle { + /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise + /// makes the next call to `turn` return immediately. + /// + /// This method is intended to be used in situations where a notification + /// needs to otherwise be sent to the main reactor. If the reactor is + /// currently blocked inside of `turn` then it will wake up and soon return + /// after this method has been called. If the reactor is not currently + /// blocked in `turn`, then the next call to `turn` will not block and + /// return immediately. + pub(crate) fn unpark(&self) { + #[cfg(not(tokio_wasi))] + self.waker.wake().expect("failed to wake I/O driver"); + } + + /// Registers an I/O resource with the reactor for a given `mio::Ready` state. + /// + /// The registration token is returned. + pub(super) fn add_source( + &self, + source: &mut impl mio::event::Source, + interest: Interest, + ) -> io::Result> { + let (address, shared) = self.allocate()?; + + let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); + + self.registry + .register(source, mio::Token(token), interest.to_mio())?; + + self.metrics.incr_fd_count(); + + Ok(shared) + } + + /// Deregisters an I/O resource from the reactor. + pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { + self.registry.deregister(source)?; + + self.metrics.dec_fd_count(); + + Ok(()) + } + + /// shutdown the dispatcher. + fn shutdown(&self) -> bool { + let mut io = self.io_dispatch.write().unwrap(); + if io.is_shutdown { + return false; + } + io.is_shutdown = true; + true + } + + fn allocate(&self) -> io::Result<(slab::Address, slab::Ref)> { + let io = self.io_dispatch.read().unwrap(); + if io.is_shutdown { + return Err(io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + )); + } + io.allocator.allocate().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + }) + } +} + +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl IoDispatcher ===== + +impl IoDispatcher { + fn new(allocator: slab::Allocator) -> Self { + Self { + allocator, + is_shutdown: false, + } + } +} + +impl Direction { + pub(super) fn mask(self) -> Ready { + match self { + Direction::Read => Ready::READABLE | Ready::READ_CLOSED, + Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, + } + } +} diff --git a/tokio/src/runtime/io/driver/signal.rs b/tokio/src/runtime/io/driver/signal.rs new file mode 100644 index 000000000..ea3ef07b6 --- /dev/null +++ b/tokio/src/runtime/io/driver/signal.rs @@ -0,0 +1,22 @@ +use super::{Driver, Handle, TOKEN_SIGNAL}; + +use std::io; + +impl Handle { + pub(crate) fn register_signal_receiver( + &self, + receiver: &mut mio::net::UnixStream, + ) -> io::Result<()> { + self.registry + .register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; + Ok(()) + } +} + +impl Driver { + pub(crate) fn consume_signal_ready(&mut self) -> bool { + let ret = self.signal_ready; + self.signal_ready = false; + ret + } +} diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 2dd426f11..c9f771cd8 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -1,4 +1,7 @@ #![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))] +mod driver; +use driver::{Direction, Tick}; +pub(crate) use driver::{Driver, Handle, ReadyEvent}; mod registration; pub(crate) use registration::Registration; @@ -7,350 +10,4 @@ mod scheduled_io; use scheduled_io::ScheduledIo; mod metrics; - -use crate::io::interest::Interest; -use crate::io::ready::Ready; -use crate::runtime::driver; -use crate::util::slab::{self, Slab}; -use crate::{loom::sync::RwLock, util::bit}; - use metrics::IoDriverMetrics; - -use std::fmt; -use std::io; -use std::time::Duration; - -/// I/O driver, backed by Mio. -pub(crate) struct Driver { - /// Tracks the number of times `turn` is called. It is safe for this to wrap - /// as it is mostly used to determine when to call `compact()`. - tick: u8, - - /// True when an event with the signal token is received - signal_ready: bool, - - /// Reuse the `mio::Events` value across calls to poll. - events: mio::Events, - - /// Primary slab handle containing the state for each resource registered - /// with this driver. - resources: Slab, - - /// The system event queue. - poll: mio::Poll, -} - -/// A reference to an I/O driver. -pub(crate) struct Handle { - /// Registers I/O resources. - registry: mio::Registry, - - /// Allocates `ScheduledIo` handles when creating new resources. - io_dispatch: RwLock, - - /// Used to wake up the reactor from a call to `turn`. - /// Not supported on Wasi due to lack of threading support. - #[cfg(not(tokio_wasi))] - waker: mio::Waker, - - pub(crate) metrics: IoDriverMetrics, -} - -#[derive(Debug)] -pub(crate) struct ReadyEvent { - tick: u8, - pub(crate) ready: Ready, - is_shutdown: bool, -} - -cfg_net_unix!( - impl ReadyEvent { - pub(crate) fn with_ready(&self, ready: Ready) -> Self { - Self { - ready, - tick: self.tick, - is_shutdown: self.is_shutdown, - } - } - } -); - -struct IoDispatcher { - allocator: slab::Allocator, - is_shutdown: bool, -} - -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -enum Direction { - Read, - Write, -} - -enum Tick { - Set(u8), - Clear(u8), -} - -// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup -// token. -const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); -const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31)); - -const ADDRESS: bit::Pack = bit::Pack::least_significant(24); - -// Packs the generation value in the `readiness` field. -// -// The generation prevents a race condition where a slab slot is reused for a -// new socket while the I/O driver is about to apply a readiness event. The -// generation value is checked when setting new readiness. If the generation do -// not match, then the readiness event is discarded. -const GENERATION: bit::Pack = ADDRESS.then(7); - -fn _assert_kinds() { - fn _assert() {} - - _assert::(); -} - -// ===== impl Driver ===== - -impl Driver { - /// Creates a new event loop, returning any error that happened during the - /// creation. - pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> { - let poll = mio::Poll::new()?; - #[cfg(not(tokio_wasi))] - let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?; - let registry = poll.registry().try_clone()?; - - let slab = Slab::new(); - let allocator = slab.allocator(); - - let driver = Driver { - tick: 0, - signal_ready: false, - events: mio::Events::with_capacity(nevents), - poll, - resources: slab, - }; - - let handle = Handle { - registry, - io_dispatch: RwLock::new(IoDispatcher::new(allocator)), - #[cfg(not(tokio_wasi))] - waker, - metrics: IoDriverMetrics::default(), - }; - - Ok((driver, handle)) - } - - pub(crate) fn park(&mut self, rt_handle: &driver::Handle) { - let handle = rt_handle.io(); - self.turn(handle, None); - } - - pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) { - let handle = rt_handle.io(); - self.turn(handle, Some(duration)); - } - - pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { - let handle = rt_handle.io(); - - if handle.shutdown() { - self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it that the - // runtime is being shutdown. And shutdown will clear all wakers. - io.shutdown(); - }); - } - } - - fn turn(&mut self, handle: &Handle, max_wait: Option) { - // How often to call `compact()` on the resource slab - const COMPACT_INTERVAL: u8 = 255; - - self.tick = self.tick.wrapping_add(1); - - if self.tick == COMPACT_INTERVAL { - self.resources.compact() - } - - let events = &mut self.events; - - // Block waiting for an event to happen, peeling out how many events - // happened. - match self.poll.poll(events, max_wait) { - Ok(_) => {} - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - #[cfg(tokio_wasi)] - Err(e) if e.kind() == io::ErrorKind::InvalidInput => { - // In case of wasm32_wasi this error happens, when trying to poll without subscriptions - // just return from the park, as there would be nothing, which wakes us up. - } - Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e), - } - - // Process all the events that came in, dispatching appropriately - let mut ready_count = 0; - for event in events.iter() { - let token = event.token(); - - if token == TOKEN_WAKEUP { - // Nothing to do, the event is used to unblock the I/O driver - } else if token == TOKEN_SIGNAL { - self.signal_ready = true; - } else { - Self::dispatch( - &mut self.resources, - self.tick, - token, - Ready::from_mio(event), - ); - ready_count += 1; - } - } - - handle.metrics.incr_ready_count_by(ready_count); - } - - fn dispatch(resources: &mut Slab, tick: u8, token: mio::Token, ready: Ready) { - let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - - let io = match resources.get(addr) { - Some(io) => io, - None => return, - }; - - let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready); - - if res.is_err() { - // token no longer valid! - return; - } - - io.wake(ready); - } -} - -impl fmt::Debug for Driver { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Driver") - } -} - -impl Handle { - /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise - /// makes the next call to `turn` return immediately. - /// - /// This method is intended to be used in situations where a notification - /// needs to otherwise be sent to the main reactor. If the reactor is - /// currently blocked inside of `turn` then it will wake up and soon return - /// after this method has been called. If the reactor is not currently - /// blocked in `turn`, then the next call to `turn` will not block and - /// return immediately. - pub(crate) fn unpark(&self) { - #[cfg(not(tokio_wasi))] - self.waker.wake().expect("failed to wake I/O driver"); - } - - /// Registers an I/O resource with the reactor for a given `mio::Ready` state. - /// - /// The registration token is returned. - pub(super) fn add_source( - &self, - source: &mut impl mio::event::Source, - interest: Interest, - ) -> io::Result> { - let (address, shared) = self.allocate()?; - - let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); - - self.registry - .register(source, mio::Token(token), interest.to_mio())?; - - self.metrics.incr_fd_count(); - - Ok(shared) - } - - /// Deregisters an I/O resource from the reactor. - pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> { - self.registry.deregister(source)?; - - self.metrics.dec_fd_count(); - - Ok(()) - } - - /// shutdown the dispatcher. - fn shutdown(&self) -> bool { - let mut io = self.io_dispatch.write().unwrap(); - if io.is_shutdown { - return false; - } - io.is_shutdown = true; - true - } - - fn allocate(&self) -> io::Result<(slab::Address, slab::Ref)> { - let io = self.io_dispatch.read().unwrap(); - if io.is_shutdown { - return Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, - )); - } - io.allocator.allocate().ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "reactor at max registered I/O resources", - ) - }) - } -} - -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} - -// ===== impl IoDispatcher ===== - -impl IoDispatcher { - fn new(allocator: slab::Allocator) -> Self { - Self { - allocator, - is_shutdown: false, - } - } -} - -impl Direction { - pub(super) fn mask(self) -> Ready { - match self { - Direction::Read => Ready::READABLE | Ready::READ_CLOSED, - Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED, - } - } -} - -// Signal handling -cfg_signal_internal_and_unix! { - impl Handle { - pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { - self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; - Ok(()) - } - } - - impl Driver { - pub(crate) fn consume_signal_ready(&mut self) -> bool { - let ret = self.signal_ready; - self.signal_ready = false; - ret - } - } -} diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 341fa0539..91c773960 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -199,6 +199,33 @@ impl Registration { } } + pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { + let ev = self.shared.readiness(interest).await; + + if ev.is_shutdown { + return Err(gone()); + } + + Ok(ev) + } + + pub(crate) async fn async_io( + &self, + interest: Interest, + mut f: impl FnMut() -> io::Result, + ) -> io::Result { + loop { + let event = self.readiness(interest).await?; + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(event); + } + x => return x, + } + } + } + fn handle(&self) -> &Handle { self.handle.driver().io() } @@ -223,30 +250,3 @@ fn gone() -> io::Error { crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, ) } - -cfg_io_readiness! { - impl Registration { - pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { - let ev = self.shared.readiness(interest).await; - - if ev.is_shutdown { - return Err(gone()) - } - - Ok(ev) - } - - pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { - loop { - let event = self.readiness(interest).await?; - - match f() { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.clear_readiness(event); - } - x => return x, - } - } - } - } -} diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index 197a4e0e2..bf78aa68a 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -1,27 +1,21 @@ -use super::{ReadyEvent, Tick}; use crate::io::interest::Interest; use crate::io::ready::Ready; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; +use crate::runtime::io::{Direction, ReadyEvent, Tick}; use crate::util::bit; +use crate::util::linked_list::{self, LinkedList}; use crate::util::slab::Entry; use crate::util::WakeList; +use std::cell::UnsafeCell; +use std::future::Future; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; -use super::Direction; - -cfg_io_readiness! { - use crate::util::linked_list::{self, LinkedList}; - - use std::cell::UnsafeCell; - use std::future::Future; - use std::marker::PhantomPinned; - use std::pin::Pin; - use std::ptr::NonNull; -} - /// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { @@ -31,13 +25,10 @@ pub(crate) struct ScheduledIo { waiters: Mutex, } -cfg_io_readiness! { - type WaitList = LinkedList::Target>; -} +type WaitList = LinkedList::Target>; #[derive(Debug, Default)] struct Waiters { - #[cfg(feature = "net")] /// List of all current waiters. list: WaitList, @@ -48,46 +39,44 @@ struct Waiters { writer: Option, } -cfg_io_readiness! { - #[derive(Debug)] - struct Waiter { - pointers: linked_list::Pointers, +#[derive(Debug)] +struct Waiter { + pointers: linked_list::Pointers, - /// The waker for this task. - waker: Option, + /// The waker for this task. + waker: Option, - /// The interest this waiter is waiting on. - interest: Interest, + /// The interest this waiter is waiting on. + interest: Interest, - is_ready: bool, + is_ready: bool, - /// Should never be `!Unpin`. - _p: PhantomPinned, - } + /// Should never be `!Unpin`. + _p: PhantomPinned, +} - generate_addr_of_methods! { - impl<> Waiter { - unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { - &self.pointers - } +generate_addr_of_methods! { + impl<> Waiter { + unsafe fn addr_of_pointers(self: NonNull) -> NonNull> { + &self.pointers } } +} - /// Future returned by `readiness()`. - struct Readiness<'a> { - scheduled_io: &'a ScheduledIo, +/// Future returned by `readiness()`. +struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, - state: State, + state: State, - /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell, - } + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell, +} - enum State { - Init, - Waiting, - Done, - } +enum State { + Init, + Waiting, + Done, } // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. @@ -106,7 +95,7 @@ const SHUTDOWN: bit::Pack = GENERATION.then(1); #[test] fn test_generations_assert_same() { - assert_eq!(super::GENERATION, GENERATION); + assert_eq!(super::driver::GENERATION, GENERATION); } // ===== impl ScheduledIo ===== @@ -238,7 +227,6 @@ impl ScheduledIo { } } - #[cfg(feature = "net")] 'outer: loop { let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); @@ -372,187 +360,193 @@ impl Drop for ScheduledIo { unsafe impl Send for ScheduledIo {} unsafe impl Sync for ScheduledIo {} -cfg_io_readiness! { - impl ScheduledIo { - /// An async version of `poll_readiness` which uses a linked list of wakers. - pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { - self.readiness_fut(interest).await - } - - // This is in a separate function so that the borrow checker doesn't think - // we are borrowing the `UnsafeCell` possibly over await boundaries. - // - // Go figure. - fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { - Readiness { - scheduled_io: self, - state: State::Init, - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - is_ready: false, - interest, - _p: PhantomPinned, - }), - } - } +impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers. + pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { + self.readiness_fut(interest).await } - unsafe impl linked_list::Link for Waiter { - type Handle = NonNull; - type Target = Waiter; - - fn as_raw(handle: &NonNull) -> NonNull { - *handle - } - - unsafe fn from_raw(ptr: NonNull) -> NonNull { - ptr - } - - unsafe fn pointers(target: NonNull) -> NonNull> { - Waiter::addr_of_pointers(target) + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: Interest) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + is_ready: false, + interest, + _p: PhantomPinned, + }), } } +} - // ===== impl Readiness ===== +unsafe impl linked_list::Link for Waiter { + type Handle = NonNull; + type Target = Waiter; - impl Future for Readiness<'_> { - type Output = ReadyEvent; + fn as_raw(handle: &NonNull) -> NonNull { + *handle + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use std::sync::atomic::Ordering::SeqCst; + unsafe fn from_raw(ptr: NonNull) -> NonNull { + ptr + } - let (scheduled_io, state, waiter) = unsafe { - let me = self.get_unchecked_mut(); - (&me.scheduled_io, &mut me.state, &me.waiter) - }; + unsafe fn pointers(target: NonNull) -> NonNull> { + Waiter::addr_of_pointers(target) + } +} - loop { - match *state { - State::Init => { - // Optimistically check existing readiness - let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); - let is_shutdown = SHUTDOWN.unpack(curr) != 0; +// ===== impl Readiness ===== - // Safety: `waiter.interest` never changes - let interest = unsafe { (*waiter.get()).interest }; - let ready = ready.intersection(interest); +impl Future for Readiness<'_> { + type Output = ReadyEvent; - if !ready.is_empty() || is_shutdown { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use std::sync::atomic::Ordering::SeqCst; - // Wasn't ready, take the lock (and check again while locked). - let mut waiters = scheduled_io.waiters.lock(); + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; - let curr = scheduled_io.readiness.load(SeqCst); - let mut ready = Ready::from_usize(READINESS.unpack(curr)); - let is_shutdown = SHUTDOWN.unpack(curr) != 0; + loop { + match *state { + State::Init => { + // Optimistically check existing readiness + let curr = scheduled_io.readiness.load(SeqCst); + let ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if is_shutdown { - ready = Ready::ALL; - } + // Safety: `waiter.interest` never changes + let interest = unsafe { (*waiter.get()).interest }; + let ready = ready.intersection(interest); - let ready = ready.intersection(interest); - - if !ready.is_empty() || is_shutdown { - // Currently ready! - let tick = TICK.unpack(curr) as u8; - *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); - } - - // Not ready even after locked, insert into list... - - // Safety: called while locked - unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); - } - - // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters - .list - .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); - *state = State::Waiting; - } - State::Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. - - let waiters = scheduled_io.waiters.lock(); - - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; - - if w.is_ready { - // Our waker has been notified. - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } - - return Poll::Pending; - } - - // Explicit drop of the lock to indicate the scope that the - // lock is held. Because holding the lock is required to - // ensure safe access to fields not held within the lock, it - // is helpful to visualize the scope of the critical - // section. - drop(waiters); - } - State::Done => { - // Safety: State::Done means it is no longer shared - let w = unsafe { &mut *waiter.get() }; - - let curr = scheduled_io.readiness.load(Acquire); - let is_shutdown = SHUTDOWN.unpack(curr) != 0; - - // The returned tick might be newer than the event - // which notified our waker. This is ok because the future - // still didn't return `Poll::Ready`. + if !ready.is_empty() || is_shutdown { + // Currently ready! let tick = TICK.unpack(curr) as u8; - - // The readiness state could have been cleared in the meantime, - // but we allow the returned ready set to be empty. - let curr_ready = Ready::from_usize(READINESS.unpack(curr)); - let ready = curr_ready.intersection(w.interest); - + *state = State::Done; return Poll::Ready(ReadyEvent { tick, ready, is_shutdown, }); } + + // Wasn't ready, take the lock (and check again while locked). + let mut waiters = scheduled_io.waiters.lock(); + + let curr = scheduled_io.readiness.load(SeqCst); + let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + if is_shutdown { + ready = Ready::ALL; + } + + let ready = ready.intersection(interest); + + if !ready.is_empty() || is_shutdown { + // Currently ready! + let tick = TICK.unpack(curr) as u8; + *state = State::Done; + return Poll::Ready(ReadyEvent { + tick, + ready, + is_shutdown, + }); + } + + // Not ready even after locked, insert into list... + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); + } + + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; + } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.is_ready { + // Our waker has been notified. + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } + + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; + + let curr = scheduled_io.readiness.load(Acquire); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + + // The returned tick might be newer than the event + // which notified our waker. This is ok because the future + // still didn't return `Poll::Ready`. + let tick = TICK.unpack(curr) as u8; + + // The readiness state could have been cleared in the meantime, + // but we allow the returned ready set to be empty. + let curr_ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = curr_ready.intersection(w.interest); + + return Poll::Ready(ReadyEvent { + tick, + ready, + is_shutdown, + }); } } } } - - impl Drop for Readiness<'_> { - fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock(); - - // Safety: `waiter` is only ever stored in `waiters` - unsafe { - waiters - .list - .remove(NonNull::new_unchecked(self.waiter.get())) - }; - } - } - - unsafe impl Send for Readiness<'_> {} - unsafe impl Sync for Readiness<'_> {} } + +impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock(); + + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } +} + +unsafe impl Send for Readiness<'_> {} +unsafe impl Sync for Readiness<'_> {} diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index ca6ef0e7b..412ffc089 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -297,7 +297,7 @@ impl Default for LinkedList { // ===== impl DrainFilter ===== -cfg_io_readiness! { +cfg_io_driver_impl! { pub(crate) struct DrainFilter<'a, T: Link, F> { list: &'a mut LinkedList, filter: F,