net: switch socket methods to &self (#2934)

Switches various socket methods from &mut self to &self. This uses the intrusive
waker infrastructure to handle multiple waiters.

Refs: #2928
This commit is contained in:
Carl Lerche 2020-10-09 09:16:42 -07:00 committed by GitHub
parent 41ac1ae2bc
commit ee597347c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 38 additions and 127 deletions

View File

@ -81,7 +81,7 @@ impl ReadHalf<'_> {
/// ///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.0.poll_peek2(cx, buf) self.0.poll_peek(cx, buf)
} }
/// Receives data on the socket from the remote address to which it is /// Receives data on the socket from the remote address to which it is

View File

@ -136,7 +136,7 @@ impl OwnedReadHalf {
/// ///
/// [`TcpStream::poll_peek`]: TcpStream::poll_peek /// [`TcpStream::poll_peek`]: TcpStream::poll_peek
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.inner.poll_peek2(cx, buf) self.inner.poll_peek(cx, buf)
} }
/// Receives data on the socket from the remote address to which it is /// Receives data on the socket from the remote address to which it is

View File

@ -257,7 +257,7 @@ impl TcpStream {
/// ///
/// #[tokio::main] /// #[tokio::main]
/// async fn main() -> io::Result<()> { /// async fn main() -> io::Result<()> {
/// let mut stream = TcpStream::connect("127.0.0.1:8000").await?; /// let stream = TcpStream::connect("127.0.0.1:8000").await?;
/// let mut buf = [0; 10]; /// let mut buf = [0; 10];
/// ///
/// poll_fn(|cx| { /// poll_fn(|cx| {
@ -267,15 +267,7 @@ impl TcpStream {
/// Ok(()) /// Ok(())
/// } /// }
/// ``` /// ```
pub fn poll_peek(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> { pub fn poll_peek(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
self.poll_peek2(cx, buf)
}
pub(super) fn poll_peek2(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop { loop {
let ev = ready!(self.io.poll_read_ready(cx))?; let ev = ready!(self.io.poll_read_ready(cx))?;
@ -326,8 +318,10 @@ impl TcpStream {
/// ///
/// [`read`]: fn@crate::io::AsyncReadExt::read /// [`read`]: fn@crate::io::AsyncReadExt::read
/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
pub async fn peek(&mut self, buf: &mut [u8]) -> io::Result<usize> { pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_peek(cx, buf)).await self.io
.async_io(mio::Interest::READABLE, |io| io.peek(buf))
.await
} }
/// Shuts down the read, write, or both halves of this connection. /// Shuts down the read, write, or both halves of this connection.

View File

@ -314,7 +314,7 @@ impl UnixDatagram {
/// let bytes = b"bytes"; /// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned /// // We use a socket pair so that they are assigned
/// // each other as a peer. /// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?; /// let (first, second) = UnixDatagram::pair()?;
/// ///
/// let size = first.try_send(bytes)?; /// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len()); /// assert_eq!(size, bytes.len());
@ -327,7 +327,7 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub fn try_send(&mut self, buf: &[u8]) -> io::Result<usize> { pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
self.io.get_ref().send(buf) self.io.get_ref().send(buf)
} }
@ -346,10 +346,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap(); /// let tmp = tempdir().unwrap();
/// ///
/// let server_path = tmp.path().join("server"); /// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?; /// let server = UnixDatagram::bind(&server_path)?;
/// ///
/// let client_path = tmp.path().join("client"); /// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?; /// let client = UnixDatagram::bind(&client_path)?;
/// ///
/// let size = client.try_send_to(bytes, &server_path)?; /// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len()); /// assert_eq!(size, bytes.len());
@ -363,7 +363,7 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub fn try_send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> pub fn try_send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where where
P: AsRef<Path>, P: AsRef<Path>,
{ {
@ -413,7 +413,7 @@ impl UnixDatagram {
/// let bytes = b"bytes"; /// let bytes = b"bytes";
/// // We use a socket pair so that they are assigned /// // We use a socket pair so that they are assigned
/// // each other as a peer. /// // each other as a peer.
/// let (mut first, mut second) = UnixDatagram::pair()?; /// let (first, second) = UnixDatagram::pair()?;
/// ///
/// let size = first.try_send(bytes)?; /// let size = first.try_send(bytes)?;
/// assert_eq!(size, bytes.len()); /// assert_eq!(size, bytes.len());
@ -426,7 +426,7 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub fn try_recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.io.get_ref().recv(buf) self.io.get_ref().recv(buf)
} }
@ -531,10 +531,10 @@ impl UnixDatagram {
/// let tmp = tempdir().unwrap(); /// let tmp = tempdir().unwrap();
/// ///
/// let server_path = tmp.path().join("server"); /// let server_path = tmp.path().join("server");
/// let mut server = UnixDatagram::bind(&server_path)?; /// let server = UnixDatagram::bind(&server_path)?;
/// ///
/// let client_path = tmp.path().join("client"); /// let client_path = tmp.path().join("client");
/// let mut client = UnixDatagram::bind(&client_path)?; /// let client = UnixDatagram::bind(&client_path)?;
/// ///
/// let size = client.try_send_to(bytes, &server_path)?; /// let size = client.try_send_to(bytes, &server_path)?;
/// assert_eq!(size, bytes.len()); /// assert_eq!(size, bytes.len());
@ -548,7 +548,7 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub fn try_recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
let (n, addr) = self.io.get_ref().recv_from(buf)?; let (n, addr) = self.io.get_ref().recv_from(buf)?;
Ok((n, SocketAddr(addr))) Ok((n, SocketAddr(addr)))
} }

View File

@ -1,42 +0,0 @@
use crate::net::unix::{UnixListener, UnixStream};
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Stream of listeners
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Incoming<'a> {
inner: &'a mut UnixListener,
}
impl Incoming<'_> {
pub(crate) fn new(listener: &mut UnixListener) -> Incoming<'_> {
Incoming { inner: listener }
}
/// Attempts to poll `UnixStream` by polling inner `UnixListener` to accept
/// connection.
///
/// If `UnixListener` isn't ready yet, `Poll::Pending` is returned and
/// current task will be notified by a waker. Otherwise `Poll::Ready` with
/// `Result` containing `UnixStream` will be returned.
pub fn poll_accept(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<UnixStream>> {
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
Poll::Ready(Ok(socket))
}
}
#[cfg(feature = "stream")]
impl crate::stream::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.inner.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket)))
}
}

