Merge branch 'master' into mox692/add_uring_write

This commit is contained in:
Motoyuki Kimura 2025-09-04 18:04:21 +09:00 committed by GitHub
commit 8bb7cac95d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 118 additions and 59 deletions

View File

@ -377,7 +377,7 @@ pub struct FramedParts<T, U> {
/// This private field allows us to add additional fields in the future in a /// This private field allows us to add additional fields in the future in a
/// backwards compatible way. /// backwards compatible way.
_priv: (), pub(crate) _priv: (),
} }
impl<T, U> FramedParts<T, U> { impl<T, U> FramedParts<T, U> {

View File

@ -11,6 +11,8 @@ use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use super::FramedParts;
pin_project! { pin_project! {
/// A [`Stream`] of messages decoded from an [`AsyncRead`]. /// A [`Stream`] of messages decoded from an [`AsyncRead`].
/// ///
@ -153,6 +155,18 @@ impl<T, D> FramedRead<T, D> {
pub fn read_buffer_mut(&mut self) -> &mut BytesMut { pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
&mut self.inner.state.buffer &mut self.inner.state.buffer
} }
/// Consumes the `FramedRead`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
pub fn into_parts(self) -> FramedParts<T, D> {
FramedParts {
io: self.inner.inner,
codec: self.inner.codec,
read_buf: self.inner.state.buffer,
write_buf: BytesMut::new(),
_priv: (),
}
}
} }
// This impl just defers to the underlying FramedImpl // This impl just defers to the underlying FramedImpl

View File

@ -12,6 +12,8 @@ use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use super::FramedParts;
pin_project! { pin_project! {
/// A [`Sink`] of frames encoded to an `AsyncWrite`. /// A [`Sink`] of frames encoded to an `AsyncWrite`.
/// ///
@ -159,6 +161,18 @@ impl<T, E> FramedWrite<T, E> {
pub fn set_backpressure_boundary(&mut self, boundary: usize) { pub fn set_backpressure_boundary(&mut self, boundary: usize) {
self.inner.state.backpressure_boundary = boundary; self.inner.state.backpressure_boundary = boundary;
} }
/// Consumes the `FramedWrite`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
pub fn into_parts(self) -> FramedParts<T, E> {
FramedParts {
io: self.inner.inner,
codec: self.inner.codec,
read_buf: BytesMut::new(),
write_buf: self.inner.state.buffer,
_priv: (),
}
}
} }
// This impl just defers to the underlying FramedImpl // This impl just defers to the underlying FramedImpl

View File

@ -40,7 +40,7 @@ use std::task::{self, ready, Poll, Waker};
/// # `Stream` implementation /// # `Stream` implementation
/// ///
/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have /// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have
/// expired, no items are returned. In this case, `Poll::Pending` is returned and the /// expired, no items are returned. In this case, [`Poll::Pending`] is returned and the
/// current task is registered to be notified once the next item's delay has /// current task is registered to be notified once the next item's delay has
/// expired. /// expired.
/// ///
@ -66,9 +66,13 @@ use std::task::{self, ready, Poll, Waker};
/// Capacity can be checked using [`capacity`] and allocated preemptively by using /// Capacity can be checked using [`capacity`] and allocated preemptively by using
/// the [`reserve`] method. /// the [`reserve`] method.
/// ///
/// # Cancellation safety
///
/// [`DelayQueue`]'s implementation of [`StreamExt::next`] is cancellation safe.
///
/// # Usage /// # Usage
/// ///
/// Using `DelayQueue` to manage cache entries. /// Using [`DelayQueue`] to manage cache entries.
/// ///
/// ```rust,no_run /// ```rust,no_run
/// use tokio_util::time::{DelayQueue, delay_queue}; /// use tokio_util::time::{DelayQueue, delay_queue};
@ -118,7 +122,8 @@ use std::task::{self, ready, Poll, Waker};
/// [`insert`]: method@Self::insert /// [`insert`]: method@Self::insert
/// [`insert_at`]: method@Self::insert_at /// [`insert_at`]: method@Self::insert_at
/// [`Key`]: struct@Key /// [`Key`]: struct@Key
/// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html /// [`Stream`]: https://docs.rs/futures/0.3.31/futures/stream/trait.Stream.html
/// [`StreamExt::next`]: https://docs.rs/tokio-stream/0.1.17/tokio_stream/trait.StreamExt.html#method.next
/// [`poll_expired`]: method@Self::poll_expired /// [`poll_expired`]: method@Self::poll_expired
/// [`Stream::poll_expired`]: method@Self::poll_expired /// [`Stream::poll_expired`]: method@Self::poll_expired
/// [`DelayQueue`]: struct@DelayQueue /// [`DelayQueue`]: struct@DelayQueue

View File

@ -16,7 +16,7 @@ use std::{fmt, num::NonZeroU64};
/// [`task::id()`](crate::task::id()) functions and from outside the task via /// [`task::id()`](crate::task::id()) functions and from outside the task via
/// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function. /// the [`JoinHandle::id()`](crate::task::JoinHandle::id()) function.
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt"))))] #[cfg_attr(docsrs, doc(cfg(all(feature = "rt"))))]
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
pub struct Id(pub(crate) NonZeroU64); pub struct Id(pub(crate) NonZeroU64);
/// Returns the [`Id`] of the currently running task. /// Returns the [`Id`] of the currently running task.

View File

@ -741,12 +741,14 @@ impl Notify {
/// } /// }
/// ``` /// ```
pub fn notify_waiters(&self) { pub fn notify_waiters(&self) {
let mut waiters = self.waiters.lock(); self.lock_waiter_list().notify_waiters();
}
// The state must be loaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);
fn inner_notify_waiters<'a>(
&'a self,
curr: usize,
mut waiters: crate::loom::sync::MutexGuard<'a, LinkedList<Waiter, Waiter>>,
) {
if matches!(get_state(curr), EMPTY | NOTIFIED) { if matches!(get_state(curr), EMPTY | NOTIFIED) {
// There are no waiting tasks. All we need to do is increment the // There are no waiting tasks. All we need to do is increment the
// number of times this method was called. // number of times this method was called.
@ -814,6 +816,20 @@ impl Notify {
wakers.wake_all(); wakers.wake_all();
} }
pub(crate) fn lock_waiter_list(&self) -> NotifyGuard<'_> {
let guarded_waiters = self.waiters.lock();
// The state must be loaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let current_state = self.state.load(SeqCst);
NotifyGuard {
guarded_notify: self,
guarded_waiters,
current_state,
}
}
} }
impl Default for Notify { impl Default for Notify {
@ -1374,3 +1390,20 @@ unsafe impl linked_list::Link for Waiter {
} }
fn is_unpin<T: Unpin>() {} fn is_unpin<T: Unpin>() {}
/// A guard that provides exclusive access to a `Notify`'s internal
/// waiters list.
///
/// While this guard is held, the `Notify` instance's waiter list is locked.
pub(crate) struct NotifyGuard<'a> {
guarded_notify: &'a Notify,
guarded_waiters: crate::loom::sync::MutexGuard<'a, WaitList>,
current_state: usize,
}
impl NotifyGuard<'_> {
pub(crate) fn notify_waiters(self) {
self.guarded_notify
.inner_notify_waiters(self.current_state, self.guarded_waiters);
}
}

View File

@ -1,14 +1,16 @@
use super::Notify; use super::Notify;
use crate::loom::cell::UnsafeCell; use crate::loom::cell::UnsafeCell;
use crate::loom::sync::{atomic::AtomicBool, Mutex}; use crate::loom::sync::atomic::AtomicBool;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::future::{poll_fn, Future};
use std::mem::MaybeUninit; use std::mem::MaybeUninit;
use std::ops::Drop; use std::ops::Drop;
use std::ptr; use std::ptr;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::task::Poll;
// This file contains an implementation of an SetOnce. The value of SetOnce // This file contains an implementation of an SetOnce. The value of SetOnce
// can only be modified once during initialization. // can only be modified once during initialization.
@ -90,9 +92,6 @@ pub struct SetOnce<T> {
value_set: AtomicBool, value_set: AtomicBool,
value: UnsafeCell<MaybeUninit<T>>, value: UnsafeCell<MaybeUninit<T>>,
notify: Notify, notify: Notify,
// we lock the mutex inside set to ensure
// only one caller of set can run at a time
lock: Mutex<()>,
} }
impl<T> Default for SetOnce<T> { impl<T> Default for SetOnce<T> {
@ -140,7 +139,6 @@ impl<T> From<T> for SetOnce<T> {
value_set: AtomicBool::new(true), value_set: AtomicBool::new(true),
value: UnsafeCell::new(MaybeUninit::new(value)), value: UnsafeCell::new(MaybeUninit::new(value)),
notify: Notify::new(), notify: Notify::new(),
lock: Mutex::new(()),
} }
} }
} }
@ -152,7 +150,6 @@ impl<T> SetOnce<T> {
value_set: AtomicBool::new(false), value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()), value: UnsafeCell::new(MaybeUninit::uninit()),
notify: Notify::new(), notify: Notify::new(),
lock: Mutex::new(()),
} }
} }
@ -195,7 +192,6 @@ impl<T> SetOnce<T> {
value_set: AtomicBool::new(false), value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()), value: UnsafeCell::new(MaybeUninit::uninit()),
notify: Notify::const_new(), notify: Notify::const_new(),
lock: Mutex::const_new(()),
} }
} }
@ -246,7 +242,6 @@ impl<T> SetOnce<T> {
value_set: AtomicBool::new(true), value_set: AtomicBool::new(true),
value: UnsafeCell::new(MaybeUninit::new(value)), value: UnsafeCell::new(MaybeUninit::new(value)),
notify: Notify::const_new(), notify: Notify::const_new(),
lock: Mutex::const_new(()),
} }
} }
@ -287,19 +282,16 @@ impl<T> SetOnce<T> {
return Err(SetOnceError(value)); return Err(SetOnceError(value));
} }
// SAFETY: lock the mutex to ensure only one caller of set // SAFETY: lock notify to ensure only one caller of set
// can run at a time. // can run at a time.
let guard = self.lock.lock(); let guard = self.notify.lock_waiter_list();
if self.initialized() { if self.initialized() {
// If the value is already set, we return an error
drop(guard);
return Err(SetOnceError(value)); return Err(SetOnceError(value));
} }
// SAFETY: We have locked the mutex and checked if the value is // SAFETY: We have locked the mutex and checked if the value is
// initalized or not, so we can safely write to the value // initialized or not, so we can safely write to the value
unsafe { unsafe {
self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value));
} }
@ -308,10 +300,8 @@ impl<T> SetOnce<T> {
// atomic is able to read the value we just stored. // atomic is able to read the value we just stored.
self.value_set.store(true, Ordering::Release); self.value_set.store(true, Ordering::Release);
drop(guard);
// notify the waiting wakers that the value is set // notify the waiting wakers that the value is set
self.notify.notify_waiters(); guard.notify_waiters();
Ok(()) Ok(())
} }
@ -353,20 +343,17 @@ impl<T> SetOnce<T> {
} }
let notify_fut = self.notify.notified(); let notify_fut = self.notify.notified();
{ pin!(notify_fut);
// Taking the lock here ensures that a concurrent call to `set`
// will see the creation of `notify_fut` in case the check
// fails.
let _guard = self.lock.lock();
poll_fn(|cx| {
// Register under the notify's internal lock.
let ret = notify_fut.as_mut().poll(cx);
if self.value_set.load(Ordering::Relaxed) { if self.value_set.load(Ordering::Relaxed) {
// SAFETY: the state is initialized return Poll::Ready(());
return unsafe { self.get_unchecked() };
} }
} ret
})
// wait until the value is set .await;
notify_fut.await;
} }
} }
} }

