Fix bug related to spawning optimization (#375)

The thread pool optimizes cases where a task currently running on the
pool spawns a new future. However, the optimization did not factor in
cases where two thread pools interacted.

This patch fixes the optimization and includes a test.

Fixes #342
This commit is contained in:
Carl Lerche 2018-05-24 22:06:32 -07:00 committed by GitHub
parent 96f3ec903c
commit 4af6109398
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 7 deletions

View File

@ -269,24 +269,30 @@ impl Pool {
/// Called from either inside or outside of the scheduler. If currently on /// Called from either inside or outside of the scheduler. If currently on
/// the scheduler, then a fast path is taken. /// the scheduler, then a fast path is taken.
pub fn submit(&self, task: Arc<Task>, inner: &Arc<Pool>) { pub fn submit(&self, task: Arc<Task>, inner: &Arc<Pool>) {
debug_assert_eq!(*self, **inner);
Worker::with_current(|worker| { Worker::with_current(|worker| {
match worker { if let Some(worker) = worker {
// If the worker is in blocking mode, then even though the // If the worker is in blocking mode, then even though the
// thread-local variable is set, the current thread does not // thread-local variable is set, the current thread does not
// have ownership of that worker entry. This is because the // have ownership of that worker entry. This is because the
// worker entry has already been handed off to another thread. // worker entry has already been handed off to another thread.
Some(worker) if !worker.is_blocking() => { //
// The second check handles the case where the current thread is
// part of a different threadpool than the one being submitted
// to.
if !worker.is_blocking() && *self == *worker.inner {
let idx = worker.id.0; let idx = worker.id.0;
trace!(" -> submit internal; idx={}", idx); trace!(" -> submit internal; idx={}", idx);
worker.inner.workers[idx].submit_internal(task); worker.inner.workers[idx].submit_internal(task);
worker.inner.signal_work(inner); worker.inner.signal_work(inner);
return;
} }
_ => { }
self.submit_external(task, inner); self.submit_external(task, inner);
}
}
}); });
} }
@ -295,6 +301,8 @@ impl Pool {
/// Called from outside of the scheduler, this function is how new tasks /// Called from outside of the scheduler, this function is how new tasks
/// enter the system. /// enter the system.
pub fn submit_external(&self, task: Arc<Task>, inner: &Arc<Pool>) { pub fn submit_external(&self, task: Arc<Task>, inner: &Arc<Pool>) {
debug_assert_eq!(*self, **inner);
use worker::Lifecycle::Notified; use worker::Lifecycle::Notified;
// First try to get a handle to a sleeping worker. This ensures that // First try to get a handle to a sleeping worker. This ensures that
@ -322,6 +330,8 @@ impl Pool {
state: worker::State, state: worker::State,
inner: &Arc<Pool>) inner: &Arc<Pool>)
{ {
debug_assert_eq!(*self, **inner);
let entry = &self.workers[idx]; let entry = &self.workers[idx];
if !entry.submit_external(task, state) { if !entry.submit_external(task, state) {
@ -338,12 +348,15 @@ impl Pool {
self.backup_stack.push(&self.backup, backup_id) self.backup_stack.push(&self.backup, backup_id)
} }
pub fn notify_blocking_task(&self, pool: &Arc<Pool>) { pub fn notify_blocking_task(&self, inner: &Arc<Pool>) {
self.blocking.notify_task(&pool); debug_assert_eq!(*self, **inner);
self.blocking.notify_task(&inner);
} }
/// Provision a thread to run a worker /// Provision a thread to run a worker
pub fn spawn_thread(&self, id: WorkerId, inner: &Arc<Pool>) { pub fn spawn_thread(&self, id: WorkerId, inner: &Arc<Pool>) {
debug_assert_eq!(*self, **inner);
let backup_id = match self.backup_stack.pop(&self.backup, false) { let backup_id = match self.backup_stack.pop(&self.backup, false) {
Ok(Some(backup_id)) => backup_id, Ok(Some(backup_id)) => backup_id,
Ok(None) => panic!("no thread available"), Ok(None) => panic!("no thread available"),
@ -454,6 +467,8 @@ impl Pool {
/// If there are any other workers currently relaxing, signal them that work /// If there are any other workers currently relaxing, signal them that work
/// is available so that they can try to find more work to process. /// is available so that they can try to find more work to process.
pub fn signal_work(&self, inner: &Arc<Pool>) { pub fn signal_work(&self, inner: &Arc<Pool>) {
debug_assert_eq!(*self, **inner);
use worker::Lifecycle::*; use worker::Lifecycle::*;
if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { if let Some((idx, mut worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
@ -534,5 +549,11 @@ impl Pool {
} }
} }
impl PartialEq for Pool {
fn eq(&self, other: &Pool) -> bool {
self as *const _ == other as *const _
}
}
unsafe impl Send for Pool {} unsafe impl Send for Pool {}
unsafe impl Sync for Pool {} unsafe impl Sync for Pool {}

View File

@ -560,3 +560,29 @@ fn panic_in_task() {
await_shutdown(pool.shutdown_on_idle()); await_shutdown(pool.shutdown_on_idle());
} }
#[test]
fn multi_threadpool() {
use futures::sync::oneshot;
let pool1 = ThreadPool::new();
let pool2 = ThreadPool::new();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
pool2.spawn({
rx.and_then(move |_| {
done_tx.send(()).unwrap();
Ok(())
})
.map_err(|e| panic!("err={:?}", e))
});
pool1.spawn(lazy(move || {
tx.send(()).unwrap();
Ok(())
}));
done_rx.recv().unwrap();
}