From 5970203b211a48b0a4b1ed4fbdac0f586bb2d281 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 1 Aug 2016 17:41:58 -0700 Subject: [PATCH] Finish docs on futures-mio Also remove mio BufReader/BufWriter for now, they should come back shortly though --- src/bin/sink.rs | 3 +- src/buf_reader.rs | 174 -------------------------------------- src/buf_writer.rs | 179 ---------------------------------------- src/event_loop.rs | 5 +- src/lib.rs | 8 -- src/readiness_stream.rs | 23 +++++- src/tcp.rs | 4 +- 7 files changed, 27 insertions(+), 369 deletions(-) delete mode 100755 src/buf_reader.rs delete mode 100755 src/buf_writer.rs diff --git a/src/bin/sink.rs b/src/bin/sink.rs index 3ee6743ff..a506833e3 100644 --- a/src/bin/sink.rs +++ b/src/bin/sink.rs @@ -5,6 +5,7 @@ #[macro_use] extern crate futures; +extern crate futures_io; extern crate futures_mio; use std::env; @@ -32,7 +33,7 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: futures_mio::TcpStream) -> Box> { +fn write(socket: futures_mio::TcpStream) -> Box> { static BUF: &'static [u8] = &[0; 64 * 1024]; socket.into_future().map_err(|e| e.0).and_then(move |(ready, mut socket)| { let ready = match ready { diff --git a/src/buf_reader.rs b/src/buf_reader.rs deleted file mode 100755 index 66fad5dee..000000000 --- a/src/buf_reader.rs +++ /dev/null @@ -1,174 +0,0 @@ -#![allow(missing_docs)] - -use std::io::{self, Read}; -use std::ops::Deref; -use std::sync::Arc; - -use ReadinessStream; - -use futures::{Task, Poll}; -use futures::stream::Stream; - -const INPUT_BUF_SIZE: usize = 8 * 1024; - -/// A cheap to copy, read-only slice of an input buffer. -#[derive(Clone)] -pub struct InputBuf { - buf: Arc>, - pos: usize, - len: usize, -} - -impl Deref for InputBuf { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - &self.buf[self.pos..self.pos + self.len] - } -} - -// TODO: implement direct slicing (which clones the Arc) - -impl InputBuf { - fn new() -> InputBuf { - InputBuf { - buf: Arc::new(Vec::with_capacity(INPUT_BUF_SIZE)), - pos: 0, - len: 0, - } - } - - pub fn take(&mut self, len: usize) -> InputBuf { - assert!(len <= self.len); - let new = InputBuf { - buf: self.buf.clone(), - pos: self.pos, - len: len, - }; - self.pos += len; - self.len -= len; - new - } - - pub fn skip(&mut self, len: usize) { - assert!(len <= self.len); - self.pos += len; - } - - fn with_mut(&mut self, f: F) -> R - where F: FnOnce(&mut Vec) -> R - { - // Fast path if we can get mutable access to our own current - // buffer. - if let Some(buf) = Arc::get_mut(&mut self.buf) { - buf.drain(..self.pos); - self.pos = 0; - let ret = f(buf); - self.len = buf.len(); - return ret; - } - - // If we couldn't get access above then we give ourself a new buffer - // here. - - let mut v = Vec::with_capacity(INPUT_BUF_SIZE); - v.extend_from_slice(&self.buf[self.pos..]); - let ret = f(&mut v); - - self.buf = Arc::new(v); - self.pos = 0; - self.len = self.buf.len(); - ret - } - - fn read(&mut self, socket: &mut R) -> io::Result<(usize, bool)> { - unsafe fn slice_to_end(v: &mut Vec) -> &mut [u8] { - use std::slice; - if v.capacity() == 0 { - v.reserve(16); - } - if v.capacity() == v.len() { - v.reserve(1); - } - slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), - v.capacity() - v.len()) - } - - self.with_mut(|buf| { - match socket.read(unsafe { slice_to_end(buf) }) { - Ok(0) => { - trace!("socket EOF"); - Ok((0, true)) - } - Ok(n) => { - trace!("socket read {} bytes", n); - unsafe { - let len = buf.len(); - buf.set_len(len + n); - } - Ok((n, false)) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Ok((0, false)), - Err(e) => Err(e), - } - }) - } -} - -/// A stream for parsing from an underlying reader, using an unbounded internal -/// buffer. -pub struct BufReader { - source: R, - source_ready: ReadinessStream, - read_ready: bool, - buf: InputBuf, -} - -impl BufReader { - pub fn new(source: R, source_ready: ReadinessStream) -> BufReader { - BufReader { - source: source, - source_ready: source_ready, - read_ready: false, - buf: InputBuf::new(), - } - } - - pub fn buf(&mut self) -> &mut InputBuf { - &mut self.buf - } -} - -impl Stream for BufReader { - type Item = (); - type Error = io::Error; - - fn poll(&mut self, task: &mut Task) -> Poll, io::Error> { - if !self.read_ready { - match self.source_ready.poll(task) { - Poll::NotReady => return Poll::NotReady, - Poll::Err(e) => return Poll::Err(e.into()), - Poll::Ok(Some(ref r)) if !r.is_read() => return Poll::NotReady, - Poll::Ok(Some(_)) => self.read_ready = true, - _ => unreachable!(), - } - } - - match self.buf.read(&mut self.source) { - Ok((0, true)) => Poll::Ok(None), - Ok((0, false)) => { - self.read_ready = false; - Poll::NotReady - } - Ok(_) => { - self.read_ready = true; - Poll::Ok(Some(())) - } - Err(e) => Poll::Err(e.into()), - } - } - - fn schedule(&mut self, task: &mut Task) { - self.source_ready.schedule(task) - } -} diff --git a/src/buf_writer.rs b/src/buf_writer.rs deleted file mode 100755 index 2e87db98d..000000000 --- a/src/buf_writer.rs +++ /dev/null @@ -1,179 +0,0 @@ -#![allow(missing_docs)] - -use std::io::{self, Write}; - -use futures::{Future, Task, Poll}; -use futures::stream::Stream; -use ReadinessStream; - -const OUTPUT_BUF_SIZE: usize = 8 * 1024; - -pub struct BufWriter { - sink: W, - sink_ready: ReadinessStream, - write_ready: bool, - buf: Vec, -} - -impl BufWriter { - pub fn new(sink: W, sink_ready: ReadinessStream) -> BufWriter { - BufWriter { - sink: sink, - sink_ready: sink_ready, - write_ready: false, - buf: Vec::with_capacity(OUTPUT_BUF_SIZE), - } - } - - pub fn extend(&mut self, data: &[u8]) { - extend(&mut self.buf, data) - } - - pub fn flush(self) -> Flush { - Flush { writer: Some(self) } - } - - pub fn reserve(self, amt: usize) -> Reserve { - Reserve { amt: amt, writer: Some(self) } - } - - /// Is there buffered data waiting to be sent? - pub fn is_dirty(&self) -> bool { - self.buf.len() > 0 - } - - fn poll_flush(&mut self, task: &mut Task) -> Poll<(), io::Error> { - let mut task = task.scoped(); - while self.is_dirty() { - if !self.write_ready { - match self.sink_ready.poll(&mut task) { - Poll::Err(e) => return Poll::Err(e), - Poll::Ok(Some(ref r)) if !r.is_write() => return Poll::NotReady, - Poll::Ok(Some(_)) => self.write_ready = true, - Poll::Ok(None) | // TODO: this should translate to an error - Poll::NotReady => return Poll::NotReady, - } - } - - debug!("trying to write some data"); - match self.sink.write(&self.buf) { - Ok(0) => return Poll::Err(io::Error::new(io::ErrorKind::Other, "early eof")), - Ok(n) => { - // TODO: consider draining more lazily, i.e. only just - // before returning - self.buf.drain(..n); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.write_ready = false; - } - Err(e) => return Poll::Err(e), - } - - task.ready(); - } - - debug!("fully flushed"); - Poll::Ok(()) - } -} - -impl Write for BufWriter { - fn write(&mut self, data: &[u8]) -> io::Result { - extend(&mut self.buf, data); - Ok(data.len()) - } - - fn flush(&mut self) -> io::Result<()> { - // TODO: something reasonable - unimplemented!() - } -} - -pub struct Flush { - writer: Option>, -} - -impl Flush { - pub fn is_dirty(&self) -> bool { - self.writer.as_ref().unwrap().is_dirty() - } - - pub fn into_inner(mut self) -> BufWriter { - self.writer.take().unwrap() - } -} - -impl Future for Flush { - type Item = BufWriter; - type Error = (io::Error, BufWriter); - - fn poll(&mut self, task: &mut Task) - -> Poll, (io::Error, BufWriter)> { - match self.writer.as_mut().unwrap().poll_flush(task) { - Poll::Ok(()) => Poll::Ok(self.writer.take().unwrap()), - Poll::Err(e) => Poll::Err((e, self.writer.take().unwrap())), - Poll::NotReady => Poll::NotReady, - } - } - - fn schedule(&mut self, task: &mut Task) { - let writer = self.writer.as_mut().unwrap(); - - assert!(!writer.write_ready); - writer.sink_ready.schedule(task) - } -} - -// TODO: why doesn't extend_from_slice optimize to this? -fn extend(dst: &mut Vec, data: &[u8]) { - use std::ptr; - dst.reserve(data.len()); - let prev = dst.len(); - unsafe { - ptr::copy_nonoverlapping(data.as_ptr(), - dst.as_mut_ptr().offset(prev as isize), - data.len()); - dst.set_len(prev + data.len()); - } -} - -pub struct Reserve { - amt: usize, - writer: Option>, -} - -impl Future for Reserve { - type Item = BufWriter; - type Error = (io::Error, BufWriter); - - fn poll(&mut self, task: &mut Task) - -> Poll, (io::Error, BufWriter)> { - loop { - let (cap, len) = { - let buf = &mut self.writer.as_mut().unwrap().buf; - (buf.capacity(), buf.len()) - }; - - if self.amt <= cap - len { - return Poll::Ok(self.writer.take().unwrap()) - } else if self.amt > cap { - let mut writer = self.writer.take().unwrap(); - writer.buf.reserve(self.amt); - return Poll::Ok(writer) - } - - match self.writer.as_mut().unwrap().poll_flush(task) { - Poll::Ok(()) => {}, - Poll::Err(e) => return Poll::Err((e, self.writer.take().unwrap())), - Poll::NotReady => return Poll::NotReady, - } - } - } - - fn schedule(&mut self, task: &mut Task) { - let writer = self.writer.as_mut().unwrap(); - - assert!(!writer.write_ready); - writer.sink_ready.schedule(task) - } -} diff --git a/src/event_loop.rs b/src/event_loop.rs index dbb4eb7a5..93584a30a 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -111,7 +111,10 @@ impl Loop { } } - #[allow(missing_docs)] + /// Runs a future until completion, driving the event loop while we're + /// otherwise waiting for the future to complete. + /// + /// Returns the value that the future resolves to. pub fn run(&mut self, f: F) -> Result { let (tx_res, rx_res) = mpsc::channel(); let handle = self.handle(); diff --git a/src/lib.rs b/src/lib.rs index 5c4214d10..adad073b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,15 +19,9 @@ extern crate scoped_tls; #[macro_use] extern crate log; -use std::io; -use futures::Future; -use futures::stream::Stream; - mod readiness_stream; mod event_loop; mod tcp; -mod buf_reader; -mod buf_writer; #[path = "../../src/slot.rs"] mod slot; #[path = "../../src/lock.rs"] @@ -36,5 +30,3 @@ mod lock; pub use event_loop::{Loop, LoopHandle}; pub use readiness_stream::ReadinessStream; pub use tcp::{TcpListener, TcpStream}; -pub use buf_reader::{BufReader, InputBuf}; -pub use buf_writer::{BufWriter, Flush, Reserve}; diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index b89868208..886e009bd 100755 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -1,13 +1,10 @@ -#![allow(missing_docs)] // TODO: document this module - use std::io; use std::sync::Arc; use futures::stream::Stream; use futures::{Future, Task, Poll}; -use futures_io::Ready; +use futures_io::{Ready, IoFuture}; -use IoFuture; use event_loop::{IoSource, LoopHandle}; use readiness_stream::drop_source::DropSource; @@ -40,6 +37,19 @@ mod drop_source { } } +/// A concrete implementation of a stream of readiness notifications for I/O +/// objects that originates from an event loop. +/// +/// Created by the `ReadinessStream::new` method, each `ReadinessStream` is +/// associated with a specific event loop and source of events that will be +/// registered with an event loop. +/// +/// Currently readiness streams have "edge" semantics. That is, if a stream +/// receives a readable notification it will not receive another readable +/// notification until all bytes have been read from the stream. +/// +/// Note that the precise semantics of when notifications are received will +/// likely be configurable in the future. pub struct ReadinessStream { io_token: usize, loop_handle: LoopHandle, @@ -48,6 +58,11 @@ pub struct ReadinessStream { } impl ReadinessStream { + /// Creates a new readiness stream associated with the provided + /// `loop_handle` and for the given `source`. + /// + /// This method returns a future which will resolve to the readiness stream + /// when it's ready. pub fn new(loop_handle: LoopHandle, source: IoSource) -> Box> { loop_handle.add_source(source.clone()).map(|token| { diff --git a/src/tcp.rs b/src/tcp.rs index 3aec07723..3ab77e645 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -5,10 +5,10 @@ use std::sync::Arc; use futures::stream::{self, Stream}; use futures::{Future, IntoFuture, failed, Task, Poll}; -use futures_io::Ready; +use futures_io::{Ready, IoFuture, IoStream}; use mio; -use {IoFuture, IoStream, ReadinessStream, LoopHandle}; +use {ReadinessStream, LoopHandle}; use event_loop::Source; /// An I/O object representing a TCP socket listening for incoming connections.