From 6aeeeff6e8a30ac59941bc00d03c29e7d66d2204 Mon Sep 17 00:00:00 2001 From: Danny Browning Date: Mon, 11 May 2020 13:47:03 -0600 Subject: [PATCH] io: add mio::Ready argument to PollEvented (#2419) Add additional methods to allow PollEvented to be created with an appropriate mio::Ready state, so that it can be properly registered with the reactor. Fixes #2413 --- tokio/src/io/driver/mod.rs | 24 +++++++++++++--------- tokio/src/io/poll_evented.rs | 30 ++++++++++++++++++++++++++- tokio/src/io/registration.rs | 39 +++++++++++++++++++++++++++++++++++- 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index d8535d9ab..dbfb6e16e 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -237,10 +237,14 @@ impl fmt::Debug for Handle { // ===== impl Inner ===== impl Inner { - /// Registers an I/O resource with the reactor. + /// Registers an I/O resource with the reactor for a given `mio::Ready` state. /// /// The registration token is returned. - pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result
{ + pub(super) fn add_source( + &self, + source: &dyn Evented, + ready: mio::Ready, + ) -> io::Result
{ let address = self.io_dispatch.alloc().ok_or_else(|| { io::Error::new( io::ErrorKind::Other, @@ -253,7 +257,7 @@ impl Inner { self.io.register( source, mio::Token(address.to_usize()), - mio::Ready::all(), + ready, mio::PollOpt::edge(), )?; @@ -339,12 +343,12 @@ mod tests { let inner = reactor.inner; let inner2 = inner.clone(); - let token_1 = inner.add_source(&NotEvented).unwrap(); + let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); let thread = thread::spawn(move || { inner2.drop_source(token_1); }); - let token_2 = inner.add_source(&NotEvented).unwrap(); + let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); thread.join().unwrap(); assert!(token_1 != token_2); @@ -360,15 +364,15 @@ mod tests { // add sources to fill up the first page so that the dropped index // may be reused. for _ in 0..31 { - inner.add_source(&NotEvented).unwrap(); + inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); } - let token_1 = inner.add_source(&NotEvented).unwrap(); + let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); let thread = thread::spawn(move || { inner2.drop_source(token_1); }); - let token_2 = inner.add_source(&NotEvented).unwrap(); + let token_2 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); thread.join().unwrap(); assert!(token_1 != token_2); @@ -383,11 +387,11 @@ mod tests { let inner2 = inner.clone(); let thread = thread::spawn(move || { - let token_2 = inner2.add_source(&NotEvented).unwrap(); + let token_2 = inner2.add_source(&NotEvented, mio::Ready::all()).unwrap(); token_2 }); - let token_1 = inner.add_source(&NotEvented).unwrap(); + let token_1 = inner.add_source(&NotEvented, mio::Ready::all()).unwrap(); let token_2 = thread.join().unwrap(); assert!(token_1 != token_2); diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 3ca30a82b..085a7e842 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -175,7 +175,35 @@ where /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. pub fn new(io: E) -> io::Result { - let registration = Registration::new(&io)?; + PollEvented::new_with_ready(io, mio::Ready::all()) + } + + /// Creates a new `PollEvented` associated with the default reactor, for specific `mio::Ready` + /// state. `new_with_ready` should be used over `new` when you need control over the readiness + /// state, such as when a file descriptor only allows reads. This does not add `hup` or `error` + /// so if you are interested in those states, you will need to add them to the readiness state + /// passed to this function. + /// + /// An example to listen to read only + /// + /// ```rust + /// ##[cfg(unix)] + /// mio::Ready::from_usize( + /// mio::Ready::readable().as_usize() + /// | mio::unix::UnixReady::error().as_usize() + /// | mio::unix::UnixReady::hup().as_usize() + /// ); + /// ``` + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result { + let registration = Registration::new_with_ready(&io, ready)?; Ok(Self { io: Some(io), inner: Inner { diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 4df11999f..6e7d84b4f 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -63,12 +63,49 @@ impl Registration { /// from a future driven by a tokio runtime, otherwise runtime can be set /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. pub fn new(io: &T) -> io::Result + where + T: Evented, + { + Registration::new_with_ready(io, mio::Ready::all()) + } + + /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. + /// `new_with_ready` should be used over `new` when you need control over the readiness state, + /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if + /// you are interested in those states, you will need to add them to the readiness state passed + /// to this function. + /// + /// An example to listen to read only + /// + /// ```rust + /// ##[cfg(unix)] + /// mio::Ready::from_usize( + /// mio::Ready::readable().as_usize() + /// | mio::unix::UnixReady::error().as_usize() + /// | mio::unix::UnixReady::hup().as_usize() + /// ); + /// ``` + /// + /// # Return + /// + /// - `Ok` if the registration happened successfully + /// - `Err` if an error was encountered during registration + /// + /// + /// # Panics + /// + /// This function panics if thread-local runtime is not set. + /// + /// The runtime is usually set implicitly when this function is called + /// from a future driven by a tokio runtime, otherwise runtime can be set + /// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function. + pub fn new_with_ready(io: &T, ready: mio::Ready) -> io::Result where T: Evented, { let handle = Handle::current(); let address = if let Some(inner) = handle.inner() { - inner.add_source(io)? + inner.add_source(io, ready)? } else { return Err(io::Error::new( io::ErrorKind::Other,