Track futures tokio-reform branch (#88)

This patch also updates tests and examples to remove deprecated API
usage.
This commit is contained in:
Carl Lerche 2018-02-01 10:31:07 -08:00 committed by GitHub
parent b9db119b45
commit 2e94b658ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 66 additions and 57 deletions

View File

@ -52,4 +52,5 @@ serde_json = "1.0"
time = "0.1" time = "0.1"
[patch.crates-io] [patch.crates-io]
futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" }
mio = { git = "https://github.com/carllerche/mio" } mio = { git = "https://github.com/carllerche/mio" }

View File

@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use futures::Future; use futures::Future;
use futures::future::Executor; use futures::future::{self, Executor};
use futures::stream::{self, Stream}; use futures::stream::{self, Stream};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -134,5 +134,5 @@ fn main() {
}); });
// execute server // execute server
srv.wait().unwrap(); future::blocking(srv).wait().unwrap();
} }

View File

@ -29,7 +29,7 @@ use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Stream, Poll}; use futures::{Future, Stream, Poll};
use futures::future::Executor; use futures::future::{self, Executor};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -62,7 +62,7 @@ fn main() {
Ok(()) Ok(())
}); });
server.wait().unwrap(); future::blocking(server).wait().unwrap();
} }
/// The main workhorse of this example. This'll compress all data read from /// The main workhorse of this example. This'll compress all data read from

View File

@ -26,7 +26,7 @@ use std::net::SocketAddr;
use std::thread; use std::thread;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Sink, Future, Stream}; use futures::{future, Sink, Stream};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
fn main() { fn main() {
@ -71,9 +71,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't // loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful. // otherwise running anything useful.
let mut out = io::stdout(); let mut out = io::stdout();
stdout.for_each(|chunk| { future::blocking(stdout.for_each(|chunk| {
out.write_all(&chunk) out.write_all(&chunk)
}).wait().unwrap(); })).wait().unwrap();
} }
mod tcp { mod tcp {
@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
Ok(n) => n, Ok(n) => n,
}; };
buf.truncate(n); buf.truncate(n);
tx = match tx.send(buf).wait() { tx = match future::blocking(tx.send(buf)).wait() {
Ok(tx) => tx, Ok(tx) => tx,
Err(_) => break, Err(_) => break,
}; };

View File

@ -24,7 +24,7 @@ use std::net::SocketAddr;
use std::thread; use std::thread;
use futures::prelude::*; use futures::prelude::*;
use futures::future::Executor; use futures::future::{self, Executor};
use futures::sync::mpsc; use futures::sync::mpsc;
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -61,7 +61,7 @@ fn main() {
next = (next + 1) % channels.len(); next = (next + 1) % channels.len();
Ok(()) Ok(())
}); });
srv.wait().unwrap(); future::blocking(srv).wait().unwrap();
} }
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
Ok(()) Ok(())
}); });
done.wait().unwrap(); future::blocking(done).wait().unwrap();
} }

View File

@ -18,7 +18,7 @@ extern crate tokio_io;
use std::{env, io}; use std::{env, io};
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Poll}; use futures::{future, Future, Poll};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
struct Server { struct Server {
@ -58,9 +58,9 @@ fn main() {
// Next we'll create a future to spawn (the one we defined above) and then // Next we'll create a future to spawn (the one we defined above) and then
// we'll block our current thread waiting on the result of the future // we'll block our current thread waiting on the result of the future
Server { future::blocking(Server {
socket: socket, socket: socket,
buf: vec![0; 1024], buf: vec![0; 1024],
to_send: None, to_send: None,
}.wait().unwrap(); }).wait().unwrap();
} }

View File

@ -26,7 +26,7 @@ use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::Future; use futures::Future;
use futures::future::Executor; use futures::future::{self, Executor};
use futures::stream::Stream; use futures::stream::Stream;
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -114,5 +114,5 @@ fn main() {
// And finally now that we've define what our server is, we run it! Here we // And finally now that we've define what our server is, we run it! Here we
// just need to execute the future we've created and wait for it to complete // just need to execute the future we've created and wait for it to complete
// using the standard methods in the `futures` crate. // using the standard methods in the `futures` crate.
done.wait().unwrap(); future::blocking(done).wait().unwrap();
} }

View File

@ -19,6 +19,7 @@ extern crate tokio_io;
use std::env; use std::env;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::future;
use futures::prelude::*; use futures::prelude::*;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -40,5 +41,5 @@ fn main() {
Ok(()) Ok(())
}); });
server.wait().unwrap(); future::blocking(server).wait().unwrap();
} }

View File

