Finish docs on futures-mio

Also remove mio BufReader/BufWriter for now, they should come back shortly
though
This commit is contained in:
Alex Crichton 2016-08-01 17:41:58 -07:00
parent 7ffe72dc73
commit 5970203b21
7 changed files with 27 additions and 369 deletions

View File

@ -5,6 +5,7 @@
#[macro_use]
extern crate futures;
extern crate futures_io;
extern crate futures_mio;
use std::env;
@ -32,7 +33,7 @@ fn main() {
l.run(server).unwrap();
}
fn write(socket: futures_mio::TcpStream) -> Box<futures_mio::IoFuture<()>> {
fn write(socket: futures_mio::TcpStream) -> Box<futures_io::IoFuture<()>> {
static BUF: &'static [u8] = &[0; 64 * 1024];
socket.into_future().map_err(|e| e.0).and_then(move |(ready, mut socket)| {
let ready = match ready {

View File

@ -1,174 +0,0 @@
#![allow(missing_docs)]
use std::io::{self, Read};
use std::ops::Deref;
use std::sync::Arc;
use ReadinessStream;
use futures::{Task, Poll};
use futures::stream::Stream;
const INPUT_BUF_SIZE: usize = 8 * 1024;
/// A cheap to copy, read-only slice of an input buffer.
#[derive(Clone)]
pub struct InputBuf {
buf: Arc<Vec<u8>>,
pos: usize,
len: usize,
}
impl Deref for InputBuf {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.buf[self.pos..self.pos + self.len]
}
}
// TODO: implement direct slicing (which clones the Arc)
impl InputBuf {
fn new() -> InputBuf {
InputBuf {
buf: Arc::new(Vec::with_capacity(INPUT_BUF_SIZE)),
pos: 0,
len: 0,
}
}
pub fn take(&mut self, len: usize) -> InputBuf {
assert!(len <= self.len);
let new = InputBuf {
buf: self.buf.clone(),
pos: self.pos,
len: len,
};
self.pos += len;
self.len -= len;
new
}
pub fn skip(&mut self, len: usize) {
assert!(len <= self.len);
self.pos += len;
}
fn with_mut<R, F>(&mut self, f: F) -> R
where F: FnOnce(&mut Vec<u8>) -> R
{
// Fast path if we can get mutable access to our own current
// buffer.
if let Some(buf) = Arc::get_mut(&mut self.buf) {
buf.drain(..self.pos);
self.pos = 0;
let ret = f(buf);
self.len = buf.len();
return ret;
}
// If we couldn't get access above then we give ourself a new buffer
// here.
let mut v = Vec::with_capacity(INPUT_BUF_SIZE);
v.extend_from_slice(&self.buf[self.pos..]);
let ret = f(&mut v);
self.buf = Arc::new(v);
self.pos = 0;
self.len = self.buf.len();
ret
}
fn read<R: Read>(&mut self, socket: &mut R) -> io::Result<(usize, bool)> {
unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
use std::slice;
if v.capacity() == 0 {
v.reserve(16);
}
if v.capacity() == v.len() {
v.reserve(1);
}
slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
v.capacity() - v.len())
}
self.with_mut(|buf| {
match socket.read(unsafe { slice_to_end(buf) }) {
Ok(0) => {
trace!("socket EOF");
Ok((0, true))
}
Ok(n) => {
trace!("socket read {} bytes", n);
unsafe {
let len = buf.len();
buf.set_len(len + n);
}
Ok((n, false))
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok((0, false)),
Err(e) => Err(e),
}
})
}
}
/// A stream for parsing from an underlying reader, using an unbounded internal
/// buffer.
pub struct BufReader<R> {
source: R,
source_ready: ReadinessStream,
read_ready: bool,
buf: InputBuf,
}
impl<R: Read + Send + 'static> BufReader<R> {
pub fn new(source: R, source_ready: ReadinessStream) -> BufReader<R> {
BufReader {
source: source,
source_ready: source_ready,
read_ready: false,
buf: InputBuf::new(),
}
}
pub fn buf(&mut self) -> &mut InputBuf {
&mut self.buf
}
}
impl<R: Read + Send + 'static> Stream for BufReader<R> {
type Item = ();
type Error = io::Error;
fn poll(&mut self, task: &mut Task) -> Poll<Option<()>, io::Error> {
if !self.read_ready {
match self.source_ready.poll(task) {
Poll::NotReady => return Poll::NotReady,
Poll::Err(e) => return Poll::Err(e.into()),
Poll::Ok(Some(ref r)) if !r.is_read() => return Poll::NotReady,
Poll::Ok(Some(_)) => self.read_ready = true,
_ => unreachable!(),
}
}
match self.buf.read(&mut self.source) {
Ok((0, true)) => Poll::Ok(None),
Ok((0, false)) => {
self.read_ready = false;
Poll::NotReady
}
Ok(_) => {
self.read_ready = true;
Poll::Ok(Some(()))
}
Err(e) => Poll::Err(e.into()),
}
}
fn schedule(&mut self, task: &mut Task) {
self.source_ready.schedule(task)
}
}

View File

@ -1,179 +0,0 @@
#![allow(missing_docs)]
use std::io::{self, Write};
use futures::{Future, Task, Poll};
use futures::stream::Stream;
use ReadinessStream;
const OUTPUT_BUF_SIZE: usize = 8 * 1024;
pub struct BufWriter<W> {
sink: W,
sink_ready: ReadinessStream,
write_ready: bool,
buf: Vec<u8>,
}
impl<W: Write + Send + 'static> BufWriter<W> {
pub fn new(sink: W, sink_ready: ReadinessStream) -> BufWriter<W> {
BufWriter {
sink: sink,
sink_ready: sink_ready,
write_ready: false,
buf: Vec::with_capacity(OUTPUT_BUF_SIZE),
}
}
pub fn extend(&mut self, data: &[u8]) {
extend(&mut self.buf, data)
}
pub fn flush(self) -> Flush<W> {
Flush { writer: Some(self) }
}
pub fn reserve(self, amt: usize) -> Reserve<W> {
Reserve { amt: amt, writer: Some(self) }
}
/// Is there buffered data waiting to be sent?
pub fn is_dirty(&self) -> bool {
self.buf.len() > 0
}
fn poll_flush(&mut self, task: &mut Task) -> Poll<(), io::Error> {
let mut task = task.scoped();
while self.is_dirty() {
if !self.write_ready {
match self.sink_ready.poll(&mut task) {
Poll::Err(e) => return Poll::Err(e),
Poll::Ok(Some(ref r)) if !r.is_write() => return Poll::NotReady,
Poll::Ok(Some(_)) => self.write_ready = true,
Poll::Ok(None) | // TODO: this should translate to an error
Poll::NotReady => return Poll::NotReady,
}
}
debug!("trying to write some data");
match self.sink.write(&self.buf) {
Ok(0) => return Poll::Err(io::Error::new(io::ErrorKind::Other, "early eof")),
Ok(n) => {
// TODO: consider draining more lazily, i.e. only just
// before returning
self.buf.drain(..n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.write_ready = false;
}
Err(e) => return Poll::Err(e),
}
task.ready();
}
debug!("fully flushed");
Poll::Ok(())
}
}
impl<W: Write> Write for BufWriter<W> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
extend(&mut self.buf, data);
Ok(data.len())
}
fn flush(&mut self) -> io::Result<()> {
// TODO: something reasonable
unimplemented!()
}
}
pub struct Flush<W> {
writer: Option<BufWriter<W>>,
}
impl<W: Write + Send + 'static> Flush<W> {
pub fn is_dirty(&self) -> bool {
self.writer.as_ref().unwrap().is_dirty()
}
pub fn into_inner(mut self) -> BufWriter<W> {
self.writer.take().unwrap()
}
}
impl<W: Write + Send + 'static> Future for Flush<W> {
type Item = BufWriter<W>;
type Error = (io::Error, BufWriter<W>);
fn poll(&mut self, task: &mut Task)
-> Poll<BufWriter<W>, (io::Error, BufWriter<W>)> {
match self.writer.as_mut().unwrap().poll_flush(task) {
Poll::Ok(()) => Poll::Ok(self.writer.take().unwrap()),
Poll::Err(e) => Poll::Err((e, self.writer.take().unwrap())),
Poll::NotReady => Poll::NotReady,
}
}
fn schedule(&mut self, task: &mut Task) {
let writer = self.writer.as_mut().unwrap();
assert!(!writer.write_ready);
writer.sink_ready.schedule(task)
}
}
// TODO: why doesn't extend_from_slice optimize to this?
fn extend(dst: &mut Vec<u8>, data: &[u8]) {
use std::ptr;
dst.reserve(data.len());
let prev = dst.len();
unsafe {
ptr::copy_nonoverlapping(data.as_ptr(),
dst.as_mut_ptr().offset(prev as isize),
data.len());
dst.set_len(prev + data.len());
}
}
pub struct Reserve<W> {
amt: usize,
writer: Option<BufWriter<W>>,
}
impl<W: Write + Send + 'static> Future for Reserve<W> {
type Item = BufWriter<W>;
type Error = (io::Error, BufWriter<W>);
fn poll(&mut self, task: &mut Task)
-> Poll<BufWriter<W>, (io::Error, BufWriter<W>)> {
loop {
let (cap, len) = {
let buf = &mut self.writer.as_mut().unwrap().buf;
(buf.capacity(), buf.len())
};
if self.amt <= cap - len {
return Poll::Ok(self.writer.take().unwrap())
} else if self.amt > cap {
let mut writer = self.writer.take().unwrap();
writer.buf.reserve(self.amt);
return Poll::Ok(writer)
}
match self.writer.as_mut().unwrap().poll_flush(task) {
Poll::Ok(()) => {},
Poll::Err(e) => return Poll::Err((e, self.writer.take().unwrap())),
Poll::NotReady => return Poll::NotReady,
}
}
}
fn schedule(&mut self, task: &mut Task) {
let writer = self.writer.as_mut().unwrap();
assert!(!writer.write_ready);
writer.sink_ready.schedule(task)
}
}

