rt: reorganize I/O driver source (#5828)

Moves `Driver` into its own file and eliminates a bunch of code defined
in macros.
This commit is contained in:
Carl Lerche 2023-06-27 16:24:36 -07:00 committed by GitHub
parent 48c55768fd
commit ce23db6bc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 608 additions and 607 deletions

View File

@ -1,5 +1,7 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]
use crate::io::interest::Interest;
use std::fmt;
use std::ops;
@ -208,41 +210,35 @@ impl Ready {
pub(crate) fn as_usize(self) -> usize {
self.0
}
}
cfg_io_readiness! {
use crate::io::Interest;
pub(crate) fn from_interest(interest: Interest) -> Ready {
let mut ready = Ready::EMPTY;
impl Ready {
pub(crate) fn from_interest(interest: Interest) -> Ready {
let mut ready = Ready::EMPTY;
if interest.is_readable() {
ready |= Ready::READABLE;
ready |= Ready::READ_CLOSED;
}
if interest.is_writable() {
ready |= Ready::WRITABLE;
ready |= Ready::WRITE_CLOSED;
}
#[cfg(any(target_os = "linux", target_os = "android"))]
if interest.is_priority() {
ready |= Ready::PRIORITY;
ready |= Ready::READ_CLOSED;
}
ready
if interest.is_readable() {
ready |= Ready::READABLE;
ready |= Ready::READ_CLOSED;
}
pub(crate) fn intersection(self, interest: Interest) -> Ready {
Ready(self.0 & Ready::from_interest(interest).0)
if interest.is_writable() {
ready |= Ready::WRITABLE;
ready |= Ready::WRITE_CLOSED;
}
pub(crate) fn satisfies(self, interest: Interest) -> bool {
self.0 & Ready::from_interest(interest).0 != 0
#[cfg(any(target_os = "linux", target_os = "android"))]
if interest.is_priority() {
ready |= Ready::PRIORITY;
ready |= Ready::READ_CLOSED;
}
ready
}
pub(crate) fn intersection(self, interest: Interest) -> Ready {
Ready(self.0 & Ready::from_interest(interest).0)
}
pub(crate) fn satisfies(self, interest: Interest) -> bool {
self.0 & Ready::from_interest(interest).0 != 0
}
}

View File

@ -0,0 +1,332 @@
// Signal handling
cfg_signal_internal_and_unix! {
mod signal;
}
use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::runtime::driver;
use crate::runtime::io::{IoDriverMetrics, ScheduledIo};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::RwLock, util::bit};
use std::fmt;
use std::io;
use std::time::Duration;
/// I/O driver, backed by Mio.
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`.
tick: u8,
/// True when an event with the signal token is received
signal_ready: bool,
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
/// Primary slab handle containing the state for each resource registered
/// with this driver.
resources: Slab<ScheduledIo>,
/// The system event queue.
poll: mio::Poll,
}
/// A reference to an I/O driver.
pub(crate) struct Handle {
/// Registers I/O resources.
registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
io_dispatch: RwLock<IoDispatcher>,
/// Used to wake up the reactor from a call to `turn`.
/// Not supported on Wasi due to lack of threading support.
#[cfg(not(tokio_wasi))]
waker: mio::Waker,
pub(crate) metrics: IoDriverMetrics,
}
#[derive(Debug)]
pub(crate) struct ReadyEvent {
pub(super) tick: u8,
pub(crate) ready: Ready,
pub(super) is_shutdown: bool,
}
cfg_net_unix!(
impl ReadyEvent {
pub(crate) fn with_ready(&self, ready: Ready) -> Self {
Self {
ready,
tick: self.tick,
is_shutdown: self.is_shutdown,
}
}
}
);
struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Write,
}
pub(super) enum Tick {
Set(u8),
Clear(u8),
}
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));
const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
// Packs the generation value in the `readiness` field.
//
// The generation prevents a race condition where a slab slot is reused for a
// new socket while the I/O driver is about to apply a readiness event. The
// generation value is checked when setting new readiness. If the generation do
// not match, then the readiness event is discarded.
pub(super) const GENERATION: bit::Pack = ADDRESS.then(7);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
_assert::<Handle>();
}
// ===== impl Driver =====
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;
let slab = Slab::new();
let allocator = slab.allocator();
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
let handle = Handle {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
};
Ok((driver, handle))
}
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
if handle.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it that the
// runtime is being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
}
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u8 = 255;
self.tick = self.tick.wrapping_add(1);
if self.tick == COMPACT_INTERVAL {
self.resources.compact()
}
let events = &mut self.events;
// Block waiting for an event to happen, peeling out how many events
// happened.
match self.poll.poll(events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
#[cfg(tokio_wasi)]
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
// In case of wasm32_wasi this error happens, when trying to poll without subscriptions
// just return from the park, as there would be nothing, which wakes us up.
}
Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
}
// Process all the events that came in, dispatching appropriately
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
// Nothing to do, the event is used to unblock the I/O driver
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
Self::dispatch(
&mut self.resources,
self.tick,
token,
Ready::from_mio(event),
);
ready_count += 1;
}
}
handle.metrics.incr_ready_count_by(ready_count);
}
fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match resources.get(addr) {
Some(io) => io,
None => return,
};
let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready);
if res.is_err() {
// token no longer valid!
return;
}
io.wake(ready);
}
}
impl fmt::Debug for Driver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Driver")
}
}
impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
///
/// This method is intended to be used in situations where a notification
/// needs to otherwise be sent to the main reactor. If the reactor is
/// currently blocked inside of `turn` then it will wake up and soon return
/// after this method has been called. If the reactor is not currently
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
pub(crate) fn unpark(&self) {
#[cfg(not(tokio_wasi))]
self.waker.wake().expect("failed to wake I/O driver");
}
/// Registers an I/O resource with the reactor for a given `mio::Ready` state.
///
/// The registration token is returned.
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.allocate()?;
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
self.registry
.register(source, mio::Token(token), interest.to_mio())?;
self.metrics.incr_fd_count();
Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)?;
self.metrics.dec_fd_count();
Ok(())
}
/// shutdown the dispatcher.
fn shutdown(&self) -> bool {
let mut io = self.io_dispatch.write().unwrap();
if io.is_shutdown {
return false;
}
io.is_shutdown = true;
true
}
fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
io.allocator.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
// ===== impl IoDispatcher =====
impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}
impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}

View File

@ -0,0 +1,22 @@
use super::{Driver, Handle, TOKEN_SIGNAL};
use std::io;
impl Handle {
pub(crate) fn register_signal_receiver(
&self,
receiver: &mut mio::net::UnixStream,
) -> io::Result<()> {
self.registry
.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}
}
impl Driver {
pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
ret
}
}

View File

@ -1,4 +1,7 @@
#![cfg_attr(not(all(feature = "rt", feature = "net")), allow(dead_code))]
mod driver;
use driver::{Direction, Tick};
pub(crate) use driver::{Driver, Handle, ReadyEvent};
mod registration;
pub(crate) use registration::Registration;
@ -7,350 +10,4 @@ mod scheduled_io;
use scheduled_io::ScheduledIo;
mod metrics;
use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::runtime::driver;
use crate::util::slab::{self, Slab};
use crate::{loom::sync::RwLock, util::bit};
use metrics::IoDriverMetrics;
use std::fmt;
use std::io;
use std::time::Duration;
/// I/O driver, backed by Mio.
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`.
tick: u8,
/// True when an event with the signal token is received
signal_ready: bool,
/// Reuse the `mio::Events` value across calls to poll.
events: mio::Events,
/// Primary slab handle containing the state for each resource registered
/// with this driver.
resources: Slab<ScheduledIo>,
/// The system event queue.
poll: mio::Poll,
}
/// A reference to an I/O driver.
pub(crate) struct Handle {
/// Registers I/O resources.
registry: mio::Registry,
/// Allocates `ScheduledIo` handles when creating new resources.
io_dispatch: RwLock<IoDispatcher>,
/// Used to wake up the reactor from a call to `turn`.
/// Not supported on Wasi due to lack of threading support.
#[cfg(not(tokio_wasi))]
waker: mio::Waker,
pub(crate) metrics: IoDriverMetrics,
}
#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
is_shutdown: bool,
}
cfg_net_unix!(
impl ReadyEvent {
pub(crate) fn with_ready(&self, ready: Ready) -> Self {
Self {
ready,
tick: self.tick,
is_shutdown: self.is_shutdown,
}
}
}
);
struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
enum Direction {
Read,
Write,
}
enum Tick {
Set(u8),
Clear(u8),
}
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
const TOKEN_SIGNAL: mio::Token = mio::Token(1 + (1 << 31));
const ADDRESS: bit::Pack = bit::Pack::least_significant(24);
// Packs the generation value in the `readiness` field.
//
// The generation prevents a race condition where a slab slot is reused for a
// new socket while the I/O driver is about to apply a readiness event. The
// generation value is checked when setting new readiness. If the generation do
// not match, then the readiness event is discarded.
const GENERATION: bit::Pack = ADDRESS.then(7);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
_assert::<Handle>();
}
// ===== impl Driver =====
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;
let slab = Slab::new();
let allocator = slab.allocator();
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
let handle = Handle {
registry,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
#[cfg(not(tokio_wasi))]
waker,
metrics: IoDriverMetrics::default(),
};
Ok((driver, handle))
}
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
if handle.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it that the
// runtime is being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
}
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u8 = 255;
self.tick = self.tick.wrapping_add(1);
if self.tick == COMPACT_INTERVAL {
self.resources.compact()
}
let events = &mut self.events;
// Block waiting for an event to happen, peeling out how many events
// happened.
match self.poll.poll(events, max_wait) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
#[cfg(tokio_wasi)]
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
// In case of wasm32_wasi this error happens, when trying to poll without subscriptions
// just return from the park, as there would be nothing, which wakes us up.
}
Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e),
}
// Process all the events that came in, dispatching appropriately
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
// Nothing to do, the event is used to unblock the I/O driver
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
Self::dispatch(
&mut self.resources,
self.tick,
token,
Ready::from_mio(event),
);
ready_count += 1;
}
}
handle.metrics.incr_ready_count_by(ready_count);
}
fn dispatch(resources: &mut Slab<ScheduledIo>, tick: u8, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match resources.get(addr) {
Some(io) => io,
None => return,
};
let res = io.set_readiness(Some(token.0), Tick::Set(tick), |curr| curr | ready);
if res.is_err() {
// token no longer valid!
return;
}
io.wake(ready);
}
}
impl fmt::Debug for Driver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Driver")
}
}
impl Handle {
/// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
/// makes the next call to `turn` return immediately.
///
/// This method is intended to be used in situations where a notification
/// needs to otherwise be sent to the main reactor. If the reactor is
/// currently blocked inside of `turn` then it will wake up and soon return
/// after this method has been called. If the reactor is not currently
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
pub(crate) fn unpark(&self) {
#[cfg(not(tokio_wasi))]
self.waker.wake().expect("failed to wake I/O driver");
}
/// Registers an I/O resource with the reactor for a given `mio::Ready` state.
///
/// The registration token is returned.
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.allocate()?;
let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));
self.registry
.register(source, mio::Token(token), interest.to_mio())?;
self.metrics.incr_fd_count();
Ok(shared)
}
/// Deregisters an I/O resource from the reactor.
pub(super) fn deregister_source(&self, source: &mut impl mio::event::Source) -> io::Result<()> {
self.registry.deregister(source)?;
self.metrics.dec_fd_count();
Ok(())
}
/// shutdown the dispatcher.
fn shutdown(&self) -> bool {
let mut io = self.io_dispatch.write().unwrap();
if io.is_shutdown {
return false;
}
io.is_shutdown = true;
true
}
fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
io.allocator.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
// ===== impl IoDispatcher =====
impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}
impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}
// Signal handling
cfg_signal_internal_and_unix! {
impl Handle {
pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> {
self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?;
Ok(())
}
}
impl Driver {
pub(crate) fn consume_signal_ready(&mut self) -> bool {
let ret = self.signal_ready;
self.signal_ready = false;
ret
}
}
}

