reactor: use AtomicTask::register to reduce unnecessary task clones (#899)

This commit is contained in:
Sean McArthur 2019-02-18 13:04:07 -08:00 committed by Carl Lerche
parent 27a42b980c
commit d1d72dc1c8

View File

@ -60,6 +60,12 @@ struct Inner {
token: usize, token: usize,
} }
#[derive(PartialEq)]
enum Notify {
Yes,
No,
}
/// Tasks waiting on readiness notifications. /// Tasks waiting on readiness notifications.
#[derive(Debug)] #[derive(Debug)]
struct Node { struct Node {
@ -276,7 +282,7 @@ impl Registration {
/// ///
/// This function will panic if called from outside of a task context. /// This function will panic if called from outside of a task context.
pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> { pub fn poll_read_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Read, true, || task::current()) self.poll_ready(Direction::Read, Notify::Yes)
.map(|v| match v { .map(|v| match v {
Some(v) => Async::Ready(v), Some(v) => Async::Ready(v),
_ => Async::NotReady, _ => Async::NotReady,
@ -291,7 +297,7 @@ impl Registration {
/// ///
/// [`poll_read_ready`]: #method.poll_read_ready /// [`poll_read_ready`]: #method.poll_read_ready
pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> { pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Read, false, || panic!()) self.poll_ready(Direction::Read, Notify::No)
} }
@ -328,7 +334,7 @@ impl Registration {
/// ///
/// This function will panic if called from outside of a task context. /// This function will panic if called from outside of a task context.
pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> { pub fn poll_write_ready(&self) -> Poll<mio::Ready, io::Error> {
self.poll_ready(Direction::Write, true, || task::current()) self.poll_ready(Direction::Write, Notify::Yes)
.map(|v| match v { .map(|v| match v {
Some(v) => Async::Ready(v), Some(v) => Async::Ready(v),
_ => Async::NotReady, _ => Async::NotReady,
@ -343,12 +349,11 @@ impl Registration {
/// ///
/// [`poll_write_ready`]: #method.poll_write_ready /// [`poll_write_ready`]: #method.poll_write_ready
pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> { pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
self.poll_ready(Direction::Write, false, || unreachable!()) self.poll_ready(Direction::Write, Notify::No)
} }
fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F) fn poll_ready(&self, direction: Direction, notify: Notify)
-> io::Result<Option<mio::Ready>> -> io::Result<Option<mio::Ready>>
where F: Fn() -> Task
{ {
let mut state = self.state.load(SeqCst); let mut state = self.state.load(SeqCst);
@ -363,17 +368,17 @@ impl Registration {
} }
READY => { READY => {
let inner = unsafe { (*self.inner.get()).as_ref().unwrap() }; let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
return inner.poll_ready(direction, notify, task); return inner.poll_ready(direction, notify);
} }
LOCKED => { LOCKED => {
if !notify { if let Notify::No = notify {
// Skip the notification tracking junk. // Skip the notification tracking junk.
return Ok(None); return Ok(None);
} }
let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node; let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
let task = task(); let task = task::current();
// Get the node // Get the node
let mut n = node.take().unwrap_or_else(|| { let mut n = node.take().unwrap_or_else(|| {
@ -473,9 +478,8 @@ impl Inner {
inner.deregister_source(io) inner.deregister_source(io)
} }
fn poll_ready<F>(&self, direction: Direction, notify: bool, task: F) fn poll_ready(&self, direction: Direction, notify: Notify)
-> io::Result<Option<mio::Ready>> -> io::Result<Option<mio::Ready>>
where F: FnOnce() -> Task
{ {
if self.token == ERROR { if self.token == ERROR {
return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor")); return Err(io::Error::new(io::ErrorKind::Other, "failed to associate with reactor"));
@ -503,13 +507,12 @@ impl Inner {
let mut ready = mask & mio::Ready::from_usize( let mut ready = mask & mio::Ready::from_usize(
sched.readiness.fetch_and(!mask_no_hup, SeqCst)); sched.readiness.fetch_and(!mask_no_hup, SeqCst));
if ready.is_empty() && notify { if ready.is_empty() && notify == Notify::Yes {
debug!("scheduling {:?} for: {}", direction, self.token); debug!("scheduling {:?} for: {}", direction, self.token);
let task = task();
// Update the task info // Update the task info
match direction { match direction {
Direction::Read => sched.reader.register_task(task), Direction::Read => sched.reader.register(),
Direction::Write => sched.writer.register_task(task), Direction::Write => sched.writer.register(),
} }
// Try again // Try again