io: use intrusive wait list for I/O driver (#2828)

This refactors I/O registration in a few ways:

- Cleans up the cached readiness in `PollEvented`. This cache used to
  be helpful when readiness was a linked list of `*mut Node`s in
  `Registration`. Previous refactors have turned `Registration` into just
  an `AtomicUsize` holding the current readiness, so the cache is just
  extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
  which includes the driver tick. This event must be passed back into
  `clear_readiness`, so that the readiness is only cleared from `Registration`
  if the tick hasn't changed. Previously, it was possible to clear the
  readiness even though another thread had *just* polled the driver and
  found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
  wakers in an instrusive linked list. This allows an unbounded number
  of tasks to register for readiness (previously, only 1 per direction (read
  and write)). By using the intrusive linked list, there is no concern of
  leaking the storage of the wakers, since they are stored inside the `async fn`
  and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
  `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
  so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
  now take advantage of this new `async fn readiness`, such as `UdpSocket`
  and `UnixDatagram`.

Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.

After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.

Refs: #2779, #2728
This commit is contained in:
Sean McArthur 2020-09-23 13:02:15 -07:00 committed by GitHub
parent f25f12d576
commit a0557840eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 885 additions and 1426 deletions

View File

@ -150,11 +150,11 @@ jobs:
run: cargo install cargo-hack run: cargo install cargo-hack
- name: check --each-feature - name: check --each-feature
run: cargo hack check --all --each-feature -Z avoid-dev-deps run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
# Try with unstable feature flags # Try with unstable feature flags
- name: check --each-feature --unstable - name: check --each-feature --unstable
run: cargo hack check --all --each-feature -Z avoid-dev-deps run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
env: env:
RUSTFLAGS: --cfg tokio_unstable -Dwarnings RUSTFLAGS: --cfg tokio_unstable -Dwarnings

View File

@ -96,7 +96,6 @@ mod udp {
use std::error::Error; use std::error::Error;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
pub async fn connect( pub async fn connect(
@ -114,16 +113,15 @@ mod udp {
let socket = UdpSocket::bind(&bind_addr).await?; let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?; socket.connect(addr).await?;
let (mut r, mut w) = socket.split();
future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?; future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?;
Ok(()) Ok(())
} }
async fn send( async fn send(
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin, mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
writer: &mut SendHalf, writer: &UdpSocket,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await { while let Some(item) = stdin.next().await {
let buf = item?; let buf = item?;
@ -135,7 +133,7 @@ mod udp {
async fn recv( async fn recv(
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin, mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
reader: &mut RecvHalf, reader: &UdpSocket,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
loop { loop {
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];

View File

@ -26,7 +26,7 @@ struct Server {
impl Server { impl Server {
async fn run(self) -> Result<(), io::Error> { async fn run(self) -> Result<(), io::Error> {
let Server { let Server {
mut socket, socket,
mut buf, mut buf,
mut to_send, mut to_send,
} = self; } = self;

View File

@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
.parse()?; .parse()?;
let mut socket = UdpSocket::bind(local_addr).await?; let socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507; const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?; socket.connect(&remote_addr).await?;
let data = get_stdin_data()?; let data = get_stdin_data()?;

View File

@ -1,3 +1,8 @@
fn main() {}
// Disabled while future of UdpFramed is decided on.
// See https://github.com/tokio-rs/tokio/issues/2830
/*
//! This example leverages `BytesCodec` to create a UDP client and server which //! This example leverages `BytesCodec` to create a UDP client and server which
//! speak a custom protocol. //! speak a custom protocol.
//! //!
@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
Ok(()) Ok(())
} }
*/

View File

@ -25,11 +25,10 @@ publish = false
default = [] default = []
# Shorthand for enabling everything # Shorthand for enabling everything
full = ["codec", "udp", "compat", "io"] full = ["codec", "compat", "io"]
compat = ["futures-io",] compat = ["futures-io",]
codec = ["tokio/stream"] codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = [] io = []
[dependencies] [dependencies]

View File

@ -18,6 +18,7 @@ macro_rules! cfg_compat {
} }
} }
/*
macro_rules! cfg_udp { macro_rules! cfg_udp {
($($item:item)*) => { ($($item:item)*) => {
$( $(
@ -27,6 +28,7 @@ macro_rules! cfg_udp {
)* )*
} }
} }
*/
macro_rules! cfg_io { macro_rules! cfg_io {
($($item:item)*) => { ($($item:item)*) => {

View File

@ -30,9 +30,14 @@ cfg_codec! {
pub mod codec; pub mod codec;
} }
/*
Disabled due to removal of poll_ functions on UdpSocket.
See https://github.com/tokio-rs/tokio/issues/2830
cfg_udp! { cfg_udp! {
pub mod udp; pub mod udp;
} }
*/
cfg_compat! { cfg_compat! {
pub mod compat; pub mod compat;

View File

@ -1,3 +1,4 @@
/*
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt}; use tokio::{net::UdpSocket, stream::StreamExt};
@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(()) Ok(())
} }
*/

View File

@ -33,7 +33,6 @@ full = [
"blocking", "blocking",
"dns", "dns",
"fs", "fs",
"io-driver",
"io-util", "io-util",
"io-std", "io-std",
"macros", "macros",
@ -51,7 +50,8 @@ full = [
blocking = ["rt-core"] blocking = ["rt-core"]
dns = ["rt-core"] dns = ["rt-core"]
fs = ["rt-core", "io-util"] fs = ["rt-core", "io-util"]
io-driver = ["mio", "lazy_static"] io-driver = ["mio", "lazy_static"] # internal only
io-readiness = [] # internal only
io-util = ["memchr"] io-util = ["memchr"]
# stdin, stdout, stderr # stdin, stdout, stderr
io-std = ["rt-core"] io-std = ["rt-core"]
@ -85,8 +85,8 @@ sync = ["fnv"]
test-util = [] test-util = []
tcp = ["io-driver", "iovec"] tcp = ["io-driver", "iovec"]
time = ["slab"] time = ["slab"]
udp = ["io-driver"] udp = ["io-driver", "io-readiness"]
uds = ["io-driver", "mio-uds", "libc"] uds = ["io-driver", "io-readiness", "mio-uds", "libc"]
[dependencies] [dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }

View File

@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration; use std::time::Duration;
/// I/O driver, backed by Mio /// I/O driver, backed by Mio
pub(crate) struct Driver { pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap /// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()` /// as it is mostly used to determine when to call `compact()`
tick: u16, tick: u8,
/// Reuse the `mio::Events` value across calls to poll. /// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>, events: Option<mio::Events>,
@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>, inner: Weak<Inner>,
} }
pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
}
pub(super) struct Inner { pub(super) struct Inner {
/// The underlying system event queue. /// The underlying system event queue.
io: mio::Poll, io: mio::Poll,
@ -57,6 +61,11 @@ pub(super) enum Direction {
Write, Write,
} }
enum Tick {
Set(u8),
Clear(u8),
}
// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup // TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token. // token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31); const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
@ -122,11 +131,11 @@ impl Driver {
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> { fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab // How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256; const COMPACT_INTERVAL: u8 = 255;
self.tick = self.tick.wrapping_add(1); self.tick = self.tick.wrapping_add(1);
if self.tick % COMPACT_INTERVAL == 0 { if self.tick == COMPACT_INTERVAL {
self.resources.compact(); self.resources.compact();
} }
@ -160,9 +169,6 @@ impl Driver {
} }
fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) { fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));
let io = match self.resources.get(addr) { let io = match self.resources.get(addr) {
@ -170,29 +176,15 @@ impl Driver {
None => return, None => return,
}; };
if io let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
.set_readiness(Some(token.0), |curr| curr | ready.as_usize()) curr | ready.as_usize()
.is_err() });
{ if set.is_err() {
// token no longer valid! // token no longer valid!
return; return;
} }
if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) { io.wake(ready);
wr = io.writer.take_waker();
}
if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}
if let Some(w) = rd {
w.wake();
}
if let Some(w) = wr {
w.wake();
}
} }
} }
@ -202,8 +194,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task // If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the // will then attempt to use the I/O resource and fail due to the
// driver being shutdown. // driver being shutdown.
io.reader.wake(); io.wake(mio::Ready::all());
io.writer.wake();
}) })
} }
} }
@ -310,16 +301,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> { pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source) self.io.deregister(source)
} }
/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};
waker.register(w);
}
} }
impl Direction { impl Direction {

View File

@ -1,8 +1,21 @@
use crate::loom::future::AtomicWaker; use super::{platform, Direction, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry; use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};
use std::cell::UnsafeCell;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
}
/// Stored in the I/O driver resource slab. /// Stored in the I/O driver resource slab.
#[derive(Debug)] #[derive(Debug)]
@ -10,19 +23,84 @@ pub(crate) struct ScheduledIo {
/// Packs the resource's readiness with the resource's generation. /// Packs the resource's readiness with the resource's generation.
readiness: AtomicUsize, readiness: AtomicUsize,
/// Task waiting on read readiness waiters: Mutex<Waiters>,
pub(crate) reader: AtomicWaker,
/// Task waiting on write readiness
pub(crate) writer: AtomicWaker,
} }
#[cfg(feature = "io-readiness")]
type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
#[derive(Debug, Default)]
struct Waiters {
#[cfg(feature = "io-readiness")]
/// List of all current waiters
list: WaitList,
/// Waker used for AsyncRead
reader: Option<Waker>,
/// Waker used for AsyncWrite
writer: Option<Waker>,
}
cfg_io_readiness! {
#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
/// The waker for this task
waker: Option<Waker>,
/// The interest this waiter is waiting on
interest: mio::Ready,
is_ready: bool,
/// Should never be `!Unpin`
_p: PhantomPinned,
}
/// Future returned by `readiness()`
struct Readiness<'a> {
scheduled_io: &'a ScheduledIo,
state: State,
/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
}
enum State {
Init,
Waiting,
Done,
}
}
// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
// | reserved | generation | driver tick | readinesss |
// |----------+------------+--------------+------------|
// | 1 bit | 7 bits + 8 bits + 16 bits |
const READINESS: bit::Pack = bit::Pack::least_significant(16);
const TICK: bit::Pack = READINESS.then(8);
const GENERATION: bit::Pack = TICK.then(7);
#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
}
// ===== impl ScheduledIo =====
impl Entry for ScheduledIo { impl Entry for ScheduledIo {
fn reset(&self) { fn reset(&self) {
let state = self.readiness.load(Acquire); let state = self.readiness.load(Acquire);
let generation = super::GENERATION.unpack(state); let generation = GENERATION.unpack(state);
let next = super::GENERATION.pack_lossy(generation + 1, 0); let next = GENERATION.pack_lossy(generation + 1, 0);
self.readiness.store(next, Release); self.readiness.store(next, Release);
} }
@ -32,15 +110,14 @@ impl Default for ScheduledIo {
fn default() -> ScheduledIo { fn default() -> ScheduledIo {
ScheduledIo { ScheduledIo {
readiness: AtomicUsize::new(0), readiness: AtomicUsize::new(0),
reader: AtomicWaker::new(), waiters: Mutex::new(Default::default()),
writer: AtomicWaker::new(),
} }
} }
} }
impl ScheduledIo { impl ScheduledIo {
pub(crate) fn generation(&self) -> usize { pub(crate) fn generation(&self) -> usize {
super::GENERATION.unpack(self.readiness.load(Acquire)) GENERATION.unpack(self.readiness.load(Acquire))
} }
/// Sets the readiness on this `ScheduledIo` by invoking the given closure on /// Sets the readiness on this `ScheduledIo` by invoking the given closure on
@ -48,6 +125,8 @@ impl ScheduledIo {
/// ///
/// # Arguments /// # Arguments
/// - `token`: the token for this `ScheduledIo`. /// - `token`: the token for this `ScheduledIo`.
/// - `tick`: whether setting the tick or trying to clear readiness for a
/// specific tick.
/// - `f`: a closure returning a new readiness value given the previous /// - `f`: a closure returning a new readiness value given the previous
/// readiness. /// readiness.
/// ///
@ -57,51 +136,330 @@ impl ScheduledIo {
/// generation, then the corresponding IO resource has been removed and /// generation, then the corresponding IO resource has been removed and
/// replaced with a new resource. In that case, this method returns `Err`. /// replaced with a new resource. In that case, this method returns `Err`.
/// Otherwise, this returns the previous readiness. /// Otherwise, this returns the previous readiness.
pub(crate) fn set_readiness( pub(super) fn set_readiness(
&self, &self,
token: Option<usize>, token: Option<usize>,
tick: Tick,
f: impl Fn(usize) -> usize, f: impl Fn(usize) -> usize,
) -> Result<usize, ()> { ) -> Result<usize, ()> {
let mut current = self.readiness.load(Acquire); let mut current = self.readiness.load(Acquire);
loop { loop {
let current_generation = super::GENERATION.unpack(current); let current_generation = GENERATION.unpack(current);
if let Some(token) = token { if let Some(token) = token {
// Check that the generation for this access is still the // Check that the generation for this access is still the
// current one. // current one.
if super::GENERATION.unpack(token) != current_generation { if GENERATION.unpack(token) != current_generation {
return Err(()); return Err(());
} }
} }
// Mask out the generation bits so that the modifying function // Mask out the tick/generation bits so that the modifying
// doesn't see them. // function doesn't see them.
let current_readiness = current & mio::Ready::all().as_usize(); let current_readiness = current & mio::Ready::all().as_usize();
let new = f(current_readiness); let mut new = f(current_readiness);
debug_assert!( debug_assert!(
new <= super::ADDRESS.max_value(), new <= READINESS.max_value(),
"new readiness value would overwrite generation bits!" "new readiness value would overwrite tick/generation bits!"
); );
match self.readiness.compare_exchange( match tick {
current, Tick::Set(t) => {
super::GENERATION.pack(current_generation, new), new = TICK.pack(t as usize, new);
AcqRel, }
Acquire, Tick::Clear(t) => {
) { if TICK.unpack(current) as u8 != t {
// Trying to clear readiness with an old event!
return Err(());
}
new = TICK.pack(t as usize, new);
}
}
new = GENERATION.pack(current_generation, new);
match self
.readiness
.compare_exchange(current, new, AcqRel, Acquire)
{
Ok(_) => return Ok(current), Ok(_) => return Ok(current),
// we lost the race, retry! // we lost the race, retry!
Err(actual) => current = actual, Err(actual) => current = actual,
} }
} }
} }
pub(super) fn wake(&self, ready: mio::Ready) {
let mut waiters = self.waiters.lock().unwrap();
// check for AsyncRead slot
if !(ready & (!mio::Ready::writable())).is_empty() {
if let Some(waker) = waiters.reader.take() {
waker.wake();
}
}
// check for AsyncWrite slot
if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
if let Some(waker) = waiters.writer.take() {
waker.wake();
}
}
#[cfg(feature = "io-readiness")]
{
// check list of waiters
for waiter in waiters
.list
.drain_filter(|w| !(w.interest & ready).is_empty())
{
let waiter = unsafe { &mut *waiter.as_ptr() };
if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
waker.wake();
}
}
}
}
/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
/// which cannot use the `async fn` version. This uses reserved reader
/// and writer slots.
pub(in crate::io) fn poll_readiness(
&self,
cx: &mut Context<'_>,
direction: Direction,
) -> Poll<ReadyEvent> {
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
// Update the task info
let mut waiters = self.waiters.lock().unwrap();
let slot = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};
*slot = Some(cx.waker().clone());
// Try again, in case the readiness was changed while we were
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & mio::Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
readiness: ready,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
readiness: ready,
})
}
}
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
// This consumes the current readiness state **except** for HUP and
// error. HUP and error are excluded because a) they are final states
// and never transition out and b) both the read AND the write
// directions need to be able to obvserve these states.
//
// # Platform-specific behavior
//
// HUP and error readiness are platform-specific. On epoll platforms,
// HUP has specific conditions that must be met by both peers of a
// connection in order to be triggered.
//
// On epoll platforms, `EPOLLERR` is signaled through
// `UnixReady::error()` and is important to be observable by both read
// AND write. A specific case that `EPOLLERR` occurs is when the read
// end of a pipe is closed. When this occurs, a peer blocked by
// writing to the pipe should be notified.
let mask_no_hup = (event.readiness - platform::hup() - platform::error()).as_usize();
// result isn't important
let _ = self.set_readiness(None, Tick::Clear(event.tick), |curr| curr & (!mask_no_hup));
}
} }
impl Drop for ScheduledIo { impl Drop for ScheduledIo {
fn drop(&mut self) { fn drop(&mut self) {
self.writer.wake(); self.wake(mio::Ready::all());
self.reader.wake();
} }
} }
unsafe impl Send for ScheduledIo {}
unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers
pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent {
self.readiness_fut(interest).await
}
// This is in a separate function so that the borrow checker doesn't think
// we are borrowing the `UnsafeCell` possibly over await boundaries.
//
// Go figure.
fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> {
Readiness {
scheduled_io: self,
state: State::Init,
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
interest,
_p: PhantomPinned,
}),
}
}
}
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
fn as_raw(handle: &NonNull<Waiter>) -> NonNull<Waiter> {
*handle
}
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
}
}
// ===== impl Readiness =====
impl Future for Readiness<'_> {
type Output = ReadyEvent;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
let (scheduled_io, state, waiter) = unsafe {
let me = self.get_unchecked_mut();
(&me.scheduled_io, &mut me.state, &me.waiter)
};
loop {
match *state {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
if readiness.contains(interest) {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { readiness, tick });
}
// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock().unwrap();
let curr = scheduled_io.readiness.load(SeqCst);
let readiness = mio::Ready::from_usize(READINESS.unpack(curr));
if readiness.contains(interest) {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { readiness, tick });
}
// Not ready even after locked, insert into list...
// Safety: called while locked
unsafe {
(*waiter.get()).waker = Some(cx.waker().clone());
}
// Insert the waiter into the linked list
//
// safety: pointers from `UnsafeCell` are never null.
waiters
.list
.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });
*state = State::Waiting;
}
State::Waiting => {
// Currently in the "Waiting" state, implying the caller has
// a waiter stored in the waiter list (guarded by
// `notify.waiters`). In order to access the waker fields,
// we must hold the lock.
let waiters = scheduled_io.waiters.lock().unwrap();
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };
if w.is_ready {
// Our waker has been notified.
*state = State::Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}
return Poll::Pending;
}
// Explicit drop of the lock to indicate the scope that the
// lock is held. Because holding the lock is required to
// ensure safe access to fields not held within the lock, it
// is helpful to visualize the scope of the critical
// section.
drop(waiters);
}
State::Done => {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;
// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };
return Poll::Ready(ReadyEvent {
tick,
readiness: w.interest,
});
}
}
}
}
}
impl Drop for Readiness<'_> {
fn drop(&mut self) {
let mut waiters = self.scheduled_io.waiters.lock().unwrap();
// Safety: `waiter` is only ever stored in `waiters`
unsafe {
waiters
.list
.remove(NonNull::new_unchecked(self.waiter.get()))
};
}
}
unsafe impl Send for Readiness<'_> {}
unsafe impl Sync for Readiness<'_> {}
}

