io: add AsyncSeek trait (#1924)

Co-authored-by: Taiki Endo <te316e89@gmail.com>
This commit is contained in:
Michael Howell 2019-12-10 22:48:24 -07:00 committed by Carl Lerche
parent 975576952f
commit 24cd6d67f7
7 changed files with 280 additions and 61 deletions

View File

@ -5,7 +5,7 @@
use self::State::*;
use crate::fs::{asyncify, sys};
use crate::io::blocking::Buf;
use crate::io::{AsyncRead, AsyncWrite};
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite};
use std::fmt;
use std::fs::{Metadata, Permissions};
@ -176,63 +176,6 @@ impl File {
}
}
/// Seek to an offset, in bytes, in a stream.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// use std::io::SeekFrom;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
/// file.seek(SeekFrom::Start(6)).await?;
///
/// let mut contents = vec![0u8; 10];
/// file.read_exact(&mut contents).await?;
/// # Ok(())
/// # }
/// ```
pub async fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
self.complete_inflight().await;
let mut buf = match self.state {
Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
_ => unreachable!(),
};
// Factor in any unread data from the buf
if !buf.is_empty() {
let n = buf.discard_read();
if let SeekFrom::Current(ref mut offset) = pos {
*offset += n;
}
}
let std = self.std.clone();
// Start the operation
self.state = Busy(sys::run(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
let (op, buf) = match self.state {
Idle(_) => unreachable!(),
Busy(ref mut rx) => rx.await.unwrap(),
};
self.state = Idle(Some(buf));
match op {
Operation::Seek(res) => res,
_ => unreachable!(),
}
}
/// Attempts to sync all OS-internal metadata to disk.
///
/// This function will attempt to ensure that all in-core data reaches the
@ -548,6 +491,82 @@ impl AsyncRead for File {
}
}
impl AsyncSeek for File {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut pos: SeekFrom,
) -> Poll<io::Result<()>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
}
loop {
match self.state {
Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
// Factor in any unread data from the buf
if !buf.is_empty() {
let n = buf.discard_read();
if let SeekFrom::Current(ref mut offset) = pos {
*offset += n;
}
}
let std = self.std.clone();
self.state = Busy(sys::run(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
return Ready(Ok(()));
}
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(_) => {}
}
}
}
}
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
}
loop {
match self.state {
Idle(_) => panic!("must call start_seek before calling poll_complete"),
Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
match op {
Operation::Read(_) => {}
Operation::Write(Err(e)) => {
self.last_write_err = Some(e.kind());
}
Operation::Write(_) => {}
Operation::Seek(res) => return Ready(res),
}
}
}
}
}
}
impl AsyncWrite for File {
fn poll_write(
mut self: Pin<&mut Self>,

View File

@ -0,0 +1,97 @@
use std::io::{self, SeekFrom};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Seek bytes asynchronously.
///
/// This trait is analogous to the `std::io::Seek` trait, but integrates
/// with the asynchronous task system. In particular, the `start_seek`
/// method, unlike `Seek::seek`, will not block the calling thread.
pub trait AsyncSeek {
/// Attempt to seek to an offset, in bytes, in a stream.
///
/// A seek beyond the end of a stream is allowed, but behavior is defined
/// by the implementation.
///
/// If this function returns successfully, then the job has been submitted.
/// To find out when it completes, call `poll_complete`.
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
position: SeekFrom,
) -> Poll<io::Result<()>>;
/// Wait for a seek operation to complete.
///
/// If the seek operation completed successfully,
/// this method returns the new position from the start of the stream.
/// That position can be used later with [`SeekFrom::Start`].
///
/// # Errors
///
/// Seeking to a negative offset is considered an error.
///
/// # Panics
///
/// Calling this method without calling `start_seek` first is an error.
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>>;
}
macro_rules! deref_async_seek {
() => {
fn start_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).start_seek(cx, pos)
}
fn poll_complete(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<u64>> {
Pin::new(&mut **self).poll_complete(cx)
}
}
}
impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for Box<T> {
deref_async_seek!();
}
impl<T: ?Sized + AsyncSeek + Unpin> AsyncSeek for &mut T {
deref_async_seek!();
}
impl<P> AsyncSeek for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncSeek,
{
fn start_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
self.get_mut().as_mut().start_seek(cx, pos)
}
fn poll_complete(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
self.get_mut().as_mut().poll_complete(cx)
}
}
impl<T: AsRef<[u8]> + Unpin> AsyncSeek for io::Cursor<T> {
fn start_seek(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<()>> {
Poll::Ready(io::Seek::seek(&mut *self, pos).map(drop))
}
fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.get_mut().position()))
}
}