View File

@ -1,6 +1,5 @@
use crate::future::poll_fn;
use crate::io::PollEvented; use crate::io::PollEvented;
use crate::net::unix::{Incoming, SocketAddr, UnixStream}; use crate::net::unix::{SocketAddr, UnixStream};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt; use std::fmt;
@ -99,18 +98,26 @@ impl UnixListener {
} }
/// Accepts a new incoming connection to this listener. /// Accepts a new incoming connection to this listener.
pub async fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
poll_fn(|cx| self.poll_accept(cx)).await let (mio, addr) = self
.io
.async_io(mio::Interest::READABLE, |sock| sock.accept())
.await?;
let addr = SocketAddr(addr);
let stream = UnixStream::new(mio)?;
Ok((stream, addr))
} }
/// Polls to accept a new incoming connection to this listener. /// Polls to accept a new incoming connection to this listener.
/// ///
/// If there is no connection to accept, `Poll::Pending` is returned and /// If there is no connection to accept, `Poll::Pending` is returned and
/// the current task will be notified by a waker. /// the current task will be notified by a waker.
pub fn poll_accept( ///
&mut self, /// When ready, the most recent task that called `poll_accept` is notified.
cx: &mut Context<'_>, /// The caller is responsble to ensure that `poll_accept` is called from a
) -> Poll<io::Result<(UnixStream, SocketAddr)>> { /// single task. Failing to do this could result in tasks hanging.
pub fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<(UnixStream, SocketAddr)>> {
loop { loop {
let ev = ready!(self.io.poll_read_ready(cx))?; let ev = ready!(self.io.poll_read_ready(cx))?;
@ -127,57 +134,13 @@ impl UnixListener {
} }
} }
} }
/// Returns a stream over the connections being received on this listener.
///
/// Note that `UnixListener` also directly implements `Stream`.
///
/// The returned stream will never return `None` and will also not yield the
/// peer's `SocketAddr` structure. Iterating over it is equivalent to
/// calling accept in a loop.
///
/// # Errors
///
/// Note that accepting a connection can lead to various errors and not all
/// of them are necessarily fatal for example having too many open file
/// descriptors or the other side closing the connection while it waits in
/// an accept queue. These would terminate the stream if not handled in any
/// way.
///
/// # Examples
///
/// ```no_run
/// use tokio::net::UnixListener;
/// use tokio::stream::StreamExt;
///
/// #[tokio::main]
/// async fn main() {
/// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap();
/// let mut incoming = listener.incoming();
///
/// while let Some(stream) = incoming.next().await {
/// match stream {
/// Ok(stream) => {
/// println!("new client!");
/// }
/// Err(e) => { /* connection failed */ }
/// }
/// }
/// }
/// ```
pub fn incoming(&mut self) -> Incoming<'_> {
Incoming::new(self)
}
} }
#[cfg(feature = "stream")] #[cfg(feature = "stream")]
impl crate::stream::Stream for UnixListener { impl crate::stream::Stream for UnixListener {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next( fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let (socket, _) = ready!(self.poll_accept(cx))?; let (socket, _) = ready!(self.poll_accept(cx))?;
Poll::Ready(Some(Ok(socket))) Poll::Ready(Some(Ok(socket)))
} }

View File

@ -2,11 +2,7 @@
pub mod datagram; pub mod datagram;
mod incoming;
pub use incoming::Incoming;
pub(crate) mod listener; pub(crate) mod listener;
pub(crate) use listener::UnixListener;
mod split; mod split;
pub use split::{ReadHalf, WriteHalf}; pub use split::{ReadHalf, WriteHalf};

View File

@ -78,7 +78,7 @@ async fn try_send_recv_never_block() -> io::Result<()> {
let payload = b"PAYLOAD"; let payload = b"PAYLOAD";
let mut count = 0; let mut count = 0;
let (mut dgram1, mut dgram2) = UnixDatagram::pair()?; let (dgram1, dgram2) = UnixDatagram::pair()?;
// Send until we hit the OS `net.unix.max_dgram_qlen`. // Send until we hit the OS `net.unix.max_dgram_qlen`.
loop { loop {

View File

@ -15,7 +15,7 @@ async fn accept_read_write() -> std::io::Result<()> {
.unwrap(); .unwrap();
let sock_path = dir.path().join("connect.sock"); let sock_path = dir.path().join("connect.sock");
let mut listener = UnixListener::bind(&sock_path)?; let listener = UnixListener::bind(&sock_path)?;
let accept = listener.accept(); let accept = listener.accept();
let connect = UnixStream::connect(&sock_path); let connect = UnixStream::connect(&sock_path);
@ -42,7 +42,7 @@ async fn shutdown() -> std::io::Result<()> {
.unwrap(); .unwrap();
let sock_path = dir.path().join("connect.sock"); let sock_path = dir.path().join("connect.sock");
let mut listener = UnixListener::bind(&sock_path)?; let listener = UnixListener::bind(&sock_path)?;
let accept = listener.accept(); let accept = listener.accept();
let connect = UnixStream::connect(&sock_path); let connect = UnixStream::connect(&sock_path);