View File

@ -207,10 +207,10 @@ cfg_io_driver! {
pub(crate) mod driver; pub(crate) mod driver;
mod poll_evented; mod poll_evented;
pub use poll_evented::PollEvented; #[cfg(not(loom))]
pub(crate) use poll_evented::PollEvented;
mod registration; mod registration;
pub use registration::Registration;
} }
cfg_io_std! { cfg_io_std! {

View File

@ -1,13 +1,12 @@
use crate::io::driver::platform; use crate::io::driver::{Direction, Handle, ReadyEvent};
use crate::io::{AsyncRead, AsyncWrite, ReadBuf, Registration}; use crate::io::registration::Registration;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use mio::event::Evented; use mio::event::Evented;
use std::fmt; use std::fmt;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
use std::marker::Unpin; use std::marker::Unpin;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
cfg_io_driver! { cfg_io_driver! {
@ -53,37 +52,6 @@ cfg_io_driver! {
/// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
/// [`clear_read_ready`]. /// [`clear_read_ready`].
/// ///
/// ```rust
/// use tokio::io::PollEvented;
///
/// use futures::ready;
/// use mio::Ready;
/// use mio::net::{TcpStream, TcpListener};
/// use std::io;
/// use std::task::{Context, Poll};
///
/// struct MyListener {
/// poll_evented: PollEvented<TcpListener>,
/// }
///
/// impl MyListener {
/// pub fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<Result<TcpStream, io::Error>> {
/// let ready = Ready::readable();
///
/// ready!(self.poll_evented.poll_read_ready(cx, ready))?;
///
/// match self.poll_evented.get_ref().accept() {
/// Ok((socket, _)) => Poll::Ready(Ok(socket)),
/// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
/// self.poll_evented.clear_read_ready(cx, ready)?;
/// Poll::Pending
/// }
/// Err(e) => Poll::Ready(Err(e)),
/// }
/// }
/// }
/// ```
///
/// ## Platform-specific events /// ## Platform-specific events
/// ///
/// `PollEvented` also allows receiving platform-specific `mio::Ready` events. /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
@ -101,66 +69,14 @@ cfg_io_driver! {
/// [`clear_write_ready`]: method@Self::clear_write_ready /// [`clear_write_ready`]: method@Self::clear_write_ready
/// [`poll_read_ready`]: method@Self::poll_read_ready /// [`poll_read_ready`]: method@Self::poll_read_ready
/// [`poll_write_ready`]: method@Self::poll_write_ready /// [`poll_write_ready`]: method@Self::poll_write_ready
pub struct PollEvented<E: Evented> { pub(crate) struct PollEvented<E: Evented> {
io: Option<E>, io: Option<E>,
inner: Inner, registration: Registration,
} }
} }
struct Inner {
registration: Registration,
/// Currently visible read readiness
read_readiness: AtomicUsize,
/// Currently visible write readiness
write_readiness: AtomicUsize,
}
// ===== impl PollEvented ===== // ===== impl PollEvented =====
macro_rules! poll_ready {
($me:expr, $mask:expr, $cache:ident, $take:ident, $poll:expr) => {{
// Load cached & encoded readiness.
let mut cached = $me.inner.$cache.load(Relaxed);
let mask = $mask | platform::hup() | platform::error();
// See if the current readiness matches any bits.
let mut ret = mio::Ready::from_usize(cached) & $mask;
if ret.is_empty() {
// Readiness does not match, consume the registration's readiness
// stream. This happens in a loop to ensure that the stream gets
// drained.
loop {
let ready = match $poll? {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
cached |= ready.as_usize();
// Update the cache store
$me.inner.$cache.store(cached, Relaxed);
ret |= ready & mask;
if !ret.is_empty() {
return Poll::Ready(Ok(ret));
}
}
} else {
// Check what's new with the registration stream. This will not
// request to be notified
if let Some(ready) = $me.inner.registration.$take()? {
cached |= ready.as_usize();
$me.inner.$cache.store(cached, Relaxed);
}
Poll::Ready(Ok(mio::Ready::from_usize(cached)))
}
}};
}
impl<E> PollEvented<E> impl<E> PollEvented<E>
where where
E: Evented, E: Evented,
@ -174,7 +90,8 @@ where
/// The runtime is usually set implicitly when this function is called /// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set /// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> { #[cfg_attr(feature = "signal", allow(unused))]
pub(crate) fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all()) PollEvented::new_with_ready(io, mio::Ready::all())
} }
@ -202,27 +119,39 @@ where
/// The runtime is usually set implicitly when this function is called /// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set /// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> { #[cfg_attr(feature = "signal", allow(unused))]
let registration = Registration::new_with_ready(&io, ready)?; pub(crate) fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
Self::new_with_ready_and_handle(io, ready, Handle::current())
}
pub(crate) fn new_with_ready_and_handle(
io: E,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Self> {
let registration = Registration::new_with_ready_and_handle(&io, ready, handle)?;
Ok(Self { Ok(Self {
io: Some(io), io: Some(io),
inner: Inner { registration,
registration,
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
},
}) })
} }
/// Returns a shared reference to the underlying I/O object this readiness /// Returns a shared reference to the underlying I/O object this readiness
/// stream is wrapping. /// stream is wrapping.
pub fn get_ref(&self) -> &E { #[cfg(any(
feature = "process",
feature = "tcp",
feature = "udp",
feature = "uds",
feature = "signal"
))]
pub(crate) fn get_ref(&self) -> &E {
self.io.as_ref().unwrap() self.io.as_ref().unwrap()
} }
/// Returns a mutable reference to the underlying I/O object this readiness /// Returns a mutable reference to the underlying I/O object this readiness
/// stream is wrapping. /// stream is wrapping.
pub fn get_mut(&mut self) -> &mut E { pub(crate) fn get_mut(&mut self) -> &mut E {
self.io.as_mut().unwrap() self.io.as_mut().unwrap()
} }
@ -234,12 +163,17 @@ where
/// Note that deregistering does not guarantee that the I/O resource can be /// Note that deregistering does not guarantee that the I/O resource can be
/// registered with a different reactor. Some I/O resource types can only be /// registered with a different reactor. Some I/O resource types can only be
/// associated with a single reactor instance for their lifetime. /// associated with a single reactor instance for their lifetime.
pub fn into_inner(mut self) -> io::Result<E> { #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))]
pub(crate) fn into_inner(mut self) -> io::Result<E> {
let io = self.io.take().unwrap(); let io = self.io.take().unwrap();
self.inner.registration.deregister(&io)?; self.registration.deregister(&io)?;
Ok(io) Ok(io)
} }
pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
self.registration.clear_readiness(event);
}
/// Checks the I/O resource's read readiness state. /// Checks the I/O resource's read readiness state.
/// ///
/// The mask argument allows specifying what readiness to notify on. This /// The mask argument allows specifying what readiness to notify on. This
@ -266,51 +200,8 @@ where
/// ///
/// This method may not be called concurrently. It takes `&self` to allow /// This method may not be called concurrently. It takes `&self` to allow
/// calling it concurrently with `poll_write_ready`. /// calling it concurrently with `poll_write_ready`.
pub fn poll_read_ready( pub(crate) fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
&self, self.registration.poll_readiness(cx, Direction::Read)
cx: &mut Context<'_>,
mask: mio::Ready,
) -> Poll<io::Result<mio::Ready>> {
assert!(!mask.is_writable(), "cannot poll for write readiness");
poll_ready!(
self,
mask,
read_readiness,
take_read_ready,
self.inner.registration.poll_read_ready(cx)
)
}
/// Clears the I/O resource's read readiness state and registers the current
/// task to be notified once a read readiness event is received.
///
/// After calling this function, `poll_read_ready` will return
/// `Poll::Pending` until a new read readiness event has been received.
///
/// The `mask` argument specifies the readiness bits to clear. This may not
/// include `writable` or `hup`.
///
/// # Panics
///
/// This function panics if:
///
/// * `ready` includes writable or HUP
/// * called from outside of a task context.
pub fn clear_read_ready(&self, cx: &mut Context<'_>, ready: mio::Ready) -> io::Result<()> {
// Cannot clear write readiness
assert!(!ready.is_writable(), "cannot clear write readiness");
assert!(!platform::is_hup(ready), "cannot clear HUP readiness");
self.inner
.read_readiness
.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_read_ready(cx, ready)?.is_ready() {
// Notify the current task
cx.waker().wake_by_ref();
}
Ok(())
} }
/// Checks the I/O resource's write readiness state. /// Checks the I/O resource's write readiness state.
@ -337,41 +228,35 @@ where
/// ///
/// This method may not be called concurrently. It takes `&self` to allow /// This method may not be called concurrently. It takes `&self` to allow
/// calling it concurrently with `poll_read_ready`. /// calling it concurrently with `poll_read_ready`.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { pub(crate) fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<ReadyEvent>> {
poll_ready!( self.registration.poll_readiness(cx, Direction::Write)
self,
mio::Ready::writable(),
write_readiness,
take_write_ready,
self.inner.registration.poll_write_ready(cx)
)
} }
}
/// Resets the I/O resource's write readiness state and registers the current cfg_io_readiness! {
/// task to be notified once a write readiness event is received. impl<E> PollEvented<E>
/// where
/// This only clears writable readiness. HUP (on platforms that support HUP) E: Evented,
/// cannot be cleared as it is a final state. {
/// pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> {
/// After calling this function, `poll_write_ready(Ready::writable())` will self.registration.readiness(interest).await
/// return `NotReady` until a new write readiness event has been received.
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn clear_write_ready(&self, cx: &mut Context<'_>) -> io::Result<()> {
let ready = mio::Ready::writable();
self.inner
.write_readiness
.fetch_and(!ready.as_usize(), Relaxed);
if self.poll_write_ready(cx)?.is_ready() {
// Notify the current task
cx.waker().wake_by_ref();
} }
Ok(()) pub(crate) async fn async_io<F, R>(&self, interest: mio::Ready, mut op: F) -> io::Result<R>
where
F: FnMut(&E) -> io::Result<R>,
{
loop {
let event = self.readiness(interest).await?;
match op(self.get_ref()) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
}
}
}
} }
} }
@ -386,20 +271,22 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<()>> {
ready!(self.poll_read_ready(cx, mio::Ready::readable()))?; loop {
let ev = ready!(self.poll_read_ready(cx))?;
// We can't assume the `Read` won't look at the read buffer, // We can't assume the `Read` won't look at the read buffer,
// so we have to force initialization here. // so we have to force initialization here.
let r = (*self).get_mut().read(buf.initialize_unfilled()); let r = (*self).get_mut().read(buf.initialize_unfilled());
if is_wouldblock(&r) { if is_wouldblock(&r) {
self.clear_read_ready(cx, mio::Ready::readable())?; self.clear_readiness(ev);
return Poll::Pending; continue;
}
return Poll::Ready(r.map(|n| {
buf.add_filled(n);
}));
} }
Poll::Ready(r.map(|n| {
buf.add_filled(n);
}))
} }
} }
@ -412,29 +299,33 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.poll_write_ready(cx))?; loop {
let ev = ready!(self.poll_write_ready(cx))?;
let r = (*self).get_mut().write(buf); let r = (*self).get_mut().write(buf);
if is_wouldblock(&r) { if is_wouldblock(&r) {
self.clear_write_ready(cx)?; self.clear_readiness(ev);
return Poll::Pending; continue;
}
return Poll::Ready(r);
} }
Poll::Ready(r)
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_write_ready(cx))?; loop {
let ev = ready!(self.poll_write_ready(cx))?;
let r = (*self).get_mut().flush(); let r = (*self).get_mut().flush();
if is_wouldblock(&r) { if is_wouldblock(&r) {
self.clear_write_ready(cx)?; self.clear_readiness(ev);
return Poll::Pending; continue;
}
return Poll::Ready(r);
} }
Poll::Ready(r)
} }
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -459,7 +350,7 @@ impl<E: Evented> Drop for PollEvented<E> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(io) = self.io.take() { if let Some(io) = self.io.take() {
// Ignore errors // Ignore errors
let _ = self.inner.registration.deregister(&io); let _ = self.registration.deregister(&io);
} }
} }
} }

