Filesystem manipulation APIs. (#323)

This patch adds a new crate: tokio-fs. This crate provides a wrapper
around `std` functionality that can only be performed using blocking
operations. This primarily includes filesystem operations, but it also
includes standard input, output, and error access as these streams
cannot be safely switched to non-blocking mode in a portable way.

These wrappers call the `std` functions from within a `blocking`
annotation which allows the runtime to compensate for the fact that the
thread will potentially remain blocked in a system call.
This commit is contained in:
Carl Lerche 2018-05-02 11:19:58 -07:00 committed by GitHub
parent 7a2b5db15c
commit f768163982
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 727 additions and 1 deletions

View File

@ -24,6 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
members = [
"./",
"tokio-executor",
"tokio-fs",
"tokio-io",
"tokio-reactor",
"tokio-threadpool",
@ -45,6 +46,7 @@ tokio-threadpool = { version = "0.1.2", path = "tokio-threadpool" }
tokio-tcp = { version = "0.1.0", path = "tokio-tcp" }
tokio-udp = { version = "0.1.0", path = "tokio-udp" }
tokio-timer = { version = "0.2.1", path = "tokio-timer" }
tokio-fs = { version = "0.1.0", path = "tokio-fs" }
futures = "0.1.20"

13
src/fs.rs Normal file
View File

@ -0,0 +1,13 @@
//! Asynchronous filesystem manipulation operations.
//!
//! This module contains basic methods and types for manipulating the contents
//! of the local filesystem from within the context of the Tokio runtime.
//!
//! Unlike *most* other Tokio APIs, the filesystem APIs **must** be used from
//! the context of the Tokio runtime as they require Tokio specific features to
//! function.
pub use tokio_fs::{
file,
File,
};

View File

@ -8,6 +8,7 @@
//! * A [reactor][reactor] backed by the operating system's event queue (epoll, kqueue,
//! IOCP, etc...).
//! * Asynchronous [TCP and UDP][net] sockets.
//! * Asynchronous [filesystem][fs] operations.
//! * [Timer][timer] API for scheduling work in the future.
//!
//! Tokio is built using [futures] as the abstraction for managing the
@ -71,6 +72,7 @@ extern crate futures;
extern crate mio;
extern crate tokio_io;
extern crate tokio_executor;
extern crate tokio_fs;
extern crate tokio_reactor;
extern crate tokio_threadpool;
extern crate tokio_timer;
@ -81,6 +83,7 @@ extern crate tokio_udp;
extern crate futures2;
pub mod executor;
pub mod fs;
pub mod net;
pub mod reactor;
pub mod runtime;
@ -100,15 +103,35 @@ pub mod io {
//! defines two traits, [`AsyncRead`] and [`AsyncWrite`], which extend the
//! `Read` and `Write` traits of the standard library.
//!
//! # AsyncRead and AsyncWrite
//!
//! [`AsyncRead`] and [`AsyncWrite`] must only be implemented for
//! non-blocking I/O types that integrate with the futures type system. In
//! other words, these types must never block the thread, and instead the
//! current task is notified when the I/O resource is ready.
//!
//! # Standard input and output
//!
//! Tokio provides asynchronous APIs to standard [input], [output], and [error].
//! These APIs are very similar to the ones provided by `std`, but they also
//! implement [`AsyncRead`] and [`AsyncWrite`].
//!
//! Unlike *most* other Tokio APIs, the standard input / output APIs
//! **must** be used from the context of the Tokio runtime as they require
//! Tokio specific features to function.
//!
//! [input]: fn.stdin.html
//! [output]: fn.stdout.html
//! [error]: fn.stderr.html
//!
//! # Utility functions
//!
//! Utilities functions are provided for working with [`AsyncRead`] /
//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all
//! data from a source to a destination.
//!
//! # `std` re-exports
//!
//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and
//! [`Result`] are re-exported from `std::io` for ease of use.
//!
@ -126,6 +149,16 @@ pub mod io {
AsyncWrite,
};
// standard input, output, and error
pub use tokio_fs::{
stdin,
Stdin,
stdout,
Stdout,
stderr,
Stderr,
};
// Utils
pub use tokio_io::io::{
copy,

3
tokio-fs/CHANGELOG.md Normal file
View File

@ -0,0 +1,3 @@
# 0.1.0 (unreleased)
* Initial release

30
tokio-fs/Cargo.toml Normal file
View File

@ -0,0 +1,30 @@
[package]
name = "tokio-fs"
# When releasing to crates.io:
# - Update html_root_url.
# - Update CHANGELOG.md.
# - Create "v0.1.x" git tag.
version = "0.1.0"
authors = ["Carl Lerche <me@carllerche.com>"]
license = "MIT"
readme = "README.md"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
documentation = "https://docs.rs/tokio-fs/0.1"
description = """
Filesystem API for Tokio.
"""
keywords = ["tokio", "futures", "fs", "file", "async"]
categories = ["asynchronous", "network-programming", "filesystem"]
[dependencies]
futures = "0.1.21"
# TODO: Set real version
tokio-threadpool = { version = "0.1.1", path = "../tokio-threadpool" }
tokio-io = { version = "0.1.6", path = "../tokio-io" }
[dev-dependencies]
rand = "0.4.2"
tempdir = "0.3.7"
tokio-io = { version = "0.1.6", path = "../tokio-io" }

25
tokio-fs/LICENSE Normal file
View File

@ -0,0 +1,25 @@
Copyright (c) 2018 Tokio Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

19
tokio-fs/README.md Normal file
View File

@ -0,0 +1,19 @@
# Tokio FS
Asynchronous filesystem manipulation operations (and stdin, stdout, stderr).
[Documentation](https://tokio-rs.github.io/tokio/tokio_fs/)
## Overview
This crate provides filesystem manipulation facilities for usage with Tokio.
## License
This project is licensed under the [MIT license](LICENSE).
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
terms or conditions.

View File

@ -0,0 +1,47 @@
//! Echo everything received on STDIN to STDOUT.
extern crate futures;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_threadpool;
use tokio_fs::{stdin, stdout, stderr};
use tokio_io::codec::{FramedRead, FramedWrite, LinesCodec};
use tokio_threadpool::Builder;
use futures::{Future, Stream, Sink};
use std::io;
pub fn main() {
let pool = Builder::new()
.pool_size(1)
.build();
pool.spawn({
let input = FramedRead::new(stdin(), LinesCodec::new());
let output = FramedWrite::new(stdout(), LinesCodec::new())
.with(|line: String| {
let mut out = "OUT: ".to_string();
out.push_str(&line);
Ok::<_, io::Error>(out)
});
let error = FramedWrite::new(stderr(), LinesCodec::new())
.with(|line: String| {
let mut out = "ERR: ".to_string();
out.push_str(&line);
Ok::<_, io::Error>(out)
});
let dst = output.fanout(error);
input
.forward(dst)
.map(|_| ())
.map_err(|e| panic!("io error = {:?}", e))
});
pool.shutdown_on_idle().wait().unwrap();
}

View File

@ -0,0 +1,37 @@
use super::File;
use futures::{Future, Poll};
use std::fs::File as StdFile;
use std::io;
use std::path::Path;
/// Future returned by `File::create` and resolves to a `File` instance.
#[derive(Debug)]
pub struct CreateFuture<P> {
path: P,
}
impl<P> CreateFuture<P>
where P: AsRef<Path> + Send + 'static,
{
pub(crate) fn new(path: P) -> Self {
CreateFuture { path }
}
}
impl<P> Future for CreateFuture<P>
where P: AsRef<Path> + Send + 'static,
{
type Item = File;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let std = try_ready!(::blocking_io(|| {
StdFile::create(&self.path)
}));
let file = File::from_std(std);
Ok(file.into())
}
}

201
tokio-fs/src/file/mod.rs Normal file
View File

@ -0,0 +1,201 @@
//! Types for working with [`File`].
//!
//! [`File`]: struct.File.html
mod create;
mod open;
pub use self::create::CreateFuture;
pub use self::open::OpenFuture;
use tokio_io::{AsyncRead, AsyncWrite};
use futures::Poll;
use std::fs::{File as StdFile, Metadata, Permissions};
use std::io::{self, Read, Write, Seek};
use std::path::Path;
/// A reference to an open file on the filesystem.
///
/// This is a specialized version of [`std::fs::File`][std] for usage from the
/// Tokio runtime.
///
/// An instance of a `File` can be read and/or written depending on what options
/// it was opened with. Files also implement Seek to alter the logical cursor
/// that the file contains internally.
///
/// Files are automatically closed when they go out of scope.
///
/// [std]: https://doc.rust-lang.org/std/fs/struct.File.html
#[derive(Debug)]
pub struct File {
std: Option<StdFile>,
}
impl File {
/// Attempts to open a file in read-only mode.
///
/// # Errors
///
/// `OpenFuture` results in an error if called from outside of the Tokio
/// runtime or if the underlying [`open`] call results in an error.
///
/// [`open`]: https://doc.rust-lang.org/std/fs/struct.OpenOptions.html#method.open
pub fn open<P>(path: P) -> OpenFuture<P>
where P: AsRef<Path> + Send + 'static,
{
OpenFuture::new(path)
}
/// Opens a file in write-only mode.
///
/// This function will create a file if it does not exist, and will truncate
/// it if it does.
///
/// `CreateFuture` results in an error if called from outside of the Tokio
/// runtime or if the underlying [`create`] call results in an error.
///
/// [`open`]: https://doc.rust-lang.org/std/fs/struct.File.html#method.create
pub fn create<P>(path: P) -> CreateFuture<P>
where P: AsRef<Path> + Send + 'static,
{
CreateFuture::new(path)
}
/// Convert a [`std::fs::File`][std] to a `tokio_fs::File`.
///
/// [std]: https://doc.rust-lang.org/std/fs/struct.File.html
pub(crate) fn from_std(std: StdFile) -> File {
File { std: Some(std) }
}
/// Seek to an offset, in bytes, in a stream.
///
/// A seek beyond the end of a stream is allowed, but implementation
/// defined.
///
/// If the seek operation completed successfully, this method returns the
/// new position from the start of the stream. That position can be used
/// later with `SeekFrom::Start`.
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
pub fn poll_seek(&mut self, pos: io::SeekFrom) -> Poll<u64, io::Error> {
::blocking_io(|| self.std().seek(pos))
}
/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-core data reaches the
/// filesystem before returning.
pub fn poll_sync_all(&mut self) -> Poll<(), io::Error> {
::blocking_io(|| self.std().sync_all())
}
/// This function is similar to `poll_sync_all`, except that it may not
/// synchronize file metadata to the filesystem.
///
/// This is intended for use cases that must synchronize content, but don't
/// need the metadata on disk. The goal of this method is to reduce disk
/// operations.
///
/// Note that some platforms may simply implement this in terms of `poll_sync_all`.
pub fn poll_sync_data(&mut self) -> Poll<(), io::Error> {
::blocking_io(|| self.std().sync_data())
}
/// Truncates or extends the underlying file, updating the size of this file to become size.
///
/// If the size is less than the current file's size, then the file will be
/// shrunk. If it is greater than the current file's size, then the file
/// will be extended to size and have all of the intermediate data filled in
/// with 0s.
///
/// # Errors
///
/// This function will return an error if the file is not opened for
/// writing.
pub fn poll_set_len(&mut self, size: u64) -> Poll<(), io::Error> {
::blocking_io(|| self.std().set_len(size))
}
/// Queries metadata about the underlying file.
pub fn poll_metadata(&mut self) -> Poll<Metadata, io::Error> {
::blocking_io(|| self.std().metadata())
}
/// Create a new `File` instance that shares the same underlying file handle
/// as the existing `File` instance. Reads, writes, and seeks will affect both
/// File instances simultaneously.
pub fn poll_try_clone(&mut self) -> Poll<File, io::Error> {
::blocking_io(|| {
let std = self.std().try_clone()?;
Ok(File::from_std(std))
})
}
/// Changes the permissions on the underlying file.
///
/// # Platform-specific behavior
///
/// This function currently corresponds to the `fchmod` function on Unix and
/// the `SetFileInformationByHandle` function on Windows. Note that, this
/// [may change in the future][changes].
///
/// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
///
/// # Errors
///
/// This function will return an error if the user lacks permission change
/// attributes on the underlying file. It may also return an error in other
/// os-specific unspecified cases.
pub fn poll_set_permissions(&mut self, perm: Permissions) -> Poll<(), io::Error> {
::blocking_io(|| self.std().set_permissions(perm))
}
fn std(&mut self) -> &mut StdFile {
self.std.as_mut().expect("`File` instance already shutdown")
}
}
impl Read for File {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
::would_block(|| self.std().read(buf))
}
}
impl AsyncRead for File {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
}
impl Write for File {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
::would_block(|| self.std().write(buf))
}
fn flush(&mut self) -> io::Result<()> {
::would_block(|| self.std().flush())
}
}
impl AsyncWrite for File {
fn shutdown(&mut self) -> Poll<(), io::Error> {
::blocking_io(|| {
self.std = None;
Ok(())
})
}
}
impl Drop for File {
fn drop(&mut self) {
if let Some(_std) = self.std.take() {
// This is probably fine as closing a file *shouldn't* be a blocking
// operation. That said, ideally `shutdown` is called first.
}
}
}

37
tokio-fs/src/file/open.rs Normal file
View File

@ -0,0 +1,37 @@
use super::File;
use futures::{Future, Poll};
use std::fs::File as StdFile;
use std::io;
use std::path::Path;
/// Future returned by `File::open` and resolves to a `File` instance.
#[derive(Debug)]
pub struct OpenFuture<P> {
path: P,
}
impl<P> OpenFuture<P>
where P: AsRef<Path> + Send + 'static,
{
pub(crate) fn new(path: P) -> Self {
OpenFuture { path }
}
}
impl<P> Future for OpenFuture<P>
where P: AsRef<Path> + Send + 'static,
{
type Item = File;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let std = try_ready!(::blocking_io(|| {
StdFile::open(&self.path)
}));
let file = File::from_std(std);
Ok(file.into())
}
}

64
tokio-fs/src/lib.rs Normal file
View File

@ -0,0 +1,64 @@
//! Asynchronous filesystem manipulation operations (and stdin, stdout, stderr).
//!
//! This module contains basic methods and types for manipulating the contents
//! of the local filesystem from within the context of the Tokio runtime.
//!
//! Tasks running on the Tokio runtime are expected to be asynchronous, i.e.,
//! they will not block the thread of execution. Filesystem operations do not
//! satisfy this requirement. In order to perform filesystem operations
//! asynchronously, this library uses the [`blocking`][blocking] annotation
//! to signal to the runtime that a blocking operation is being performed. This
//! allows the runtime to compensate.
//!
//! [blocking]: https://docs.rs/tokio-threadpool/0.1/tokio_threadpool/fn.blocking.html
#[macro_use]
extern crate futures;
extern crate tokio_io;
extern crate tokio_threadpool;
pub mod file;
mod stdin;
mod stdout;
mod stderr;
pub use file::File;
pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout};
pub use stderr::{stderr, Stderr};
use futures::Poll;
use futures::Async::*;
use std::io;
use std::io::ErrorKind::{Other, WouldBlock};
fn blocking_io<F, T>(f: F) -> Poll<T, io::Error>
where F: FnOnce() -> io::Result<T>,
{
match tokio_threadpool::blocking(f) {
Ok(Ready(Ok(v))) => Ok(v.into()),
Ok(Ready(Err(err))) => Err(err),
Ok(NotReady) => Ok(NotReady),
Err(_) => Err(blocking_err()),
}
}
fn would_block<F, T>(f: F) -> io::Result<T>
where F: FnOnce() -> io::Result<T>,
{
match tokio_threadpool::blocking(f) {
Ok(Ready(Ok(v))) => Ok(v),
Ok(Ready(Err(err))) => {
debug_assert_ne!(err.kind(), WouldBlock);
Err(err)
}
Ok(NotReady) => Err(WouldBlock.into()),
Err(_) => Err(blocking_err()),
}
}
fn blocking_err() -> io::Error {
io::Error::new(Other, "tokio-fs::File::open must be called \
from the context of the Tokio runtime.")
}