View File

@ -199,6 +199,33 @@ impl Registration {
}
}
pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
let ev = self.shared.readiness(interest).await;
if ev.is_shutdown {
return Err(gone());
}
Ok(ev)
}
pub(crate) async fn async_io<R>(
&self,
interest: Interest,
mut f: impl FnMut() -> io::Result<R>,
) -> io::Result<R> {
loop {
let event = self.readiness(interest).await?;
match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
}
}
}
fn handle(&self) -> &Handle {
self.handle.driver().io()
}
@ -223,30 +250,3 @@ fn gone() -> io::Error {
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
)
}
cfg_io_readiness! {
impl Registration {
pub(crate) async fn readiness(&self, interest: Interest) -> io::Result<ReadyEvent> {
let ev = self.shared.readiness(interest).await;
if ev.is_shutdown {
return Err(gone())
}
Ok(ev)
}
pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
loop {
let event = self.readiness(interest).await?;
match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
}
}
}
}
}

View File

@ -1,27 +1,21 @@
use super::{ReadyEvent, Tick};
use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::runtime::io::{Direction, ReadyEvent, Tick};
use crate::util::bit;
use crate::util::linked_list::{self, LinkedList};
use crate::util::slab::Entry;
use crate::util::WakeList;
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
use super::Direction;
cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
}
/// Stored in the I/O driver resource slab.
#[derive(Debug)]
pub(crate) struct ScheduledIo {
@ -31,13 +25,10 @@ pub(crate) struct ScheduledIo {
waiters: Mutex<Waiters>,
}
cfg_io_readiness! {
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
}
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "net")]
/// List of all current waiters.
list: WaitList,
@ -48,46 +39,44 @@ struct Waiters {
writer: Option<Waker>,
}
cfg_io_readiness! {
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
/// The waker for this task.
waker: Option<Waker>,
/// The waker for this task.
waker: Option<Waker>,
/// The interest this waiter is waiting on.
interest: Interest,
/// The interest this waiter is waiting on.
interest: Interest,
is_ready: bool,
is_ready: bool,
/// Should never be `!Unpin`.
_p: PhantomPinned,
}
/// Should never be `!Unpin`.
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
/// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
/// Future returned by `readiness()`.
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
state: State,
state: State,
/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
}
/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
}
enum State {
Init,
Waiting,
Done,
}
enum State {
Init,
Waiting,
Done,
}
// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
@ -106,7 +95,7 @@ const SHUTDOWN: bit::Pack = GENERATION.then(1);
#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
assert_eq!(super::driver::GENERATION, GENERATION);
}
// ===== impl ScheduledIo =====
@ -238,7 +227,6 @@ impl ScheduledIo {
}
}
#[cfg(feature = "net")]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));
@ -372,187 +360,193 @@ impl Drop for ScheduledIo {
unsafe impl Send for ScheduledIo {}
unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
// This is in a separate function so that the borrow checker doesn't think
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
self.readiness_fut(interest).await
}
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
*handle
}
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
// This is in a separate function so that the borrow checker doesn't think
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
fn readiness_fut(&self, interest: Interest) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
}
// ===== impl Readiness =====
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
impl Future for Readiness<'_> {
type Output = ReadyEvent;
fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
*handle
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
let (scheduled_io, state, waiter) = unsafe {
let me = self.get_unchecked_mut();
(&me.scheduled_io, &mut me.state, &me.waiter)
};
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}
loop {
match *state {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// ===== impl Readiness =====
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
impl Future for Readiness<'_> {
type Output = ReadyEvent;
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock();
let (scheduled_io, state, waiter) = unsafe {
let me = self.get_unchecked_mut();
(&me.scheduled_io, &mut me.state, &me.waiter)
};
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
loop {
match *state {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if is_shutdown {
ready = Ready::ALL;
}
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
let ready = ready.intersection(interest);
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}
// Not ready even after locked, insert into list...
// Safety: called while locked
unsafe {
(*waiter.get()).waker = Some(cx.waker().clone());
}
// Insert the waiter into the linked list
//
// safety: pointers from `UnsafeCell` are never null.
waiters
.list
.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = State::Waiting;
}
State::Waiting => {
// Currently in the "Waiting" state, implying the caller has
// a waiter stored in the waiter list (guarded by
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
let waiters = scheduled_io.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
// Our waker has been notified.
*state = State::Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
// Explicit drop of the lock to indicate the scope that the
// lock is held. Because holding the lock is required to
// ensure safe access to fields not held within the lock, it
// is helpful to visualize the scope of the critical
// section.
drop(waiters);
}
State::Done => {
// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };
let curr = scheduled_io.readiness.load(Acquire);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
// still didn't return `Poll::Ready`.
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = curr_ready.intersection(w.interest);
*state = State::Done;
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock();
let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if is_shutdown {
ready = Ready::ALL;
}
let ready = ready.intersection(interest);
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
// Not ready even after locked, insert into list...
// Safety: called while locked
unsafe {
(*waiter.get()).waker = Some(cx.waker().clone());
}
// Insert the waiter into the linked list
//
// safety: pointers from `UnsafeCell` are never null.
waiters
.list
.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = State::Waiting;
}
State::Waiting => {
// Currently in the "Waiting" state, implying the caller has
// a waiter stored in the waiter list (guarded by
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
let waiters = scheduled_io.waiters.lock();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
// Our waker has been notified.
*state = State::Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
// Explicit drop of the lock to indicate the scope that the
// lock is held. Because holding the lock is required to
// ensure safe access to fields not held within the lock, it
// is helpful to visualize the scope of the critical
// section.
drop(waiters);
}
State::Done => {
// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };
let curr = scheduled_io.readiness.load(Acquire);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
// still didn't return `Poll::Ready`.
let tick = TICK.unpack(curr) as u8;
// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = curr_ready.intersection(w.interest);
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
}
}
}
impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock();
// Safety: `waiter` is only ever stored in `waiters`
unsafe {
waiters
.list
.remove(NonNull::new_unchecked(self.waiter.get()))
};
}
}
unsafe impl Send for Readiness<'_> {}
unsafe impl Sync for Readiness<'_> {}
}
impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock();
// Safety: `waiter` is only ever stored in `waiters`
unsafe {
waiters
.list
.remove(NonNull::new_unchecked(self.waiter.get()))
};
}
}
unsafe impl Send for Readiness<'_> {}
unsafe impl Sync for Readiness<'_> {}

View File

@ -297,7 +297,7 @@ impl<L: Link> Default for LinkedList<L, L::Target> {
// ===== impl DrainFilter =====
cfg_io_readiness! {
cfg_io_driver_impl! {
pub(crate) struct DrainFilter<'a, T: Link, F> {
list: &'a mut LinkedList<T, T::Target>,
filter: F,