diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 0bce72ac6..53b8bcc99 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -84,7 +84,7 @@ where F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); + self.scheduler.schedule(task, true); handle } @@ -161,7 +161,7 @@ impl Spawner { F::Output: Send + 'static, { let (task, handle) = task::joinable(future); - self.scheduler.schedule(task); + self.scheduler.schedule(task, true); handle } @@ -230,6 +230,27 @@ impl SchedulerPriv { self.queues.push_local(task); handle } + + fn schedule(&self, task: Task, spawn: bool) { + let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); + + if is_current { + unsafe { + // safety: this function is safe to call only from the + // thread the basic scheduler is running on. If `is_current` is + // then we are on that thread. + self.queues.push_local(task) + }; + } else { + let mut lock = self.queues.remote(); + lock.schedule(task, spawn); + + // while locked, call unpark + self.unpark.unpark(); + + drop(lock); + } + } } impl Schedule for SchedulerPriv { @@ -260,24 +281,7 @@ impl Schedule for SchedulerPriv { } fn schedule(&self, task: Task) { - let is_current = ACTIVE.with(|cell| cell.get() == self as *const SchedulerPriv); - - if is_current { - unsafe { - // safety: this function is safe to call only from the - // thread the basic scheduler is running on. If `is_current` is - // then we are on that thread. - self.queues.push_local(task) - }; - } else { - let mut lock = self.queues.remote(); - lock.schedule(task); - - // while locked, call unpark - self.unpark.unpark(); - - drop(lock); - } + SchedulerPriv::schedule(self, task, false); } } @@ -298,10 +302,16 @@ where } // Wait until all tasks have been released. - while unsafe { self.scheduler.queues.has_tasks_remaining() } { - self.local.park.park().ok().expect("park failed"); + loop { unsafe { self.scheduler.queues.drain_pending_drop(); + self.scheduler.queues.drain_queues(); + + if !self.scheduler.queues.has_tasks_remaining() { + break; + } + + self.local.park.park().ok().expect("park failed"); } } } diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 203a2f396..d43187a8b 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -345,7 +345,7 @@ impl Schedule for Scheduler { unsafe { self.queues.push_local(task) }; } else { let mut lock = self.queues.remote(); - lock.schedule(task); + lock.schedule(task, false); self.waker.wake(); @@ -437,8 +437,15 @@ impl Drop for Scheduler { // Wait until all tasks have been released. // XXX: this is a busy loop, but we don't really have any way to park // the thread here? - while self.queues.has_tasks_remaining() { + loop { self.queues.drain_pending_drop(); + self.queues.drain_queues(); + + if !self.queues.has_tasks_remaining() { + break; + } + + std::thread::yield_now(); } } } diff --git a/tokio/src/task/queue.rs b/tokio/src/task/queue.rs index 6a004fc7e..a2badc8af 100644 --- a/tokio/src/task/queue.rs +++ b/tokio/src/task/queue.rs @@ -233,6 +233,16 @@ where self.drain_pending_drop(); } + /// Drain both the local and remote run queues, shutting down any tasks. + /// + /// # Safety + /// + /// This *must* be called only from the thread that owns the scheduler. + pub(crate) unsafe fn drain_queues(&self) { + self.close_local(); + self.close_remote(); + } + /// Shut down the scheduler's owned task list. /// /// # Safety @@ -300,8 +310,10 @@ where /// If the queue is open to accept new tasks, the task is pushed to the back /// of the queue. Otherwise, if the queue is closed (the scheduler is /// shutting down), the new task will be shut down immediately. - pub(crate) fn schedule(&mut self, task: Task) { - if self.open { + /// + /// `spawn` should be set if the caller is spawning a new task. + pub(crate) fn schedule(&mut self, task: Task, spawn: bool) { + if !spawn || self.open { self.queue.push_back(task); } else { task.shutdown(); diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 68e09ef33..4f0beab9c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -27,6 +27,42 @@ fn spawned_task_does_not_progress_without_block_on() { assert_eq!(out, "hello"); } +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + use tokio::task; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut rt = rt(); + + rt.spawn(async move { + let _ = rx2.await; + unreachable!(); + }); + + rt.spawn(async move { + let _ = rx1.await; + let _ = tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + rt.spawn(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + rt.block_on(async { + task::yield_now().await; + }); + + // Drop the rt + drop(rt); +} + fn rt() -> Runtime { tokio::runtime::Builder::new() .basic_scheduler() diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs new file mode 100644 index 000000000..f50142750 --- /dev/null +++ b/tokio/tests/task_local_set.rs @@ -0,0 +1,50 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::oneshot; +use tokio::task::{self, LocalSet}; + +#[test] +fn acquire_mutex_in_drop() { + use futures::future::pending; + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let mut rt = rt(); + let local = LocalSet::new(); + + local.spawn_local(async move { + let _ = rx2.await; + unreachable!(); + }); + + local.spawn_local(async move { + let _ = rx1.await; + let _ = tx2.send(()).unwrap(); + unreachable!(); + }); + + // Spawn a task that will never notify + local.spawn_local(async move { + pending::<()>().await; + tx1.send(()).unwrap(); + }); + + // Tick the loop + local.block_on(&mut rt, async { + task::yield_now().await; + }); + + // Drop the LocalSet + drop(local); +} + +fn rt() -> Runtime { + tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .build() + .unwrap() +}