45
tokio-fs/src/stderr.rs Normal file
View File

@ -0,0 +1,45 @@
use tokio_io::{AsyncWrite};
use futures::Poll;
use std::io::{self, Write, Stderr as StdStderr};
/// A handle to the standard error stream of a process.
///
/// The handle implements the [`AsyncWrite`] trait, but beware that concurrent
/// writes to `Stderr` must be executed with care.
///
/// Created by the [`stderr`] function.
///
/// [`stderr`]: fn.stderr.html
/// [`AsyncWrite`]: trait.AsyncWrite.html
#[derive(Debug)]
pub struct Stderr {
std: StdStderr,
}
/// Constructs a new handle to the standard error of the current process.
///
/// The returned handle allows writing to standard error from the within the
/// Tokio runtime.
pub fn stderr() -> Stderr {
let std = io::stderr();
Stderr { std }
}
impl Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
::would_block(|| self.std.write(buf))
}
fn flush(&mut self) -> io::Result<()> {
::would_block(|| self.std.flush())
}
}
impl AsyncWrite for Stderr {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}

38
tokio-fs/src/stdin.rs Normal file
View File

@ -0,0 +1,38 @@
use tokio_io::{AsyncRead};
use std::io::{self, Read, Stdin as StdStdin};
/// A handle to the standard input stream of a process.
///
/// The handle implements the [`AsyncRead`] trait, but beware that concurrent
/// reads of `Stdin` must be executed with care.
///
/// Created by the [`stdin`] function.
///
/// [`stdin`]: fn.stdin.html
/// [`AsyncRead`]: trait.AsyncRead.html
#[derive(Debug)]
pub struct Stdin {
std: StdStdin,
}
/// Constructs a new handle to the standard input of the current process.
///
/// The returned handle allows reading from standard input from the within the
/// Tokio runtime.
pub fn stdin() -> Stdin {
let std = io::stdin();
Stdin { std }
}
impl Read for Stdin {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
::would_block(|| self.std.read(buf))
}
}
impl AsyncRead for Stdin {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
false
}
}