@ -28,7 +28,7 @@ use std::io::{self, Read, Write};
use futures::stream::Stream; use futures::stream::Stream;
use futures::{Future, Poll}; use futures::{Future, Poll};
use futures::future::Executor; use futures::future::{self, Executor};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
@ -92,7 +92,7 @@ fn main() {
Ok(()) Ok(())
}); });
done.wait().unwrap(); future::blocking(done).wait().unwrap();
} }
// This is a custom type used to have a custom implementation of the // This is a custom type used to have a custom implementation of the

View File

@ -26,7 +26,7 @@ use std::iter;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::Future; use futures::Future;
use futures::future::Executor; use futures::future::{self, Executor};
use futures::stream::{self, Stream}; use futures::stream::{self, Stream};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio_io::IoFuture; use tokio_io::IoFuture;
@ -46,7 +46,7 @@ fn main() {
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(()) Ok(())
}); });
server.wait().unwrap(); future::blocking(server).wait().unwrap();
} }
fn write(socket: TcpStream) -> IoFuture<()> { fn write(socket: TcpStream) -> IoFuture<()> {

View File

@ -51,7 +51,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use futures::prelude::*; use futures::prelude::*;
use futures::future::Executor; use futures::future::{self, Executor};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -160,7 +160,7 @@ fn main() {
Ok(()) Ok(())
}); });
done.wait().unwrap(); future::blocking(done).wait().unwrap();
} }
impl Request { impl Request {

View File

@ -31,8 +31,7 @@ use std::net::{self, SocketAddr};
use std::thread; use std::thread;
use bytes::BytesMut; use bytes::BytesMut;
use futures::future::Executor; use futures::future::{self, Executor};
use futures::future;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Stream, Future, Sink}; use futures::{Stream, Future, Sink};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
@ -91,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
})).unwrap(); })).unwrap();
Ok(()) Ok(())
}); });
done.wait().unwrap(); future::blocking(done).wait().unwrap();
} }
/// "Server logic" is implemented in this function. /// "Server logic" is implemented in this function.

View File

@ -15,7 +15,7 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Stream, Sink}; use futures::{Future, Stream, Sink};
use futures::future::Executor; use futures::future::{self, Executor};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec}; use tokio::net::{UdpSocket, UdpCodec};
@ -76,5 +76,5 @@ fn main() {
// Spawn the sender of pongs and then wait for our pinger to finish. // Spawn the sender of pongs and then wait for our pinger to finish.
pool.execute(b.then(|_| Ok(()))).unwrap(); pool.execute(b.then(|_| Ok(()))).unwrap();
drop(a.wait()); drop(future::blocking(a).wait());
} }

View File

@ -8,6 +8,7 @@ use std::thread;
use std::io::{Read, Write, BufReader, BufWriter}; use std::io::{Read, Write, BufReader, BufWriter};
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_io::io::copy; use tokio_io::io::copy;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -54,7 +55,7 @@ fn echo_server() {
copy(a, b) copy(a, b)
}); });
let (amt, _, _) = t!(copied.wait()); let (amt, _, _) = t!(blocking(copied).wait());
let (expected, t2) = t.join().unwrap(); let (expected, t2) = t.join().unwrap();
let actual = t2.join().unwrap(); let actual = t2.join().unwrap();

View File

@ -7,6 +7,7 @@ use std::thread;
use std::io::{Write, Read}; use std::io::{Write, Read};
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_io::io::read_to_end; use tokio_io::io::read_to_end;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -42,7 +43,7 @@ fn chain_clients() {
read_to_end(a.chain(b).chain(c), Vec::new()) read_to_end(a.chain(b).chain(c), Vec::new())
}); });
let (_, data) = t!(copied.wait()); let (_, data) = t!(blocking(copied).wait());
t.join().unwrap(); t.join().unwrap();
assert_eq!(data, b"foo bar baz"); assert_eq!(data, b"foo bar baz");

View File

@ -4,7 +4,7 @@ extern crate futures;
use std::thread; use std::thread;
use std::net; use std::net;
use futures::future; use futures::{future, stream};
use futures::prelude::*; use futures::prelude::*;
use futures::sync::oneshot; use futures::sync::oneshot;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -17,7 +17,7 @@ fn tcp_doesnt_block() {
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let listener = TcpListener::from_std(listener, &handle).unwrap(); let listener = TcpListener::from_std(listener, &handle).unwrap();
drop(core); drop(core);
assert!(listener.incoming().wait().next().unwrap().is_err()); assert!(stream::blocking(listener.incoming()).next().unwrap().is_err());
} }
#[test] #[test]
@ -34,9 +34,9 @@ fn drop_wakes() {
drop(tx); drop(tx);
future::ok(()) future::ok(())
}); });
assert!(new_socket.join(drop_tx).wait().is_err()); assert!(future::blocking(new_socket.join(drop_tx)).wait().is_err());
}); });
drop(rx.wait()); drop(future::blocking(rx).wait());
drop(core); drop(core);
t.join().unwrap(); t.join().unwrap();
} }

