From dd74c7c1bfd69ee997c57aade1db2326b85855df Mon Sep 17 00:00:00 2001 From: Asger Hautop Drewsen Date: Fri, 15 Aug 2025 09:52:12 +0200 Subject: [PATCH 1/5] task: implement `Ord` for `task::Id` (#7530) --- tokio/src/runtime/task/id.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/id.rs b/tokio/src/runtime/task/id.rs index 63d01700b..df946f2cf 100644 --- a/tokio/src/runtime/task/id.rs +++ b/tokio/src/runtime/task/id.rs @@ -16,7 +16,7 @@ use std::{fmt, num::NonZeroU64}; /// [`task::id()`](crate::task::id()) functions and from outside the task via /// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. #[cfg_attr(docsrs, doc(cfg(all(feature = "rt"))))] -#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] pub struct Id(pub(crate) NonZeroU64); /// Returns the [`Id`] of the currently running task. From 925c614c89d0a26777a334612e2ed6ad0e7935c3 Mon Sep 17 00:00:00 2001 From: Alex Bakon Date: Tue, 19 Aug 2025 07:42:55 -0400 Subject: [PATCH 2/5] time: reduce the generated code size of `Timeout::poll` (#7535) --- tokio/src/time/timeout.rs | 48 ++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index e7fbe75dc..ce4bf16d5 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -203,26 +203,32 @@ where return Poll::Ready(Ok(v)); } - let has_budget_now = coop::has_budget_remaining(); - - let delay = me.delay; - - let poll_delay = || -> Poll { - match delay.poll(cx) { - Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), - Poll::Pending => Poll::Pending, - } - }; - - if let (true, false) = (had_budget_before, has_budget_now) { - // if it is the underlying future that exhausted the budget, we poll - // the `delay` with an unconstrained one. This prevents pathological - // cases where the underlying future always exhausts the budget and - // we never get a chance to evaluate whether the timeout was hit or - // not. - coop::with_unconstrained(poll_delay) - } else { - poll_delay() - } + poll_delay(had_budget_before, me.delay, cx).map(Err) + } +} + +// The T-invariant portion of Timeout::::poll. Pulling this out reduces the +// amount of code that gets duplicated during monomorphization. +fn poll_delay( + had_budget_before: bool, + delay: Pin<&mut Sleep>, + cx: &mut task::Context<'_>, +) -> Poll { + let delay_poll = || match delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Elapsed::new()), + Poll::Pending => Poll::Pending, + }; + + let has_budget_now = coop::has_budget_remaining(); + + if let (true, false) = (had_budget_before, has_budget_now) { + // if it is the underlying future that exhausted the budget, we poll + // the `delay` with an unconstrained one. This prevents pathological + // cases where the underlying future always exhausts the budget and + // we never get a chance to evaluate whether the timeout was hit or + // not. + coop::with_unconstrained(delay_poll) + } else { + delay_poll() } } From adc3e19ba7467a192ae9ef2379549bac03e2ca1c Mon Sep 17 00:00:00 2001 From: Daniel Sharifi <40335219+DSharifi@users.noreply.github.com> Date: Sun, 31 Aug 2025 12:57:17 +0000 Subject: [PATCH 3/5] time: clarify the cancellation safety of the `DelayQueue` (#7564) --- tokio-util/src/time/delay_queue.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index a65101d16..eff8cb1ae 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -40,7 +40,7 @@ use std::task::{self, ready, Poll, Waker}; /// # `Stream` implementation /// /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have -/// expired, no items are returned. In this case, `Poll::Pending` is returned and the +/// expired, no items are returned. In this case, [`Poll::Pending`] is returned and the /// current task is registered to be notified once the next item's delay has /// expired. /// @@ -66,9 +66,13 @@ use std::task::{self, ready, Poll, Waker}; /// Capacity can be checked using [`capacity`] and allocated preemptively by using /// the [`reserve`] method. /// +/// # Cancellation safety +/// +/// [`DelayQueue`]'s implementation of [`StreamExt::next`] is cancellation safe. +/// /// # Usage /// -/// Using `DelayQueue` to manage cache entries. +/// Using [`DelayQueue`] to manage cache entries. /// /// ```rust,no_run /// use tokio_util::time::{DelayQueue, delay_queue}; @@ -118,7 +122,8 @@ use std::task::{self, ready, Poll, Waker}; /// [`insert`]: method@Self::insert /// [`insert_at`]: method@Self::insert_at /// [`Key`]: struct@Key -/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html +/// [`Stream`]: https://docs.rs/futures/0.3.31/futures/stream/trait.Stream.html +/// [`StreamExt::next`]: https://docs.rs/tokio-stream/0.1.17/tokio_stream/trait.StreamExt.html#method.next /// [`poll_expired`]: method@Self::poll_expired /// [`Stream::poll_expired`]: method@Self::poll_expired /// [`DelayQueue`]: struct@DelayQueue From c8371d45bc0529f88dd168e83c1e8b14f9b9def6 Mon Sep 17 00:00:00 2001 From: Varun Doshi Date: Wed, 3 Sep 2025 17:08:51 +0530 Subject: [PATCH 4/5] codec: add `{FramedRead,FramedWrite}::into_parts()` (#7566) --- tokio-util/src/codec/framed.rs | 2 +- tokio-util/src/codec/framed_read.rs | 14 ++++++++++++++ tokio-util/src/codec/framed_write.rs | 14 ++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index da3fec88e..ec7aa172f 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -377,7 +377,7 @@ pub struct FramedParts { /// This private field allows us to add additional fields in the future in a /// backwards compatible way. - _priv: (), + pub(crate) _priv: (), } impl FramedParts { diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index 3ede5876b..0f2e16dc3 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -11,6 +11,8 @@ use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; +use super::FramedParts; + pin_project! { /// A [`Stream`] of messages decoded from an [`AsyncRead`]. /// @@ -153,6 +155,18 @@ impl FramedRead { pub fn read_buffer_mut(&mut self) -> &mut BytesMut { &mut self.inner.state.buffer } + + /// Consumes the `FramedRead`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. + pub fn into_parts(self) -> FramedParts { + FramedParts { + io: self.inner.inner, + codec: self.inner.codec, + read_buf: self.inner.state.buffer, + write_buf: BytesMut::new(), + _priv: (), + } + } } // This impl just defers to the underlying FramedImpl diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 541aae9bf..efc369ebf 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -12,6 +12,8 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use super::FramedParts; + pin_project! { /// A [`Sink`] of frames encoded to an `AsyncWrite`. /// @@ -159,6 +161,18 @@ impl FramedWrite { pub fn set_backpressure_boundary(&mut self, boundary: usize) { self.inner.state.backpressure_boundary = boundary; } + + /// Consumes the `FramedWrite`, returning its underlying I/O stream, the buffer + /// with unprocessed data, and the codec. + pub fn into_parts(self) -> FramedParts { + FramedParts { + io: self.inner.inner, + codec: self.inner.codec, + read_buf: BytesMut::new(), + write_buf: self.inner.state.buffer, + _priv: (), + } + } } // This impl just defers to the underlying FramedImpl From 37ca2f049c552fde50a2535de6dfc61ee6e96eed Mon Sep 17 00:00:00 2001 From: Sam <130320493+srxg@users.noreply.github.com> Date: Wed, 3 Sep 2025 16:37:50 +0100 Subject: [PATCH 5/5] sync: remove inner mutex in `SetOnce` (#7554) --- tokio/src/sync/notify.rs | 43 +++++++++++++++++++++++++++++++++----- tokio/src/sync/set_once.rs | 43 +++++++++++++------------------------- 2 files changed, 53 insertions(+), 33 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index d46079793..b79c14d8e 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -741,12 +741,14 @@ impl Notify { /// } /// ``` pub fn notify_waiters(&self) { - let mut waiters = self.waiters.lock(); - - // The state must be loaded while the lock is held. The state may only - // transition out of WAITING while the lock is held. - let curr = self.state.load(SeqCst); + self.lock_waiter_list().notify_waiters(); + } + fn inner_notify_waiters<'a>( + &'a self, + curr: usize, + mut waiters: crate::loom::sync::MutexGuard<'a, LinkedList>, + ) { if matches!(get_state(curr), EMPTY | NOTIFIED) { // There are no waiting tasks. All we need to do is increment the // number of times this method was called. @@ -814,6 +816,20 @@ impl Notify { wakers.wake_all(); } + + pub(crate) fn lock_waiter_list(&self) -> NotifyGuard<'_> { + let guarded_waiters = self.waiters.lock(); + + // The state must be loaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let current_state = self.state.load(SeqCst); + + NotifyGuard { + guarded_notify: self, + guarded_waiters, + current_state, + } + } } impl Default for Notify { @@ -1374,3 +1390,20 @@ unsafe impl linked_list::Link for Waiter { } fn is_unpin() {} + +/// A guard that provides exclusive access to a `Notify`'s internal +/// waiters list. +/// +/// While this guard is held, the `Notify` instance's waiter list is locked. +pub(crate) struct NotifyGuard<'a> { + guarded_notify: &'a Notify, + guarded_waiters: crate::loom::sync::MutexGuard<'a, WaitList>, + current_state: usize, +} + +impl NotifyGuard<'_> { + pub(crate) fn notify_waiters(self) { + self.guarded_notify + .inner_notify_waiters(self.current_state, self.guarded_waiters); + } +} diff --git a/tokio/src/sync/set_once.rs b/tokio/src/sync/set_once.rs index e4adbf593..3170a9cda 100644 --- a/tokio/src/sync/set_once.rs +++ b/tokio/src/sync/set_once.rs @@ -1,14 +1,16 @@ use super::Notify; use crate::loom::cell::UnsafeCell; -use crate::loom::sync::{atomic::AtomicBool, Mutex}; +use crate::loom::sync::atomic::AtomicBool; use std::error::Error; use std::fmt; +use std::future::{poll_fn, Future}; use std::mem::MaybeUninit; use std::ops::Drop; use std::ptr; use std::sync::atomic::Ordering; +use std::task::Poll; // This file contains an implementation of an SetOnce. The value of SetOnce // can only be modified once during initialization. @@ -90,9 +92,6 @@ pub struct SetOnce { value_set: AtomicBool, value: UnsafeCell>, notify: Notify, - // we lock the mutex inside set to ensure - // only one caller of set can run at a time - lock: Mutex<()>, } impl Default for SetOnce { @@ -140,7 +139,6 @@ impl From for SetOnce { value_set: AtomicBool::new(true), value: UnsafeCell::new(MaybeUninit::new(value)), notify: Notify::new(), - lock: Mutex::new(()), } } } @@ -152,7 +150,6 @@ impl SetOnce { value_set: AtomicBool::new(false), value: UnsafeCell::new(MaybeUninit::uninit()), notify: Notify::new(), - lock: Mutex::new(()), } } @@ -195,7 +192,6 @@ impl SetOnce { value_set: AtomicBool::new(false), value: UnsafeCell::new(MaybeUninit::uninit()), notify: Notify::const_new(), - lock: Mutex::const_new(()), } } @@ -246,7 +242,6 @@ impl SetOnce { value_set: AtomicBool::new(true), value: UnsafeCell::new(MaybeUninit::new(value)), notify: Notify::const_new(), - lock: Mutex::const_new(()), } } @@ -287,19 +282,16 @@ impl SetOnce { return Err(SetOnceError(value)); } - // SAFETY: lock the mutex to ensure only one caller of set + // SAFETY: lock notify to ensure only one caller of set // can run at a time. - let guard = self.lock.lock(); + let guard = self.notify.lock_waiter_list(); if self.initialized() { - // If the value is already set, we return an error - drop(guard); - return Err(SetOnceError(value)); } // SAFETY: We have locked the mutex and checked if the value is - // initalized or not, so we can safely write to the value + // initialized or not, so we can safely write to the value unsafe { self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); } @@ -308,10 +300,8 @@ impl SetOnce { // atomic is able to read the value we just stored. self.value_set.store(true, Ordering::Release); - drop(guard); - // notify the waiting wakers that the value is set - self.notify.notify_waiters(); + guard.notify_waiters(); Ok(()) } @@ -353,20 +343,17 @@ impl SetOnce { } let notify_fut = self.notify.notified(); - { - // Taking the lock here ensures that a concurrent call to `set` - // will see the creation of `notify_fut` in case the check - // fails. - let _guard = self.lock.lock(); + pin!(notify_fut); + poll_fn(|cx| { + // Register under the notify's internal lock. + let ret = notify_fut.as_mut().poll(cx); if self.value_set.load(Ordering::Relaxed) { - // SAFETY: the state is initialized - return unsafe { self.get_unchecked() }; + return Poll::Ready(()); } - } - - // wait until the value is set - notify_fut.await; + ret + }) + .await; } } }