fs: reorganize tokio::fs::file's mock tests (#3988)

This commit is contained in:
Alan Somers 2021-07-31 01:55:32 -06:00 committed by GitHub
parent d01bda86a4
commit cf02b3f32d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 564 additions and 544 deletions

View File

@ -123,6 +123,7 @@ version = "0.3.6"
tokio-test = { version = "0.4.0", path = "../tokio-test" }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
futures = { version = "0.3.0", features = ["async-await"] }
mockall = "0.10.2"
proptest = "1"
rand = "0.8.0"
tempfile = "3.1.0"

View File

@ -3,7 +3,7 @@
//! [`File`]: File
use self::State::*;
use crate::fs::{asyncify, sys};
use crate::fs::asyncify;
use crate::io::blocking::Buf;
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
use crate::sync::Mutex;
@ -19,6 +19,19 @@ use std::task::Context;
use std::task::Poll;
use std::task::Poll::*;
#[cfg(test)]
use super::mocks::spawn_blocking;
#[cfg(test)]
use super::mocks::JoinHandle;
#[cfg(test)]
use super::mocks::MockFile as StdFile;
#[cfg(not(test))]
use crate::blocking::spawn_blocking;
#[cfg(not(test))]
use crate::blocking::JoinHandle;
#[cfg(not(test))]
use std::fs::File as StdFile;
/// A reference to an open file on the filesystem.
///
/// This is a specialized version of [`std::fs::File`][std] for usage from the
@ -78,7 +91,7 @@ use std::task::Poll::*;
/// # }
/// ```
pub struct File {
std: Arc<sys::File>,
std: Arc<StdFile>,
inner: Mutex<Inner>,
}
@ -96,7 +109,7 @@ struct Inner {
#[derive(Debug)]
enum State {
Idle(Option<Buf>),
Busy(sys::Blocking<(Operation, Buf)>),
Busy(JoinHandle<(Operation, Buf)>),
}
#[derive(Debug)]
@ -142,7 +155,7 @@ impl File {
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let std = asyncify(|| sys::File::open(path)).await?;
let std = asyncify(|| StdFile::open(path)).await?;
Ok(File::from_std(std))
}
@ -182,7 +195,7 @@ impl File {
/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
let path = path.as_ref().to_owned();
let std_file = asyncify(move || sys::File::create(path)).await?;
let std_file = asyncify(move || StdFile::create(path)).await?;
Ok(File::from_std(std_file))
}
@ -199,7 +212,7 @@ impl File {
/// let std_file = std::fs::File::open("foo.txt").unwrap();
/// let file = tokio::fs::File::from_std(std_file);
/// ```
pub fn from_std(std: sys::File) -> File {
pub fn from_std(std: StdFile) -> File {
File {
std: Arc::new(std),
inner: Mutex::new(Inner {
@ -323,7 +336,7 @@ impl File {
let std = self.std.clone();
inner.state = Busy(sys::run(move || {
inner.state = Busy(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
} else {
@ -409,7 +422,7 @@ impl File {
/// # Ok(())
/// # }
/// ```
pub async fn into_std(mut self) -> sys::File {
pub async fn into_std(mut self) -> StdFile {
self.inner.get_mut().complete_inflight().await;
Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
}
@ -434,7 +447,7 @@ impl File {
/// # Ok(())
/// # }
/// ```
pub fn try_into_std(mut self) -> Result<sys::File, Self> {
pub fn try_into_std(mut self) -> Result<StdFile, Self> {
match Arc::try_unwrap(self.std) {
Ok(file) => Ok(file),
Err(std_file_arc) => {
@ -502,7 +515,7 @@ impl AsyncRead for File {
buf.ensure_capacity_for(dst);
let std = me.std.clone();
inner.state = Busy(sys::run(move || {
inner.state = Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
@ -569,7 +582,7 @@ impl AsyncSeek for File {
let std = me.std.clone();
inner.state = Busy(sys::run(move || {
inner.state = Busy(spawn_blocking(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
@ -636,7 +649,7 @@ impl AsyncWrite for File {
let n = buf.copy_from(src);
let std = me.std.clone();
inner.state = Busy(sys::run(move || {
inner.state = Busy(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
} else {
@ -685,8 +698,8 @@ impl AsyncWrite for File {
}
}
impl From<sys::File> for File {
fn from(std: sys::File) -> Self {
impl From<StdFile> for File {
fn from(std: StdFile) -> Self {
Self::from_std(std)
}
}
@ -709,7 +722,7 @@ impl std::os::unix::io::AsRawFd for File {
#[cfg(unix)]
impl std::os::unix::io::FromRawFd for File {
unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
sys::File::from_raw_fd(fd).into()
StdFile::from_raw_fd(fd).into()
}
}
@ -723,7 +736,7 @@ impl std::os::windows::io::AsRawHandle for File {
#[cfg(windows)]
impl std::os::windows::io::FromRawHandle for File {
unsafe fn from_raw_handle(handle: std::os::windows::io::RawHandle) -> Self {
sys::File::from_raw_handle(handle).into()
StdFile::from_raw_handle(handle).into()
}
}
@ -756,3 +769,6 @@ impl Inner {
}
}
}
#[cfg(test)]
mod tests;

View File

@ -1,80 +1,21 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}
#[macro_export]
macro_rules! cfg_fs {
($($item:item)*) => { $($item)* }
}
#[macro_export]
macro_rules! cfg_io_std {
($($item:item)*) => { $($item)* }
}
use futures::future;
// Load source
#[allow(warnings)]
#[path = "../src/fs/file.rs"]
mod file;
use file::File;
#[allow(warnings)]
#[path = "../src/io/blocking.rs"]
mod blocking;
// Load mocked types
mod support {
pub(crate) mod mock_file;
pub(crate) mod mock_pool;
}
pub(crate) use support::mock_pool as pool;
// Place them where the source expects them
pub(crate) mod io {
pub(crate) use tokio::io::*;
pub(crate) use crate::blocking;
pub(crate) mod sys {
pub(crate) use crate::support::mock_pool::{run, Blocking};
}
}
pub(crate) mod fs {
pub(crate) mod sys {
pub(crate) use crate::support::mock_file::File;
pub(crate) use crate::support::mock_pool::{run, Blocking};
}
pub(crate) use crate::support::mock_pool::asyncify;
}
pub(crate) mod sync {
pub(crate) use tokio::sync::Mutex;
}
use fs::sys;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
use std::io::SeekFrom;
use super::*;
use crate::{
fs::mocks::*,
io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use mockall::{predicate::eq, Sequence};
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
const HELLO: &[u8] = b"hello world...";
const FOO: &[u8] = b"foo bar baz...";
#[test]
fn open_read() {
let (mock, file) = sys::File::mock();
mock.read(HELLO);
let mut file = MockFile::default();
file.expect_inner_read().once().returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
let mut buf = [0; 1024];
@ -83,12 +24,10 @@ fn open_read() {
assert_eq!(0, pool::len());
assert_pending!(t.poll());
assert_eq!(1, mock.remaining());
assert_eq!(1, pool::len());
pool::run_one();
assert_eq!(0, mock.remaining());
assert!(t.is_woken());
let n = assert_ready_ok!(t.poll());
@ -98,9 +37,11 @@ fn open_read() {
#[test]
fn read_twice_before_dispatch() {
let (mock, file) = sys::File::mock();
mock.read(HELLO);
let mut file = MockFile::default();
file.expect_inner_read().once().returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
let mut buf = [0; 1024];
@ -120,8 +61,11 @@ fn read_twice_before_dispatch() {
#[test]
fn read_with_smaller_buf() {
let (mock, file) = sys::File::mock();
mock.read(HELLO);
let mut file = MockFile::default();
file.expect_inner_read().once().returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
@ -153,8 +97,22 @@ fn read_with_smaller_buf() {
#[test]
fn read_with_bigger_buf() {
let (mock, file) = sys::File::mock();
mock.read(&HELLO[..4]).read(&HELLO[4..]);
let mut seq = Sequence::new();
let mut file = MockFile::default();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..4].copy_from_slice(&HELLO[..4]);
Ok(4)
});
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len() - 4].copy_from_slice(&HELLO[4..]);
Ok(HELLO.len() - 4)
});
let mut file = File::from_std(file);
@ -194,8 +152,19 @@ fn read_with_bigger_buf() {
#[test]
fn read_err_then_read_success() {
let (mock, file) = sys::File::mock();
mock.read_err().read(&HELLO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(io::ErrorKind::Other.into()));
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
@ -225,8 +194,11 @@ fn read_err_then_read_success() {
#[test]
fn open_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO);
let mut file = MockFile::default();
file.expect_inner_write()
.once()
.with(eq(HELLO))
.returning(|buf| Ok(buf.len()));
let mut file = File::from_std(file);
@ -235,12 +207,10 @@ fn open_write() {
assert_eq!(0, pool::len());
assert_ready_ok!(t.poll());
assert_eq!(1, mock.remaining());
assert_eq!(1, pool::len());
pool::run_one();
assert_eq!(0, mock.remaining());
assert!(!t.is_woken());
let mut t = task::spawn(file.flush());
@ -249,7 +219,7 @@ fn open_write() {
#[test]
fn flush_while_idle() {
let (_mock, file) = sys::File::mock();
let file = MockFile::default();
let mut file = File::from_std(file);
@ -271,13 +241,42 @@ fn read_with_buffer_larger_than_max() {
for i in 0..(chunk_d - 1) {
data.push((i % 151) as u8);
}
let data = Arc::new(data);
let d0 = data.clone();
let d1 = data.clone();
let d2 = data.clone();
let d3 = data.clone();
let (mock, file) = sys::File::mock();
mock.read(&data[0..chunk_a])
.read(&data[chunk_a..chunk_b])
.read(&data[chunk_b..chunk_c])
.read(&data[chunk_c..]);
let mut seq = Sequence::new();
let mut file = MockFile::default();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(move |buf| {
buf[0..chunk_a].copy_from_slice(&d0[0..chunk_a]);
Ok(chunk_a)
});
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(move |buf| {
buf[..chunk_a].copy_from_slice(&d1[chunk_a..chunk_b]);
Ok(chunk_b - chunk_a)
});
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(move |buf| {
buf[..chunk_a].copy_from_slice(&d2[chunk_b..chunk_c]);
Ok(chunk_c - chunk_b)
});
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(move |buf| {
buf[..chunk_a - 1].copy_from_slice(&d3[chunk_c..]);
Ok(chunk_a - 1)
});
let mut file = File::from_std(file);
let mut actual = vec![0; chunk_d];
@ -296,8 +295,7 @@ fn read_with_buffer_larger_than_max() {
pos += n;
}
assert_eq!(mock.remaining(), 0);
assert_eq!(data, &actual[..data.len()]);
assert_eq!(&data[..], &actual[..data.len()]);
}
#[test]
@ -314,12 +312,34 @@ fn write_with_buffer_larger_than_max() {
for i in 0..(chunk_d - 1) {
data.push((i % 151) as u8);
}
let data = Arc::new(data);
let d0 = data.clone();
let d1 = data.clone();
let d2 = data.clone();
let d3 = data.clone();
let (mock, file) = sys::File::mock();
mock.write(&data[0..chunk_a])
.write(&data[chunk_a..chunk_b])
.write(&data[chunk_b..chunk_c])
.write(&data[chunk_c..]);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.withf(move |buf| buf == &d0[0..chunk_a])
.returning(|buf| Ok(buf.len()));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.withf(move |buf| buf == &d1[chunk_a..chunk_b])
.returning(|buf| Ok(buf.len()));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.withf(move |buf| buf == &d2[chunk_b..chunk_c])
.returning(|buf| Ok(buf.len()));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.withf(move |buf| buf == &d3[chunk_c..chunk_d - 1])
.returning(|buf| Ok(buf.len()));
let mut file = File::from_std(file);
@ -344,14 +364,22 @@ fn write_with_buffer_larger_than_max() {
}
pool::run_one();
assert_eq!(mock.remaining(), 0);
}
#[test]
fn write_twice_before_dispatch() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).write(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|buf| Ok(buf.len()));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(FOO))
.returning(|buf| Ok(buf.len()));
let mut file = File::from_std(file);
@ -380,10 +408,24 @@ fn write_twice_before_dispatch() {
#[test]
fn incomplete_read_followed_by_write() {
let (mock, file) = sys::File::mock();
mock.read(HELLO)
.seek_current_ok(-(HELLO.len() as i64), 0)
.write(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
file.expect_inner_seek()
.once()
.with(eq(SeekFrom::Current(-(HELLO.len() as i64))))
.in_sequence(&mut seq)
.returning(|_| Ok(0));
file.expect_inner_write()
.once()
.with(eq(FOO))
.returning(|_| Ok(FOO.len()));
let mut file = File::from_std(file);
@ -406,8 +448,25 @@ fn incomplete_read_followed_by_write() {
#[test]
fn incomplete_partial_read_followed_by_write() {
let (mock, file) = sys::File::mock();
mock.read(HELLO).seek_current_ok(-10, 0).write(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
file.expect_inner_seek()
.once()
.in_sequence(&mut seq)
.with(eq(SeekFrom::Current(-10)))
.returning(|_| Ok(0));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(FOO))
.returning(|_| Ok(FOO.len()));
let mut file = File::from_std(file);
@ -433,10 +492,25 @@ fn incomplete_partial_read_followed_by_write() {
#[test]
fn incomplete_read_followed_by_flush() {
let (mock, file) = sys::File::mock();
mock.read(HELLO)
.seek_current_ok(-(HELLO.len() as i64), 0)
.write(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
file.expect_inner_seek()
.once()
.in_sequence(&mut seq)
.with(eq(SeekFrom::Current(-(HELLO.len() as i64))))
.returning(|_| Ok(0));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(FOO))
.returning(|_| Ok(FOO.len()));
let mut file = File::from_std(file);
@ -458,8 +532,18 @@ fn incomplete_read_followed_by_flush() {
#[test]
fn incomplete_flush_followed_by_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).write(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|_| Ok(HELLO.len()));
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(FOO))
.returning(|_| Ok(FOO.len()));
let mut file = File::from_std(file);
@ -484,8 +568,10 @@ fn incomplete_flush_followed_by_write() {
#[test]
fn read_err() {
let (mock, file) = sys::File::mock();
mock.read_err();
let mut file = MockFile::default();
file.expect_inner_read()
.once()
.returning(|_| Err(io::ErrorKind::Other.into()));
let mut file = File::from_std(file);
@ -502,8 +588,10 @@ fn read_err() {
#[test]
fn write_write_err() {
let (mock, file) = sys::File::mock();
mock.write_err();
let mut file = MockFile::default();
file.expect_inner_write()
.once()
.returning(|_| Err(io::ErrorKind::Other.into()));
let mut file = File::from_std(file);
@ -518,8 +606,19 @@ fn write_write_err() {
#[test]
fn write_read_write_err() {
let (mock, file) = sys::File::mock();
mock.write_err().read(HELLO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(io::ErrorKind::Other.into()));
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
@ -541,8 +640,19 @@ fn write_read_write_err() {
#[test]
fn write_read_flush_err() {
let (mock, file) = sys::File::mock();
mock.write_err().read(HELLO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(io::ErrorKind::Other.into()));
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
let mut file = File::from_std(file);
@ -564,8 +674,17 @@ fn write_read_flush_err() {
#[test]
fn write_seek_write_err() {
let (mock, file) = sys::File::mock();
mock.write_err().seek_start_ok(0);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(io::ErrorKind::Other.into()));
file.expect_inner_seek()
.once()
.with(eq(SeekFrom::Start(0)))
.in_sequence(&mut seq)
.returning(|_| Ok(0));
let mut file = File::from_std(file);
@ -587,8 +706,17 @@ fn write_seek_write_err() {
#[test]
fn write_seek_flush_err() {
let (mock, file) = sys::File::mock();
mock.write_err().seek_start_ok(0);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.returning(|_| Err(io::ErrorKind::Other.into()));
file.expect_inner_seek()
.once()
.with(eq(SeekFrom::Start(0)))
.in_sequence(&mut seq)
.returning(|_| Ok(0));
let mut file = File::from_std(file);
@ -610,8 +738,14 @@ fn write_seek_flush_err() {
#[test]
fn sync_all_ordered_after_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).sync_all();
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|_| Ok(HELLO.len()));
file.expect_sync_all().once().returning(|| Ok(()));
let mut file = File::from_std(file);
let mut t = task::spawn(file.write(HELLO));
@ -635,8 +769,16 @@ fn sync_all_ordered_after_write() {
#[test]
fn sync_all_err_ordered_after_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).sync_all_err();
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|_| Ok(HELLO.len()));
file.expect_sync_all()
.once()
.returning(|| Err(io::ErrorKind::Other.into()));
let mut file = File::from_std(file);
let mut t = task::spawn(file.write(HELLO));
@ -660,8 +802,14 @@ fn sync_all_err_ordered_after_write() {
#[test]
fn sync_data_ordered_after_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).sync_data();
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|_| Ok(HELLO.len()));
file.expect_sync_data().once().returning(|| Ok(()));
let mut file = File::from_std(file);
let mut t = task::spawn(file.write(HELLO));
@ -685,8 +833,16 @@ fn sync_data_ordered_after_write() {
#[test]
fn sync_data_err_ordered_after_write() {
let (mock, file) = sys::File::mock();
mock.write(HELLO).sync_data_err();
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_write()
.once()
.in_sequence(&mut seq)
.with(eq(HELLO))
.returning(|_| Ok(HELLO.len()));
file.expect_sync_data()
.once()
.returning(|| Err(io::ErrorKind::Other.into()));
let mut file = File::from_std(file);
let mut t = task::spawn(file.write(HELLO));
@ -710,17 +866,15 @@ fn sync_data_err_ordered_after_write() {
#[test]
fn open_set_len_ok() {
let (mock, file) = sys::File::mock();
mock.set_len(123);
let mut file = MockFile::default();
file.expect_set_len().with(eq(123)).returning(|_| Ok(()));
let file = File::from_std(file);
let mut t = task::spawn(file.set_len(123));
assert_pending!(t.poll());
assert_eq!(1, mock.remaining());
pool::run_one();
assert_eq!(0, mock.remaining());
assert!(t.is_woken());
assert_ready_ok!(t.poll());
@ -728,17 +882,17 @@ fn open_set_len_ok() {
#[test]
fn open_set_len_err() {
let (mock, file) = sys::File::mock();
mock.set_len_err(123);
let mut file = MockFile::default();
file.expect_set_len()
.with(eq(123))
.returning(|_| Err(io::ErrorKind::Other.into()));
let file = File::from_std(file);
let mut t = task::spawn(file.set_len(123));
assert_pending!(t.poll());
assert_eq!(1, mock.remaining());
pool::run_one();
assert_eq!(0, mock.remaining());
assert!(t.is_woken());
assert_ready_err!(t.poll());
@ -746,11 +900,32 @@ fn open_set_len_err() {
#[test]
fn partial_read_set_len_ok() {
let (mock, file) = sys::File::mock();
mock.read(HELLO)
.seek_current_ok(-14, 0)
.set_len(123)
.read(FOO);
let mut file = MockFile::default();
let mut seq = Sequence::new();
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..HELLO.len()].copy_from_slice(HELLO);
Ok(HELLO.len())
});
file.expect_inner_seek()
.once()
.with(eq(SeekFrom::Current(-(HELLO.len() as i64))))
.in_sequence(&mut seq)
.returning(|_| Ok(0));
file.expect_set_len()
.once()
.in_sequence(&mut seq)
.with(eq(123))
.returning(|_| Ok(()));
file.expect_inner_read()
.once()
.in_sequence(&mut seq)
.returning(|buf| {
buf[0..FOO.len()].copy_from_slice(FOO);
Ok(FOO.len())
});
let mut buf = [0; 32];
let mut file = File::from_std(file);

136
tokio/src/fs/mocks.rs Normal file
View File

@ -0,0 +1,136 @@
//! Mock version of std::fs::File;
use mockall::mock;
use crate::sync::oneshot;
use std::{
cell::RefCell,
collections::VecDeque,
fs::{Metadata, Permissions},
future::Future,
io::{self, Read, Seek, SeekFrom, Write},
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
mock! {
#[derive(Debug)]
pub File {
pub fn create(pb: PathBuf) -> io::Result<Self>;
// These inner_ methods exist because std::fs::File has two
// implementations for each of these methods: one on "&mut self" and
// one on "&&self". Defining both of those in terms of an inner_ method
// allows us to specify the expectation the same way, regardless of
// which method is used.
pub fn inner_flush(&self) -> io::Result<()>;
pub fn inner_read(&self, dst: &mut [u8]) -> io::Result<usize>;
pub fn inner_seek(&self, pos: SeekFrom) -> io::Result<u64>;
pub fn inner_write(&self, src: &[u8]) -> io::Result<usize>;
pub fn metadata(&self) -> io::Result<Metadata>;
pub fn open(pb: PathBuf) -> io::Result<Self>;
pub fn set_len(&self, size: u64) -> io::Result<()>;
pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()>;
pub fn sync_all(&self) -> io::Result<()>;
pub fn sync_data(&self) -> io::Result<()>;
pub fn try_clone(&self) -> io::Result<Self>;
}
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for File {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle;
}
#[cfg(windows)]
impl std::os::windows::io::FromRawHandle for File {
unsafe fn from_raw_handle(h: std::os::windows::io::RawHandle) -> Self;
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for File {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd;
}
#[cfg(unix)]
impl std::os::unix::io::FromRawFd for File {
unsafe fn from_raw_fd(h: std::os::unix::io::RawFd) -> Self;
}
}
impl Read for MockFile {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner_read(dst)
}
}
impl Read for &'_ MockFile {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
self.inner_read(dst)
}
}
impl Seek for &'_ MockFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.inner_seek(pos)
}
}
impl Write for &'_ MockFile {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
self.inner_write(src)
}
fn flush(&mut self) -> io::Result<()> {
self.inner_flush()
}
}
thread_local! {
static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
}
#[derive(Debug)]
pub(super) struct JoinHandle<T> {
rx: oneshot::Receiver<T>,
}
pub(super) fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let task = Box::new(move || {
let _ = tx.send(f());
});
QUEUE.with(|cell| cell.borrow_mut().push_back(task));
JoinHandle { rx }
}
impl<T> Future for JoinHandle<T> {
type Output = Result<T, io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::task::Poll::*;
match Pin::new(&mut self.rx).poll(cx) {
Ready(Ok(v)) => Ready(Ok(v)),
Ready(Err(e)) => panic!("error = {:?}", e),
Pending => Pending,
}
}
}
pub(super) mod pool {
use super::*;
pub(in super::super) fn len() -> usize {
QUEUE.with(|cell| cell.borrow().len())
}
pub(in super::super) fn run_one() {
let task = QUEUE
.with(|cell| cell.borrow_mut().pop_front())
.expect("expected task to run, but none ready");
task();
}
}

View File

@ -84,6 +84,9 @@ pub use self::write::write;
mod copy;
pub use self::copy::copy;
#[cfg(test)]
mod mocks;
feature! {
#![unix]
@ -103,12 +106,17 @@ feature! {
use std::io;
#[cfg(not(test))]
use crate::blocking::spawn_blocking;
#[cfg(test)]
use mocks::spawn_blocking;
pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
where
F: FnOnce() -> io::Result<T> + Send + 'static,
T: Send + 'static,
{
match sys::run(f).await {
match spawn_blocking(f).await {
Ok(res) => res,
Err(_) => Err(io::Error::new(
io::ErrorKind::Other,
@ -116,12 +124,3 @@ where
)),
}
}
/// Types in this module can be mocked out in tests.
mod sys {
pub(crate) use std::fs::File;
// TODO: don't rename
pub(crate) use crate::blocking::spawn_blocking as run;
pub(crate) use crate::blocking::JoinHandle as Blocking;
}

View File

@ -3,6 +3,13 @@ use crate::fs::{asyncify, File};
use std::io;
use std::path::Path;
#[cfg(test)]
mod mock_open_options;
#[cfg(test)]
use mock_open_options::MockOpenOptions as StdOpenOptions;
#[cfg(not(test))]
use std::fs::OpenOptions as StdOpenOptions;
/// Options and flags which can be used to configure how a file is opened.
///
/// This builder exposes the ability to configure how a [`File`] is opened and
@ -69,7 +76,7 @@ use std::path::Path;
/// }
/// ```
#[derive(Clone, Debug)]
pub struct OpenOptions(std::fs::OpenOptions);
pub struct OpenOptions(StdOpenOptions);
impl OpenOptions {
/// Creates a blank new set of options ready for configuration.
@ -89,7 +96,7 @@ impl OpenOptions {
/// let future = options.read(true).open("foo.txt");
/// ```
pub fn new() -> OpenOptions {
OpenOptions(std::fs::OpenOptions::new())
OpenOptions(StdOpenOptions::new())
}
/// Sets the option for read access.
@ -384,7 +391,7 @@ impl OpenOptions {
}
/// Returns a mutable reference to the underlying `std::fs::OpenOptions`
pub(super) fn as_inner_mut(&mut self) -> &mut std::fs::OpenOptions {
pub(super) fn as_inner_mut(&mut self) -> &mut StdOpenOptions {
&mut self.0
}
}
@ -645,8 +652,8 @@ feature! {
}
}
impl From<std::fs::OpenOptions> for OpenOptions {
fn from(options: std::fs::OpenOptions) -> OpenOptions {
impl From<StdOpenOptions> for OpenOptions {
fn from(options: StdOpenOptions) -> OpenOptions {
OpenOptions(options)
}
}

View File

@ -0,0 +1,38 @@
//! Mock version of std::fs::OpenOptions;
use mockall::mock;
use crate::fs::mocks::MockFile;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(windows)]
use std::os::windows::fs::OpenOptionsExt;
use std::{io, path::Path};
mock! {
#[derive(Debug)]
pub OpenOptions {
pub fn append(&mut self, append: bool) -> &mut Self;
pub fn create(&mut self, create: bool) -> &mut Self;
pub fn create_new(&mut self, create_new: bool) -> &mut Self;
pub fn open<P: AsRef<Path> + 'static>(&self, path: P) -> io::Result<MockFile>;
pub fn read(&mut self, read: bool) -> &mut Self;
pub fn truncate(&mut self, truncate: bool) -> &mut Self;
pub fn write(&mut self, write: bool) -> &mut Self;
}
impl Clone for OpenOptions {
fn clone(&self) -> Self;
}
#[cfg(unix)]
impl OpenOptionsExt for OpenOptions {
fn custom_flags(&mut self, flags: i32) -> &mut Self;
fn mode(&mut self, mode: u32) -> &mut Self;
}
#[cfg(windows)]
impl OpenOptionsExt for OpenOptions {
fn access_mode(&mut self, access: u32) -> &mut Self;
fn share_mode(&mut self, val: u32) -> &mut Self;
fn custom_flags(&mut self, flags: u32) -> &mut Self;
fn attributes(&mut self, val: u32) -> &mut Self;
fn security_qos_flags(&mut self, flags: u32) -> &mut Self;
}
}

View File

@ -1,4 +1,4 @@
use crate::fs::{asyncify, sys};
use crate::fs::asyncify;
use std::ffi::OsString;
use std::fs::{FileType, Metadata};
@ -10,6 +10,15 @@ use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
#[cfg(test)]
use super::mocks::spawn_blocking;
#[cfg(test)]
use super::mocks::JoinHandle;
#[cfg(not(test))]
use crate::blocking::spawn_blocking;
#[cfg(not(test))]
use crate::blocking::JoinHandle;
/// Returns a stream over the entries within a directory.
///
/// This is an async version of [`std::fs::read_dir`](std::fs::read_dir)
@ -50,7 +59,7 @@ pub struct ReadDir(State);
#[derive(Debug)]
enum State {
Idle(Option<std::fs::ReadDir>),
Pending(sys::Blocking<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
Pending(JoinHandle<(Option<io::Result<std::fs::DirEntry>>, std::fs::ReadDir)>),
}
impl ReadDir {
@ -88,7 +97,7 @@ impl ReadDir {
State::Idle(ref mut std) => {
let mut std = std.take().unwrap();
self.0 = State::Pending(sys::run(move || {
self.0 = State::Pending(spawn_blocking(move || {
let ret = std.next();
(ret, std)
}));

View File

@ -1,295 +0,0 @@
#![allow(clippy::unnecessary_operation)]
use std::collections::VecDeque;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::io;
use std::io::prelude::*;
use std::io::SeekFrom;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
pub struct File {
shared: Arc<Mutex<Shared>>,
}
pub struct Handle {
shared: Arc<Mutex<Shared>>,
}
struct Shared {
calls: VecDeque<Call>,
}
#[derive(Debug)]
enum Call {
Read(io::Result<Vec<u8>>),
Write(io::Result<Vec<u8>>),
Seek(SeekFrom, io::Result<u64>),
SyncAll(io::Result<()>),
SyncData(io::Result<()>),
SetLen(u64, io::Result<()>),
}
impl Handle {
pub fn read(&self, data: &[u8]) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls.push_back(Call::Read(Ok(data.to_owned())));
self
}
pub fn read_err(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::Read(Err(io::ErrorKind::Other.into())));
self
}
pub fn write(&self, data: &[u8]) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls.push_back(Call::Write(Ok(data.to_owned())));
self
}
pub fn write_err(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::Write(Err(io::ErrorKind::Other.into())));
self
}
pub fn seek_start_ok(&self, offset: u64) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::Seek(SeekFrom::Start(offset), Ok(offset)));
self
}
pub fn seek_current_ok(&self, offset: i64, ret: u64) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::Seek(SeekFrom::Current(offset), Ok(ret)));
self
}
pub fn sync_all(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls.push_back(Call::SyncAll(Ok(())));
self
}
pub fn sync_all_err(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::SyncAll(Err(io::ErrorKind::Other.into())));
self
}
pub fn sync_data(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls.push_back(Call::SyncData(Ok(())));
self
}
pub fn sync_data_err(&self) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::SyncData(Err(io::ErrorKind::Other.into())));
self
}
pub fn set_len(&self, size: u64) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls.push_back(Call::SetLen(size, Ok(())));
self
}
pub fn set_len_err(&self, size: u64) -> &Self {
let mut s = self.shared.lock().unwrap();
s.calls
.push_back(Call::SetLen(size, Err(io::ErrorKind::Other.into())));
self
}
pub fn remaining(&self) -> usize {
let s = self.shared.lock().unwrap();
s.calls.len()
}
}
impl Drop for Handle {
fn drop(&mut self) {
if !std::thread::panicking() {
let s = self.shared.lock().unwrap();
assert_eq!(0, s.calls.len());
}
}
}
impl File {
pub fn open(_: PathBuf) -> io::Result<File> {
unimplemented!();
}
pub fn create(_: PathBuf) -> io::Result<File> {
unimplemented!();
}
pub fn mock() -> (Handle, File) {
let shared = Arc::new(Mutex::new(Shared {
calls: VecDeque::new(),
}));
let handle = Handle {
shared: shared.clone(),
};
let file = File { shared };
(handle, file)
}
pub fn sync_all(&self) -> io::Result<()> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(SyncAll(ret)) => ret,
Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
None => panic!("did not expect call"),
}
}
pub fn sync_data(&self) -> io::Result<()> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(SyncData(ret)) => ret,
Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
None => panic!("did not expect call"),
}
}
pub fn set_len(&self, size: u64) -> io::Result<()> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(SetLen(arg, ret)) => {
assert_eq!(arg, size);
ret
}
Some(op) => panic!("expected next call to be {:?}; was sync_all", op),
None => panic!("did not expect call"),
}
}
pub fn metadata(&self) -> io::Result<Metadata> {
unimplemented!();
}
pub fn set_permissions(&self, _perm: Permissions) -> io::Result<()> {
unimplemented!();
}
pub fn try_clone(&self) -> io::Result<Self> {
unimplemented!();
}
}
impl Read for &'_ File {
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(Read(Ok(data))) => {
assert!(dst.len() >= data.len());
assert!(dst.len() <= 16 * 1024, "actual = {}", dst.len()); // max buffer
dst[..data.len()].copy_from_slice(&data);
Ok(data.len())
}
Some(Read(Err(e))) => Err(e),
Some(op) => panic!("expected next call to be {:?}; was a read", op),
None => panic!("did not expect call"),
}
}
}
impl Write for &'_ File {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(Write(Ok(data))) => {
assert_eq!(src, &data[..]);
Ok(src.len())
}
Some(Write(Err(e))) => Err(e),
Some(op) => panic!("expected next call to be {:?}; was write", op),
None => panic!("did not expect call"),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Seek for &'_ File {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
use self::Call::*;
let mut s = self.shared.lock().unwrap();
match s.calls.pop_front() {
Some(Seek(expect, res)) => {
assert_eq!(expect, pos);
res
}
Some(op) => panic!("expected call {:?}; was `seek`", op),
None => panic!("did not expect call; was `seek`"),
}
}
}
impl fmt::Debug for File {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("mock::File").finish()
}
}
#[cfg(unix)]
impl std::os::unix::io::AsRawFd for File {
fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
unimplemented!();
}
}
#[cfg(unix)]
impl std::os::unix::io::FromRawFd for File {
unsafe fn from_raw_fd(_: std::os::unix::io::RawFd) -> Self {
unimplemented!();
}
}
#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for File {
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
unimplemented!();
}
}
#[cfg(windows)]
impl std::os::windows::io::FromRawHandle for File {
unsafe fn from_raw_handle(_: std::os::windows::io::RawHandle) -> Self {
unimplemented!();
}
}

View File

@ -1,66 +0,0 @@
use tokio::sync::oneshot;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
thread_local! {
static QUEUE: RefCell<VecDeque<Box<dyn FnOnce() + Send>>> = RefCell::new(VecDeque::new())
}
#[derive(Debug)]
pub(crate) struct Blocking<T> {
rx: oneshot::Receiver<T>,
}
pub(crate) fn run<F, R>(f: F) -> Blocking<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let task = Box::new(move || {
let _ = tx.send(f());
});
QUEUE.with(|cell| cell.borrow_mut().push_back(task));
Blocking { rx }
}
impl<T> Future for Blocking<T> {
type Output = Result<T, io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::task::Poll::*;
match Pin::new(&mut self.rx).poll(cx) {
Ready(Ok(v)) => Ready(Ok(v)),
Ready(Err(e)) => panic!("error = {:?}", e),
Pending => Pending,
}
}
}
pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
where
F: FnOnce() -> io::Result<T> + Send + 'static,
T: Send + 'static,
{
run(f).await?
}
pub(crate) fn len() -> usize {
QUEUE.with(|cell| cell.borrow().len())
}
pub(crate) fn run_one() {
let task = QUEUE
.with(|cell| cell.borrow_mut().pop_front())
.expect("expected task to run, but none ready");
task();
}