View File

@ -8,6 +8,7 @@ use std::net::TcpStream;
use std::thread; use std::thread;
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -44,7 +45,7 @@ fn echo_server() {
let halves = client.map(|s| s.split()); let halves = client.map(|s| s.split());
let copied = halves.and_then(|(a, b)| copy(a, b)); let copied = halves.and_then(|(a, b)| copy(a, b));
let (amt, _, _) = t!(copied.wait()); let (amt, _, _) = t!(blocking(copied).wait());
t.join().unwrap(); t.join().unwrap();
assert_eq!(amt, msg.len() as u64 * 1024); assert_eq!(amt, msg.len() as u64 * 1024);

View File

@ -3,6 +3,7 @@ extern crate tokio;
use std::thread; use std::thread;
use futures::future::blocking;
use futures::prelude::*; use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener}; use tokio::net::{TcpStream, TcpListener};
@ -23,7 +24,7 @@ fn hammer() {
let theirs = srv.incoming().into_future() let theirs = srv.incoming().into_future()
.map(|(s, _)| s.unwrap()) .map(|(s, _)| s.unwrap())
.map_err(|(s, _)| s); .map_err(|(s, _)| s);
let (mine, theirs) = t!(mine.join(theirs).wait()); let (mine, theirs) = t!(blocking(mine.join(theirs)).wait());
assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));

View File

@ -7,6 +7,7 @@ use std::thread;
use std::io::{Write, Read}; use std::io::{Write, Read};
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_io::io::read_to_end; use tokio_io::io::read_to_end;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -36,7 +37,7 @@ fn limit() {
read_to_end(a.take(4), Vec::new()) read_to_end(a.take(4), Vec::new())
}); });
let (_, data) = t!(copied.wait()); let (_, data) = t!(blocking(copied).wait());
t.join().unwrap(); t.join().unwrap();
assert_eq!(data, b"foo "); assert_eq!(data, b"foo ");

View File

@ -10,7 +10,7 @@ use std::net::Shutdown;
use bytes::{BytesMut, BufMut}; use bytes::{BytesMut, BufMut};
use futures::{Future, Stream, Sink}; use futures::{Future, Stream, Sink};
use futures::future::Executor; use futures::future::{blocking, Executor};
use futures_cpupool::CpuPool; use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_io::codec::{Encoder, Decoder}; use tokio_io::codec::{Encoder, Decoder};
@ -68,20 +68,20 @@ fn echo() {
pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap();
let client = TcpStream::connect(&addr); let client = TcpStream::connect(&addr);
let client = client.wait().unwrap(); let client = blocking(client).wait().unwrap();
let (client, _) = write_all(client, b"a\n").wait().unwrap(); let (client, _) = blocking(write_all(client, b"a\n")).wait().unwrap();
let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); let (client, buf, amt) = blocking(read(client, vec![0; 1024])).wait().unwrap();
assert_eq!(amt, 2); assert_eq!(amt, 2);
assert_eq!(&buf[..2], b"a\n"); assert_eq!(&buf[..2], b"a\n");
let (client, _) = write_all(client, b"\n").wait().unwrap(); let (client, _) = blocking(write_all(client, b"\n")).wait().unwrap();
let (client, buf, amt) = read(client, buf).wait().unwrap(); let (client, buf, amt) = blocking(read(client, buf)).wait().unwrap();
assert_eq!(amt, 1); assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"\n"); assert_eq!(&buf[..1], b"\n");
let (client, _) = write_all(client, b"b").wait().unwrap(); let (client, _) = blocking(write_all(client, b"b")).wait().unwrap();
client.shutdown(Shutdown::Write).unwrap(); client.shutdown(Shutdown::Write).unwrap();
let (_client, buf, amt) = read(client, buf).wait().unwrap(); let (_client, buf, amt) = blocking(read(client, buf)).wait().unwrap();
assert_eq!(amt, 1); assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"b"); assert_eq!(&buf[..1], b"b");
} }

View File

@ -13,7 +13,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use futures::prelude::*; use futures::future::blocking;
use mio::event::Evented; use mio::event::Evented;
use mio::unix::{UnixReady, EventedFd}; use mio::unix::{UnixReady, EventedFd};
use mio::{PollOpt, Ready, Token}; use mio::{PollOpt, Ready, Token};
@ -81,7 +81,7 @@ fn hup() {
let source = PollEvented::new(MyFile::new(read), &handle).unwrap(); let source = PollEvented::new(MyFile::new(read), &handle).unwrap();
let reader = read_to_end(source, Vec::new()); let reader = read_to_end(source, Vec::new());
let (_, content) = t!(reader.wait()); let (_, content) = t!(blocking(reader).wait());
assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]);
t.join().unwrap(); t.join().unwrap();
} }