View File

@ -1,4 +1,4 @@
use crate::io::driver::{platform, Direction, Handle, ScheduledIo}; use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
use crate::util::slab; use crate::util::slab;
use mio::{self, Evented}; use mio::{self, Evented};
@ -38,7 +38,7 @@ cfg_io_driver! {
/// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)] #[derive(Debug)]
pub struct Registration { pub(crate) struct Registration {
/// Handle to the associated driver. /// Handle to the associated driver.
handle: Handle, handle: Handle,
@ -53,28 +53,6 @@ unsafe impl Sync for Registration {}
// ===== impl Registration ===== // ===== impl Registration =====
impl Registration { impl Registration {
/// Registers the I/O resource with the default reactor.
///
/// # Return
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
{
Registration::new_with_ready(io, mio::Ready::all())
}
/// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
/// `new_with_ready` should be used over `new` when you need control over the readiness state, /// `new_with_ready` should be used over `new` when you need control over the readiness state,
/// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
@ -96,23 +74,6 @@ impl Registration {
/// ///
/// - `Ok` if the registration happened successfully /// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration /// - `Err` if an error was encountered during registration
///
///
/// # Panics
///
/// This function panics if thread-local runtime is not set.
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
{
Self::new_with_ready_and_handle(io, ready, Handle::current())
}
/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>( pub(crate) fn new_with_ready_and_handle<T>(
io: &T, io: &T,
ready: mio::Ready, ready: mio::Ready,
@ -149,7 +110,7 @@ impl Registration {
/// no longer result in notifications getting sent for this registration. /// no longer result in notifications getting sent for this registration.
/// ///
/// `Err` is returned if an error is encountered. /// `Err` is returned if an error is encountered.
pub fn deregister<T>(&mut self, io: &T) -> io::Result<()> pub(super) fn deregister<T>(&mut self, io: &T) -> io::Result<()>
where where
T: Evented, T: Evented,
{ {
@ -160,192 +121,36 @@ impl Registration {
inner.deregister_source(io) inner.deregister_source(io)
} }
/// Polls for events on the I/O resource's read readiness stream. pub(super) fn clear_readiness(&self, event: ReadyEvent) {
/// self.shared.clear_readiness(event);
/// If the I/O resource receives a new read readiness event since the last
/// call to `poll_read_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// # Return value
///
/// There are several possible return values:
///
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_read_ready`.
///
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This could represent a permanent internal error for example.
///
/// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| {
coop.made_progress();
e
})?;
match v {
Some(v) => {
coop.made_progress();
Poll::Ready(Ok(v))
}
None => Poll::Pending,
}
}
/// Consume any pending read readiness event.
///
/// This function is identical to [`poll_read_ready`] **except** that it
/// will not notify the current task when a new event is received. As such,
/// it is safe to call this function from outside of a task context.
///
/// [`poll_read_ready`]: method@Self::poll_read_ready
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, None)
}
/// Polls for events on the I/O resource's write readiness stream.
///
/// If the I/O resource receives a new write readiness event since the last
/// call to `poll_write_ready`, it is returned. If it has not, the current
/// task is notified once a new event is received.
///
/// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
/// the function will always return `Ready(HUP)`. This should be treated as
/// the end of the readiness stream.
///
/// # Return value
///
/// There are several possible return values:
///
/// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
/// a new readiness event. The readiness value is included.
///
/// * `Poll::Pending` means that no new readiness events have been received
/// since the last call to `poll_write_ready`.
///
/// * `Poll::Ready(Err(err))` means that the registration has encountered an
/// error. This could represent a permanent internal error for example.
///
/// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
///
/// # Panics
///
/// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));
let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| {
coop.made_progress();
e
})?;
match v {
Some(v) => {
coop.made_progress();
Poll::Ready(Ok(v))
}
None => Poll::Pending,
}
}
/// Consumes any pending write readiness event.
///
/// This function is identical to [`poll_write_ready`] **except** that it
/// will not notify the current task when a new event is received. As such,
/// it is safe to call this function from outside of a task context.
///
/// [`poll_write_ready`]: method@Self::poll_write_ready
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, None)
} }
/// Polls for events on the I/O resource's `direction` readiness stream. /// Polls for events on the I/O resource's `direction` readiness stream.
/// ///
/// If called with a task context, notify the task when a new event is /// If called with a task context, notify the task when a new event is
/// received. /// received.
fn poll_ready( pub(super) fn poll_readiness(
&self, &self,
cx: &mut Context<'_>,
direction: Direction, direction: Direction,
cx: Option<&mut Context<'_>>, ) -> Poll<io::Result<ReadyEvent>> {
) -> io::Result<Option<mio::Ready>> { if self.handle.inner().is_none() {
let inner = match self.handle.inner() { return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
// If the task should be notified about new events, ensure that it has
// been registered
if let Some(ref cx) = cx {
inner.register(&self.shared, direction, cx.waker().clone())
} }
let mask = direction.mask(); // Keep track of task budget
let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));
// This consumes the current readiness state **except** for HUP and coop.made_progress();
// error. HUP and error are excluded because a) they are final states Poll::Ready(Ok(ev))
// and never transitition out and b) both the read AND the write
// directions need to be able to obvserve these states.
//
// # Platform-specific behavior
//
// HUP and error readiness are platform-specific. On epoll platforms,
// HUP has specific conditions that must be met by both peers of a
// connection in order to be triggered.
//
// On epoll platforms, `EPOLLERR` is signaled through
// `UnixReady::error()` and is important to be observable by both read
// AND write. A specific case that `EPOLLERR` occurs is when the read
// end of a pipe is closed. When this occurs, a peer blocked by
// writing to the pipe should be notified.
let curr_ready = self
.shared
.set_readiness(None, |curr| curr & (!mask_no_hup))
.unwrap_or_else(|_| unreachable!());
let mut ready = mask & mio::Ready::from_usize(curr_ready);
if ready.is_empty() {
if let Some(cx) = cx {
// Update the task info
match direction {
Direction::Read => self.shared.reader.register_by_ref(cx.waker()),
Direction::Write => self.shared.writer.register_by_ref(cx.waker()),
}
// Try again
let curr_ready = self
.shared
.set_readiness(None, |curr| curr & (!mask_no_hup))
.unwrap();
ready = mask & mio::Ready::from_usize(curr_ready);
}
}
if ready.is_empty() {
Ok(None)
} else {
Ok(Some(ready))
}
} }
} }
impl Drop for Registration { cfg_io_readiness! {
fn drop(&mut self) { impl Registration {
drop(self.shared.reader.take_waker()); pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> {
drop(self.shared.writer.take_waker()); // TODO: does this need to return a `Result`?
Ok(self.shared.readiness(interest).await)
}
} }
} }

View File

@ -77,7 +77,6 @@
//! - `rt-core`: Enables `tokio::spawn` and the basic (single-threaded) scheduler. //! - `rt-core`: Enables `tokio::spawn` and the basic (single-threaded) scheduler.
//! - `rt-threaded`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `rt-threaded`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `rt-util`: Enables non-scheduler utilities. //! - `rt-util`: Enables non-scheduler utilities.
//! - `io-driver`: Enables the `mio` based IO driver.
//! - `io-util`: Enables the IO based `Ext` traits. //! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`. //! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`.
@ -269,8 +268,7 @@
//! the [`AsyncRead`], [`AsyncWrite`], and [`AsyncBufRead`] traits. In addition, //! the [`AsyncRead`], [`AsyncWrite`], and [`AsyncBufRead`] traits. In addition,
//! when the "io-util" feature flag is enabled, it also provides combinators and //! when the "io-util" feature flag is enabled, it also provides combinators and
//! functions for working with these traits, forming as an asynchronous //! functions for working with these traits, forming as an asynchronous
//! counterpart to [`std::io`]. When the "io-driver" feature flag is enabled, it //! counterpart to [`std::io`].
//! also provides utilities for library authors implementing I/O resources.
//! //!
//! Tokio also includes APIs for performing various kinds of I/O and interacting //! Tokio also includes APIs for performing various kinds of I/O and interacting
//! with the operating system asynchronously. These include: //! with the operating system asynchronously. These include:

View File

@ -129,7 +129,6 @@ macro_rules! cfg_io_driver {
($($item:item)*) => { ($($item:item)*) => {
$( $(
#[cfg(feature = "io-driver")] #[cfg(feature = "io-driver")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-driver")))]
$item $item
)* )*
} }
@ -144,6 +143,15 @@ macro_rules! cfg_not_io_driver {
} }
} }
macro_rules! cfg_io_readiness {
($($item:item)*) => {
$(
#[cfg(feature = "io-readiness")]
$item
)*
}
}
macro_rules! cfg_io_std { macro_rules! cfg_io_std {
($($item:item)*) => { ($($item:item)*) => {
$( $(

View File

@ -205,15 +205,16 @@ impl TcpListener {
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> { ) -> Poll<io::Result<(net::TcpStream, SocketAddr)>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
match self.io.get_ref().accept_std() { match self.io.get_ref().accept_std() {
Ok(pair) => Poll::Ready(Ok(pair)), Ok(pair) => return Poll::Ready(Ok(pair)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?; self.io.clear_readiness(ev);
Poll::Pending }
Err(e) => return Poll::Ready(Err(e)),
} }
Err(e) => Poll::Ready(Err(e)),
} }
} }
@ -411,11 +412,6 @@ impl TryFrom<TcpListener> for mio::net::TcpListener {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpListener) -> Result<Self, Self::Error> { fn try_from(value: TcpListener) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }

View File

@ -299,15 +299,16 @@ impl TcpStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
match self.io.get_ref().peek(buf) { match self.io.get_ref().peek(buf) {
Ok(ret) => Poll::Ready(Ok(ret)), Ok(ret) => return Poll::Ready(Ok(ret)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?; self.io.clear_readiness(ev);
Poll::Pending }
Err(e) => return Poll::Ready(Err(e)),
} }
Err(e) => Poll::Ready(Err(e)),
} }
} }
@ -703,26 +704,28 @@ impl TcpStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<()>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
// Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. // Safety: `TcpStream::read` will not peek at the maybe uinitialized bytes.
let b = let b = unsafe {
unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
match self.io.get_ref().read(b) { };
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { match self.io.get_ref().read(b) {
self.io.clear_read_ready(cx, mio::Ready::readable())?; Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Poll::Pending self.io.clear_readiness(ev);
}
Ok(n) => {
// Safety: We trust `TcpStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
} }
buf.add_filled(n); Ok(n) => {
Poll::Ready(Ok(())) // Safety: We trust `TcpStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.add_filled(n);
return Poll::Ready(Ok(()));
}
Err(e) => return Poll::Ready(Err(e)),
} }
Err(e) => Poll::Ready(Err(e)),
} }
} }
@ -731,14 +734,15 @@ impl TcpStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?; loop {
let ev = ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().write(buf) { match self.io.get_ref().write(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?; self.io.clear_readiness(ev);
Poll::Pending }
x => return Poll::Ready(x),
} }
x => Poll::Ready(x),
} }
} }
@ -749,99 +753,100 @@ impl TcpStream {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
use std::io::IoSlice; use std::io::IoSlice;
ready!(self.io.poll_write_ready(cx))?; loop {
let ev = ready!(self.io.poll_write_ready(cx))?;
// The `IoVec` (v0.1.x) type can't have a zero-length size, so create // The `IoVec` (v0.1.x) type can't have a zero-length size, so create
// a dummy version from a 1-length slice which we'll overwrite with // a dummy version from a 1-length slice which we'll overwrite with
// the `bytes_vectored` method. // the `bytes_vectored` method.
static S: &[u8] = &[0]; static S: &[u8] = &[0];
const MAX_BUFS: usize = 64; const MAX_BUFS: usize = 64;
// IoSlice isn't Copy, so we must expand this manually ;_; // IoSlice isn't Copy, so we must expand this manually ;_;
let mut slices: [IoSlice<'_>; MAX_BUFS] = [ let mut slices: [IoSlice<'_>; MAX_BUFS] = [
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
IoSlice::new(S), IoSlice::new(S),
]; ];
let cnt = buf.bytes_vectored(&mut slices); let cnt = buf.bytes_vectored(&mut slices);
let iovec = <&IoVec>::from(S); let iovec = <&IoVec>::from(S);
let mut vecs = [iovec; MAX_BUFS]; let mut vecs = [iovec; MAX_BUFS];
for i in 0..cnt { for i in 0..cnt {
vecs[i] = (*slices[i]).into(); vecs[i] = (*slices[i]).into();
}
match self.io.get_ref().write_bufs(&vecs[..cnt]) {
Ok(n) => {
buf.advance(n);
Poll::Ready(Ok(n))
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?; match self.io.get_ref().write_bufs(&vecs[..cnt]) {
Poll::Pending Ok(n) => {
buf.advance(n);
return Poll::Ready(Ok(n));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(e) => return Poll::Ready(Err(e)),
} }
Err(e) => Poll::Ready(Err(e)),
} }
} }
} }
@ -850,11 +855,6 @@ impl TryFrom<TcpStream> for mio::net::TcpStream {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: TcpStream) -> Result<Self, Self::Error> { fn try_from(value: TcpStream) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }

