From 9563707aaa73a802fa4d3c51c12869a037641070 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 28 May 2025 20:01:31 +0800 Subject: [PATCH] time: cumulative minor improvements (#7358) --- tokio/src/runtime/local_runtime/runtime.rs | 1 - tokio/src/runtime/runtime.rs | 1 - tokio/src/runtime/time/entry.rs | 107 +++++++++++++-------- tokio/src/runtime/time/mod.rs | 1 - tokio/src/runtime/time/tests/mod.rs | 5 +- 5 files changed, 66 insertions(+), 49 deletions(-) diff --git a/tokio/src/runtime/local_runtime/runtime.rs b/tokio/src/runtime/local_runtime/runtime.rs index c0cb327f2..2e9d14e13 100644 --- a/tokio/src/runtime/local_runtime/runtime.rs +++ b/tokio/src/runtime/local_runtime/runtime.rs @@ -376,7 +376,6 @@ impl LocalRuntime { } } -#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let impl Drop for LocalRuntime { fn drop(&mut self) { if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler { diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 2f2b07d32..1bbb0205b 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -471,7 +471,6 @@ impl Runtime { } } -#[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let impl Drop for Runtime { fn drop(&mut self) { match &mut self.scheduler { diff --git a/tokio/src/runtime/time/entry.rs b/tokio/src/runtime/time/entry.rs index 751c77334..627fcbc5e 100644 --- a/tokio/src/runtime/time/entry.rs +++ b/tokio/src/runtime/time/entry.rs @@ -63,6 +63,7 @@ use crate::sync::AtomicWaker; use crate::time::Instant; use crate::util::linked_list; +use pin_project_lite::pin_project; use std::task::{Context, Poll, Waker}; use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull}; @@ -276,29 +277,36 @@ impl StateCell { } } -/// A timer entry. -/// -/// This is the handle to a timer that is controlled by the requester of the -/// timer. As this participates in intrusive data structures, it must be pinned -/// before polling. -#[derive(Debug)] -pub(crate) struct TimerEntry { - /// Arc reference to the runtime handle. We can only free the driver after - /// deregistering everything from their respective timer wheels. - driver: scheduler::Handle, - /// Shared inner structure; this is part of an intrusive linked list, and - /// therefore other references can exist to it while mutable references to - /// Entry exist. - /// - /// This is manipulated only under the inner mutex. - inner: Option, - /// Deadline for the timer. This is used to register on the first - /// poll, as we can't register prior to being pinned. - deadline: Instant, - /// Whether the deadline has been registered. - registered: bool, - /// Ensure the type is !Unpin - _m: std::marker::PhantomPinned, +pin_project! { + // A timer entry. + // + // This is the handle to a timer that is controlled by the requester of the + // timer. As this participates in intrusive data structures, it must be pinned + // before polling. + #[derive(Debug)] + pub(crate) struct TimerEntry { + // Arc reference to the runtime handle. We can only free the driver after + // deregistering everything from their respective timer wheels. + driver: scheduler::Handle, + // Shared inner structure; this is part of an intrusive linked list, and + // therefore other references can exist to it while mutable references to + // Entry exist. + // + // This is manipulated only under the inner mutex. + #[pin] + inner: Option, + // Deadline for the timer. This is used to register on the first + // poll, as we can't register prior to being pinned. + deadline: Instant, + // Whether the deadline has been registered. + registered: bool, + } + + impl PinnedDrop for TimerEntry { + fn drop(this: Pin<&mut Self>) { + this.cancel(); + } + } } unsafe impl Send for TimerEntry {} @@ -480,7 +488,6 @@ impl TimerEntry { inner: None, deadline, registered: false, - _m: std::marker::PhantomPinned, } } @@ -488,10 +495,10 @@ impl TimerEntry { self.inner.as_ref() } - fn init_inner(&mut self) { + fn init_inner(self: Pin<&mut Self>) { match self.inner { Some(_) => {} - None => self.inner = Some(TimerShared::new()), + None => self.project().inner.set(Some(TimerShared::new())), } } @@ -504,7 +511,29 @@ impl TimerEntry { return false; }; - !inner.state.might_be_registered() && self.registered + // Is this timer still in the timer wheel? + let deregistered = !inner.might_be_registered(); + + // Once the timer has expired, + // it will be taken out of the wheel and be fired. + // + // So if we have already registered the timer into the wheel, + // but now it is not in the wheel, it means that it has been + // fired. + // + // +--------------+-----------------+----------+ + // | deregistered | self.registered | output | + // +--------------+-----------------+----------+ + // | true | false | false | <- never been registered + // +--------------+-----------------+----------+ + // | false | false | false | <- never been registered + // +--------------+-----------------+----------+ + // | true | true | true | <- registered into the wheel, + // | | | | and then taken out of the wheel. + // +--------------+-----------------+----------+ + // | false | true | false | <- still registered in the wheel + // +--------------+-----------------+----------+ + deregistered && self.registered } /// Cancels and deregisters the timer. This operation is irreversible. @@ -540,16 +569,16 @@ impl TimerEntry { } pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) { - let this = unsafe { self.as_mut().get_unchecked_mut() }; - this.deadline = new_time; - this.registered = reregister; + let this = self.as_mut().project(); + *this.deadline = new_time; + *this.registered = reregister; - let tick = this.driver().time_source().deadline_to_tick(new_time); - let inner = match this.inner() { + let tick = self.driver().time_source().deadline_to_tick(new_time); + let inner = match self.inner() { Some(inner) => inner, None => { - this.init_inner(); - this.inner() + self.as_mut().init_inner(); + self.inner() .expect("inner should already be initialized by `this.init_inner()`") } }; @@ -560,8 +589,8 @@ impl TimerEntry { if reregister { unsafe { - this.driver() - .reregister(&this.driver.driver().io, tick, inner.into()); + self.driver() + .reregister(&self.driver.driver().io, tick, inner.into()); } } } @@ -656,9 +685,3 @@ impl TimerHandle { self.inner.as_ref().state.fire(completed_state) } } - -impl Drop for TimerEntry { - fn drop(&mut self) { - unsafe { Pin::new_unchecked(self) }.as_mut().cancel(); - } -} diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 024fe77e9..3250dce97 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -246,7 +246,6 @@ impl Driver { } impl Handle { - /// Runs timer related logic, and returns the next wakeup time pub(self) fn process(&self, clock: &Clock) { let now = self.time_source().now(clock); diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index c2138a82d..33c4a5366 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -62,9 +62,7 @@ fn single_timer() { let time = handle.inner.driver().time(); let clock = handle.inner.driver().clock(); - // This may or may not return Some (depending on how it races with the - // thread). If it does return None, however, the timer should complete - // synchronously. + // advance 2s time.process_at_time(time.time_source().now(clock) + 2_000_000_000); jh.join().unwrap(); @@ -170,7 +168,6 @@ fn reset_future() { let handle = handle.inner.driver().time(); - // This may or may not return a wakeup time. handle.process_at_time( handle .time_source()