View File

@ -8,6 +8,7 @@ use std::net::TcpStream;
use std::thread; use std::thread;
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio_io::io::copy; use tokio_io::io::copy;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -48,7 +49,7 @@ fn echo_server() {
.take(2) .take(2)
.collect(); .collect();
t!(future.wait()); t!(blocking(future).wait());
t.join().unwrap(); t.join().unwrap();
} }

View File

@ -7,6 +7,7 @@ use std::sync::mpsc::channel;
use std::thread; use std::thread;
use futures::Future; use futures::Future;
use futures::future::blocking;
use futures::stream::Stream; use futures::stream::Stream;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
@ -27,7 +28,7 @@ fn connect() {
}); });
let stream = TcpStream::connect(&addr); let stream = TcpStream::connect(&addr);
let mine = t!(stream.wait()); let mine = t!(blocking(stream).wait());
let theirs = t.join().unwrap(); let theirs = t.join().unwrap();
assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
@ -50,7 +51,7 @@ fn accept() {
net::TcpStream::connect(&addr).unwrap() net::TcpStream::connect(&addr).unwrap()
}); });
let (mine, _remaining) = t!(client.wait()); let (mine, _remaining) = t!(blocking(client).wait());
let mine = mine.unwrap(); let mine = mine.unwrap();
let theirs = t.join().unwrap(); let theirs = t.join().unwrap();
@ -75,7 +76,7 @@ fn accept2() {
}).into_future().map_err(|e| e.0); }).into_future().map_err(|e| e.0);
assert!(rx.try_recv().is_err()); assert!(rx.try_recv().is_err());
let (mine, _remaining) = t!(client.wait()); let (mine, _remaining) = t!(blocking(client).wait());
mine.unwrap(); mine.unwrap();
t.join().unwrap(); t.join().unwrap();
} }

View File

@ -7,6 +7,7 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use futures::{Future, Poll, Stream, Sink}; use futures::{Future, Poll, Stream, Sink};
use futures::future::blocking;
use tokio::net::{UdpSocket, UdpCodec}; use tokio::net::{UdpSocket, UdpCodec};
macro_rules! t { macro_rules! t {
@ -25,7 +26,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
{ {
let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
let (sendt, received) = t!(send.join(recv).wait()); let (sendt, received) = t!(blocking(send.join(recv)).wait());
a = sendt; a = sendt;
b = received; b = received;
} }
@ -33,7 +34,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
{ {
let send = SendMessage::new(a, send, b_addr, b""); let send = SendMessage::new(a, send, b_addr, b"");
let recv = RecvMessage::new(b, recv, a_addr, b""); let recv = RecvMessage::new(b, recv, a_addr, b"");
t!(send.join(recv).wait()); t!(blocking(send.join(recv)).wait());
} }
} }
@ -172,7 +173,7 @@ fn send_dgrams() {
{ {
let send = a.send_dgram(&b"4321"[..], &b_addr); let send = a.send_dgram(&b"4321"[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]); let recv = b.recv_dgram(&mut buf[..]);
let (sendt, received) = t!(send.join(recv).wait()); let (sendt, received) = t!(blocking(send.join(recv)).wait());
assert_eq!(received.2, 4); assert_eq!(received.2, 4);
assert_eq!(&received.1[..4], b"4321"); assert_eq!(&received.1[..4], b"4321");
a = sendt.0; a = sendt.0;
@ -182,7 +183,7 @@ fn send_dgrams() {
{ {
let send = a.send_dgram(&b""[..], &b_addr); let send = a.send_dgram(&b""[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]); let recv = b.recv_dgram(&mut buf[..]);
let received = t!(send.join(recv).wait()).1; let received = t!(blocking(send.join(recv)).wait()).1;
assert_eq!(received.2, 0); assert_eq!(received.2, 0);
} }
} }
@ -225,7 +226,7 @@ fn send_framed() {
let send = a.send(&b"4567"[..]); let send = a.send(&b"4567"[..]);
let recv = b.into_future().map_err(|e| e.0); let recv = b.into_future().map_err(|e| e.0);
let (sendt, received) = t!(send.join(recv).wait()); let (sendt, received) = t!(blocking(send.join(recv)).wait());
assert_eq!(received.0, Some(())); assert_eq!(received.0, Some(()));
a_soc = sendt.into_inner(); a_soc = sendt.into_inner();
@ -238,7 +239,7 @@ fn send_framed() {
let send = a.send(&b""[..]); let send = a.send(&b""[..]);
let recv = b.into_future().map_err(|e| e.0); let recv = b.into_future().map_err(|e| e.0);
let received = t!(send.join(recv).wait()).1; let received = t!(blocking(send.join(recv)).wait()).1;
assert_eq!(received.0, Some(())); assert_eq!(received.0, Some(()));
} }
} }