View File

@ -1,7 +1,3 @@
//! UDP utility types. //! UDP utility types.
pub(crate) mod socket; pub(crate) mod socket;
pub(crate) use socket::UdpSocket;
mod split;
pub use split::{RecvHalf, ReuniteError, SendHalf};

View File

@ -1,13 +1,10 @@
use crate::future::poll_fn;
use crate::io::PollEvented; use crate::io::PollEvented;
use crate::net::udp::split::{split, RecvHalf, SendHalf};
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::task::{Context, Poll};
cfg_udp! { cfg_udp! {
/// A UDP socket /// A UDP socket
@ -67,15 +64,7 @@ impl UdpSocket {
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?; let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?; UdpSocket::new(io)
Ok(UdpSocket { io })
}
/// Splits the `UdpSocket` into a receive half and a send half. The two parts
/// can be used to receive and send datagrams concurrently, even from two
/// different tasks.
pub fn split(self) -> (RecvHalf, SendHalf) {
split(self)
} }
/// Returns the local address that this socket is bound to. /// Returns the local address that this socket is bound to.
@ -112,8 +101,10 @@ impl UdpSocket {
/// will resolve to an error if the socket is not connected. /// will resolve to an error if the socket is not connected.
/// ///
/// [`connect`]: method@Self::connect /// [`connect`]: method@Self::connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_send(cx, buf)).await self.io
.async_io(mio::Ready::writable(), |sock| sock.send(buf))
.await
} }
/// Try to send data on the socket to the remote address to which it is /// Try to send data on the socket to the remote address to which it is
@ -130,29 +121,6 @@ impl UdpSocket {
self.io.get_ref().send(buf) self.io.get_ref().send(buf)
} }
// Poll IO functions that takes `&self` are provided for the split API.
//
// They are not public because (taken from the doc of `PollEvented`):
//
// While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
// caller must ensure that there are at most two tasks that use a
// `PollEvented` instance concurrently. One for reading and one for writing.
// While violating this requirement is "safe" from a Rust memory model point
// of view, it will result in unexpected behavior in the form of lost
// notifications and tasks hanging.
#[doc(hidden)]
pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
/// Returns a future that receives a single datagram message on the socket from /// Returns a future that receives a single datagram message on the socket from
/// the remote address to which it is connected. On success, the future will resolve /// the remote address to which it is connected. On success, the future will resolve
/// to the number of bytes read. /// to the number of bytes read.
@ -165,21 +133,10 @@ impl UdpSocket {
/// will fail if the socket is not connected. /// will fail if the socket is not connected.
/// ///
/// [`connect`]: method@Self::connect /// [`connect`]: method@Self::connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_recv(cx, buf)).await self.io
} .async_io(mio::Ready::readable(), |sock| sock.recv(buf))
.await
#[doc(hidden)]
pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
} }
/// Returns a future that sends data on the socket to the given address. /// Returns a future that sends data on the socket to the given address.
@ -187,11 +144,11 @@ impl UdpSocket {
/// ///
/// The future will resolve to an error if the IP version of the socket does /// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`. /// not match that of `target`.
pub async fn send_to<A: ToSocketAddrs>(&mut self, buf: &[u8], target: A) -> io::Result<usize> { pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
let mut addrs = target.to_socket_addrs().await?; let mut addrs = target.to_socket_addrs().await?;
match addrs.next() { match addrs.next() {
Some(target) => poll_fn(|cx| self.poll_send_to(cx, buf, &target)).await, Some(target) => self.send_to_addr(buf, &target).await,
None => Err(io::Error::new( None => Err(io::Error::new(
io::ErrorKind::InvalidInput, io::ErrorKind::InvalidInput,
"no addresses to send data to", "no addresses to send data to",
@ -214,23 +171,10 @@ impl UdpSocket {
self.io.get_ref().send_to(buf, &target) self.io.get_ref().send_to(buf, &target)
} }
// TODO: Public or not? async fn send_to_addr(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
#[doc(hidden)] self.io
pub fn poll_send_to( .async_io(mio::Ready::writable(), |sock| sock.send_to(buf, target))
&self, .await
cx: &mut Context<'_>,
buf: &[u8],
target: &SocketAddr,
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send_to(buf, target) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
} }
/// Returns a future that receives a single datagram on the socket. On success, /// Returns a future that receives a single datagram on the socket. On success,
@ -239,25 +183,10 @@ impl UdpSocket {
/// The function must be called with valid byte array `buf` of sufficient size /// The function must be called with valid byte array `buf` of sufficient size
/// to hold the message bytes. If a message is too long to fit in the supplied /// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded. /// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.poll_recv_from(cx, buf)).await self.io
} .async_io(mio::Ready::readable(), |sock| sock.recv_from(buf))
.await
#[doc(hidden)]
pub fn poll_recv_from(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), io::Error>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv_from(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
} }
/// Gets the value of the `SO_BROADCAST` option for this socket. /// Gets the value of the `SO_BROADCAST` option for this socket.
@ -399,11 +328,6 @@ impl TryFrom<UdpSocket> for mio::net::UdpSocket {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UdpSocket) -> Result<Self, Self::Error> { fn try_from(value: UdpSocket) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }
@ -423,7 +347,7 @@ impl TryFrom<net::UdpSocket> for UdpSocket {
impl fmt::Debug for UdpSocket { impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.io.get_ref().fmt(f) self.io.fmt(f)
} }
} }

View File

@ -1,148 +0,0 @@
//! [`UdpSocket`](crate::net::UdpSocket) split support.
//!
//! The [`split`](method@crate::net::UdpSocket::split) method splits a
//! `UdpSocket` into a receive half and a send half, which can be used to
//! receive and send datagrams concurrently, even from two different tasks.
//!
//! The halves provide access to the underlying socket, implementing
//! `AsRef<UdpSocket>`. This allows you to call `UdpSocket` methods that takes
//! `&self`, e.g., to get local address, to get and set socket options, to join
//! or leave multicast groups, etc.
//!
//! The halves can be reunited to the original socket with their `reunite`
//! methods.
use crate::future::poll_fn;
use crate::net::udp::UdpSocket;
use std::error::Error;
use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
/// The send half after [`split`](super::UdpSocket::split).
///
/// Use [`send_to`](method@Self::send_to) or [`send`](method@Self::send) to send
/// datagrams.
#[derive(Debug)]
pub struct SendHalf(Arc<UdpSocket>);
/// The recv half after [`split`](super::UdpSocket::split).
///
/// Use [`recv_from`](method@Self::recv_from) or [`recv`](method@Self::recv) to receive
/// datagrams.
#[derive(Debug)]
pub struct RecvHalf(Arc<UdpSocket>);
pub(crate) fn split(socket: UdpSocket) -> (RecvHalf, SendHalf) {
let shared = Arc::new(socket);
let send = shared.clone();
let recv = shared;
(RecvHalf(recv), SendHalf(send))
}
/// Error indicating that two halves were not from the same socket, and thus could
/// not be `reunite`d.
#[derive(Debug)]
pub struct ReuniteError(pub SendHalf, pub RecvHalf);
impl fmt::Display for ReuniteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"tried to reunite halves that are not from the same socket"
)
}
}
impl Error for ReuniteError {}
fn reunite(s: SendHalf, r: RecvHalf) -> Result<UdpSocket, ReuniteError> {
if Arc::ptr_eq(&s.0, &r.0) {
drop(r);
// Only two instances of the `Arc` are ever created, one for the
// receiver and one for the sender, and those `Arc`s are never exposed
// externally. And so when we drop one here, the other one must be the
// only remaining one.
Ok(Arc::try_unwrap(s.0).expect("udp: try_unwrap failed in reunite"))
} else {
Err(ReuniteError(s, r))
}
}
impl RecvHalf {
/// Attempts to put the two "halves" of a `UdpSocket` back together and
/// recover the original socket. Succeeds only if the two "halves"
/// originated from the same call to `UdpSocket::split`.
pub fn reunite(self, other: SendHalf) -> Result<UdpSocket, ReuniteError> {
reunite(other, self)
}
/// Returns a future that receives a single datagram on the socket. On success,
/// the future resolves to the number of bytes read and the origin.
///
/// The function must be called with valid byte array `buf` of sufficient size
/// to hold the message bytes. If a message is too long to fit in the supplied
/// buffer, excess bytes may be discarded.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.0.poll_recv_from(cx, buf)).await
}
/// Returns a future that receives a single datagram message on the socket from
/// the remote address to which it is connected. On success, the future will resolve
/// to the number of bytes read.
///
/// The function must be called with valid byte array `buf` of sufficient size to
/// hold the message bytes. If a message is too long to fit in the supplied buffer,
/// excess bytes may be discarded.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will fail if the socket is not connected.
///
/// [`connect`]: super::UdpSocket::connect
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_recv(cx, buf)).await
}
}
impl SendHalf {
/// Attempts to put the two "halves" of a `UdpSocket` back together and
/// recover the original socket. Succeeds only if the two "halves"
/// originated from the same call to `UdpSocket::split`.
pub fn reunite(self, other: RecvHalf) -> Result<UdpSocket, ReuniteError> {
reunite(self, other)
}
/// Returns a future that sends data on the socket to the given address.
/// On success, the future will resolve to the number of bytes written.
///
/// The future will resolve to an error if the IP version of the socket does
/// not match that of `target`.
pub async fn send_to(&mut self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send_to(cx, buf, target)).await
}
/// Returns a future that sends data on the socket to the remote address to which it is connected.
/// On success, the future will resolve to the number of bytes written.
///
/// The [`connect`] method will connect this socket to a remote address. The future
/// will resolve to an error if the socket is not connected.
///
/// [`connect`]: super::UdpSocket::connect
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send(cx, buf)).await
}
}
impl AsRef<UdpSocket> for SendHalf {
fn as_ref(&self) -> &UdpSocket {
&self.0
}
}
impl AsRef<UdpSocket> for RecvHalf {
fn as_ref(&self) -> &UdpSocket {
&self.0
}
}

