Remove Send from Future/Stream

This bound existed for two primary reasons, both detail below, and both of which
have now been solved.

One of the primary reasons this existed was due to the presence of `tailcall`.
Each standard combinator will call `tailcall` as appropriate, storing the
resulting trait object. Storing trait objects influences the applicatoin of the
`Send` and `Sync` bounds normally, but a key insight here is that we're not
storing trait objects but rather just pieces of otherwise internal futures.

With this insight the main storage for these futures, `Collapsed`, could simply
implement `Send` so long as the future itself originally implemented `Send`.
This in turn means that `tailcall` must be an `unsafe` method, but it seems well
worth the benefit of relaxing the `Send` bound.

The second primary reason for this bound was so the `Task` itself could be send.
This is critical for ensuring that futures can receive notifications from
multiple threads (e.g. be a future waiting on sources of multiple events).
Another key insight here, however, is that only the *outer* future needs to be
`Send`. We already have a solution, with `LoopData`, to make non-`Send` data
`Send`. By implementing `Future` directly for `LoopData<F: Future>`, this means
that it's trivial to make any future sendable by simply pinning it to an event
loop!

With these two pieces combined, it means that `Send` is no longer needed as a
bound on the `Future` and `Stream` traits. It may practically mean that
`LoopData` is used commonly in some scenarios, but that's quite a small price to
pay for relaxing the requirements of the core trait.

Some other ramifications of this change are:

* The `Future::boxed` and `Stream::boxed` methods now require `Self` to adhere
  to `Send`. This is expected to be the most common case, and in the less common
  case of not-`Send` `Box::new` can be used.
* Two new type aliases, `BoxFuture` and `BoxStream` have been introduced to
  assist in writing APIs that return a trait object which is `Send`. Both of
  these type aliases package in the `Send` bound.
* A new `LoopPin` type, added in the previous commit, can be used to easily
  generate handles that can be used to pin futures to an event loop without
  having a literal reference to the event loop itself.
This commit is contained in:
Alex Crichton 2016-08-12 11:54:19 -07:00
parent 9911f421eb
commit b508964db6
5 changed files with 41 additions and 16 deletions

View File

