mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
io: make repeat
and sink
cooperative (#6254)
This commit is contained in:
parent
5f7fe8fd0d
commit
581cd41d79
@ -1,3 +1,4 @@
|
|||||||
|
use crate::io::util::poll_proceed_and_make_progress;
|
||||||
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
|
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
@ -138,20 +139,6 @@ impl fmt::Debug for Empty {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg_coop! {
|
|
||||||
fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
|
|
||||||
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
|
|
||||||
coop.made_progress();
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_not_coop! {
|
|
||||||
fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> {
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -85,6 +85,20 @@ cfg_io_util! {
|
|||||||
// used by `BufReader` and `BufWriter`
|
// used by `BufReader` and `BufWriter`
|
||||||
// https://github.com/rust-lang/rust/blob/master/library/std/src/sys_common/io.rs#L1
|
// https://github.com/rust-lang/rust/blob/master/library/std/src/sys_common/io.rs#L1
|
||||||
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
|
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
|
||||||
|
|
||||||
|
cfg_coop! {
|
||||||
|
fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
|
||||||
|
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
|
||||||
|
coop.made_progress();
|
||||||
|
std::task::Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg_not_coop! {
|
||||||
|
fn poll_proceed_and_make_progress(_: &mut std::task::Context<'_>) -> std::task::Poll<()> {
|
||||||
|
std::task::Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg_not_io_util! {
|
cfg_not_io_util! {
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::io::util::poll_proceed_and_make_progress;
|
||||||
use crate::io::{AsyncRead, ReadBuf};
|
use crate::io::{AsyncRead, ReadBuf};
|
||||||
|
|
||||||
use std::io;
|
use std::io;
|
||||||
@ -50,9 +51,11 @@ impl AsyncRead for Repeat {
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn poll_read(
|
fn poll_read(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &mut ReadBuf<'_>,
|
buf: &mut ReadBuf<'_>,
|
||||||
) -> Poll<io::Result<()>> {
|
) -> Poll<io::Result<()>> {
|
||||||
|
ready!(crate::trace::trace_leaf(cx));
|
||||||
|
ready!(poll_proceed_and_make_progress(cx));
|
||||||
// TODO: could be faster, but should we unsafe it?
|
// TODO: could be faster, but should we unsafe it?
|
||||||
while buf.remaining() != 0 {
|
while buf.remaining() != 0 {
|
||||||
buf.put_slice(&[self.byte]);
|
buf.put_slice(&[self.byte]);
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use crate::io::util::poll_proceed_and_make_progress;
|
||||||
use crate::io::AsyncWrite;
|
use crate::io::AsyncWrite;
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
@ -53,19 +54,25 @@ impl AsyncWrite for Sink {
|
|||||||
#[inline]
|
#[inline]
|
||||||
fn poll_write(
|
fn poll_write(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
_: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
buf: &[u8],
|
buf: &[u8],
|
||||||
) -> Poll<Result<usize, io::Error>> {
|
) -> Poll<Result<usize, io::Error>> {
|
||||||
|
ready!(crate::trace::trace_leaf(cx));
|
||||||
|
ready!(poll_proceed_and_make_progress(cx));
|
||||||
Poll::Ready(Ok(buf.len()))
|
Poll::Ready(Ok(buf.len()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||||
|
ready!(crate::trace::trace_leaf(cx));
|
||||||
|
ready!(poll_proceed_and_make_progress(cx));
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||||
|
ready!(crate::trace::trace_leaf(cx));
|
||||||
|
ready!(poll_proceed_and_make_progress(cx));
|
||||||
Poll::Ready(Ok(()))
|
Poll::Ready(Ok(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
18
tokio/tests/io_repeat.rs
Normal file
18
tokio/tests/io_repeat.rs
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
#![warn(rust_2018_idioms)]
|
||||||
|
#![cfg(all(feature = "full"))]
|
||||||
|
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn repeat_poll_read_is_cooperative() {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = async {
|
||||||
|
loop {
|
||||||
|
let mut buf = [0u8; 4096];
|
||||||
|
tokio::io::repeat(0b101).read_exact(&mut buf).await.unwrap();
|
||||||
|
}
|
||||||
|
} => {},
|
||||||
|
_ = tokio::task::yield_now() => {}
|
||||||
|
}
|
||||||
|
}
|
44
tokio/tests/io_sink.rs
Normal file
44
tokio/tests/io_sink.rs
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
#![warn(rust_2018_idioms)]
|
||||||
|
#![cfg(all(feature = "full"))]
|
||||||
|
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sink_poll_write_is_cooperative() {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = async {
|
||||||
|
loop {
|
||||||
|
let buf = vec![1, 2, 3];
|
||||||
|
tokio::io::sink().write_all(&buf).await.unwrap();
|
||||||
|
}
|
||||||
|
} => {},
|
||||||
|
_ = tokio::task::yield_now() => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sink_poll_flush_is_cooperative() {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = async {
|
||||||
|
loop {
|
||||||
|
tokio::io::sink().flush().await.unwrap();
|
||||||
|
}
|
||||||
|
} => {},
|
||||||
|
_ = tokio::task::yield_now() => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn sink_poll_shutdown_is_cooperative() {
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
_ = async {
|
||||||
|
loop {
|
||||||
|
tokio::io::sink().shutdown().await.unwrap();
|
||||||
|
}
|
||||||
|
} => {},
|
||||||
|
_ = tokio::task::yield_now() => {}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user