View File

@ -1,8 +1,3 @@
//! Unix datagram types. //! Unix datagram types.
pub(crate) mod socket; pub(crate) mod socket;
pub(crate) mod split;
pub(crate) mod split_owned;
pub use split::{RecvHalf, SendHalf};
pub use split_owned::{OwnedRecvHalf, OwnedSendHalf, ReuniteError};

View File

@ -1,7 +1,4 @@
use crate::future::poll_fn;
use crate::io::PollEvented; use crate::io::PollEvented;
use crate::net::unix::datagram::split::{split, RecvHalf, SendHalf};
use crate::net::unix::datagram::split_owned::{split_owned, OwnedRecvHalf, OwnedSendHalf};
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt; use std::fmt;
@ -10,7 +7,6 @@ use std::net::Shutdown;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{self, SocketAddr}; use std::os::unix::net::{self, SocketAddr};
use std::path::Path; use std::path::Path;
use std::task::{Context, Poll};
cfg_uds! { cfg_uds! {
/// An I/O object representing a Unix datagram socket. /// An I/O object representing a Unix datagram socket.
@ -38,9 +34,9 @@ cfg_uds! {
/// ///
/// // Bind each socket to a filesystem path /// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx"); /// let tx_path = tmp.path().join("tx");
/// let mut tx = UnixDatagram::bind(&tx_path)?; /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx"); /// let rx_path = tmp.path().join("rx");
/// let mut rx = UnixDatagram::bind(&rx_path)?; /// let rx = UnixDatagram::bind(&rx_path)?;
/// ///
/// let bytes = b"hello world"; /// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?; /// tx.send_to(bytes, &rx_path).await?;
@ -64,7 +60,7 @@ cfg_uds! {
/// use tokio::net::UnixDatagram; /// use tokio::net::UnixDatagram;
/// ///
/// // Create the pair of sockets /// // Create the pair of sockets
/// let (mut sock1, mut sock2) = UnixDatagram::pair()?; /// let (sock1, sock2) = UnixDatagram::pair()?;
/// ///
/// // Since the sockets are paired, the paired send/recv /// // Since the sockets are paired, the paired send/recv
/// // functions can be used /// // functions can be used
@ -128,7 +124,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram; /// use tokio::net::UnixDatagram;
/// ///
/// // Create the pair of sockets /// // Create the pair of sockets
/// let (mut sock1, mut sock2) = UnixDatagram::pair()?; /// let (sock1, sock2) = UnixDatagram::pair()?;
/// ///
/// // Since the sockets are paired, the paired send/recv /// // Since the sockets are paired, the paired send/recv
/// // functions can be used /// // functions can be used
@ -208,12 +204,12 @@ impl UnixDatagram {
/// use tempfile::tempdir; /// use tempfile::tempdir;
/// ///
/// // Create an unbound socket /// // Create an unbound socket
/// let mut tx = UnixDatagram::unbound()?; /// let tx = UnixDatagram::unbound()?;
/// ///
/// // Create another, bound socket /// // Create another, bound socket
/// let tmp = tempdir()?; /// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx"); /// let rx_path = tmp.path().join("rx");
/// let mut rx = UnixDatagram::bind(&rx_path)?; /// let rx = UnixDatagram::bind(&rx_path)?;
/// ///
/// // Send to the bound socket /// // Send to the bound socket
/// let bytes = b"hello world"; /// let bytes = b"hello world";
@ -247,12 +243,12 @@ impl UnixDatagram {
/// use tempfile::tempdir; /// use tempfile::tempdir;
/// ///
/// // Create an unbound socket /// // Create an unbound socket
/// let mut tx = UnixDatagram::unbound()?; /// let tx = UnixDatagram::unbound()?;
/// ///
/// // Create another, bound socket /// // Create another, bound socket
/// let tmp = tempdir()?; /// let tmp = tempdir()?;
/// let rx_path = tmp.path().join("rx"); /// let rx_path = tmp.path().join("rx");
/// let mut rx = UnixDatagram::bind(&rx_path)?; /// let rx = UnixDatagram::bind(&rx_path)?;
/// ///
/// // Connect to the bound socket /// // Connect to the bound socket
/// tx.connect(&rx_path)?; /// tx.connect(&rx_path)?;
@ -284,7 +280,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram; /// use tokio::net::UnixDatagram;
/// ///
/// // Create the pair of sockets /// // Create the pair of sockets
/// let (mut sock1, mut sock2) = UnixDatagram::pair()?; /// let (sock1, sock2) = UnixDatagram::pair()?;
/// ///
/// // Since the sockets are paired, the paired send/recv /// // Since the sockets are paired, the paired send/recv
/// // functions can be used /// // functions can be used
@ -300,8 +296,10 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_send_priv(cx, buf)).await self.io
.async_io(mio::Ready::writable(), |sock| sock.send(buf))
.await
} }
/// Try to send a datagram to the peer without waiting. /// Try to send a datagram to the peer without waiting.
@ -371,32 +369,6 @@ impl UnixDatagram {
self.io.get_ref().send_to(buf, target) self.io.get_ref().send_to(buf, target)
} }
// Poll IO functions that takes `&self` are provided for the split API.
//
// They are not public because (taken from the doc of `PollEvented`):
//
// While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the
// caller must ensure that there are at most two tasks that use a
// `PollEvented` instance concurrently. One for reading and one for writing.
// While violating this requirement is "safe" from a Rust memory model point
// of view, it will result in unexpected behavior in the form of lost
// notifications and tasks hanging.
pub(crate) fn poll_send_priv(
&self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
/// Receives data from the socket. /// Receives data from the socket.
/// ///
/// # Examples /// # Examples
@ -407,7 +379,7 @@ impl UnixDatagram {
/// use tokio::net::UnixDatagram; /// use tokio::net::UnixDatagram;
/// ///
/// // Create the pair of sockets /// // Create the pair of sockets
/// let (mut sock1, mut sock2) = UnixDatagram::pair()?; /// let (sock1, sock2) = UnixDatagram::pair()?;
/// ///
/// // Since the sockets are paired, the paired send/recv /// // Since the sockets are paired, the paired send/recv
/// // functions can be used /// // functions can be used
@ -423,8 +395,10 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.poll_recv_priv(cx, buf)).await self.io
.async_io(mio::Ready::readable(), |sock| sock.recv(buf))
.await
} }
/// Try to receive a datagram from the peer without waiting. /// Try to receive a datagram from the peer without waiting.
@ -455,22 +429,6 @@ impl UnixDatagram {
self.io.get_ref().recv(buf) self.io.get_ref().recv(buf)
} }
pub(crate) fn poll_recv_priv(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
/// Sends data on the socket to the specified address. /// Sends data on the socket to the specified address.
/// ///
/// # Examples /// # Examples
@ -487,9 +445,9 @@ impl UnixDatagram {
/// ///
/// // Bind each socket to a filesystem path /// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx"); /// let tx_path = tmp.path().join("tx");
/// let mut tx = UnixDatagram::bind(&tx_path)?; /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx"); /// let rx_path = tmp.path().join("rx");
/// let mut rx = UnixDatagram::bind(&rx_path)?; /// let rx = UnixDatagram::bind(&rx_path)?;
/// ///
/// let bytes = b"hello world"; /// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?; /// tx.send_to(bytes, &rx_path).await?;
@ -504,28 +462,15 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize> pub async fn send_to<P>(&self, buf: &[u8], target: P) -> io::Result<usize>
where where
P: AsRef<Path> + Unpin, P: AsRef<Path>,
{ {
poll_fn(|cx| self.poll_send_to_priv(cx, buf, target.as_ref())).await self.io
} .async_io(mio::Ready::writable(), |sock| {
sock.send_to(buf, target.as_ref())
pub(crate) fn poll_send_to_priv( })
&self, .await
cx: &mut Context<'_>,
buf: &[u8],
target: &Path,
) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().send_to(buf, target) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?;
Poll::Pending
}
x => Poll::Ready(x),
}
} }
/// Receives data from the socket. /// Receives data from the socket.
@ -544,9 +489,9 @@ impl UnixDatagram {
/// ///
/// // Bind each socket to a filesystem path /// // Bind each socket to a filesystem path
/// let tx_path = tmp.path().join("tx"); /// let tx_path = tmp.path().join("tx");
/// let mut tx = UnixDatagram::bind(&tx_path)?; /// let tx = UnixDatagram::bind(&tx_path)?;
/// let rx_path = tmp.path().join("rx"); /// let rx_path = tmp.path().join("rx");
/// let mut rx = UnixDatagram::bind(&rx_path)?; /// let rx = UnixDatagram::bind(&rx_path)?;
/// ///
/// let bytes = b"hello world"; /// let bytes = b"hello world";
/// tx.send_to(bytes, &rx_path).await?; /// tx.send_to(bytes, &rx_path).await?;
@ -561,8 +506,10 @@ impl UnixDatagram {
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.poll_recv_from_priv(cx, buf)).await self.io
.async_io(mio::Ready::readable(), |sock| sock.recv_from(buf))
.await
} }
/// Try to receive data from the socket without waiting. /// Try to receive data from the socket without waiting.
@ -601,22 +548,6 @@ impl UnixDatagram {
self.io.get_ref().recv_from(buf) self.io.get_ref().recv_from(buf)
} }
pub(crate) fn poll_recv_from_priv(
&self,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<(usize, SocketAddr), io::Error>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;
match self.io.get_ref().recv_from(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, mio::Ready::readable())?;
Poll::Pending
}
x => Poll::Ready(x),
}
}
/// Returns the local address that this socket is bound to. /// Returns the local address that this socket is bound to.
/// ///
/// # Examples /// # Examples
@ -748,7 +679,7 @@ impl UnixDatagram {
/// use std::net::Shutdown; /// use std::net::Shutdown;
/// ///
/// // Create an unbound socket /// // Create an unbound socket
/// let (mut socket, other) = UnixDatagram::pair()?; /// let (socket, other) = UnixDatagram::pair()?;
/// ///
/// socket.shutdown(Shutdown::Both)?; /// socket.shutdown(Shutdown::Both)?;
/// ///
@ -768,102 +699,12 @@ impl UnixDatagram {
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
self.io.get_ref().shutdown(how) self.io.get_ref().shutdown(how)
} }
// These lifetime markers also appear in the generated documentation, and make
// it more clear that this is a *borrowed* split.
#[allow(clippy::needless_lifetimes)]
/// Split a `UnixDatagram` into a receive half and a send half, which can be used
/// to receive and send the datagram concurrently.
///
/// This method is more efficient than [`into_split`], but the halves cannot
/// be moved into independently spawned tasks.
///
/// [`into_split`]: fn@crate::net::UnixDatagram::into_split
///
/// # Examples
/// ```
/// # use std::error::Error;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn Error>> {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
/// let (mut sock1, mut sock2) = UnixDatagram::pair()?;
///
/// // Split sock1
/// let (sock1_rx, mut sock1_tx) = sock1.split();
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
/// let bytes = b"hello world";
/// sock1_tx.send(bytes).await?;
///
/// let mut buff = vec![0u8; 24];
/// let size = sock2.recv(&mut buff).await?;
///
/// let dgram = &buff[..size];
/// assert_eq!(dgram, bytes);
///
/// # Ok(())
/// # }
/// ```
pub fn split<'a>(&'a mut self) -> (RecvHalf<'a>, SendHalf<'a>) {
split(self)
}
/// Split a `UnixDatagram` into a receive half and a send half, which can be used
/// to receive and send the datagram concurrently.
///
/// Unlike [`split`], the owned halves can be moved to separate tasks,
/// however this comes at the cost of a heap allocation.
///
/// **Note:** Dropping the write half will shut down the write half of the
/// datagram. This is equivalent to calling [`shutdown(Write)`].
///
/// # Examples
/// ```
/// # use std::error::Error;
/// # #[tokio::main]
/// # async fn main() -> Result<(), Box<dyn Error>> {
/// use tokio::net::UnixDatagram;
///
/// // Create the pair of sockets
/// let (sock1, mut sock2) = UnixDatagram::pair()?;
///
/// // Split sock1
/// let (sock1_rx, mut sock1_tx) = sock1.into_split();
///
/// // Since the sockets are paired, the paired send/recv
/// // functions can be used
/// let bytes = b"hello world";
/// sock1_tx.send(bytes).await?;
///
/// let mut buff = vec![0u8; 24];
/// let size = sock2.recv(&mut buff).await?;
///
/// let dgram = &buff[..size];
/// assert_eq!(dgram, bytes);
///
/// # Ok(())
/// # }
/// ```
///
/// [`split`]: fn@crate::net::UnixDatagram::split
/// [`shutdown(Write)`]:fn@crate::net::UnixDatagram::shutdown
pub fn into_split(self) -> (OwnedRecvHalf, OwnedSendHalf) {
split_owned(self)
}
} }
impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram { impl TryFrom<UnixDatagram> for mio_uds::UnixDatagram {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> { fn try_from(value: UnixDatagram) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }

View File

@ -1,68 +0,0 @@
//! `UnixDatagram` split support.
//!
//! A `UnixDatagram` can be split into a `RecvHalf` and a `SendHalf` with the
//! `UnixDatagram::split` method.
use crate::future::poll_fn;
use crate::net::UnixDatagram;
use std::io;
use std::os::unix::net::SocketAddr;
use std::path::Path;
/// Borrowed receive half of a [`UnixDatagram`], created by [`split`].
///
/// [`UnixDatagram`]: UnixDatagram
/// [`split`]: crate::net::UnixDatagram::split()
#[derive(Debug)]
pub struct RecvHalf<'a>(&'a UnixDatagram);
/// Borrowed send half of a [`UnixDatagram`], created by [`split`].
///
/// [`UnixDatagram`]: UnixDatagram
/// [`split`]: crate::net::UnixDatagram::split()
#[derive(Debug)]
pub struct SendHalf<'a>(&'a UnixDatagram);
pub(crate) fn split(stream: &mut UnixDatagram) -> (RecvHalf<'_>, SendHalf<'_>) {
(RecvHalf(&*stream), SendHalf(&*stream))
}
impl RecvHalf<'_> {
/// Receives data from the socket.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.0.poll_recv_from_priv(cx, buf)).await
}
/// Receives data from the socket.
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_recv_priv(cx, buf)).await
}
}
impl SendHalf<'_> {
/// Sends data on the socket to the specified address.
pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path> + Unpin,
{
poll_fn(|cx| self.0.poll_send_to_priv(cx, buf, target.as_ref())).await
}
/// Sends data on the socket to the socket's peer.
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.0.poll_send_priv(cx, buf)).await
}
}
impl AsRef<UnixDatagram> for RecvHalf<'_> {
fn as_ref(&self) -> &UnixDatagram {
self.0
}
}
impl AsRef<UnixDatagram> for SendHalf<'_> {
fn as_ref(&self) -> &UnixDatagram {
self.0
}
}

