diff --git a/tests/reactor.rs b/tests/reactor.rs new file mode 100644 index 000000000..1bac13ad4 --- /dev/null +++ b/tests/reactor.rs @@ -0,0 +1,89 @@ +extern crate futures; +extern crate tokio_executor; +extern crate tokio_reactor; +extern crate tokio_tcp; + +use tokio_reactor::Reactor; +use tokio_tcp::TcpListener; + +use futures::{Future, Stream}; +use futures::executor::{spawn, Notify, Spawn}; + +use std::mem; +use std::net::TcpStream; +use std::sync::{Arc, Mutex}; + +#[test] +fn test_drop_on_notify() { + // When the reactor receives a kernel notification, it notifies the + // task that holds the associated socket. If this notification results in + // the task being dropped, the socket will also be dropped. + // + // Previously, there was a deadlock scenario where the reactor, while + // notifying, held a lock and the task being dropped attempted to acquire + // that same lock in order to clean up state. + // + // To simulate this case, we create a fake executor that does nothing when + // the task is notified. This simulates an executor in the process of + // shutting down. Then, when the task handle is dropped, the task itself is + // dropped. + + struct MyNotify; + + type Task = Mutex>>>; + + impl Notify for MyNotify { + fn notify(&self, _: usize) { + // Do nothing + } + + fn clone_id(&self, id: usize) -> usize { + let ptr = id as *const Task; + let task = unsafe { Arc::from_raw(ptr) }; + + mem::forget(task.clone()); + mem::forget(task); + + id + } + + fn drop_id(&self, id: usize) { + let ptr = id as *const Task; + let _ = unsafe { Arc::from_raw(ptr) }; + } + } + + let addr = "127.0.0.1:0".parse().unwrap(); + let mut reactor = Reactor::new().unwrap(); + + // Create a listener + let listener = TcpListener::bind(&addr).unwrap(); + let addr = listener.local_addr().unwrap(); + + // Define a task that just drains the listener + let task = Box::new({ + listener.incoming() + .for_each(|_| Ok(())) + .map_err(|_| panic!()) + }) as Box>; + + let task = Arc::new(Mutex::new(spawn(task))); + let notify = Arc::new(MyNotify); + + let mut enter = tokio_executor::enter().unwrap(); + + tokio_reactor::with_default(&reactor.handle(), &mut enter, |_| { + let id = &*task as *const Task as usize; + + task.lock().unwrap() + .poll_future_notify(¬ify, id) + .unwrap(); + }); + + drop(task); + + // Establish a connection to the acceptor + let _s = TcpStream::connect(&addr).unwrap(); + + reactor.turn(None).unwrap(); +} diff --git a/tokio-reactor/src/atomic_task.rs b/tokio-reactor/src/atomic_task.rs index 6f895ac01..b48dca402 100644 --- a/tokio-reactor/src/atomic_task.rs +++ b/tokio-reactor/src/atomic_task.rs @@ -237,10 +237,9 @@ impl AtomicTask { } } - /// Notifies the task that last called `register`. - /// - /// If `register` has not been called yet, then this does nothing. - pub fn notify(&self) { + /// Attempts to take the `Task` value out of the `AtomicTask` with the + /// intention that the caller will notify the task. + pub fn take_to_notify(&self) -> Option { // AcqRel ordering is used in order to acquire the value of the `task` // cell as well as to establish a `release` ordering with whatever // memory the `AtomicTask` is associated with. @@ -252,9 +251,7 @@ impl AtomicTask { // Release the lock self.state.fetch_and(!NOTIFYING, Release); - if let Some(task) = task { - task.notify(); - } + task } state => { // There is a concurrent thread currently updating the @@ -268,9 +265,20 @@ impl AtomicTask { state == REGISTERING || state == REGISTERING | NOTIFYING || state == NOTIFYING); + + None } } } + + /// Notifies the task that last called `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn notify(&self) { + if let Some(task) = self.take_to_notify() { + task.notify(); + } + } } impl Default for AtomicTask { diff --git a/tokio-reactor/src/lib.rs b/tokio-reactor/src/lib.rs index 4c4dfc218..8706fc3cc 100644 --- a/tokio-reactor/src/lib.rs +++ b/tokio-reactor/src/lib.rs @@ -392,9 +392,19 @@ impl Reactor { let aba_guard = token.0 & !MAX_SOURCES; let token = token.0 & MAX_SOURCES; - let io_dispatch = self.inner.io_dispatch.read().unwrap(); + let mut rd = None; + let mut wr = None; + + // Create a scope to ensure that notifying the tasks stays out of the + // lock's critical section. + { + let io_dispatch = self.inner.io_dispatch.read().unwrap(); + + let io = match io_dispatch.get(token) { + Some(io) => io, + None => return, + }; - if let Some(io) = io_dispatch.get(token) { if aba_guard != io.aba_guard { return; } @@ -402,13 +412,21 @@ impl Reactor { io.readiness.fetch_or(ready.as_usize(), Relaxed); if ready.is_writable() || platform::is_hup(&ready) { - io.writer.notify(); + wr = io.writer.take_to_notify(); } if !(ready & (!mio::Ready::writable())).is_empty() { - io.reader.notify(); + rd = io.reader.take_to_notify(); } } + + if let Some(task) = rd { + task.notify(); + } + + if let Some(task) = wr { + task.notify(); + } } }