mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
tokio: Add io copy, read, and write (#1187)
This commit is contained in:
parent
9df1140340
commit
29e417c257
@ -11,13 +11,14 @@ jobs:
|
|||||||
# name: rustfmt
|
# name: rustfmt
|
||||||
|
|
||||||
# Test top level crate
|
# Test top level crate
|
||||||
# - template: ci/azure-test-stable.yml
|
- template: ci/azure-test-stable.yml
|
||||||
# parameters:
|
parameters:
|
||||||
# name: test_tokio
|
name: test_tokio
|
||||||
# displayName: Test tokio
|
rust: $(nightly)
|
||||||
# cross: true
|
displayName: Test tokio
|
||||||
# crates:
|
cross: true
|
||||||
# - tokio
|
crates:
|
||||||
|
- tokio
|
||||||
|
|
||||||
# Test crates that are platform specific
|
# Test crates that are platform specific
|
||||||
- template: ci/azure-test-stable.yml
|
- template: ci/azure-test-stable.yml
|
||||||
|
@ -85,6 +85,8 @@ tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" }
|
|||||||
tokio-uds = { version = "0.2.1", optional = true }
|
tokio-uds = { version = "0.2.1", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
tokio-test = { path = "../tokio-test" }
|
||||||
|
pin-utils = "0.1.0-alpha.4"
|
||||||
env_logger = { version = "0.5", default-features = false }
|
env_logger = { version = "0.5", default-features = false }
|
||||||
flate2 = { version = "1", features = ["tokio"] }
|
flate2 = { version = "1", features = ["tokio"] }
|
||||||
futures-cpupool = "0.1"
|
futures-cpupool = "0.1"
|
||||||
|
@ -40,7 +40,7 @@
|
|||||||
//! [`spawn`]: fn.spawn.html
|
//! [`spawn`]: fn.spawn.html
|
||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
pub use tokio_executor::{Executor, TypedExecutor, DefaultExecutor, SpawnError};
|
pub use tokio_executor::{DefaultExecutor, Executor, SpawnError, TypedExecutor};
|
||||||
|
|
||||||
/// Return value from the `spawn` function.
|
/// Return value from the `spawn` function.
|
||||||
///
|
///
|
||||||
@ -68,7 +68,7 @@ pub struct Spawn(());
|
|||||||
/// In this example, a server is started and `spawn` is used to start a new task
|
/// In this example, a server is started and `spawn` is used to start a new task
|
||||||
/// that processes each received connection.
|
/// that processes each received connection.
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust,ignore
|
||||||
/// # use futures::{Future, Stream};
|
/// # use futures::{Future, Stream};
|
||||||
/// use tokio::net::TcpListener;
|
/// use tokio::net::TcpListener;
|
||||||
///
|
///
|
||||||
@ -100,7 +100,8 @@ pub struct Spawn(());
|
|||||||
///
|
///
|
||||||
/// [`DefaultExecutor`]: struct.DefaultExecutor.html
|
/// [`DefaultExecutor`]: struct.DefaultExecutor.html
|
||||||
pub fn spawn<F>(f: F) -> Spawn
|
pub fn spawn<F>(f: F) -> Spawn
|
||||||
where F: Future<Output = ()> + 'static + Send
|
where
|
||||||
|
F: Future<Output = ()> + 'static + Send,
|
||||||
{
|
{
|
||||||
::tokio_executor::spawn(f);
|
::tokio_executor::spawn(f);
|
||||||
Spawn(())
|
Spawn(())
|
||||||
|
107
tokio/src/io/copy.rs
Normal file
107
tokio/src/io/copy.rs
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
macro_rules! ready {
|
||||||
|
($e:expr) => {
|
||||||
|
match $e {
|
||||||
|
::std::task::Poll::Ready(t) => t,
|
||||||
|
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A future which will copy all data from a reader into a writer.
|
||||||
|
///
|
||||||
|
/// Created by the [`copy`] function, this future will resolve to the number of
|
||||||
|
/// bytes copied or an error if one happens.
|
||||||
|
///
|
||||||
|
/// [`copy`]: fn.copy.html
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Copy<'a, R, W> {
|
||||||
|
reader: &'a mut R,
|
||||||
|
read_done: bool,
|
||||||
|
writer: &'a mut W,
|
||||||
|
pos: usize,
|
||||||
|
cap: usize,
|
||||||
|
amt: u64,
|
||||||
|
buf: Box<[u8]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a future which represents copying all the bytes from one object to
|
||||||
|
/// another.
|
||||||
|
///
|
||||||
|
/// The returned future will copy all the bytes read from `reader` into the
|
||||||
|
/// `writer` specified. This future will only complete once the `reader` has hit
|
||||||
|
/// EOF and all bytes have been written to and flushed from the `writer`
|
||||||
|
/// provided.
|
||||||
|
///
|
||||||
|
/// On success the number of bytes is returned and the `reader` and `writer` are
|
||||||
|
/// consumed. On error the error is returned and the I/O objects are consumed as
|
||||||
|
/// well.
|
||||||
|
pub fn copy<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> Copy<'a, R, W>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
W: AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
|
Copy {
|
||||||
|
reader,
|
||||||
|
read_done: false,
|
||||||
|
writer,
|
||||||
|
amt: 0,
|
||||||
|
pos: 0,
|
||||||
|
cap: 0,
|
||||||
|
buf: Box::new([0; 2048]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, R, W> Future for Copy<'a, R, W>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
W: AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
|
type Output = io::Result<u64>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
|
||||||
|
loop {
|
||||||
|
// If our buffer is empty, then we need to read some data to
|
||||||
|
// continue.
|
||||||
|
if self.pos == self.cap && !self.read_done {
|
||||||
|
let me = &mut *self;
|
||||||
|
let n = ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?;
|
||||||
|
if n == 0 {
|
||||||
|
self.read_done = true;
|
||||||
|
} else {
|
||||||
|
self.pos = 0;
|
||||||
|
self.cap = n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If our buffer has some data, let's write it out!
|
||||||
|
while self.pos < self.cap {
|
||||||
|
let me = &mut *self;
|
||||||
|
let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, &me.buf[me.pos..me.cap]))?;
|
||||||
|
if i == 0 {
|
||||||
|
return Poll::Ready(Err(io::Error::new(
|
||||||
|
io::ErrorKind::WriteZero,
|
||||||
|
"write zero byte into writer",
|
||||||
|
)));
|
||||||
|
} else {
|
||||||
|
self.pos += i;
|
||||||
|
self.amt += i as u64;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we've written al the data and we've seen EOF, flush out the
|
||||||
|
// data and finish the transfer.
|
||||||
|
// done with the entire transfer.
|
||||||
|
if self.pos == self.cap && self.read_done {
|
||||||
|
let me = &mut *self;
|
||||||
|
ready!(Pin::new(&mut *me.writer).poll_flush(cx))?;
|
||||||
|
return Poll::Ready(Ok(self.amt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -45,3 +45,11 @@ pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout};
|
|||||||
// Re-export io::Error so that users don't have to deal
|
// Re-export io::Error so that users don't have to deal
|
||||||
// with conflicts when `use`ing `futures::io` and `std::io`.
|
// with conflicts when `use`ing `futures::io` and `std::io`.
|
||||||
pub use std::io::{Error, ErrorKind, Result};
|
pub use std::io::{Error, ErrorKind, Result};
|
||||||
|
|
||||||
|
mod copy;
|
||||||
|
mod read;
|
||||||
|
mod write;
|
||||||
|
|
||||||
|
pub use self::copy::{copy, Copy};
|
||||||
|
pub use self::read::{read, Read};
|
||||||
|
pub use self::write::{write, Write};
|
43
tokio/src/io/read.rs
Normal file
43
tokio/src/io/read.rs
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::marker::Unpin;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio_io::AsyncRead;
|
||||||
|
|
||||||
|
/// Tries to read some bytes directly into the given `buf` in asynchronous
|
||||||
|
/// manner, returning a future type.
|
||||||
|
///
|
||||||
|
/// The returned future will resolve to both the I/O stream and the buffer
|
||||||
|
/// as well as the number of bytes read once the read operation is completed.
|
||||||
|
pub fn read<'a, R>(reader: &'a mut R, buf: &'a mut [u8]) -> Read<'a, R>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin + ?Sized,
|
||||||
|
{
|
||||||
|
Read { reader, buf }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A future which can be used to easily read available number of bytes to fill
|
||||||
|
/// a buffer.
|
||||||
|
///
|
||||||
|
/// Created by the [`read`] function.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Read<'a, R: ?Sized> {
|
||||||
|
reader: &'a mut R,
|
||||||
|
buf: &'a mut [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
// forward Unpin
|
||||||
|
impl<'a, R: Unpin + ?Sized> Unpin for Read<'_, R> {}
|
||||||
|
|
||||||
|
impl<R> Future for Read<'_, R>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin + ?Sized,
|
||||||
|
{
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
|
||||||
|
let me = &mut *self;
|
||||||
|
Pin::new(&mut *me.reader).poll_read(cx, me.buf)
|
||||||
|
}
|
||||||
|
}
|
37
tokio/src/io/write.rs
Normal file
37
tokio/src/io/write.rs
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::marker::Unpin;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio_io::AsyncWrite;
|
||||||
|
|
||||||
|
/// A future to write some of the buffer to an `AsyncWrite`.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Write<'a, W: ?Sized> {
|
||||||
|
writer: &'a mut W,
|
||||||
|
buf: &'a [u8],
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tries to write some bytes from the given `buf` to the writer in an
|
||||||
|
/// asynchronous manner, returning a future.
|
||||||
|
pub fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W>
|
||||||
|
where
|
||||||
|
W: AsyncWrite + Unpin + ?Sized,
|
||||||
|
{
|
||||||
|
Write { writer, buf }
|
||||||
|
}
|
||||||
|
|
||||||
|
// forward Unpin
|
||||||
|
impl<'a, W: Unpin + ?Sized> Unpin for Write<'_, W> {}
|
||||||
|
|
||||||
|
impl<W> Future for Write<'_, W>
|
||||||
|
where
|
||||||
|
W: AsyncWrite + Unpin + ?Sized,
|
||||||
|
{
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
|
||||||
|
let me = &mut *self;
|
||||||
|
Pin::new(&mut *me.writer).poll_write(cx, me.buf)
|
||||||
|
}
|
||||||
|
}
|
@ -28,7 +28,7 @@
|
|||||||
//!
|
//!
|
||||||
//! A simple TCP echo server:
|
//! A simple TCP echo server:
|
||||||
//!
|
//!
|
||||||
//! ```no_run
|
//! ```no_run,ignore
|
||||||
//! use tokio::prelude::*;
|
//! use tokio::prelude::*;
|
||||||
//! use tokio::io::copy;
|
//! use tokio::io::copy;
|
||||||
//! use tokio::net::TcpListener;
|
//! use tokio::net::TcpListener;
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
//!
|
//!
|
||||||
//! Let's start with a basic example, establishing a TCP connection.
|
//! Let's start with a basic example, establishing a TCP connection.
|
||||||
//!
|
//!
|
||||||
//! ```rust
|
//! ```rust,ignore
|
||||||
//! # fn dox() {
|
//! # fn dox() {
|
||||||
//! use tokio::prelude::*;
|
//! use tokio::prelude::*;
|
||||||
//! use tokio::net::TcpStream;
|
//! use tokio::net::TcpStream;
|
||||||
@ -134,6 +134,4 @@
|
|||||||
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
|
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
|
||||||
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
|
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
|
||||||
|
|
||||||
pub use tokio_reactor::{
|
pub use tokio_reactor::{Handle, PollEvented, Reactor, Registration, Turn};
|
||||||
Handle, PollEvented, Reactor, Registration, Turn,
|
|
||||||
};
|
|
||||||
|
@ -20,7 +20,7 @@ use std::io;
|
|||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```ignore
|
||||||
/// use tokio::runtime::current_thread::Builder;
|
/// use tokio::runtime::current_thread::Builder;
|
||||||
/// use tokio_timer::clock::Clock;
|
/// use tokio_timer::clock::Clock;
|
||||||
///
|
///
|
||||||
@ -78,7 +78,8 @@ impl Builder {
|
|||||||
reactor_handle,
|
reactor_handle,
|
||||||
//timer_handle,
|
//timer_handle,
|
||||||
//self.clock.clone(),
|
//self.clock.clone(),
|
||||||
executor);
|
executor,
|
||||||
|
);
|
||||||
|
|
||||||
Ok(runtime)
|
Ok(runtime)
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@
|
|||||||
//!
|
//!
|
||||||
//! For example:
|
//! For example:
|
||||||
//!
|
//!
|
||||||
//! ```
|
//! ```ignore
|
||||||
//! use tokio::runtime::current_thread::Runtime;
|
//! use tokio::runtime::current_thread::Runtime;
|
||||||
//! use tokio::prelude::*;
|
//! use tokio::prelude::*;
|
||||||
//! use std::thread;
|
//! use std::thread;
|
||||||
@ -68,7 +68,7 @@ mod builder;
|
|||||||
mod runtime;
|
mod runtime;
|
||||||
|
|
||||||
pub use self::builder::Builder;
|
pub use self::builder::Builder;
|
||||||
pub use self::runtime::{Runtime, Handle};
|
pub use self::runtime::{Handle, Runtime};
|
||||||
pub use tokio_current_thread::spawn;
|
pub use tokio_current_thread::spawn;
|
||||||
pub use tokio_current_thread::TaskExecutor;
|
pub use tokio_current_thread::TaskExecutor;
|
||||||
|
|
||||||
@ -98,7 +98,6 @@ pub fn run<F>(future: F)
|
|||||||
where
|
where
|
||||||
F: Future<Output = ()> + 'static,
|
F: Future<Output = ()> + 'static,
|
||||||
{
|
{
|
||||||
|
|
||||||
let mut r = Runtime::new().expect("failed to start runtime on current thread");
|
let mut r = Runtime::new().expect("failed to start runtime on current thread");
|
||||||
r.spawn(future);
|
r.spawn(future);
|
||||||
r.run().expect("failed to resolve remaining futures");
|
r.run().expect("failed to resolve remaining futures");
|
||||||
|
@ -1,13 +1,13 @@
|
|||||||
use crate::runtime::current_thread::Builder;
|
use crate::runtime::current_thread::Builder;
|
||||||
use tokio_current_thread::{self as current_thread, CurrentThread};
|
|
||||||
use tokio_current_thread::Handle as ExecutorHandle;
|
use tokio_current_thread::Handle as ExecutorHandle;
|
||||||
|
use tokio_current_thread::{self as current_thread, CurrentThread};
|
||||||
use tokio_executor;
|
use tokio_executor;
|
||||||
use tokio_reactor::{self, Reactor};
|
use tokio_reactor::{self, Reactor};
|
||||||
//use tokio_timer::clock::{self, Clock};
|
//use tokio_timer::clock::{self, Clock};
|
||||||
//use tokio_timer::timer::{self, Timer};
|
//use tokio_timer::timer::{self, Timer};
|
||||||
|
use std::error::Error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::error::Error;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
/// Single-threaded runtime provides a way to start reactor
|
/// Single-threaded runtime provides a way to start reactor
|
||||||
@ -39,7 +39,9 @@ impl Handle {
|
|||||||
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
|
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
|
||||||
/// instance of the `Handle` does not exist anymore.
|
/// instance of the `Handle` does not exist anymore.
|
||||||
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
|
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
|
||||||
where F: Future<Output = ()> + Send + 'static {
|
where
|
||||||
|
F: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
self.0.spawn(future)
|
self.0.spawn(future)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -101,8 +103,8 @@ impl Runtime {
|
|||||||
reactor_handle: tokio_reactor::Handle,
|
reactor_handle: tokio_reactor::Handle,
|
||||||
//timer_handle: timer::Handle,
|
//timer_handle: timer::Handle,
|
||||||
//clock: Clock,
|
//clock: Clock,
|
||||||
executor: CurrentThread<Parker>) -> Runtime
|
executor: CurrentThread<Parker>,
|
||||||
{
|
) -> Runtime {
|
||||||
Runtime {
|
Runtime {
|
||||||
reactor_handle,
|
reactor_handle,
|
||||||
//timer_handle,
|
//timer_handle,
|
||||||
@ -127,7 +129,7 @@ impl Runtime {
|
|||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust,ignore
|
||||||
/// # use futures::{future, Future, Stream};
|
/// # use futures::{future, Future, Stream};
|
||||||
/// use tokio::runtime::current_thread::Runtime;
|
/// use tokio::runtime::current_thread::Runtime;
|
||||||
///
|
///
|
||||||
@ -149,7 +151,8 @@ impl Runtime {
|
|||||||
/// This function panics if the spawn fails. Failure occurs if the executor
|
/// This function panics if the spawn fails. Failure occurs if the executor
|
||||||
/// is currently at capacity and is unable to spawn a new future.
|
/// is currently at capacity and is unable to spawn a new future.
|
||||||
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
||||||
where F: Future<Output = ()> + 'static,
|
where
|
||||||
|
F: Future<Output = ()> + 'static,
|
||||||
{
|
{
|
||||||
self.executor.spawn(future);
|
self.executor.spawn(future);
|
||||||
self
|
self
|
||||||
@ -172,7 +175,8 @@ impl Runtime {
|
|||||||
/// The caller is responsible for ensuring that other spawned futures
|
/// The caller is responsible for ensuring that other spawned futures
|
||||||
/// complete execution by calling `block_on` or `run`.
|
/// complete execution by calling `block_on` or `run`.
|
||||||
pub fn block_on<F>(&mut self, f: F) -> F::Output
|
pub fn block_on<F>(&mut self, f: F) -> F::Output
|
||||||
where F: Future
|
where
|
||||||
|
F: Future,
|
||||||
{
|
{
|
||||||
self.enter(|executor| {
|
self.enter(|executor| {
|
||||||
// Run the provided future
|
// Run the provided future
|
||||||
@ -184,13 +188,12 @@ impl Runtime {
|
|||||||
/// spawned futures have completed.
|
/// spawned futures have completed.
|
||||||
pub fn run(&mut self) -> Result<(), RunError> {
|
pub fn run(&mut self) -> Result<(), RunError> {
|
||||||
self.enter(|executor| executor.run())
|
self.enter(|executor| executor.run())
|
||||||
.map_err(|e| RunError {
|
.map_err(|e| RunError { inner: e })
|
||||||
inner: e,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn enter<F, R>(&mut self, f: F) -> R
|
fn enter<F, R>(&mut self, f: F) -> R
|
||||||
where F: FnOnce(&mut current_thread::Entered<'_, Parker>) -> R
|
where
|
||||||
|
F: FnOnce(&mut current_thread::Entered<'_, Parker>) -> R,
|
||||||
{
|
{
|
||||||
let Runtime {
|
let Runtime {
|
||||||
ref reactor_handle,
|
ref reactor_handle,
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
//! "seed" the application, blocking the thread until the runtime becomes
|
//! "seed" the application, blocking the thread until the runtime becomes
|
||||||
//! [idle].
|
//! [idle].
|
||||||
//!
|
//!
|
||||||
//! ```rust
|
//! ```rust,ignore
|
||||||
//! # use futures::{Future, Stream};
|
//! # use futures::{Future, Stream};
|
||||||
//! use tokio::net::TcpListener;
|
//! use tokio::net::TcpListener;
|
||||||
//!
|
//!
|
||||||
@ -66,7 +66,7 @@
|
|||||||
//!
|
//!
|
||||||
//! A [`Runtime`] instance can also be used directly.
|
//! A [`Runtime`] instance can also be used directly.
|
||||||
//!
|
//!
|
||||||
//! ```rust
|
//! ```rust,ignore
|
||||||
//! # use futures::{Future, Stream};
|
//! # use futures::{Future, Stream};
|
||||||
//! use tokio::runtime::Runtime;
|
//! use tokio::runtime::Runtime;
|
||||||
//! use tokio::net::TcpListener;
|
//! use tokio::net::TcpListener;
|
||||||
@ -111,11 +111,7 @@
|
|||||||
pub mod current_thread;
|
pub mod current_thread;
|
||||||
//mod threadpool;
|
//mod threadpool;
|
||||||
|
|
||||||
pub use self::current_thread::{
|
pub use self::current_thread::{run, Builder, Runtime};
|
||||||
Builder,
|
|
||||||
Runtime,
|
|
||||||
run,
|
|
||||||
};
|
|
||||||
/*
|
/*
|
||||||
pub use self::threadpool::{
|
pub use self::threadpool::{
|
||||||
Builder,
|
Builder,
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use env_logger;
|
use env_logger;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use env_logger;
|
use env_logger;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use futures::future;
|
use futures::future;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use futures::sync::mpsc;
|
use futures::sync::mpsc;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use env_logger;
|
use env_logger;
|
||||||
|
134
tokio/tests/io.rs
Normal file
134
tokio/tests/io.rs
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
use bytes::BytesMut;
|
||||||
|
use pin_utils::pin_mut;
|
||||||
|
use std::future::Future;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_test::assert_ready_ok;
|
||||||
|
use tokio_test::task::MockTask;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn write() {
|
||||||
|
struct Wr(BytesMut);
|
||||||
|
|
||||||
|
impl AsyncWrite for Wr {
|
||||||
|
fn poll_write(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.0.extend(buf);
|
||||||
|
Ok(buf.len()).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Ok(()).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Ok(()).into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|cx| {
|
||||||
|
let mut wr = Wr(BytesMut::with_capacity(64));
|
||||||
|
|
||||||
|
let write = tokio::io::write(&mut wr, "hello world".as_bytes());
|
||||||
|
pin_mut!(write);
|
||||||
|
|
||||||
|
let n = assert_ready_ok!(write.poll(cx));
|
||||||
|
assert_eq!(n, 11);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read() {
|
||||||
|
struct Rd;
|
||||||
|
|
||||||
|
impl AsyncRead for Rd {
|
||||||
|
fn poll_read(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
buf[0..11].copy_from_slice(b"hello world");
|
||||||
|
Poll::Ready(Ok(11))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = Box::new([0; 11]);
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|cx| {
|
||||||
|
let mut rd = Rd;
|
||||||
|
|
||||||
|
let read = tokio::io::read(&mut rd, &mut buf[..]);
|
||||||
|
pin_mut!(read);
|
||||||
|
|
||||||
|
let n = assert_ready_ok!(read.poll(cx));
|
||||||
|
assert_eq!(n, 11);
|
||||||
|
assert_eq!(buf[..], b"hello world"[..]);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn copy() {
|
||||||
|
struct Rd(bool);
|
||||||
|
|
||||||
|
impl AsyncRead for Rd {
|
||||||
|
fn poll_read(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &mut [u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
if self.0 {
|
||||||
|
buf[0..11].copy_from_slice(b"hello world");
|
||||||
|
self.0 = false;
|
||||||
|
Poll::Ready(Ok(11))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(Ok(0))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Wr(BytesMut);
|
||||||
|
|
||||||
|
impl Unpin for Wr {}
|
||||||
|
impl AsyncWrite for Wr {
|
||||||
|
fn poll_write(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
_cx: &mut Context<'_>,
|
||||||
|
buf: &[u8],
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
self.0.extend(buf);
|
||||||
|
Ok(buf.len()).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Ok(()).into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||||
|
Ok(()).into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let buf = BytesMut::with_capacity(64);
|
||||||
|
let mut task = MockTask::new();
|
||||||
|
|
||||||
|
task.enter(|cx| {
|
||||||
|
let mut rd = Rd(true);
|
||||||
|
let mut wr = Wr(buf);
|
||||||
|
|
||||||
|
let copy = tokio::io::copy(&mut rd, &mut wr);
|
||||||
|
pin_mut!(copy);
|
||||||
|
|
||||||
|
let n = assert_ready_ok!(copy.poll(cx));
|
||||||
|
|
||||||
|
assert_eq!(n, 11);
|
||||||
|
assert_eq!(wr.0[..], b"hello world"[..]);
|
||||||
|
});
|
||||||
|
}
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![cfg(unix)]
|
#![cfg(unix)]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use futures::executor::{spawn, Notify, Spawn};
|
use futures::executor::{spawn, Notify, Spawn};
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use env_logger;
|
use env_logger;
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#![cfg(feature = "broken")]
|
||||||
#![deny(warnings, rust_2018_idioms)]
|
#![deny(warnings, rust_2018_idioms)]
|
||||||
|
|
||||||
use env_logger;
|
use env_logger;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user