View File

@ -203,26 +203,32 @@ where
return Poll::Ready(Ok(v)); return Poll::Ready(Ok(v));
} }
let has_budget_now = coop::has_budget_remaining(); poll_delay(had_budget_before, me.delay, cx).map(Err)
}
let delay = me.delay; }
let poll_delay = || -> Poll<Self::Output> { // The T-invariant portion of Timeout::<T>::poll. Pulling this out reduces the
match delay.poll(cx) { // amount of code that gets duplicated during monomorphization.
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), fn poll_delay(
Poll::Pending => Poll::Pending, had_budget_before: bool,
} delay: Pin<&mut Sleep>,
}; cx: &mut task::Context<'_>,
) -> Poll<Elapsed> {
if let (true, false) = (had_budget_before, has_budget_now) { let delay_poll = || match delay.poll(cx) {
// if it is the underlying future that exhausted the budget, we poll Poll::Ready(()) => Poll::Ready(Elapsed::new()),
// the `delay` with an unconstrained one. This prevents pathological Poll::Pending => Poll::Pending,
// cases where the underlying future always exhausts the budget and };
// we never get a chance to evaluate whether the timeout was hit or
// not. let has_budget_now = coop::has_budget_remaining();
coop::with_unconstrained(poll_delay)
} else { if let (true, false) = (had_budget_before, has_budget_now) {
poll_delay() // if it is the underlying future that exhausted the budget, we poll
} // the `delay` with an unconstrained one. This prevents pathological
// cases where the underlying future always exhausts the budget and
// we never get a chance to evaluate whether the timeout was hit or
// not.
coop::with_unconstrained(delay_poll)
} else {
delay_poll()
} }
} }