View File

@ -1,148 +0,0 @@
//! `UnixDatagram` owned split support.
//!
//! A `UnixDatagram` can be split into an `OwnedSendHalf` and a `OwnedRecvHalf`
//! with the `UnixDatagram::into_split` method.
use crate::future::poll_fn;
use crate::net::UnixDatagram;
use std::error::Error;
use std::net::Shutdown;
use std::os::unix::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::{fmt, io};
pub(crate) fn split_owned(socket: UnixDatagram) -> (OwnedRecvHalf, OwnedSendHalf) {
let shared = Arc::new(socket);
let send = shared.clone();
let recv = shared;
(
OwnedRecvHalf { inner: recv },
OwnedSendHalf {
inner: send,
shutdown_on_drop: true,
},
)
}
/// Owned send half of a [`UnixDatagram`], created by [`into_split`].
///
/// [`UnixDatagram`]: UnixDatagram
/// [`into_split`]: UnixDatagram::into_split()
#[derive(Debug)]
pub struct OwnedSendHalf {
inner: Arc<UnixDatagram>,
shutdown_on_drop: bool,
}
/// Owned receive half of a [`UnixDatagram`], created by [`into_split`].
///
/// [`UnixDatagram`]: UnixDatagram
/// [`into_split`]: UnixDatagram::into_split()
#[derive(Debug)]
pub struct OwnedRecvHalf {
inner: Arc<UnixDatagram>,
}
/// Error indicating that two halves were not from the same socket, and thus could
/// not be `reunite`d.
#[derive(Debug)]
pub struct ReuniteError(pub OwnedSendHalf, pub OwnedRecvHalf);
impl fmt::Display for ReuniteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"tried to reunite halves that are not from the same socket"
)
}
}
impl Error for ReuniteError {}
fn reunite(s: OwnedSendHalf, r: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
if Arc::ptr_eq(&s.inner, &r.inner) {
s.forget();
// Only two instances of the `Arc` are ever created, one for the
// receiver and one for the sender, and those `Arc`s are never exposed
// externally. And so when we drop one here, the other one must be the
// only remaining one.
Ok(Arc::try_unwrap(r.inner).expect("UnixDatagram: try_unwrap failed in reunite"))
} else {
Err(ReuniteError(s, r))
}
}
impl OwnedRecvHalf {
/// Attempts to put the two "halves" of a `UnixDatagram` back together and
/// recover the original socket. Succeeds only if the two "halves"
/// originated from the same call to [`into_split`].
///
/// [`into_split`]: UnixDatagram::into_split()
pub fn reunite(self, other: OwnedSendHalf) -> Result<UnixDatagram, ReuniteError> {
reunite(other, self)
}
/// Receives data from the socket.
pub async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
poll_fn(|cx| self.inner.poll_recv_from_priv(cx, buf)).await
}
/// Receives data from the socket.
pub async fn recv(&mut self, buf: &mut [u8]) -> io::Result<usize> {
poll_fn(|cx| self.inner.poll_recv_priv(cx, buf)).await
}
}
impl OwnedSendHalf {
/// Attempts to put the two "halves" of a `UnixDatagram` back together and
/// recover the original socket. Succeeds only if the two "halves"
/// originated from the same call to [`into_split`].
///
/// [`into_split`]: UnixDatagram::into_split()
pub fn reunite(self, other: OwnedRecvHalf) -> Result<UnixDatagram, ReuniteError> {
reunite(self, other)
}
/// Sends data on the socket to the specified address.
pub async fn send_to<P>(&mut self, buf: &[u8], target: P) -> io::Result<usize>
where
P: AsRef<Path> + Unpin,
{
poll_fn(|cx| self.inner.poll_send_to_priv(cx, buf, target.as_ref())).await
}
/// Sends data on the socket to the socket's peer.
pub async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
poll_fn(|cx| self.inner.poll_send_priv(cx, buf)).await
}
/// Destroy the send half, but don't close the send half of the stream
/// until the receive half is dropped. If the read half has already been
/// dropped, this closes the stream.
pub fn forget(mut self) {
self.shutdown_on_drop = false;
drop(self);
}
}
impl Drop for OwnedSendHalf {
fn drop(&mut self) {
if self.shutdown_on_drop {
let _ = self.inner.shutdown(Shutdown::Write);
}
}
}
impl AsRef<UnixDatagram> for OwnedSendHalf {
fn as_ref(&self) -> &UnixDatagram {
&self.inner
}
}
impl AsRef<UnixDatagram> for OwnedRecvHalf {
fn as_ref(&self) -> &UnixDatagram {
&self.inner
}
}