View File

@ -111,7 +111,10 @@ impl Loop {
}
}
#[allow(missing_docs)]
/// Runs a future until completion, driving the event loop while we're
/// otherwise waiting for the future to complete.
///
/// Returns the value that the future resolves to.
pub fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
let (tx_res, rx_res) = mpsc::channel();
let handle = self.handle();

View File

@ -19,15 +19,9 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
use std::io;
use futures::Future;
use futures::stream::Stream;
mod readiness_stream;
mod event_loop;
mod tcp;
mod buf_reader;
mod buf_writer;
#[path = "../../src/slot.rs"]
mod slot;
#[path = "../../src/lock.rs"]
@ -36,5 +30,3 @@ mod lock;
pub use event_loop::{Loop, LoopHandle};
pub use readiness_stream::ReadinessStream;
pub use tcp::{TcpListener, TcpStream};
pub use buf_reader::{BufReader, InputBuf};
pub use buf_writer::{BufWriter, Flush, Reserve};

View File

@ -1,13 +1,10 @@
#![allow(missing_docs)] // TODO: document this module
use std::io;
use std::sync::Arc;
use futures::stream::Stream;
use futures::{Future, Task, Poll};
use futures_io::Ready;
use futures_io::{Ready, IoFuture};
use IoFuture;
use event_loop::{IoSource, LoopHandle};
use readiness_stream::drop_source::DropSource;
@ -40,6 +37,19 @@ mod drop_source {
}
}
/// A concrete implementation of a stream of readiness notifications for I/O
/// objects that originates from an event loop.
///
/// Created by the `ReadinessStream::new` method, each `ReadinessStream` is
/// associated with a specific event loop and source of events that will be
/// registered with an event loop.
///
/// Currently readiness streams have "edge" semantics. That is, if a stream
/// receives a readable notification it will not receive another readable
/// notification until all bytes have been read from the stream.
///
/// Note that the precise semantics of when notifications are received will
/// likely be configurable in the future.
pub struct ReadinessStream {
io_token: usize,
loop_handle: LoopHandle,
@ -48,6 +58,11 @@ pub struct ReadinessStream {
}
impl ReadinessStream {
/// Creates a new readiness stream associated with the provided
/// `loop_handle` and for the given `source`.
///
/// This method returns a future which will resolve to the readiness stream
/// when it's ready.
pub fn new(loop_handle: LoopHandle, source: IoSource)
-> Box<IoFuture<ReadinessStream>> {
loop_handle.add_source(source.clone()).map(|token| {

View File

@ -5,10 +5,10 @@ use std::sync::Arc;
use futures::stream::{self, Stream};
use futures::{Future, IntoFuture, failed, Task, Poll};
use futures_io::Ready;
use futures_io::{Ready, IoFuture, IoStream};
use mio;
use {IoFuture, IoStream, ReadinessStream, LoopHandle};
use {ReadinessStream, LoopHandle};
use event_loop::Source;
/// An I/O object representing a TCP socket listening for incoming connections.