Support current_thread::spawn from task drop (#157)

Currently, the thread-local tracking the current thread executor is not
set when a task is dropped. This means that one cannot spawn a new
future from within the drop implementation of another future.

This patch adds support for this by setting the thread-local before
releasing a task.

This implementation is a bit messy. It probably could be cleaned up, but
this is being put off in favor of trying a more comprehensive
reorganization once the current thread executor is feature complete.
This commit is contained in:
Carl Lerche 2018-02-27 09:56:29 -08:00 committed by GitHub
parent 427b7325d0
commit 8e1a9101f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 28 deletions

View File

@ -529,22 +529,9 @@ impl<'a, P: Park> Entered<'a, P> {
/// Returns `true` if any futures were processed
fn tick(&mut self) -> bool {
let num_futures = &mut self.executor.num_futures;
let enter = &mut *self.enter;
// work the scheduler
self.executor.scheduler.tick(|scheduler, scheduled| {
let mut borrow = Borrow {
scheduler,
num_futures,
};
// A future completed, decrement the future count
if borrow.enter(enter, || scheduled.tick()) {
debug_assert!(*borrow.num_futures > 0);
*borrow.num_futures -= 1;
}
})
self.executor.scheduler.tick(
&mut *self.enter,
&mut self.executor.num_futures)
}
}

View File

@ -1,3 +1,5 @@
use super::Borrow;
use tokio_executor::Enter;
use tokio_executor::park::Unpark;
use futures::{Future, Async};
@ -118,6 +120,7 @@ struct Node<U> {
enum Dequeue<U> {
Data(*const Node<U>),
Empty,
Yield,
Inconsistent,
}
@ -198,8 +201,7 @@ where U: Unpark,
///
/// This function should be called whenever the caller is notified via a
/// wakeup.
pub fn tick<F>(&mut self, mut f: F) -> bool
where F: FnMut(&mut Self, &mut Scheduled<U>),
pub fn tick(&mut self, enter: &mut Enter, num_futures: &mut usize) -> bool
{
let mut ret = false;
let tick = self.inner.tick_num.fetch_add(1, SeqCst)
@ -210,6 +212,10 @@ where U: Unpark,
Dequeue::Empty => {
return ret;
}
Dequeue::Yield => {
self.inner.unpark.unpark();
return ret;
}
Dequeue::Inconsistent => {
thread::yield_now();
continue;
@ -247,22 +253,31 @@ where U: Unpark,
// assume is is complete (will return Ready or panic), in
// which case we'll want to discard it regardless.
//
struct Bomb<'a, U: 'a> {
queue: &'a mut Scheduler<U>,
struct Bomb<'a, U: Unpark + 'a> {
borrow: &'a mut Borrow<'a, U>,
enter: &'a mut Enter,
node: Option<Arc<Node<U>>>,
}
impl<'a, U> Drop for Bomb<'a, U> {
impl<'a, U: Unpark> Drop for Bomb<'a, U> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
release_node(node);
self.borrow.enter(self.enter, || release_node(node))
}
}
}
let node = self.nodes.remove(node);
let mut borrow = Borrow {
scheduler: self,
num_futures,
};
let mut bomb = Bomb {
node: Some(self.nodes.remove(node)),
queue: self,
node: Some(node),
enter: enter,
borrow: &mut borrow,
};
let mut done = false;
@ -295,7 +310,8 @@ where U: Unpark,
// instances. These structs will basically just use `T` to size
// the internal allocation, appropriately accessing fields and
// deallocating the node if need be.
let queue = &mut *bomb.queue;
let borrow = &mut *bomb.borrow;
let enter = &mut *bomb.enter;
let notify = Notify(bomb.node.as_ref().unwrap());
let mut scheduled = Scheduled {
@ -304,14 +320,16 @@ where U: Unpark,
done: &mut done,
};
f(queue, &mut scheduled);
if borrow.enter(enter, || scheduled.tick()) {
*borrow.num_futures -= 1;
}
}
if !done {
// The future is not done, push it back into the "all
// node" list.
let node = bomb.node.take().unwrap();
bomb.queue.nodes.push_back(node);
bomb.borrow.scheduler.nodes.push_back(node);
}
}
}
@ -452,7 +470,7 @@ impl<U> Inner<U> {
//
// If, for some reason, this is not enough, calling `unpark`
// here will resolve the issue.
return Dequeue::Empty;
return Dequeue::Yield;
}
}
@ -496,6 +514,7 @@ impl<U> Drop for Inner<U> {
loop {
match self.dequeue(None) {
Dequeue::Empty => break,
Dequeue::Yield => unreachable!(),
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
}

View File

@ -4,6 +4,7 @@ extern crate futures;
use tokio::executor::current_thread::{self, block_on_all, CurrentThread};
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::thread;
@ -293,6 +294,48 @@ fn spawn_and_turn() {
assert_eq!(2, cnt.get());
}
#[test]
fn spawn_in_drop() {
let mut current_thread = CurrentThread::new();
let (tx, rx) = oneshot::channel();
struct OnDrop<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0.take().unwrap())();
}
}
struct MyFuture {
_data: Box<Any>,
}
impl Future for MyFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(().into())
}
}
current_thread.spawn({
MyFuture {
_data: Box::new(OnDrop(Some(move || {
current_thread::spawn(lazy(move || {
tx.send(()).unwrap();
Ok(())
}));
}))),
}
});
current_thread.block_on(rx).unwrap();
current_thread.run().unwrap();
}
#[test]
fn hammer_turn() {
use futures::sync::mpsc;