44
tokio-fs/src/stdout.rs Normal file
View File

@ -0,0 +1,44 @@
use tokio_io::{AsyncWrite};
use futures::Poll;
use std::io::{self, Write, Stdout as StdStdout};
/// A handle to the standard output stream of a process.
///
/// The handle implements the [`AsyncWrite`] trait, but beware that concurrent
/// writes to `Stdout` must be executed with care.
///
/// Created by the [`stdout`] function.
///
/// [`stdout`]: fn.stdout.html
/// [`AsyncWrite`]: trait.AsyncWrite.html
#[derive(Debug)]
pub struct Stdout {
std: StdStdout,
}
/// Constructs a new handle to the standard output of the current process.
///
/// The returned handle allows writing to standard out from the within the Tokio
/// runtime.
pub fn stdout() -> Stdout {
let std = io::stdout();
Stdout { std }
}
impl Write for Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
::would_block(|| self.std.write(buf))
}
fn flush(&mut self) -> io::Result<()> {
::would_block(|| self.std.flush())
}
}
impl AsyncWrite for Stdout {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}

73
tokio-fs/tests/file.rs Normal file
View File

@ -0,0 +1,73 @@
extern crate futures;
extern crate rand;
extern crate tempdir;
extern crate tokio_fs;
extern crate tokio_io;
extern crate tokio_threadpool;
use tokio_fs::*;
use tokio_io::io;
use tokio_threadpool::*;
use futures::Future;
use futures::future::poll_fn;
use futures::sync::oneshot;
use rand::{thread_rng, Rng};
use tempdir::TempDir;
use std::fs::File as StdFile;
use std::io::Read;
#[test]
fn read_write() {
const NUM_CHARS: usize = 16 * 1_024;
let dir = TempDir::new("tokio-fs-tests").unwrap();
let file_path = dir.path().join("read_write.txt");
let contents: Vec<u8> = thread_rng().gen_ascii_chars()
.take(NUM_CHARS)
.collect::<String>()
.into();
let pool = Builder::new()
.pool_size(1)
.build();
let (tx, rx) = oneshot::channel();
pool.spawn({
let file_path = file_path.clone();
let contents = contents.clone();
File::create(file_path)
.and_then(move |file| io::write_all(file, contents))
.and_then(|(mut file, _)| {
poll_fn(move || file.poll_sync_all())
})
.then(|res| {
let _ = res.unwrap();
tx.send(()).unwrap();
Ok(())
})
});
rx.wait().unwrap();
let mut file = StdFile::open(&file_path).unwrap();
let mut dst = vec![];
file.read_to_end(&mut dst).unwrap();
assert_eq!(dst, contents);
pool.spawn({
File::open(file_path)
.and_then(|file| io::read_to_end(file, vec![]))
.then(move |res| {
let (_, buf) = res.unwrap();
assert_eq!(buf, contents);
Ok(())
})
});
}

View File

@ -2,6 +2,9 @@ use worker::Worker;
use futures::Poll;
use std::error::Error;
use std::fmt;
/// Error raised by `blocking`.
#[derive(Debug)]
pub struct BlockingError {
@ -124,7 +127,7 @@ where F: FnOnce() -> T,
};
// Transition the worker state to blocking. This will exit the fn early
// with `NotRead` if the pool does not have enough capacity to enter
// with `NotReady` if the pool does not have enough capacity to enter
// blocking mode.
worker.transition_to_blocking()
});
@ -146,3 +149,15 @@ where F: FnOnce() -> T,
// Return the result
Ok(ret.into())
}
impl fmt::Display for BlockingError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.description())
}
}
impl Error for BlockingError {
fn description(&self) -> &str {
"`blocking` annotation used from outside the context of a thread pool"
}
}