rt: update TimerEntry to use runtime::Handle

The `TimerEntry` struct is the internal integration point for public
time APIs (`sleep`, `interval`, ...) with the time driver. Currently,
`TimerEntry` holds an ref-counted reference to the time driver handle.

This patch replaces the reference to the time driver handle with a
reference to the runtime handle. This is part of a larger effort to
consolate internal handles across the runtime.
This commit is contained in:
Carl Lerche 2022-09-13 20:40:22 -07:00
parent ac1ae2cfbc
commit 588408c060
12 changed files with 448 additions and 456 deletions

View File

@ -498,6 +498,12 @@ cfg_rt! {
} }
cfg_not_rt! { cfg_not_rt! {
// The `runtime` module is used when the IO or time driver is needed. // The `runtime` module is used when the IO or time driver is needed.
#[cfg(any(
feature = "net",
feature = "time",
all(unix, feature = "process"),
all(unix, feature = "signal"),
))]
pub(crate) mod runtime; pub(crate) mod runtime;
} }

View File

@ -50,16 +50,6 @@ cfg_signal_internal! {
} }
cfg_time! { cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().time_handle.clone()
}) {
Ok(time_handle) => time_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
}
cfg_test_util! { cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> { pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) { match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) {

View File

@ -1,4 +1,9 @@
//! Abstracts out the entire chain of runtime sub-drivers into common types. //! Abstracts out the entire chain of runtime sub-drivers into common types.
// Eventually, this file will see significant refactoring / cleanup. For now, we
// don't need to worry much about dead code with certain feature permutations.
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
use crate::park::thread::{ParkThread, UnparkThread}; use crate::park::thread::{ParkThread, UnparkThread};
use std::io; use std::io;

View File

@ -1,11 +1,9 @@
use crate::runtime::task::JoinHandle; // When the runtime refactor is done, this should be removed.
use crate::runtime::{blocking, context, driver, Spawner}; #![cfg_attr(not(feature = "rt"), allow(dead_code))]
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
use crate::runtime::driver;
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use std::{error, fmt};
/// Handle to the runtime. /// Handle to the runtime.
/// ///
@ -14,6 +12,9 @@ use std::{error, fmt};
/// ///
/// [`Runtime::handle`]: crate::runtime::Runtime::handle() /// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
// When the `rt` feature is *not* enabled, this type is still defined, but not
// included in the public API.
#[cfg_attr(not(feature = "rt"), allow(unreachable_pub))]
pub struct Handle { pub struct Handle {
pub(super) inner: Arc<HandleInner>, pub(super) inner: Arc<HandleInner>,
} }
@ -21,6 +22,7 @@ pub struct Handle {
/// All internal handles that are *not* the scheduler's spawner. /// All internal handles that are *not* the scheduler's spawner.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct HandleInner { pub(crate) struct HandleInner {
#[cfg(feature = "rt")]
pub(super) spawner: Spawner, pub(super) spawner: Spawner,
/// Handles to the I/O drivers /// Handles to the I/O drivers
@ -54,23 +56,33 @@ pub(crate) struct HandleInner {
pub(super) clock: driver::Clock, pub(super) clock: driver::Clock,
/// Blocking pool spawner /// Blocking pool spawner
#[cfg(feature = "rt")]
pub(crate) blocking_spawner: blocking::Spawner, pub(crate) blocking_spawner: blocking::Spawner,
} }
/// Runtime context guard. cfg_rt! {
/// use crate::runtime::task::JoinHandle;
/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits use crate::runtime::{blocking, context, Spawner};
/// the runtime context on drop. use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
///
/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter use std::future::Future;
#[derive(Debug)] use std::marker::PhantomData;
#[must_use = "Creating and dropping a guard does nothing"] use std::{error, fmt};
pub struct EnterGuard<'a> {
/// Runtime context guard.
///
/// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
/// the runtime context on drop.
///
/// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
#[derive(Debug)]
#[must_use = "Creating and dropping a guard does nothing"]
pub struct EnterGuard<'a> {
_guard: context::EnterGuard, _guard: context::EnterGuard,
_handle_lifetime: PhantomData<&'a Handle>, _handle_lifetime: PhantomData<&'a Handle>,
} }
impl Handle { impl Handle {
/// Enters the runtime context. This allows you to construct types that must /// Enters the runtime context. This allows you to construct types that must
/// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. /// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
/// It will also allow you to call methods such as [`tokio::spawn`] and [`Handle::current`] /// It will also allow you to call methods such as [`tokio::spawn`] and [`Handle::current`]
@ -315,9 +327,9 @@ impl Handle {
pub(crate) fn shutdown(&self) { pub(crate) fn shutdown(&self) {
self.inner.spawner.shutdown(); self.inner.spawner.shutdown();
} }
} }
cfg_metrics! { cfg_metrics! {
use crate::runtime::RuntimeMetrics; use crate::runtime::RuntimeMetrics;
impl Handle { impl Handle {
@ -327,15 +339,15 @@ cfg_metrics! {
RuntimeMetrics::new(self.clone()) RuntimeMetrics::new(self.clone())
} }
} }
} }
/// Error returned by `try_current` when no Runtime has been started /// Error returned by `try_current` when no Runtime has been started
#[derive(Debug)] #[derive(Debug)]
pub struct TryCurrentError { pub struct TryCurrentError {
kind: TryCurrentErrorKind, kind: TryCurrentErrorKind,
} }
impl TryCurrentError { impl TryCurrentError {
pub(crate) fn new_no_context() -> Self { pub(crate) fn new_no_context() -> Self {
Self { Self {
kind: TryCurrentErrorKind::NoContext, kind: TryCurrentErrorKind::NoContext,
@ -360,14 +372,14 @@ impl TryCurrentError {
pub fn is_thread_local_destroyed(&self) -> bool { pub fn is_thread_local_destroyed(&self) -> bool {
matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed) matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
} }
} }
enum TryCurrentErrorKind { enum TryCurrentErrorKind {
NoContext, NoContext,
ThreadLocalDestroyed, ThreadLocalDestroyed,
} }
impl fmt::Debug for TryCurrentErrorKind { impl fmt::Debug for TryCurrentErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*; use TryCurrentErrorKind::*;
match self { match self {
@ -375,9 +387,9 @@ impl fmt::Debug for TryCurrentErrorKind {
ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
} }
} }
} }
impl fmt::Display for TryCurrentError { impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*; use TryCurrentErrorKind::*;
match self.kind { match self.kind {
@ -385,6 +397,25 @@ impl fmt::Display for TryCurrentError {
ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
} }
} }
}
impl error::Error for TryCurrentError {}
} }
impl error::Error for TryCurrentError {} cfg_not_rt! {
impl Handle {
pub(crate) fn current() -> Handle {
panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
}
}
}
cfg_time! {
impl Handle {
#[track_caller]
pub(crate) fn as_time_handle(&self) -> &crate::runtime::time::Handle {
self.inner.time_handle.as_ref()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}

View File

@ -177,6 +177,9 @@
#[macro_use] #[macro_use]
mod tests; mod tests;
mod driver;
pub(crate) mod handle;
cfg_io_driver_impl! { cfg_io_driver_impl! {
pub(crate) mod io; pub(crate) mod io;
} }
@ -216,11 +219,9 @@ cfg_rt! {
} }
pub(crate) mod context; pub(crate) mod context;
mod driver;
use self::enter::enter; use self::enter::enter;
mod handle;
pub use handle::{EnterGuard, Handle, TryCurrentError}; pub use handle::{EnterGuard, Handle, TryCurrentError};
pub(crate) use handle::HandleInner; pub(crate) use handle::HandleInner;

View File

@ -58,12 +58,11 @@ use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicU64; use crate::loom::sync::atomic::AtomicU64;
use crate::loom::sync::atomic::Ordering; use crate::loom::sync::atomic::Ordering;
use crate::runtime::handle::Handle;
use crate::sync::AtomicWaker; use crate::sync::AtomicWaker;
use crate::time::Instant; use crate::time::Instant;
use crate::util::linked_list; use crate::util::linked_list;
use super::Handle;
use std::cell::UnsafeCell as StdUnsafeCell; use std::cell::UnsafeCell as StdUnsafeCell;
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};
use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
@ -284,7 +283,7 @@ impl StateCell {
/// before polling. /// before polling.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct TimerEntry { pub(crate) struct TimerEntry {
/// Arc reference to the driver. We can only free the driver after /// Arc reference to the runtime handle. We can only free the driver after
/// deregistering everything from their respective timer wheels. /// deregistering everything from their respective timer wheels.
driver: Handle, driver: Handle,
/// Shared inner structure; this is part of an intrusive linked list, and /// Shared inner structure; this is part of an intrusive linked list, and
@ -490,7 +489,11 @@ unsafe impl linked_list::Link for TimerShared {
// ===== impl Entry ===== // ===== impl Entry =====
impl TimerEntry { impl TimerEntry {
#[track_caller]
pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self { pub(crate) fn new(handle: &Handle, deadline: Instant) -> Self {
// Panic if the time driver is not enabled
let _ = handle.as_time_handle();
let driver = handle.clone(); let driver = handle.clone();
Self { Self {
@ -533,20 +536,20 @@ impl TimerEntry {
// driver did so far and happens-before everything the driver does in // driver did so far and happens-before everything the driver does in
// the future. While we have the lock held, we also go ahead and // the future. While we have the lock held, we also go ahead and
// deregister the entry if necessary. // deregister the entry if necessary.
unsafe { self.driver.clear_entry(NonNull::from(self.inner())) }; unsafe { self.driver().clear_entry(NonNull::from(self.inner())) };
} }
pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) { pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant) {
unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None; unsafe { self.as_mut().get_unchecked_mut() }.initial_deadline = None;
let tick = self.driver.time_source().deadline_to_tick(new_time); let tick = self.driver().time_source().deadline_to_tick(new_time);
if self.inner().extend_expiration(tick).is_ok() { if self.inner().extend_expiration(tick).is_ok() {
return; return;
} }
unsafe { unsafe {
self.driver.reregister(tick, self.inner().into()); self.driver().reregister(tick, self.inner().into());
} }
} }
@ -554,7 +557,7 @@ impl TimerEntry {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Result<(), super::Error>> { ) -> Poll<Result<(), super::Error>> {
if self.driver.is_shutdown() { if self.driver().is_shutdown() {
panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR); panic!("{}", crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR);
} }
@ -566,6 +569,11 @@ impl TimerEntry {
this.inner().state.poll(cx.waker()) this.inner().state.poll(cx.waker())
} }
fn driver(&self) -> &super::Handle {
// At this point, we know the time_handle is Some.
self.driver.inner.time_handle.as_ref().unwrap()
}
} }
impl TimerHandle { impl TimerHandle {

View File

@ -32,35 +32,6 @@ impl Handle {
} }
} }
cfg_rt! {
impl Handle {
/// Tries to get a handle to the current timer.
///
/// # Panics
///
/// This function panics if there is no current timer set.
///
/// It can be triggered when [`Builder::enable_time`] or
/// [`Builder::enable_all`] are not included in the builder.
///
/// It can also panic whenever a timer is created outside of a
/// Tokio runtime. That is why `rt.block_on(sleep(...))` will panic,
/// since the function is executed outside of the runtime.
/// Whereas `rt.block_on(async {sleep(...).await})` doesn't panic.
/// And this is because wrapping the function on an async makes it lazy,
/// and so gets executed inside the runtime successfully without
/// panicking.
///
/// [`Builder::enable_time`]: crate::runtime::Builder::enable_time
/// [`Builder::enable_all`]: crate::runtime::Builder::enable_all
#[track_caller]
pub(crate) fn current() -> Self {
crate::runtime::context::time_handle()
.expect("A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers.")
}
}
}
cfg_not_rt! { cfg_not_rt! {
impl Handle { impl Handle {
/// Tries to get a handle to the current timer. /// Tries to get a handle to the current timer.

View File

@ -20,22 +20,10 @@ mod wheel;
use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Arc, Mutex}; use crate::loom::sync::{Arc, Mutex};
use crate::runtime::driver::{IoStack, IoUnpark};
use crate::time::error::Error; use crate::time::error::Error;
use crate::time::{Clock, Duration}; use crate::time::{Clock, Duration};
// This duplication should be cleaned up in a later refactor
cfg_io_driver! {
cfg_rt! {
use crate::runtime::driver::{IoStack, IoUnpark};
}
cfg_not_rt! {
use crate::runtime::io::{Driver as IoStack, Handle as IoUnpark};
}
}
cfg_not_io_driver! {
use crate::park::thread::{ParkThread as IoStack, UnparkThread as IoUnpark};
}
use std::fmt; use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; use std::{num::NonZeroU64, ptr::NonNull, task::Waker};

View File

@ -8,9 +8,8 @@ use futures::task::noop_waker_ref;
use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::Arc; use crate::loom::sync::Arc;
use crate::loom::thread; use crate::loom::thread;
use crate::runtime::driver::IoUnpark;
use super::{Handle, TimerEntry}; use super::TimerEntry;
fn block_on<T>(f: impl std::future::Future<Output = T>) -> T { fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
#[cfg(loom)] #[cfg(loom)]
@ -33,30 +32,24 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
f(); f();
} }
#[cfg(not(tokio_wasm))] fn rt(start_paused: bool) -> crate::runtime::Runtime {
fn unpark() -> IoUnpark { crate::runtime::Builder::new_current_thread()
use crate::park::thread::ParkThread; .enable_time()
IoUnpark::Disabled(ParkThread::new().unpark()) .start_paused(start_paused)
} .build()
.unwrap()
#[cfg(tokio_wasm)]
fn unpark() -> IoUnpark {
use crate::park::thread::ParkThread;
ParkThread::new().unpark()
} }
#[test] #[test]
fn single_timer() { fn single_timer() {
model(|| { model(|| {
let clock = crate::time::Clock::new(true, false); let rt = rt(false);
let time_source = super::TimeSource::new(clock.clone()); let handle = rt.handle();
let inner = super::Inner::new(time_source.clone(), unpark());
let handle = Handle::new(Arc::new(inner));
let handle_ = handle.clone(); let handle_ = handle.clone();
let jh = thread::spawn(move || { let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
pin!(entry); pin!(entry);
block_on(futures::future::poll_fn(|cx| { block_on(futures::future::poll_fn(|cx| {
@ -67,10 +60,12 @@ fn single_timer() {
thread::yield_now(); thread::yield_now();
let handle = handle.as_time_handle();
// This may or may not return Some (depending on how it races with the // This may or may not return Some (depending on how it races with the
// thread). If it does return None, however, the timer should complete // thread). If it does return None, however, the timer should complete
// synchronously. // synchronously.
handle.process_at_time(time_source.now() + 2_000_000_000); handle.process_at_time(handle.time_source().now() + 2_000_000_000);
jh.join().unwrap(); jh.join().unwrap();
}) })
@ -79,15 +74,13 @@ fn single_timer() {
#[test] #[test]
fn drop_timer() { fn drop_timer() {
model(|| { model(|| {
let clock = crate::time::Clock::new(true, false); let rt = rt(false);
let time_source = super::TimeSource::new(clock.clone()); let handle = rt.handle();
let inner = super::Inner::new(time_source.clone(), unpark());
let handle = Handle::new(Arc::new(inner));
let handle_ = handle.clone(); let handle_ = handle.clone();
let jh = thread::spawn(move || { let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
pin!(entry); pin!(entry);
let _ = entry let _ = entry
@ -100,8 +93,10 @@ fn drop_timer() {
thread::yield_now(); thread::yield_now();
let handle = handle.as_time_handle();
// advance 2s in the future. // advance 2s in the future.
handle.process_at_time(time_source.now() + 2_000_000_000); handle.process_at_time(handle.time_source().now() + 2_000_000_000);
jh.join().unwrap(); jh.join().unwrap();
}) })
@ -110,15 +105,13 @@ fn drop_timer() {
#[test] #[test]
fn change_waker() { fn change_waker() {
model(|| { model(|| {
let clock = crate::time::Clock::new(true, false); let rt = rt(false);
let time_source = super::TimeSource::new(clock.clone()); let handle = rt.handle();
let inner = super::Inner::new(time_source.clone(), unpark());
let handle = Handle::new(Arc::new(inner));
let handle_ = handle.clone(); let handle_ = handle.clone();
let jh = thread::spawn(move || { let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_, clock.now() + Duration::from_secs(1)); let entry =
TimerEntry::new(&handle_, handle_.inner.clock.now() + Duration::from_secs(1));
pin!(entry); pin!(entry);
let _ = entry let _ = entry
@ -133,8 +126,10 @@ fn change_waker() {
thread::yield_now(); thread::yield_now();
let handle = handle.as_time_handle();
// advance 2s // advance 2s
handle.process_at_time(time_source.now() + 2_000_000_000); handle.process_at_time(handle.time_source().now() + 2_000_000_000);
jh.join().unwrap(); jh.join().unwrap();
}) })
@ -145,15 +140,12 @@ fn reset_future() {
model(|| { model(|| {
let finished_early = Arc::new(AtomicBool::new(false)); let finished_early = Arc::new(AtomicBool::new(false));
let clock = crate::time::Clock::new(true, false); let rt = rt(false);
let time_source = super::TimeSource::new(clock.clone()); let handle = rt.handle();
let inner = super::Inner::new(time_source.clone(), unpark());
let handle = Handle::new(Arc::new(inner));
let handle_ = handle.clone(); let handle_ = handle.clone();
let finished_early_ = finished_early.clone(); let finished_early_ = finished_early.clone();
let start = clock.now(); let start = handle.inner.clock.now();
let jh = thread::spawn(move || { let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1)); let entry = TimerEntry::new(&handle_, start + Duration::from_secs(1));
@ -176,12 +168,22 @@ fn reset_future() {
thread::yield_now(); thread::yield_now();
let handle = handle.as_time_handle();
// This may or may not return a wakeup time. // This may or may not return a wakeup time.
handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(1500))); handle.process_at_time(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(1500)),
);
assert!(!finished_early.load(Ordering::Relaxed)); assert!(!finished_early.load(Ordering::Relaxed));
handle.process_at_time(time_source.instant_to_tick(start + Duration::from_millis(2500))); handle.process_at_time(
handle
.time_source()
.instant_to_tick(start + Duration::from_millis(2500)),
);
jh.join().unwrap(); jh.join().unwrap();
@ -201,20 +203,15 @@ fn normal_or_miri<T>(normal: T, miri: T) -> T {
#[test] #[test]
#[cfg(not(loom))] #[cfg(not(loom))]
fn poll_process_levels() { fn poll_process_levels() {
let clock = crate::time::Clock::new(true, false); let rt = rt(true);
clock.pause(); let handle = rt.handle();
let time_source = super::TimeSource::new(clock.clone());
let inner = super::Inner::new(time_source, unpark());
let handle = Handle::new(Arc::new(inner));
let mut entries = vec![]; let mut entries = vec![];
for i in 0..normal_or_miri(1024, 64) { for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new( let mut entry = Box::pin(TimerEntry::new(
&handle, &handle,
clock.now() + Duration::from_millis(i), handle.inner.clock.now() + Duration::from_millis(i),
)); ));
let _ = entry let _ = entry
@ -225,7 +222,8 @@ fn poll_process_levels() {
} }
for t in 1..normal_or_miri(1024, 64) { for t in 1..normal_or_miri(1024, 64) {
handle.process_at_time(t as u64); handle.as_time_handle().process_at_time(t as u64);
for (deadline, future) in entries.iter_mut().enumerate() { for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref()); let mut context = Context::from_waker(noop_waker_ref());
if deadline <= t { if deadline <= t {
@ -242,17 +240,17 @@ fn poll_process_levels() {
fn poll_process_levels_targeted() { fn poll_process_levels_targeted() {
let mut context = Context::from_waker(noop_waker_ref()); let mut context = Context::from_waker(noop_waker_ref());
let clock = crate::time::Clock::new(true, false); let rt = rt(true);
clock.pause(); let handle = rt.handle();
let time_source = super::TimeSource::new(clock.clone()); let e1 = TimerEntry::new(
&handle,
let inner = super::Inner::new(time_source, unpark()); handle.inner.clock.now() + Duration::from_millis(193),
let handle = Handle::new(Arc::new(inner)); );
let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193));
pin!(e1); pin!(e1);
let handle = handle.as_time_handle();
handle.process_at_time(62); handle.process_at_time(62);
assert!(e1.as_mut().poll_elapsed(&mut context).is_pending()); assert!(e1.as_mut().poll_elapsed(&mut context).is_pending());
handle.process_at_time(192); handle.process_at_time(192);

View File

@ -1,6 +1,7 @@
use crate::runtime::handle::Handle;
#[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::runtime::time::TimeSource; use crate::runtime::time::TimeSource;
use crate::runtime::time::{Handle, TimerEntry}; use crate::runtime::time::TimerEntry;
use crate::time::{error::Error, Duration, Instant}; use crate::time::{error::Error, Duration, Instant};
use crate::util::trace; use crate::util::trace;
@ -262,6 +263,7 @@ impl Sleep {
#[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = { let inner = {
let handle = &handle.as_time_handle();
let time_source = handle.time_source().clone(); let time_source = handle.time_source().clone();
let deadline_tick = time_source.deadline_to_tick(deadline); let deadline_tick = time_source.deadline_to_tick(deadline);
let duration = deadline_tick.saturating_sub(time_source.now()); let duration = deadline_tick.saturating_sub(time_source.now());

View File

@ -1,15 +1,14 @@
// Some combinations of features may not use these constants.
#![cfg_attr(not(feature = "full"), allow(dead_code))]
/// Error string explaining that the Tokio context hasn't been instantiated. /// Error string explaining that the Tokio context hasn't been instantiated.
pub(crate) const CONTEXT_MISSING_ERROR: &str = pub(crate) const CONTEXT_MISSING_ERROR: &str =
"there is no reactor running, must be called from the context of a Tokio 1.x runtime"; "there is no reactor running, must be called from the context of a Tokio 1.x runtime";
// some combinations of features might not use this
#[allow(dead_code)]
/// Error string explaining that the Tokio context is shutting down and cannot drive timers. /// Error string explaining that the Tokio context is shutting down and cannot drive timers.
pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str = pub(crate) const RUNTIME_SHUTTING_DOWN_ERROR: &str =
"A Tokio 1.x context was found, but it is being shutdown."; "A Tokio 1.x context was found, but it is being shutdown.";
// some combinations of features might not use this
#[allow(dead_code)]
/// Error string explaining that the Tokio context is not available because the /// Error string explaining that the Tokio context is not available because the
/// thread-local storing it has been destroyed. This usually only happens during /// thread-local storing it has been destroyed. This usually only happens during
/// destructors of other thread-locals. /// destructors of other thread-locals.

View File

@ -74,11 +74,4 @@ pub(crate) mod trace;
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use self::rand::thread_rng_n; pub use self::rand::thread_rng_n;
#[cfg(any(
feature = "rt",
feature = "time",
feature = "net",
feature = "process",
all(unix, feature = "signal")
))]
pub(crate) mod error; pub(crate) mod error;