View File

@ -2,7 +2,6 @@ use crate::future::poll_fn;
use crate::io::PollEvented; use crate::io::PollEvented;
use crate::net::unix::{Incoming, UnixStream}; use crate::net::unix::{Incoming, UnixStream};
use mio::Ready;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt; use std::fmt;
use std::io; use std::io;
@ -122,19 +121,19 @@ impl UnixListener {
&mut self, &mut self,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> { ) -> Poll<io::Result<(net::UnixStream, SocketAddr)>> {
ready!(self.io.poll_read_ready(cx, Ready::readable()))?; loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
match self.io.get_ref().accept_std() { match self.io.get_ref().accept_std() {
Ok(None) => { Ok(None) => {
self.io.clear_read_ready(cx, Ready::readable())?; self.io.clear_readiness(ev);
Poll::Pending }
Ok(Some((sock, addr))) => return Ok((sock, addr)).into(),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_readiness(ev);
}
Err(err) => return Err(err).into(),
} }
Ok(Some((sock, addr))) => Ok((sock, addr)).into(),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_read_ready(cx, Ready::readable())?;
Poll::Pending
}
Err(err) => Err(err).into(),
} }
} }
@ -197,11 +196,6 @@ impl TryFrom<UnixListener> for mio_uds::UnixListener {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixListener) -> Result<Self, Self::Error> { fn try_from(value: UnixListener) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }

View File

@ -143,11 +143,6 @@ impl TryFrom<UnixStream> for mio_uds::UnixStream {
type Error = io::Error; type Error = io::Error;
/// Consumes value, returning the mio I/O object. /// Consumes value, returning the mio I/O object.
///
/// See [`PollEvented::into_inner`] for more details about
/// resource deregistration that happens during the call.
///
/// [`PollEvented::into_inner`]: crate::io::PollEvented::into_inner
fn try_from(value: UnixStream) -> Result<Self, Self::Error> { fn try_from(value: UnixStream) -> Result<Self, Self::Error> {
value.io.into_inner() value.io.into_inner()
} }
@ -211,26 +206,28 @@ impl UnixStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>, buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> { ) -> Poll<io::Result<()>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; loop {
let ev = ready!(self.io.poll_read_ready(cx))?;
// Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes. // Safety: `UnixStream::read` will not peek at the maybe uinitialized bytes.
let b = let b = unsafe {
unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
match self.io.get_ref().read(b) { };
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { match self.io.get_ref().read(b) {
self.io.clear_read_ready(cx, mio::Ready::readable())?; Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Poll::Pending self.io.clear_readiness(ev);
}
Ok(n) => {
// Safety: We trust `UnixStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
} }
buf.add_filled(n); Ok(n) => {
Poll::Ready(Ok(())) // Safety: We trust `UnixStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
buf.add_filled(n);
return Poll::Ready(Ok(()));
}
Err(e) => return Poll::Ready(Err(e)),
} }
Err(e) => Poll::Ready(Err(e)),
} }
} }
@ -239,14 +236,15 @@ impl UnixStream {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.io.poll_write_ready(cx))?; loop {
let ev = ready!(self.io.poll_write_ready(cx))?;
match self.io.get_ref().write(buf) { match self.io.get_ref().write(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.io.clear_write_ready(cx)?; self.io.clear_readiness(ev);
Poll::Pending }
x => return Poll::Ready(x),
} }
x => Poll::Ready(x),
} }
} }
} }

View File

@ -1,13 +1,15 @@
//! Signal driver //! Signal driver
use crate::io::driver::Driver as IoDriver; use crate::io::driver::Driver as IoDriver;
use crate::io::Registration; use crate::io::PollEvented;
use crate::park::Park; use crate::park::Park;
use crate::runtime::context; use crate::runtime::context;
use crate::signal::registry::globals; use crate::signal::registry::globals;
use mio_uds::UnixStream; use mio_uds::UnixStream;
use std::io::{self, Read}; use std::io::{self, Read};
use std::ptr;
use std::sync::{Arc, Weak}; use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::time::Duration; use std::time::Duration;
/// Responsible for registering wakeups when an OS signal is received, and /// Responsible for registering wakeups when an OS signal is received, and
@ -21,11 +23,7 @@ pub(crate) struct Driver {
park: IoDriver, park: IoDriver,
/// A pipe for receiving wake events from the signal handler /// A pipe for receiving wake events from the signal handler
receiver: UnixStream, receiver: PollEvented<UnixStream>,
/// The actual registraiton for `receiver` when active.
/// Lazily bound at the first signal registration.
registration: Registration,
/// Shared state /// Shared state
inner: Arc<Inner>, inner: Arc<Inner>,
@ -58,13 +56,12 @@ impl Driver {
// either, since we can't compare Handles or assume they will always // either, since we can't compare Handles or assume they will always
// point to the exact same reactor. // point to the exact same reactor.
let receiver = globals().receiver.try_clone()?; let receiver = globals().receiver.try_clone()?;
let registration = let receiver =
Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; PollEvented::new_with_ready_and_handle(receiver, mio::Ready::all(), park.handle())?;
Ok(Self { Ok(Self {
park, park,
receiver, receiver,
registration,
inner: Arc::new(Inner(())), inner: Arc::new(Inner(())),
}) })
} }
@ -79,17 +76,23 @@ impl Driver {
fn process(&self) { fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up // Check if the pipe is ready to read and therefore has "woken" us up
match self.registration.take_read_ready() { //
Ok(Some(ready)) => assert!(ready.is_readable()), // To do so, we will `poll_read_ready` with a noop waker, since we don't
Ok(None) => return, // No wake has arrived, bail // need to actually be notified when read ready...
Err(e) => panic!("reactor gone: {}", e), let waker = unsafe { Waker::from_raw(RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)) };
} let mut cx = Context::from_waker(&waker);
let ev = match self.receiver.poll_read_ready(&mut cx) {
Poll::Ready(Ok(ev)) => ev,
Poll::Ready(Err(e)) => panic!("reactor gone: {}", e),
Poll::Pending => return, // No wake has arrived, bail
};
// Drain the pipe completely so we can receive a new readiness event // Drain the pipe completely so we can receive a new readiness event
// if another signal has come in. // if another signal has come in.
let mut buf = [0; 128]; let mut buf = [0; 128];
loop { loop {
match (&self.receiver).read(&mut buf) { match self.receiver.get_ref().read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"), Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading Ok(_) => continue, // Keep reading
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
@ -97,11 +100,21 @@ impl Driver {
} }
} }
self.receiver.clear_readiness(ev);
// Broadcast any signals which were received // Broadcast any signals which were received
globals().broadcast(); globals().broadcast();
} }
} }
const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
unsafe fn noop_clone(_data: *const ()) -> RawWaker {
RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE)
}
unsafe fn noop(_data: *const ()) {}
// ===== impl Park for Driver ===== // ===== impl Park for Driver =====
impl Park for Driver { impl Park for Driver {

View File

@ -141,13 +141,12 @@ impl AtomicWaker {
} }
} }
/*
/// Registers the current waker to be notified on calls to `wake`. /// Registers the current waker to be notified on calls to `wake`.
///
/// This is the same as calling `register_task` with `task::current()`.
#[cfg(feature = "io-driver")]
pub(crate) fn register(&self, waker: Waker) { pub(crate) fn register(&self, waker: Waker) {
self.do_register(waker); self.do_register(waker);
} }
*/
/// Registers the provided waker to be notified on calls to `wake`. /// Registers the provided waker to be notified on calls to `wake`.
/// ///

