mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
tcp: Express incoming
in terms of accept
This commit is contained in:
parent
936727fa52
commit
1864ecc46f
@ -145,38 +145,17 @@ impl TcpListener {
|
||||
}
|
||||
|
||||
impl Stream for MyIncoming {
|
||||
type Item = (mio::tcp::TcpStream, SocketAddr);
|
||||
type Item = (TcpStream, SocketAddr);
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
|
||||
if let Async::NotReady = self.inner.io.poll_read() {
|
||||
return Ok(Async::NotReady)
|
||||
}
|
||||
match self.inner.io.get_ref().accept() {
|
||||
Ok(pair) => Ok(Async::Ready(Some(pair))),
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.inner.io.need_read();
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
Err(e) => Err(e)
|
||||
}
|
||||
Ok(Async::Ready(Some(try_nb!(self.inner.accept()))))
|
||||
}
|
||||
}
|
||||
|
||||
let remote = self.io.remote().clone();
|
||||
let stream = MyIncoming { inner: self };
|
||||
Incoming {
|
||||
inner: stream.and_then(move |(tcp, addr)| {
|
||||
let (tx, rx) = futures::oneshot();
|
||||
remote.spawn(move |handle| {
|
||||
let res = PollEvented::new(tcp, handle).map(move |io| {
|
||||
(TcpStream { io: io }, addr)
|
||||
});
|
||||
tx.complete(res);
|
||||
Ok(())
|
||||
});
|
||||
rx.then(|r| r.expect("shouldn't be canceled"))
|
||||
}).boxed(),
|
||||
inner: stream.boxed(),
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user