diff --git a/src/executor/current_thread/mod.rs b/src/executor/current_thread/mod.rs index bb3fe0c87..e61fdd2ca 100644 --- a/src/executor/current_thread/mod.rs +++ b/src/executor/current_thread/mod.rs @@ -529,22 +529,9 @@ impl<'a, P: Park> Entered<'a, P> { /// Returns `true` if any futures were processed fn tick(&mut self) -> bool { - let num_futures = &mut self.executor.num_futures; - let enter = &mut *self.enter; - - // work the scheduler - self.executor.scheduler.tick(|scheduler, scheduled| { - let mut borrow = Borrow { - scheduler, - num_futures, - }; - - // A future completed, decrement the future count - if borrow.enter(enter, || scheduled.tick()) { - debug_assert!(*borrow.num_futures > 0); - *borrow.num_futures -= 1; - } - }) + self.executor.scheduler.tick( + &mut *self.enter, + &mut self.executor.num_futures) } } diff --git a/src/executor/current_thread/scheduler.rs b/src/executor/current_thread/scheduler.rs index 53c1fc289..351730bf8 100644 --- a/src/executor/current_thread/scheduler.rs +++ b/src/executor/current_thread/scheduler.rs @@ -1,3 +1,5 @@ +use super::Borrow; +use tokio_executor::Enter; use tokio_executor::park::Unpark; use futures::{Future, Async}; @@ -118,6 +120,7 @@ struct Node { enum Dequeue { Data(*const Node), Empty, + Yield, Inconsistent, } @@ -198,8 +201,7 @@ where U: Unpark, /// /// This function should be called whenever the caller is notified via a /// wakeup. - pub fn tick(&mut self, mut f: F) -> bool - where F: FnMut(&mut Self, &mut Scheduled), + pub fn tick(&mut self, enter: &mut Enter, num_futures: &mut usize) -> bool { let mut ret = false; let tick = self.inner.tick_num.fetch_add(1, SeqCst) @@ -210,6 +212,10 @@ where U: Unpark, Dequeue::Empty => { return ret; } + Dequeue::Yield => { + self.inner.unpark.unpark(); + return ret; + } Dequeue::Inconsistent => { thread::yield_now(); continue; @@ -247,22 +253,31 @@ where U: Unpark, // assume is is complete (will return Ready or panic), in // which case we'll want to discard it regardless. // - struct Bomb<'a, U: 'a> { - queue: &'a mut Scheduler, + struct Bomb<'a, U: Unpark + 'a> { + borrow: &'a mut Borrow<'a, U>, + enter: &'a mut Enter, node: Option>>, } - impl<'a, U> Drop for Bomb<'a, U> { + impl<'a, U: Unpark> Drop for Bomb<'a, U> { fn drop(&mut self) { if let Some(node) = self.node.take() { - release_node(node); + self.borrow.enter(self.enter, || release_node(node)) } } } + let node = self.nodes.remove(node); + + let mut borrow = Borrow { + scheduler: self, + num_futures, + }; + let mut bomb = Bomb { - node: Some(self.nodes.remove(node)), - queue: self, + node: Some(node), + enter: enter, + borrow: &mut borrow, }; let mut done = false; @@ -295,7 +310,8 @@ where U: Unpark, // instances. These structs will basically just use `T` to size // the internal allocation, appropriately accessing fields and // deallocating the node if need be. - let queue = &mut *bomb.queue; + let borrow = &mut *bomb.borrow; + let enter = &mut *bomb.enter; let notify = Notify(bomb.node.as_ref().unwrap()); let mut scheduled = Scheduled { @@ -304,14 +320,16 @@ where U: Unpark, done: &mut done, }; - f(queue, &mut scheduled); + if borrow.enter(enter, || scheduled.tick()) { + *borrow.num_futures -= 1; + } } if !done { // The future is not done, push it back into the "all // node" list. let node = bomb.node.take().unwrap(); - bomb.queue.nodes.push_back(node); + bomb.borrow.scheduler.nodes.push_back(node); } } } @@ -452,7 +470,7 @@ impl Inner { // // If, for some reason, this is not enough, calling `unpark` // here will resolve the issue. - return Dequeue::Empty; + return Dequeue::Yield; } } @@ -496,6 +514,7 @@ impl Drop for Inner { loop { match self.dequeue(None) { Dequeue::Empty => break, + Dequeue::Yield => unreachable!(), Dequeue::Inconsistent => abort("inconsistent in drop"), Dequeue::Data(ptr) => drop(ptr2arc(ptr)), } diff --git a/tests/current_thread.rs b/tests/current_thread.rs index 2f3b98e5d..51695f40d 100644 --- a/tests/current_thread.rs +++ b/tests/current_thread.rs @@ -4,6 +4,7 @@ extern crate futures; use tokio::executor::current_thread::{self, block_on_all, CurrentThread}; +use std::any::Any; use std::cell::{Cell, RefCell}; use std::rc::Rc; use std::thread; @@ -293,6 +294,48 @@ fn spawn_and_turn() { assert_eq!(2, cnt.get()); } +#[test] +fn spawn_in_drop() { + let mut current_thread = CurrentThread::new(); + + let (tx, rx) = oneshot::channel(); + + struct OnDrop(Option); + + impl Drop for OnDrop { + fn drop(&mut self) { + (self.0.take().unwrap())(); + } + } + + struct MyFuture { + _data: Box, + } + + impl Future for MyFuture { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + Ok(().into()) + } + } + + current_thread.spawn({ + MyFuture { + _data: Box::new(OnDrop(Some(move || { + current_thread::spawn(lazy(move || { + tx.send(()).unwrap(); + Ok(()) + })); + }))), + } + }); + + current_thread.block_on(rx).unwrap(); + current_thread.run().unwrap(); +} + #[test] fn hammer_turn() { use futures::sync::mpsc;