rt: switch io::handle refs with scheduler:Handle (#5128)

The `schedule::Handle` reference is the internal runtime handle. This
patch replaces owned refs to `runtime::io::Handle` with
`scheduler::Handle`.
This commit is contained in:
Carl Lerche 2022-10-27 08:51:03 -07:00 committed by GitHub
parent 58c457190b
commit cb67f28fe3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 96 additions and 134 deletions

View File

@ -1,5 +1,6 @@
use crate::io::Interest;
use crate::runtime::io::{Handle, ReadyEvent, Registration};
use crate::runtime::io::{ReadyEvent, Registration};
use crate::runtime::scheduler;
use mio::unix::SourceFd;
use std::io;
@ -200,12 +201,13 @@ impl<T: AsRawFd> AsyncFd<T> {
where
T: AsRawFd,
{
Self::new_with_handle_and_interest(inner, Handle::current(), interest)
Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
}
#[track_caller]
pub(crate) fn new_with_handle_and_interest(
inner: T,
handle: Handle,
handle: scheduler::Handle,
interest: Interest,
) -> io::Result<Self> {
let fd = inner.as_raw_fd();

View File

@ -1,7 +1,8 @@
//! Use POSIX AIO futures with Tokio.
use crate::io::interest::Interest;
use crate::runtime::io::{Handle, ReadyEvent, Registration};
use crate::runtime::io::{ReadyEvent, Registration};
use crate::runtime::scheduler;
use mio::event::Source;
use mio::Registry;
use mio::Token;
@ -118,7 +119,7 @@ impl<E: AioSource> Aio<E> {
fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
let handle = Handle::current();
let handle = scheduler::Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}

View File

@ -1,5 +1,6 @@
use crate::io::interest::Interest;
use crate::runtime::io::{Handle, Registration};
use crate::runtime::io::Registration;
use crate::runtime::scheduler;
use mio::event::Source;
use std::fmt;
@ -103,13 +104,14 @@ impl<E: Source> PollEvented<E> {
#[track_caller]
#[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
Self::new_with_interest_and_handle(io, interest, Handle::current())
Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
}
#[track_caller]
pub(crate) fn new_with_interest_and_handle(
mut io: E,
interest: Interest,
handle: Handle,
handle: scheduler::Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self {

View File

@ -24,24 +24,6 @@ pub(crate) fn current() -> Handle {
}
}
cfg_io_driver! {
#[track_caller]
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref()
.expect(crate::util::error::CONTEXT_MISSING_ERROR)
.inner
.driver()
.io
.clone()
}) {
Ok(io_handle) => io_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}
}
cfg_signal_internal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {

View File

@ -82,10 +82,11 @@ impl Handle {
}
cfg_io_driver! {
#[track_caller]
pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
self.io
.as_ref()
.expect("A Tokio 1.x context was found, but I/O is disabled. Call `enable_io` on the runtime builder to enable I/O.")
.expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
}
}
@ -170,14 +171,6 @@ cfg_io_driver! {
}
}
#[track_caller]
pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle {
match self {
IoHandle::Enabled(v) => v,
IoHandle::Disabled(..) => panic!("{}", msg),
}
}
pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> {
match self {
IoHandle::Enabled(v) => Some(v),

View File

@ -96,7 +96,9 @@ impl Handle {
/// ```
#[track_caller]
pub fn current() -> Self {
context::current()
Handle {
inner: scheduler::Handle::current(),
}
}
/// Returns a Handle view over the currently running Runtime

View File

@ -236,38 +236,6 @@ impl fmt::Debug for Driver {
}
}
// ===== impl Handle =====
cfg_rt! {
impl Handle {
/// Returns a handle to the current reactor.
///
/// # Panics
///
/// This function panics if there is no current reactor set and `rt` feature
/// flag is not enabled.
#[track_caller]
pub(crate) fn current() -> Self {
crate::runtime::context::io_handle().expect("A Tokio 1.x context was found, but IO is disabled. Call `enable_io` on the runtime builder to enable IO.")
}
}
}
cfg_not_rt! {
impl Handle {
/// Returns a handle to the current reactor.
///
/// # Panics
///
/// This function panics if there is no current reactor set, or if the `rt`
/// feature flag is not enabled.
#[track_caller]
pub(crate) fn current() -> Self {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
cfg_net! {
cfg_metrics! {
impl Handle {

View File

@ -2,6 +2,7 @@
use crate::io::interest::Interest;
use crate::runtime::io::{Direction, Handle, ReadyEvent, ScheduledIo};
use crate::runtime::scheduler;
use crate::util::slab;
use mio::event::Source;
@ -43,8 +44,8 @@ cfg_io_driver! {
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
pub(crate) struct Registration {
/// Handle to the associated driver.
handle: Handle,
/// Handle to the associated runtime.
handle: scheduler::Handle,
/// Reference to state stored by the driver.
shared: slab::Ref<ScheduledIo>,
@ -66,12 +67,13 @@ impl Registration {
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
#[track_caller]
pub(crate) fn new_with_interest_and_handle(
io: &mut impl Source,
interest: Interest,
handle: Handle,
handle: scheduler::Handle,
) -> io::Result<Registration> {
let shared = handle.inner.add_source(io, interest)?;
let shared = handle.io().inner.add_source(io, interest)?;
Ok(Registration { handle, shared })
}
@ -93,7 +95,7 @@ impl Registration {
///
/// `Err` is returned if an error is encountered.
pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
self.handle.inner.deregister_source(io)
self.handle().inner.deregister_source(io)
}
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
@ -146,7 +148,7 @@ impl Registration {
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
if self.handle.inner.is_shutdown() {
if self.handle().inner.is_shutdown() {
return Poll::Ready(Err(gone()));
}
@ -195,6 +197,10 @@ impl Registration {
res => res,
}
}
fn handle(&self) -> &Handle {
self.handle.io()
}
}
impl Drop for Registration {
@ -224,7 +230,7 @@ cfg_io_readiness! {
pin!(fut);
crate::future::poll_fn(|cx| {
if self.handle.inner.is_shutdown() {
if self.handle().inner.is_shutdown() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR

View File

@ -40,6 +40,13 @@ impl Handle {
}
}
cfg_io_driver! {
#[track_caller]
pub(crate) fn io(&self) -> &crate::runtime::io::Handle {
self.driver().io()
}
}
cfg_time! {
#[track_caller]
pub(crate) fn time(&self) -> &crate::runtime::time::Handle {
@ -62,6 +69,11 @@ cfg_rt! {
use crate::util::RngSeedGenerator;
impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
crate::runtime::context::current().inner
}
pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
match self {
Handle::CurrentThread(h) => &h.blocking_spawner,
@ -156,3 +168,12 @@ cfg_rt! {
}
}
}
cfg_not_rt! {
impl Handle {
#[track_caller]
pub(crate) fn current() -> Handle {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}

View File

@ -248,76 +248,61 @@ cfg_not_trace! {
}
impl Sleep {
cfg_rt! {
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
#[track_caller]
pub(crate) fn new_timeout(
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
use crate::runtime::Handle;
#[cfg_attr(not(all(tokio_unstable, feature = "tracing")), allow(unused_variables))]
#[track_caller]
pub(crate) fn new_timeout(
deadline: Instant,
location: Option<&'static Location<'static>>,
) -> Sleep {
use crate::runtime::scheduler;
let handle = Handle::current().inner;
let entry = TimerEntry::new(&handle, deadline);
let handle = scheduler::Handle::current();
let entry = TimerEntry::new(&handle, deadline);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = &handle.time();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = &handle.time();
let time_source = handle.time_source();
let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now());
let location = location.expect("should have location if tracing");
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sleep",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
let location = location.expect("should have location if tracing");
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sleep",
kind = "timer",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
let async_op_span = resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
let async_op_span = resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
duration = duration,
duration.unit = "ms",
duration.op = "override",
);
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
});
tracing::trace_span!("runtime.resource.async_op", source = "Sleep::new_timeout")
});
let async_op_poll_span =
async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
let async_op_poll_span =
async_op_span.in_scope(|| tracing::trace_span!("runtime.resource.async_op.poll"));
let ctx = trace::AsyncOpTracingCtx {
async_op_span,
async_op_poll_span,
resource_span,
};
Inner {
deadline,
ctx,
}
let ctx = trace::AsyncOpTracingCtx {
async_op_span,
async_op_poll_span,
resource_span,
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner { deadline };
Inner { deadline, ctx }
};
Sleep { inner, entry }
}
}
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let inner = Inner { deadline };
cfg_not_rt! {
#[track_caller]
pub(crate) fn new_timeout(
_deadline: Instant,
_location: Option<&'static Location<'static>>,
) -> Sleep {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
Sleep { inner, entry }
}
pub(crate) fn far_future(location: Option<&'static Location<'static>>) -> Sleep {