@ -14,6 +14,7 @@ use std::net::SocketAddr;
use futures::Future;
use futures::stream::Stream;
use futures_io::IoFuture;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
@ -33,7 +34,7 @@ fn main() {
l.run(server).unwrap();
}
fn write(socket: futures_mio::TcpStream) -> Box<futures_io::IoFuture<()>> {
fn write(socket: futures_mio::TcpStream) -> IoFuture<()> {
static BUF: &'static [u8] = &[0; 64 * 1024];
socket.into_future().map_err(|e| e.0).and_then(move |(ready, mut socket)| {
let ready = match ready {

View File

@ -193,10 +193,10 @@ impl Loop {
pub fn run<F: Future>(&mut self, f: F) -> Result<F::Item, F::Error> {
let (tx_res, rx_res) = mpsc::channel();
let handle = self.handle();
f.then(move |res| {
self.add_loop_data(f.then(move |res| {
handle.shutdown();
tx_res.send(res)
}).forget();
})).forget();
self._run();
@ -780,6 +780,30 @@ impl<A: 'static> LoopData<A> {
}
}
impl<A: Future> Future for LoopData<A> {
type Item = A::Item;
type Error = A::Error;
fn poll(&mut self, task: &mut Task) -> Poll<A::Item, A::Error> {
// If we're on the right thread, then we can proceed. Otherwise we need
// to go and get polled on the right thread.
if let Some(inner) = self.get_mut() {
return inner.poll(task)
}
task.poll_on(self.executor());
Poll::NotReady
}
fn schedule(&mut self, task: &mut Task) {
// If we're on the right thread, then we're good to go, otherwise we
// need to get poll'd to tell the task to move somewhere else.
match self.get_mut() {
Some(inner) => inner.schedule(task),
None => task.notify(),
}
}
}
impl<A: 'static> Drop for LoopData<A> {
fn drop(&mut self) {
// The `DropBox` we store internally will cause a memory leak if it's
@ -944,7 +968,7 @@ struct LoopFuture<T, U> {
}
impl<T, U> LoopFuture<T, U>
where T: Send + 'static,
where T: 'static,
{
fn poll<F>(&mut self, f: F) -> Poll<T, io::Error>
where F: FnOnce(&Loop, U) -> io::Result<T>,

View File

@ -24,7 +24,7 @@ pub struct TcpListener {
impl TcpListener {
fn new(listener: mio::tcp::TcpListener,
handle: LoopHandle) -> Box<IoFuture<TcpListener>> {
handle: LoopHandle) -> IoFuture<TcpListener> {
let listener = Arc::new(Source::new(listener));
ReadinessStream::new(handle.clone(), listener.clone()).map(|r| {
TcpListener {
@ -64,7 +64,7 @@ impl TcpListener {
/// well (same for IPv6).
pub fn from_listener(listener: net::TcpListener,
addr: &SocketAddr,
handle: LoopHandle) -> Box<IoFuture<TcpListener>> {
handle: LoopHandle) -> IoFuture<TcpListener> {
mio::tcp::TcpListener::from_listener(listener, addr)
.into_future()
.and_then(|l| TcpListener::new(l, handle))
@ -84,7 +84,7 @@ impl TcpListener {
///
/// This method returns an implementation of the `Stream` trait which
/// resolves to the sockets the are accepted on this listener.
pub fn incoming(self) -> Box<IoStream<(TcpStream, SocketAddr)>> {
pub fn incoming(self) -> IoStream<(TcpStream, SocketAddr)> {
let TcpListener { loop_handle, listener, ready } = self;
ready
@ -169,7 +169,7 @@ impl LoopHandle {
/// The TCP listener will bind to the provided `addr` address, if available,
/// and will be returned as a future. The returned future, if resolved
/// successfully, can then be used to accept incoming connections.
pub fn tcp_listen(self, addr: &SocketAddr) -> Box<IoFuture<TcpListener>> {
pub fn tcp_listen(self, addr: &SocketAddr) -> IoFuture<TcpListener> {
match mio::tcp::TcpListener::bind(addr) {
Ok(l) => TcpListener::new(l, self),
Err(e) => failed(e).boxed(),
@ -183,7 +183,7 @@ impl LoopHandle {
/// stream has successfully connected. If an error happens during the
/// connection or during the socket creation, that error will be returned to
/// the future instead.
pub fn tcp_connect(self, addr: &SocketAddr) -> Box<IoFuture<TcpStream>> {
pub fn tcp_connect(self, addr: &SocketAddr) -> IoFuture<TcpStream> {
match mio::tcp::TcpStream::connect(addr) {
Ok(tcp) => TcpStream::new(tcp, self),
Err(e) => failed(e).boxed(),
@ -194,7 +194,7 @@ impl LoopHandle {
impl TcpStream {
fn new(connected_stream: mio::tcp::TcpStream,
handle: LoopHandle)
-> Box<IoFuture<TcpStream>> {
-> IoFuture<TcpStream> {
// Once we've connected, wait for the stream to be writable as that's
// when the actual connection has been initiated. Once we're writable we
// check for `take_socket_error` to see if the connect actually hit an
@ -230,7 +230,7 @@ impl TcpStream {
/// (perhaps to `INADDR_ANY`) before this method is called.
pub fn connect_stream(stream: net::TcpStream,
addr: &SocketAddr,
handle: LoopHandle) -> Box<IoFuture<TcpStream>> {
handle: LoopHandle) -> IoFuture<TcpStream> {
match mio::tcp::TcpStream::connect_stream(stream, addr) {
Ok(tcp) => TcpStream::new(tcp, handle),
Err(e) => failed(e).boxed(),

View File

@ -26,7 +26,7 @@ impl LoopHandle {
/// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn timeout(self, dur: Duration) -> Box<IoFuture<Timeout>> {
pub fn timeout(self, dur: Duration) -> IoFuture<Timeout> {
self.timeout_at(Instant::now() + dur)
}
@ -35,7 +35,7 @@ impl LoopHandle {
/// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn timeout_at(self, at: Instant) -> Box<IoFuture<Timeout>> {
pub fn timeout_at(self, at: Instant) -> IoFuture<Timeout> {
self.add_timeout(at).map(move |token| {
Timeout {
at: at,

View File

@ -25,7 +25,7 @@ impl LoopHandle {
/// `addr` provided. The returned future will be resolved once the socket
/// has successfully bound. If an error happens during the binding or during
/// the socket creation, that error will be returned to the future instead.
pub fn udp_bind(self, addr: &SocketAddr) -> Box<IoFuture<UdpSocket>> {
pub fn udp_bind(self, addr: &SocketAddr) -> IoFuture<UdpSocket> {
match mio::udp::UdpSocket::bind(addr) {
Ok(udp) => UdpSocket::new(udp, self),
Err(e) => failed(e).boxed(),
@ -35,7 +35,7 @@ impl LoopHandle {
impl UdpSocket {
fn new(socket: mio::udp::UdpSocket, handle: LoopHandle)
-> Box<IoFuture<UdpSocket>> {
-> IoFuture<UdpSocket> {
let socket = Arc::new(Source::new(socket));
ReadinessStream::new(handle, socket.clone()).map(|ready| {
UdpSocket {
@ -55,7 +55,7 @@ impl UdpSocket {
/// configure a socket before it's handed off, such as setting options like
/// `reuse_address` or binding to multiple addresses.
pub fn from_socket(socket: net::UdpSocket,
handle: LoopHandle) -> Box<IoFuture<UdpSocket>> {
handle: LoopHandle) -> IoFuture<UdpSocket> {
match mio::udp::UdpSocket::from_socket(socket) {
Ok(tcp) => UdpSocket::new(tcp, handle),
Err(e) => failed(e).boxed(),