From 311bfa07a362657c57037b44404a6e42cf560438 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 17 Aug 2016 11:23:32 -0700 Subject: [PATCH] Update futures-minihttp --- src/bin/echo.rs | 28 ++++------------------------ src/event_loop.rs | 2 ++ src/tcp.rs | 6 +++++- tests/echo.rs | 32 ++++---------------------------- tests/stream-buffered.rs | 30 ++++-------------------------- 5 files changed, 19 insertions(+), 79 deletions(-) diff --git a/src/bin/echo.rs b/src/bin/echo.rs index 6bc76b7c7..338d44d46 100644 --- a/src/bin/echo.rs +++ b/src/bin/echo.rs @@ -5,14 +5,11 @@ extern crate futures_io; extern crate futures_mio; use std::env; -use std::io::{self, Read, Write}; use std::net::SocketAddr; -use std::sync::Arc; use futures::Future; use futures::stream::Stream; -use futures_io::copy; -use futures_mio::TcpStream; +use futures_io::{copy, TaskIo}; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); @@ -36,8 +33,9 @@ fn main() { // Finally we use the `io::copy` future to copy all data from the // reading half onto the writing half. socket.incoming().for_each(|(socket, addr)| { - let socket = Arc::new(socket); - let amt = copy(SocketIo(socket.clone()), SocketIo(socket)); + let socket = futures::lazy(|| futures::finished(TaskIo::new(socket))); + let pair = socket.map(|s| s.split()); + let amt = pair.and_then(|(reader, writer)| copy(reader, writer)); // Once all that is done we print out how much we wrote, and then // critically we *forget* this future which allows it to run @@ -51,21 +49,3 @@ fn main() { }); l.run(done).unwrap(); } - -struct SocketIo(Arc); - -impl Read for SocketIo { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&*self.0).read(buf) - } -} - -impl Write for SocketIo { - fn write(&mut self, buf: &[u8]) -> io::Result { - (&*self.0).write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - (&*self.0).flush() - } -} diff --git a/src/event_loop.rs b/src/event_loop.rs index 0b00ba91a..522e96b8f 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -995,8 +995,10 @@ impl LoopFuture lp.map(|lp| f(lp, data.take().unwrap())) }); if let Some(ret) = ret { + debug!("loop future done immediately on event loop"); return ret.into() } + debug!("loop future needs to send info to event loop"); let task = task::park(); let result = Arc::new(Slot::new(None)); diff --git a/src/tcp.rs b/src/tcp.rs index f004c2698..e07063351 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -100,8 +100,12 @@ impl TcpListener { fn poll(&mut self) -> Poll, io::Error> { match self.inner.listener.io().accept() { - Ok(Some(pair)) => Poll::Ok(Some(pair)), + Ok(Some(pair)) => { + debug!("accepted a socket"); + Poll::Ok(Some(pair)) + } Ok(None) => { + debug!("waiting to accept another socket"); self.inner.ready.need_read(); Poll::NotReady } diff --git a/tests/echo.rs b/tests/echo.rs index 724886472..77635539e 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -3,14 +3,13 @@ extern crate futures; extern crate futures_io; extern crate futures_mio; -use std::io::{self, Read, Write}; -use std::sync::Arc; +use std::io::{Read, Write}; +use std::net::TcpStream; use std::thread; use futures::Future; use futures::stream::Stream; -use futures_io::copy; -use futures_mio::TcpStream; +use futures_io::{copy, TaskIo}; macro_rules! t { ($e:expr) => (match $e { @@ -30,8 +29,6 @@ fn echo_server() { let msg = "foo bar baz"; let t = thread::spawn(move || { - use std::net::TcpStream; - let mut s = TcpStream::connect(&addr).unwrap(); for _i in 0..1024 { @@ -44,10 +41,7 @@ fn echo_server() { let clients = srv.incoming(); let client = clients.into_future().map(|e| e.0.unwrap()).map_err(|e| e.0); - let halves = client.map(|s| { - let s = Arc::new(s.0); - (SocketIo(s.clone()), SocketIo(s)) - }); + let halves = client.map(|s| TaskIo::new(s.0).split()); let copied = halves.and_then(|(a, b)| copy(a, b)); let amt = t!(l.run(copied)); @@ -55,21 +49,3 @@ fn echo_server() { assert_eq!(amt, msg.len() as u64 * 1024); } - -struct SocketIo(Arc); - -impl Read for SocketIo { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&*self.0).read(buf) - } -} - -impl Write for SocketIo { - fn write(&mut self, buf: &[u8]) -> io::Result { - (&*self.0).write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - (&*self.0).flush() - } -} diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index 86d5989a9..a472a3cc4 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -3,14 +3,13 @@ extern crate futures_io; extern crate futures_mio; extern crate env_logger; -use std::sync::Arc; +use std::io::{Read, Write}; +use std::net::TcpStream; use std::thread; -use std::io::{self, Read, Write}; use futures::Future; use futures::stream::Stream; -use futures_io::copy; -use futures_mio::TcpStream; +use futures_io::{copy, TaskIo}; macro_rules! t { ($e:expr) => (match $e { @@ -29,8 +28,6 @@ fn echo_server() { let addr = t!(srv.local_addr()); let t = thread::spawn(move || { - use std::net::TcpStream; - let mut s1 = t!(TcpStream::connect(&addr)); let mut s2 = t!(TcpStream::connect(&addr)); @@ -45,8 +42,7 @@ fn echo_server() { }); let future = srv.incoming() - .map(|s| Arc::new(s.0)) - .map(|i| (SocketIo(i.clone()), SocketIo(i))) + .map(|s| TaskIo::new(s.0).split()) .map(|(a, b)| copy(a, b).map(|_| ())) .buffered(10) .take(2) @@ -56,21 +52,3 @@ fn echo_server() { t.join().unwrap(); } - -struct SocketIo(Arc); - -impl Read for SocketIo { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - (&*self.0).read(buf) - } -} - -impl Write for SocketIo { - fn write(&mut self, buf: &[u8]) -> io::Result { - (&*self.0).write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - (&*self.0).flush() - } -}