Merge remote-tracking branch 'origin/master' into rt-threaded-rework

This commit is contained in:
Carl Lerche 2023-06-26 17:28:14 +00:00
commit 81885fbb1d
No known key found for this signature in database
GPG Key ID: DF9E42ECB951A75D
6 changed files with 111 additions and 5 deletions

View File

@ -34,7 +34,7 @@ rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"]
__docs_rs = ["futures-util"]
[dependencies]
tokio = { version = "1.22.0", path = "../tokio", features = ["sync"] }
tokio = { version = "1.28.0", path = "../tokio", features = ["sync"] }
bytes = "1.0.0"
futures-core = "0.3.0"
futures-sink = "0.3.0"
@ -56,6 +56,7 @@ async-stream = "0.3.0"
futures = "0.3.0"
futures-test = "0.3.5"
parking_lot = "0.12.0"
tempfile = "3.1.0"
[package.metadata.docs.rs]
all-features = true

View File

@ -227,6 +227,8 @@ impl<T: tokio::io::AsyncSeek> futures_io::AsyncSeek for Compat<T> {
pos: io::SeekFrom,
) -> Poll<io::Result<u64>> {
if self.seek_pos != Some(pos) {
// Ensure previous seeks have finished before starting a new one
ready!(self.as_mut().project().inner.poll_complete(cx))?;
self.as_mut().project().inner.start_seek(pos)?;
*self.as_mut().project().seek_pos = Some(pos);
}

View File

@ -316,6 +316,60 @@ where
self.insert(key, task);
}
/// Spawn the blocking code on the blocking threadpool and store it in this `JoinMap` with the provided
/// key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// Note that blocking tasks cannot be cancelled after execution starts.
/// Replaced blocking tasks will still run to completion if the task has begun
/// to execute when it is replaced. A blocking task which is replaced before
/// it has been scheduled on a blocking worker thread will be cancelled.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_blocking<F>(&mut self, key: K, f: F)
where
F: FnOnce() -> V,
F: Send + 'static,
V: Send,
{
let task = self.tasks.spawn_blocking(f);
self.insert(key, task)
}
/// Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this
/// `JoinMap` with the provided key.
///
/// If a task previously existed in the `JoinMap` for this key, that task
/// will be cancelled and replaced with the new one. The previous task will
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
/// *not* return a cancelled [`JoinError`] for that task.
///
/// Note that blocking tasks cannot be cancelled after execution starts.
/// Replaced blocking tasks will still run to completion if the task has begun
/// to execute when it is replaced. A blocking task which is replaced before
/// it has been scheduled on a blocking worker thread will be cancelled.
///
/// [`join_next`]: Self::join_next
#[track_caller]
pub fn spawn_blocking_on<F>(&mut self, key: K, f: F, handle: &Handle)
where
F: FnOnce() -> V,
F: Send + 'static,
V: Send,
{
let task = self.tasks.spawn_blocking_on(f, handle);
self.insert(key, task);
}
/// Spawn the provided task on the current [`LocalSet`] and store it in this
/// `JoinMap` with the provided key.
///

View File

@ -0,0 +1,43 @@
#![cfg(all(feature = "compat"))]
#![cfg(not(target_os = "wasi"))] // WASI does not support all fs operations
#![warn(rust_2018_idioms)]
use futures_io::SeekFrom;
use futures_util::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tempfile::NamedTempFile;
use tokio::fs::OpenOptions;
use tokio_util::compat::TokioAsyncWriteCompatExt;
#[tokio::test]
async fn compat_file_seek() -> futures_util::io::Result<()> {
let temp_file = NamedTempFile::new()?;
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(temp_file)
.await?
.compat_write();
file.write_all(&[0, 1, 2, 3, 4, 5]).await?;
file.write_all(&[6, 7]).await?;
assert_eq!(file.stream_position().await?, 8);
// Modify elements at position 2.
assert_eq!(file.seek(SeekFrom::Start(2)).await?, 2);
file.write_all(&[8, 9]).await?;
file.flush().await?;
// Verify we still have 8 elements.
assert_eq!(file.seek(SeekFrom::End(0)).await?, 8);
// Seek back to the start of the file to read and verify contents.
file.seek(SeekFrom::Start(0)).await?;
let mut buf = Vec::new();
let num_bytes = file.read_to_end(&mut buf).await?;
assert_eq!(&buf[..num_bytes], &[0, 1, 8, 9, 4, 5, 6, 7]);
Ok(())
}

View File

@ -139,7 +139,9 @@ impl ReadDir {
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
target_os = "vxworks",
target_os = "nto",
target_os = "vita",
)))]
file_type: std.file_type().ok(),
std: Arc::new(std),
@ -200,7 +202,9 @@ pub struct DirEntry {
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
target_os = "vxworks",
target_os = "nto",
target_os = "vita",
)))]
file_type: Option<FileType>,
std: Arc<std::fs::DirEntry>,
@ -331,7 +335,9 @@ impl DirEntry {
target_os = "solaris",
target_os = "illumos",
target_os = "haiku",
target_os = "vxworks"
target_os = "vxworks",
target_os = "nto",
target_os = "vita",
)))]
if let Some(file_type) = self.file_type {
return Ok(file_type);

View File

@ -801,7 +801,7 @@ impl<T> Sender<T> {
if state.is_closed() {
coop.made_progress();
return Poll::Ready(());
return Ready(());
}
if state.is_tx_task_set() {