mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
fs: add vectored writes to tokio::fs::File
(#5958)
This commit is contained in:
parent
cb1e10b745
commit
37bb47c4a2
@ -725,6 +725,81 @@ impl AsyncWrite for File {
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[io::IoSlice<'_>],
|
||||
) -> Poll<Result<usize, io::Error>> {
|
||||
ready!(crate::trace::trace_leaf(cx));
|
||||
let me = self.get_mut();
|
||||
let inner = me.inner.get_mut();
|
||||
|
||||
if let Some(e) = inner.last_write_err.take() {
|
||||
return Ready(Err(e.into()));
|
||||
}
|
||||
|
||||
loop {
|
||||
match inner.state {
|
||||
Idle(ref mut buf_cell) => {
|
||||
let mut buf = buf_cell.take().unwrap();
|
||||
|
||||
let seek = if !buf.is_empty() {
|
||||
Some(SeekFrom::Current(buf.discard_read()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let n = buf.copy_from_bufs(bufs);
|
||||
let std = me.std.clone();
|
||||
|
||||
let blocking_task_join_handle = spawn_mandatory_blocking(move || {
|
||||
let res = if let Some(seek) = seek {
|
||||
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
|
||||
} else {
|
||||
buf.write_to(&mut &*std)
|
||||
};
|
||||
|
||||
(Operation::Write(res), buf)
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
io::Error::new(io::ErrorKind::Other, "background task failed")
|
||||
})?;
|
||||
|
||||
inner.state = Busy(blocking_task_join_handle);
|
||||
|
||||
return Ready(Ok(n));
|
||||
}
|
||||
Busy(ref mut rx) => {
|
||||
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
|
||||
inner.state = Idle(Some(buf));
|
||||
|
||||
match op {
|
||||
Operation::Read(_) => {
|
||||
// We don't care about the result here. The fact
|
||||
// that the cursor has advanced will be reflected in
|
||||
// the next iteration of the loop
|
||||
continue;
|
||||
}
|
||||
Operation::Write(res) => {
|
||||
// If the previous write was successful, continue.
|
||||
// Otherwise, error.
|
||||
res?;
|
||||
continue;
|
||||
}
|
||||
Operation::Seek(_) => {
|
||||
// Ignore the seek
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
|
||||
ready!(crate::trace::trace_leaf(cx));
|
||||
let inner = self.inner.get_mut();
|
||||
|
@ -276,5 +276,22 @@ cfg_fs! {
|
||||
self.buf.truncate(0);
|
||||
ret
|
||||
}
|
||||
|
||||
pub(crate) fn copy_from_bufs(&mut self, bufs: &[io::IoSlice<'_>]) -> usize {
|
||||
assert!(self.is_empty());
|
||||
|
||||
let mut rem = MAX_BUF;
|
||||
for buf in bufs {
|
||||
if rem == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
let len = buf.len().min(rem);
|
||||
self.buf.extend_from_slice(&buf[..len]);
|
||||
rem -= len;
|
||||
}
|
||||
|
||||
MAX_BUF - rem
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,6 +2,7 @@
|
||||
#![cfg(all(feature = "full", not(target_os = "wasi")))] // WASI does not support all fs operations
|
||||
|
||||
use std::io::prelude::*;
|
||||
use std::io::IoSlice;
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
|
||||
@ -49,6 +50,40 @@ async fn basic_write_and_shutdown() {
|
||||
assert_eq!(file, HELLO);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_vectored() {
|
||||
let tempfile = tempfile();
|
||||
|
||||
let mut file = File::create(tempfile.path()).await.unwrap();
|
||||
|
||||
let ret = file
|
||||
.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(ret, HELLO.len() * 2);
|
||||
file.flush().await.unwrap();
|
||||
|
||||
let file = std::fs::read(tempfile.path()).unwrap();
|
||||
assert_eq!(file, [HELLO, HELLO].concat());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_vectored_and_shutdown() {
|
||||
let tempfile = tempfile();
|
||||
|
||||
let mut file = File::create(tempfile.path()).await.unwrap();
|
||||
|
||||
let ret = file
|
||||
.write_vectored(&[IoSlice::new(HELLO), IoSlice::new(HELLO)])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(ret, HELLO.len() * 2);
|
||||
file.shutdown().await.unwrap();
|
||||
|
||||
let file = std::fs::read(tempfile.path()).unwrap();
|
||||
assert_eq!(file, [HELLO, HELLO].concat());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewind_seek_position() {
|
||||
let tempfile = tempfile();
|
||||
|
Loading…
x
Reference in New Issue
Block a user