View File

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
#[derive(Clone, Copy)] #[derive(Clone, Copy, PartialEq)]
pub(crate) struct Pack { pub(crate) struct Pack {
mask: usize, mask: usize,
shift: u32, shift: u32,

View File

@ -106,6 +106,7 @@ impl<L: Link> LinkedList<L, L::Target> {
/// Removes the last element from a list and returns it, or None if it is /// Removes the last element from a list and returns it, or None if it is
/// empty. /// empty.
#[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))]
pub(crate) fn pop_back(&mut self) -> Option<L::Handle> { pub(crate) fn pop_back(&mut self) -> Option<L::Handle> {
unsafe { unsafe {
let last = self.tail?; let last = self.tail?;
@ -125,6 +126,7 @@ impl<L: Link> LinkedList<L, L::Target> {
} }
/// Returns whether the linked list doesn not contain any node /// Returns whether the linked list doesn not contain any node
#[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))]
pub(crate) fn is_empty(&self) -> bool { pub(crate) fn is_empty(&self) -> bool {
if self.head.is_some() { if self.head.is_some() {
return false; return false;
@ -180,6 +182,12 @@ impl<L: Link> fmt::Debug for LinkedList<L, L::Target> {
} }
} }
impl<L: Link> Default for LinkedList<L, L::Target> {
fn default() -> Self {
Self::new()
}
}
cfg_sync! { cfg_sync! {
impl<L: Link> LinkedList<L, L::Target> { impl<L: Link> LinkedList<L, L::Target> {
pub(crate) fn last(&self) -> Option<&L::Target> { pub(crate) fn last(&self) -> Option<&L::Target> {
@ -222,6 +230,52 @@ cfg_rt_threaded! {
} }
} }
// ===== impl DrainFilter =====
cfg_io_readiness! {
pub(crate) struct DrainFilter<'a, T: Link, F> {
list: &'a mut LinkedList<T, T::Target>,
filter: F,
curr: Option<NonNull<T::Target>>,
}
impl<T: Link> LinkedList<T, T::Target> {
pub(crate) fn drain_filter<F>(&mut self, filter: F) -> DrainFilter<'_, T, F>
where
F: FnMut(&mut T::Target) -> bool,
{
let curr = self.head;
DrainFilter {
curr,
filter,
list: self,
}
}
}
impl<'a, T, F> Iterator for DrainFilter<'a, T, F>
where
T: Link,
F: FnMut(&mut T::Target) -> bool,
{
type Item = T::Handle;
fn next(&mut self) -> Option<Self::Item> {
while let Some(curr) = self.curr {
// safety: the pointer references data contained by the list
self.curr = unsafe { T::pointers(curr).as_ref() }.next;
// safety: the value is still owned by the linked list.
if (self.filter)(unsafe { &mut *curr.as_ptr() }) {
return unsafe { self.list.remove(curr) };
}
}
None
}
}
}
// ===== impl Pointers ===== // ===== impl Pointers =====
impl<T> Pointers<T> { impl<T> Pointers<T> {

View File

@ -3,7 +3,7 @@ cfg_io_driver! {
pub(crate) mod slab; pub(crate) mod slab;
} }
#[cfg(any(feature = "sync", feature = "rt-core"))] #[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))]
pub(crate) mod linked_list; pub(crate) mod linked_list;
#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))] #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]

View File

@ -141,6 +141,8 @@ unsafe impl<T: Sync> Sync for Page<T> {}
unsafe impl<T: Sync> Send for Page<T> {} unsafe impl<T: Sync> Send for Page<T> {}
unsafe impl<T: Sync> Sync for CachedPage<T> {} unsafe impl<T: Sync> Sync for CachedPage<T> {}
unsafe impl<T: Sync> Send for CachedPage<T> {} unsafe impl<T: Sync> Send for CachedPage<T> {}
unsafe impl<T: Sync> Sync for Ref<T> {}
unsafe impl<T: Sync> Send for Ref<T> {}
/// A slot in the slab. Contains slot-specific metadata. /// A slot in the slab. Contains slot-specific metadata.
/// ///

View File

@ -172,10 +172,6 @@ async_assert_fn!(tokio::net::UdpSocket::send(_, &[u8]): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv(_, &mut [u8]): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::send_to(_, &[u8], SocketAddr): Send & Sync);
async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync); async_assert_fn!(tokio::net::UdpSocket::recv_from(_, &mut [u8]): Send & Sync);
async_assert_fn!(tokio::net::udp::RecvHalf::recv(_, &mut [u8]): Send & Sync);
async_assert_fn!(tokio::net::udp::RecvHalf::recv_from(_, &mut [u8]): Send & Sync);
async_assert_fn!(tokio::net::udp::SendHalf::send(_, &[u8]): Send & Sync);
async_assert_fn!(tokio::net::udp::SendHalf::send_to(_, &[u8], &SocketAddr): Send & Sync);
#[cfg(unix)] #[cfg(unix)]
mod unix_datagram { mod unix_datagram {

View File

@ -827,6 +827,7 @@ rt_test! {
#[test] #[test]
fn io_notify_while_shutting_down() { fn io_notify_while_shutting_down() {
use std::net::Ipv6Addr; use std::net::Ipv6Addr;
use std::sync::Arc;
for _ in 1..10 { for _ in 1..10 {
let runtime = rt(); let runtime = rt();
@ -834,7 +835,8 @@ rt_test! {
runtime.block_on(async { runtime.block_on(async {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap();
let addr = socket.local_addr().unwrap(); let addr = socket.local_addr().unwrap();
let (mut recv_half, mut send_half) = socket.split(); let send_half = Arc::new(socket);
let recv_half = send_half.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut buf = [0]; let mut buf = [0];

View File

@ -1,6 +1,7 @@
#![warn(rust_2018_idioms)] #![warn(rust_2018_idioms)]
#![cfg(feature = "full")] #![cfg(feature = "full")]
use std::sync::Arc;
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
const MSG: &[u8] = b"hello"; const MSG: &[u8] = b"hello";
@ -8,8 +9,8 @@ const MSG_LEN: usize = MSG.len();
#[tokio::test] #[tokio::test]
async fn send_recv() -> std::io::Result<()> { async fn send_recv() -> std::io::Result<()> {
let mut sender = UdpSocket::bind("127.0.0.1:0").await?; let sender = UdpSocket::bind("127.0.0.1:0").await?;
let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver = UdpSocket::bind("127.0.0.1:0").await?;
sender.connect(receiver.local_addr()?).await?; sender.connect(receiver.local_addr()?).await?;
receiver.connect(sender.local_addr()?).await?; receiver.connect(sender.local_addr()?).await?;
@ -25,8 +26,8 @@ async fn send_recv() -> std::io::Result<()> {
#[tokio::test] #[tokio::test]
async fn send_to_recv_from() -> std::io::Result<()> { async fn send_to_recv_from() -> std::io::Result<()> {
let mut sender = UdpSocket::bind("127.0.0.1:0").await?; let sender = UdpSocket::bind("127.0.0.1:0").await?;
let mut receiver = UdpSocket::bind("127.0.0.1:0").await?; let receiver = UdpSocket::bind("127.0.0.1:0").await?;
let receiver_addr = receiver.local_addr()?; let receiver_addr = receiver.local_addr()?;
sender.send_to(MSG, &receiver_addr).await?; sender.send_to(MSG, &receiver_addr).await?;
@ -42,9 +43,10 @@ async fn send_to_recv_from() -> std::io::Result<()> {
#[tokio::test] #[tokio::test]
async fn split() -> std::io::Result<()> { async fn split() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?; let socket = UdpSocket::bind("127.0.0.1:0").await?;
let (mut r, mut s) = socket.split(); let s = Arc::new(socket);
let r = s.clone();
let addr = s.as_ref().local_addr()?; let addr = s.local_addr()?;
tokio::spawn(async move { tokio::spawn(async move {
s.send_to(MSG, &addr).await.unwrap(); s.send_to(MSG, &addr).await.unwrap();
}); });
@ -54,24 +56,6 @@ async fn split() -> std::io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn reunite() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let (s, r) = socket.split();
assert!(s.reunite(r).is_ok());
Ok(())
}
#[tokio::test]
async fn reunite_error() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0").await?;
let socket1 = UdpSocket::bind("127.0.0.1:0").await?;
let (s, _) = socket.split();
let (_, r1) = socket1.split();
assert!(s.reunite(r1).is_err());
Ok(())
}
// # Note // # Note
// //
// This test is purposely written such that each time `sender` sends data on // This test is purposely written such that each time `sender` sends data on
@ -86,7 +70,7 @@ async fn try_send_spawn() {
const MSG2_LEN: usize = MSG2.len(); const MSG2_LEN: usize = MSG2.len();
let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let mut receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap(); let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
receiver receiver
.connect(sender.local_addr().unwrap()) .connect(sender.local_addr().unwrap())

View File

@ -6,8 +6,9 @@ use tokio::net::UnixDatagram;
use tokio::try_join; use tokio::try_join;
use std::io; use std::io;
use std::sync::Arc;
async fn echo_server(mut socket: UnixDatagram) -> io::Result<()> { async fn echo_server(socket: UnixDatagram) -> io::Result<()> {
let mut recv_buf = vec![0u8; 1024]; let mut recv_buf = vec![0u8; 1024];
loop { loop {
let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?; let (len, peer_addr) = socket.recv_from(&mut recv_buf[..]).await?;
@ -32,7 +33,7 @@ async fn echo() -> io::Result<()> {
}); });
{ {
let mut socket = UnixDatagram::bind(&client_path).unwrap(); let socket = UnixDatagram::bind(&client_path).unwrap();
socket.connect(server_path)?; socket.connect(server_path)?;
socket.send(b"ECHO").await?; socket.send(b"ECHO").await?;
let mut recv_buf = [0u8; 16]; let mut recv_buf = [0u8; 16];
@ -87,8 +88,8 @@ async fn try_send_recv_never_block() -> io::Result<()> {
async fn split() -> std::io::Result<()> { async fn split() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("split.sock"); let path = dir.path().join("split.sock");
let socket = UnixDatagram::bind(path.clone())?; let s = Arc::new(UnixDatagram::bind(path.clone())?);
let (mut r, mut s) = socket.into_split(); let r = s.clone();
let msg = b"hello"; let msg = b"hello";
let ((), ()) = try_join! { let ((), ()) = try_join! {
@ -106,28 +107,3 @@ async fn split() -> std::io::Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn reunite() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("reunite.sock");
let socket = UnixDatagram::bind(path)?;
let (s, r) = socket.into_split();
assert!(s.reunite(r).is_ok());
Ok(())
}
#[tokio::test]
async fn reunite_error() -> std::io::Result<()> {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("reunit.sock");
let dir = tempfile::tempdir().unwrap();
let path1 = dir.path().join("reunit.sock");
let socket = UnixDatagram::bind(path)?;
let socket1 = UnixDatagram::bind(path1)?;
let (s, _) = socket.into_split();
let (_, r1) = socket1.into_split();
assert!(s.reunite(r1).is_err());
Ok(())
}