View File

@ -164,6 +164,9 @@ pub use self::async_buf_read::AsyncBufRead;
mod async_read;
pub use self::async_read::AsyncRead;
mod async_seek;
pub use self::async_seek::AsyncSeek;
mod async_write;
pub use self::async_write::AsyncWrite;
@ -192,10 +195,13 @@ cfg_io_util! {
mod split;
pub use split::{split, ReadHalf, WriteHalf};
pub(crate) mod seek;
pub use self::seek::Seek;
pub(crate) mod util;
pub use util::{
copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader, BufStream,
BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
copy, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader,
BufStream, BufWriter, Copy, Empty, Lines, Repeat, Sink, Split, Take,
};
// Re-export io::Error so that users don't have to deal with conflicts when

56
tokio/src/io/seek.rs Normal file
View File

@ -0,0 +1,56 @@
use crate::io::AsyncSeek;
use std::future::Future;
use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Seek<'a, S: ?Sized> {
seek: &'a mut S,
pos: Option<SeekFrom>,
}
pub(crate) fn seek<S>(seek: &mut S, pos: SeekFrom) -> Seek<'_, S>
where
S: AsyncSeek + ?Sized + Unpin,
{
Seek {
seek,
pos: Some(pos),
}
}
impl<S> Future for Seek<'_, S>
where
S: AsyncSeek + ?Sized + Unpin,
{
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = &mut *self;
match me.pos {
Some(pos) => {
match Pin::new(&mut me.seek).start_seek(cx, pos) {
Poll::Ready(Ok(())) => me.pos = None,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => (),
};
Poll::Pending
}
None => Pin::new(&mut me.seek).poll_complete(cx),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn assert_unpin() {
use std::marker::PhantomPinned;
crate::is_unpin::<Seek<'_, PhantomPinned>>();
}
}

View File

@ -0,0 +1,38 @@
use crate::io::seek::{seek, Seek};
use crate::io::AsyncSeek;
use std::io::SeekFrom;
/// An extension trait which adds utility methods to `AsyncSeek` types.
pub trait AsyncSeekExt: AsyncSeek {
/// Creates a future which will seek an IO object, and then yield the
/// new position in the object and the object itself.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
/// # Examples
///
/// ```no_run
/// use tokio::fs::File;
/// use tokio::prelude::*;
///
/// use std::io::SeekFrom;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut file = File::open("foo.txt").await?;
/// file.seek(SeekFrom::Start(6)).await?;
///
/// let mut contents = vec![0u8; 10];
/// file.read_exact(&mut contents).await?;
/// # Ok(())
/// # }
/// ```
fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
where
Self: Unpin,
{
seek(self, pos)
}
}
impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

View File

@ -7,6 +7,9 @@ cfg_io_util! {
mod async_read_ext;
pub use async_read_ext::AsyncReadExt;
mod async_seek_ext;
pub use async_seek_ext::AsyncSeekExt;
mod async_write_ext;
pub use async_write_ext::AsyncWriteExt;

View File

@ -17,5 +17,5 @@ pub use crate::io::{self, AsyncBufRead, AsyncRead, AsyncWrite};
cfg_io_util! {
#[doc(no_inline)]
pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _};
pub use crate::io::{AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _};
}