From b508964db63b526ccd88805195a50b5a66da8438 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Fri, 12 Aug 2016 11:54:19 -0700 Subject: [PATCH] Remove Send from Future/Stream This bound existed for two primary reasons, both detail below, and both of which have now been solved. One of the primary reasons this existed was due to the presence of `tailcall`. Each standard combinator will call `tailcall` as appropriate, storing the resulting trait object. Storing trait objects influences the applicatoin of the `Send` and `Sync` bounds normally, but a key insight here is that we're not storing trait objects but rather just pieces of otherwise internal futures. With this insight the main storage for these futures, `Collapsed`, could simply implement `Send` so long as the future itself originally implemented `Send`. This in turn means that `tailcall` must be an `unsafe` method, but it seems well worth the benefit of relaxing the `Send` bound. The second primary reason for this bound was so the `Task` itself could be send. This is critical for ensuring that futures can receive notifications from multiple threads (e.g. be a future waiting on sources of multiple events). Another key insight here, however, is that only the *outer* future needs to be `Send`. We already have a solution, with `LoopData`, to make non-`Send` data `Send`. By implementing `Future` directly for `LoopData`, this means that it's trivial to make any future sendable by simply pinning it to an event loop! With these two pieces combined, it means that `Send` is no longer needed as a bound on the `Future` and `Stream` traits. It may practically mean that `LoopData` is used commonly in some scenarios, but that's quite a small price to pay for relaxing the requirements of the core trait. Some other ramifications of this change are: * The `Future::boxed` and `Stream::boxed` methods now require `Self` to adhere to `Send`. This is expected to be the most common case, and in the less common case of not-`Send` `Box::new` can be used. * Two new type aliases, `BoxFuture` and `BoxStream` have been introduced to assist in writing APIs that return a trait object which is `Send`. Both of these type aliases package in the `Send` bound. * A new `LoopPin` type, added in the previous commit, can be used to easily generate handles that can be used to pin futures to an event loop without having a literal reference to the event loop itself. --- src/bin/sink.rs | 3 ++- src/event_loop.rs | 30 +++++++++++++++++++++++++++--- src/tcp.rs | 14 +++++++------- src/timeout.rs | 4 ++-- src/udp.rs | 6 +++--- 5 files changed, 41 insertions(+), 16 deletions(-) diff --git a/src/bin/sink.rs b/src/bin/sink.rs index a506833e3..9f81b30c0 100644 --- a/src/bin/sink.rs +++ b/src/bin/sink.rs @@ -14,6 +14,7 @@ use std::net::SocketAddr; use futures::Future; use futures::stream::Stream; +use futures_io::IoFuture; fn main() { let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); @@ -33,7 +34,7 @@ fn main() { l.run(server).unwrap(); } -fn write(socket: futures_mio::TcpStream) -> Box> { +fn write(socket: futures_mio::TcpStream) -> IoFuture<()> { static BUF: &'static [u8] = &[0; 64 * 1024]; socket.into_future().map_err(|e| e.0).and_then(move |(ready, mut socket)| { let ready = match ready { diff --git a/src/event_loop.rs b/src/event_loop.rs index 9727ee2d9..aab1062a3 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -193,10 +193,10 @@ impl Loop { pub fn run(&mut self, f: F) -> Result { let (tx_res, rx_res) = mpsc::channel(); let handle = self.handle(); - f.then(move |res| { + self.add_loop_data(f.then(move |res| { handle.shutdown(); tx_res.send(res) - }).forget(); + })).forget(); self._run(); @@ -780,6 +780,30 @@ impl LoopData { } } +impl Future for LoopData { + type Item = A::Item; + type Error = A::Error; + + fn poll(&mut self, task: &mut Task) -> Poll { + // If we're on the right thread, then we can proceed. Otherwise we need + // to go and get polled on the right thread. + if let Some(inner) = self.get_mut() { + return inner.poll(task) + } + task.poll_on(self.executor()); + Poll::NotReady + } + + fn schedule(&mut self, task: &mut Task) { + // If we're on the right thread, then we're good to go, otherwise we + // need to get poll'd to tell the task to move somewhere else. + match self.get_mut() { + Some(inner) => inner.schedule(task), + None => task.notify(), + } + } +} + impl Drop for LoopData { fn drop(&mut self) { // The `DropBox` we store internally will cause a memory leak if it's @@ -944,7 +968,7 @@ struct LoopFuture { } impl LoopFuture - where T: Send + 'static, + where T: 'static, { fn poll(&mut self, f: F) -> Poll where F: FnOnce(&Loop, U) -> io::Result, diff --git a/src/tcp.rs b/src/tcp.rs index 7fb28b21e..0f8384981 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -24,7 +24,7 @@ pub struct TcpListener { impl TcpListener { fn new(listener: mio::tcp::TcpListener, - handle: LoopHandle) -> Box> { + handle: LoopHandle) -> IoFuture { let listener = Arc::new(Source::new(listener)); ReadinessStream::new(handle.clone(), listener.clone()).map(|r| { TcpListener { @@ -64,7 +64,7 @@ impl TcpListener { /// well (same for IPv6). pub fn from_listener(listener: net::TcpListener, addr: &SocketAddr, - handle: LoopHandle) -> Box> { + handle: LoopHandle) -> IoFuture { mio::tcp::TcpListener::from_listener(listener, addr) .into_future() .and_then(|l| TcpListener::new(l, handle)) @@ -84,7 +84,7 @@ impl TcpListener { /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - pub fn incoming(self) -> Box> { + pub fn incoming(self) -> IoStream<(TcpStream, SocketAddr)> { let TcpListener { loop_handle, listener, ready } = self; ready @@ -169,7 +169,7 @@ impl LoopHandle { /// The TCP listener will bind to the provided `addr` address, if available, /// and will be returned as a future. The returned future, if resolved /// successfully, can then be used to accept incoming connections. - pub fn tcp_listen(self, addr: &SocketAddr) -> Box> { + pub fn tcp_listen(self, addr: &SocketAddr) -> IoFuture { match mio::tcp::TcpListener::bind(addr) { Ok(l) => TcpListener::new(l, self), Err(e) => failed(e).boxed(), @@ -183,7 +183,7 @@ impl LoopHandle { /// stream has successfully connected. If an error happens during the /// connection or during the socket creation, that error will be returned to /// the future instead. - pub fn tcp_connect(self, addr: &SocketAddr) -> Box> { + pub fn tcp_connect(self, addr: &SocketAddr) -> IoFuture { match mio::tcp::TcpStream::connect(addr) { Ok(tcp) => TcpStream::new(tcp, self), Err(e) => failed(e).boxed(), @@ -194,7 +194,7 @@ impl LoopHandle { impl TcpStream { fn new(connected_stream: mio::tcp::TcpStream, handle: LoopHandle) - -> Box> { + -> IoFuture { // Once we've connected, wait for the stream to be writable as that's // when the actual connection has been initiated. Once we're writable we // check for `take_socket_error` to see if the connect actually hit an @@ -230,7 +230,7 @@ impl TcpStream { /// (perhaps to `INADDR_ANY`) before this method is called. pub fn connect_stream(stream: net::TcpStream, addr: &SocketAddr, - handle: LoopHandle) -> Box> { + handle: LoopHandle) -> IoFuture { match mio::tcp::TcpStream::connect_stream(stream, addr) { Ok(tcp) => TcpStream::new(tcp, handle), Err(e) => failed(e).boxed(), diff --git a/src/timeout.rs b/src/timeout.rs index e8ef7a225..c0f6eb4db 100644 --- a/src/timeout.rs +++ b/src/timeout.rs @@ -26,7 +26,7 @@ impl LoopHandle { /// This function will return a future that will resolve to the actual /// timeout object. The timeout object itself is then a future which will be /// set to fire at the specified point in the future. - pub fn timeout(self, dur: Duration) -> Box> { + pub fn timeout(self, dur: Duration) -> IoFuture { self.timeout_at(Instant::now() + dur) } @@ -35,7 +35,7 @@ impl LoopHandle { /// This function will return a future that will resolve to the actual /// timeout object. The timeout object itself is then a future which will be /// set to fire at the specified point in the future. - pub fn timeout_at(self, at: Instant) -> Box> { + pub fn timeout_at(self, at: Instant) -> IoFuture { self.add_timeout(at).map(move |token| { Timeout { at: at, diff --git a/src/udp.rs b/src/udp.rs index 2c0009aaf..dd2b3d861 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -25,7 +25,7 @@ impl LoopHandle { /// `addr` provided. The returned future will be resolved once the socket /// has successfully bound. If an error happens during the binding or during /// the socket creation, that error will be returned to the future instead. - pub fn udp_bind(self, addr: &SocketAddr) -> Box> { + pub fn udp_bind(self, addr: &SocketAddr) -> IoFuture { match mio::udp::UdpSocket::bind(addr) { Ok(udp) => UdpSocket::new(udp, self), Err(e) => failed(e).boxed(), @@ -35,7 +35,7 @@ impl LoopHandle { impl UdpSocket { fn new(socket: mio::udp::UdpSocket, handle: LoopHandle) - -> Box> { + -> IoFuture { let socket = Arc::new(Source::new(socket)); ReadinessStream::new(handle, socket.clone()).map(|ready| { UdpSocket { @@ -55,7 +55,7 @@ impl UdpSocket { /// configure a socket before it's handed off, such as setting options like /// `reuse_address` or binding to multiple addresses. pub fn from_socket(socket: net::UdpSocket, - handle: LoopHandle) -> Box> { + handle: LoopHandle) -> IoFuture { match mio::udp::UdpSocket::from_socket(socket) { Ok(tcp) => UdpSocket::new(tcp, handle), Err(e) => failed(e).boxed(),