io: make tokio::io::empty cooperative (#4300)

Reads and buffered reads from a `tokio::io::empty` were always marked
as ready. That makes sense, given that there is nothing to wait for.
However, doing repeated reads on the `empty` could stall the event
loop and prevent other tasks from making progress.

This change uses tokio's coop system to yield control back to the
executor when appropriate.

Note that the issue that originally triggered this PR is not fixed
yet, because the `timeout` function will not poll the timer after
empty::read runs out of budget. A different change will be needed to
address that.

Refs: #4291
This commit is contained in:
Braulio Valdivielso Martínez 2021-12-10 10:08:49 +00:00 committed by GitHub
parent 0bc9160e25
commit eb1af7f29c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 2 deletions

View File

@ -50,16 +50,18 @@ impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}
}
impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(&[]))
}
@ -73,6 +75,20 @@ impl fmt::Debug for Empty {
}
}
cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}
}
cfg_not_coop! {
fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -0,0 +1,32 @@
#![cfg(feature = "full")]
use tokio::io::{AsyncBufReadExt, AsyncReadExt};
#[tokio::test]
async fn empty_read_is_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
let mut buf = [0u8; 4096];
let _ = tokio::io::empty().read(&mut buf).await;
}
} => {},
_ = tokio::task::yield_now() => {}
}
}
#[tokio::test]
async fn empty_buf_reads_are_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
let mut buf = String::new();
let _ = tokio::io::empty().read_line(&mut buf).await;
}
} => {},
_ = tokio::task::yield_now() => {}
}
}