mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
Count in-transit spawned futures to current thread executor as pending (#478)
This commit is contained in:
parent
f212a2ab9d
commit
1e90e27720
@ -99,6 +99,40 @@ fn runtime_single_threaded_block_on_all() {
|
||||
assert_eq!(msg, "hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_single_threaded_racy_spawn() {
|
||||
let (trigger, exit) = futures::sync::oneshot::channel();
|
||||
let (handle_tx, handle_rx) = ::std::sync::mpsc::channel();
|
||||
let jh = ::std::thread::spawn(move || {
|
||||
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
|
||||
handle_tx.send(rt.handle()).unwrap();
|
||||
|
||||
// don't exit until we are told to
|
||||
rt.block_on(exit.map_err(|_| ())).unwrap();
|
||||
|
||||
// run until all spawned futures (incl. the "exit" signal future) have completed.
|
||||
rt.run().unwrap();
|
||||
});
|
||||
|
||||
let (tx, rx) = futures::sync::oneshot::channel();
|
||||
|
||||
let handle = handle_rx.recv().unwrap();
|
||||
handle
|
||||
.spawn(futures::future::lazy(move || {
|
||||
tx.send(()).unwrap();
|
||||
Ok(())
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
// signal runtime thread to exit
|
||||
trigger.send(()).unwrap();
|
||||
|
||||
// wait for runtime thread to exit
|
||||
jh.join().unwrap();
|
||||
|
||||
assert_eq!(rx.wait().unwrap(), ());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn runtime_multi_threaded() {
|
||||
let _ = env_logger::init();
|
||||
|
@ -42,8 +42,8 @@ use std::fmt;
|
||||
use std::cell::Cell;
|
||||
use std::error::Error;
|
||||
use std::rc::Rc;
|
||||
use std::sync::{atomic, mpsc, Arc};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::sync::mpsc;
|
||||
|
||||
#[cfg(feature = "unstable-futures")]
|
||||
use futures2;
|
||||
@ -53,8 +53,11 @@ pub struct CurrentThread<P: Park = ParkThread> {
|
||||
/// Execute futures and receive unpark notifications.
|
||||
scheduler: Scheduler<P::Unpark>,
|
||||
|
||||
/// Current number of futures being executed
|
||||
num_futures: usize,
|
||||
/// Current number of futures being executed.
|
||||
///
|
||||
/// The LSB is used to indicate that the runtime is preparing to shut down.
|
||||
/// Thus, to get the actual number of pending futures, `>>1`.
|
||||
num_futures: Arc<atomic::AtomicUsize>,
|
||||
|
||||
/// Thread park handle
|
||||
park: P,
|
||||
@ -177,11 +180,11 @@ impl<T: fmt::Debug> Error for BlockError<T> {
|
||||
/// This is mostly split out to make the borrow checker happy.
|
||||
struct Borrow<'a, U: 'a> {
|
||||
scheduler: &'a mut Scheduler<U>,
|
||||
num_futures: &'a mut usize,
|
||||
num_futures: &'a atomic::AtomicUsize,
|
||||
}
|
||||
|
||||
trait SpawnLocal {
|
||||
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>);
|
||||
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool);
|
||||
}
|
||||
|
||||
struct CurrentRunner {
|
||||
@ -260,11 +263,18 @@ impl<P: Park> CurrentThread<P> {
|
||||
let scheduler = Scheduler::new(unpark);
|
||||
let notify = scheduler.notify();
|
||||
|
||||
let num_futures = Arc::new(atomic::AtomicUsize::new(0));
|
||||
|
||||
CurrentThread {
|
||||
scheduler: scheduler,
|
||||
num_futures: 0,
|
||||
num_futures: num_futures.clone(),
|
||||
park,
|
||||
spawn_handle: Handle { sender: spawn_sender, notify: notify },
|
||||
spawn_handle: Handle {
|
||||
sender: spawn_sender,
|
||||
num_futures: num_futures,
|
||||
notify: notify,
|
||||
shut_down: Cell::new(false),
|
||||
},
|
||||
spawn_receiver: spawn_receiver,
|
||||
}
|
||||
}
|
||||
@ -272,8 +282,11 @@ impl<P: Park> CurrentThread<P> {
|
||||
/// Returns `true` if the executor is currently idle.
|
||||
///
|
||||
/// An idle executor is defined by not currently having any spawned tasks.
|
||||
///
|
||||
/// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`,
|
||||
/// this method may return `true` even though there are more futures to be executed.
|
||||
pub fn is_idle(&self) -> bool {
|
||||
self.num_futures == 0
|
||||
self.num_futures.load(atomic::Ordering::SeqCst) <= 1
|
||||
}
|
||||
|
||||
/// Spawn the future on the executor.
|
||||
@ -282,7 +295,7 @@ impl<P: Park> CurrentThread<P> {
|
||||
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
||||
where F: Future<Item = (), Error = ()> + 'static,
|
||||
{
|
||||
self.borrow().spawn_local(Box::new(future));
|
||||
self.borrow().spawn_local(Box::new(future), false);
|
||||
self
|
||||
}
|
||||
|
||||
@ -358,7 +371,7 @@ impl<P: Park> CurrentThread<P> {
|
||||
fn borrow(&mut self) -> Borrow<P::Unpark> {
|
||||
Borrow {
|
||||
scheduler: &mut self.scheduler,
|
||||
num_futures: &mut self.num_futures,
|
||||
num_futures: &*self.num_futures,
|
||||
}
|
||||
}
|
||||
|
||||
@ -371,11 +384,30 @@ impl<P: Park> CurrentThread<P> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<P: Park> Drop for CurrentThread<P> {
|
||||
fn drop(&mut self) {
|
||||
// Signal to Handles that no more futures can be spawned by setting LSB.
|
||||
//
|
||||
// NOTE: this isn't technically necessary since the send on the mpsc will fail once the
|
||||
// receiver is dropped, but it's useful to illustrate how clean shutdown will be
|
||||
// implemented (e.g., by setting the LSB).
|
||||
let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
|
||||
|
||||
// TODO: We currently ignore any pending futures at the time we shut down.
|
||||
//
|
||||
// The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`)
|
||||
// which sets LSB (as above) do make Handle::spawn stop working, and then runs until
|
||||
// num_futures.load() == 1.
|
||||
let _ = pending;
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_executor::Executor for CurrentThread {
|
||||
fn spawn(&mut self, future: Box<Future<Item = (), Error = ()> + Send>)
|
||||
-> Result<(), SpawnError>
|
||||
{
|
||||
self.borrow().spawn_local(future);
|
||||
fn spawn(
|
||||
&mut self,
|
||||
future: Box<Future<Item = (), Error = ()> + Send>,
|
||||
) -> Result<(), SpawnError> {
|
||||
self.borrow().spawn_local(future, false);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -391,7 +423,7 @@ impl<P: Park> fmt::Debug for CurrentThread<P> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("CurrentThread")
|
||||
.field("scheduler", &self.scheduler)
|
||||
.field("num_futures", &self.num_futures)
|
||||
.field("num_futures", &self.num_futures.load(atomic::Ordering::SeqCst))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -405,7 +437,7 @@ impl<'a, P: Park> Entered<'a, P> {
|
||||
pub fn spawn<F>(&mut self, future: F) -> &mut Self
|
||||
where F: Future<Item = (), Error = ()> + 'static,
|
||||
{
|
||||
self.executor.borrow().spawn_local(Box::new(future));
|
||||
self.executor.borrow().spawn_local(Box::new(future), false);
|
||||
self
|
||||
}
|
||||
|
||||
@ -546,13 +578,13 @@ impl<'a, P: Park> Entered<'a, P> {
|
||||
let (mut borrow, spawn_receiver) = (
|
||||
Borrow {
|
||||
scheduler: &mut self.executor.scheduler,
|
||||
num_futures: &mut self.executor.num_futures,
|
||||
num_futures: &*self.executor.num_futures,
|
||||
},
|
||||
&mut self.executor.spawn_receiver,
|
||||
);
|
||||
|
||||
while let Ok(future) = spawn_receiver.try_recv() {
|
||||
borrow.spawn_local(future);
|
||||
borrow.spawn_local(future, true);
|
||||
}
|
||||
|
||||
// After any pending futures were scheduled, do the actual tick
|
||||
@ -577,6 +609,8 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
|
||||
#[derive(Clone)]
|
||||
pub struct Handle {
|
||||
sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
|
||||
num_futures: Arc<atomic::AtomicUsize>,
|
||||
shut_down: Cell<bool>,
|
||||
notify: executor::NotifyHandle,
|
||||
}
|
||||
|
||||
@ -584,6 +618,7 @@ pub struct Handle {
|
||||
impl fmt::Debug for Handle {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("Handle")
|
||||
.field("shut_down", &self.shut_down.get())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
@ -596,8 +631,27 @@ impl Handle {
|
||||
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
|
||||
/// instance of the `Handle` does not exist anymore.
|
||||
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
|
||||
where F: Future<Item = (), Error = ()> + Send + 'static {
|
||||
self.sender.send(Box::new(future))
|
||||
where
|
||||
F: Future<Item = (), Error = ()> + Send + 'static,
|
||||
{
|
||||
if self.shut_down.get() {
|
||||
return Err(SpawnError::shutdown());
|
||||
}
|
||||
|
||||
// NOTE: += 2 since LSB is the shutdown bit
|
||||
let pending = self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
|
||||
if pending % 2 == 1 {
|
||||
// Bring the count back so we still know when the Runtime is idle.
|
||||
self.num_futures.fetch_sub(2, atomic::Ordering::SeqCst);
|
||||
|
||||
// Once the Runtime is shutting down, we know it won't come back.
|
||||
self.shut_down.set(true);
|
||||
|
||||
return Err(SpawnError::shutdown());
|
||||
}
|
||||
|
||||
self.sender
|
||||
.send(Box::new(future))
|
||||
.expect("CurrentThread does not exist anymore");
|
||||
// use 0 for the id, CurrentThread does not make use of it
|
||||
self.notify.notify(0);
|
||||
@ -628,7 +682,7 @@ impl TaskExecutor {
|
||||
CURRENT.with(|current| {
|
||||
match current.spawn.get() {
|
||||
Some(spawn) => {
|
||||
unsafe { (*spawn).spawn_local(future) };
|
||||
unsafe { (*spawn).spawn_local(future, false) };
|
||||
Ok(())
|
||||
}
|
||||
None => {
|
||||
@ -671,7 +725,7 @@ where F: Future<Item = (), Error = ()> + 'static
|
||||
CURRENT.with(|current| {
|
||||
match current.spawn.get() {
|
||||
Some(spawn) => {
|
||||
unsafe { (*spawn).spawn_local(Box::new(future)) };
|
||||
unsafe { (*spawn).spawn_local(Box::new(future), false) };
|
||||
Ok(())
|
||||
}
|
||||
None => {
|
||||
@ -697,8 +751,12 @@ impl<'a, U: Unpark> Borrow<'a, U> {
|
||||
}
|
||||
|
||||
impl<'a, U: Unpark> SpawnLocal for Borrow<'a, U> {
|
||||
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>) {
|
||||
*self.num_futures += 1;
|
||||
fn spawn_local(&mut self, future: Box<Future<Item = (), Error = ()>>, already_counted: bool) {
|
||||
if !already_counted {
|
||||
// NOTE: we have a borrow of the Runtime, so we know that it isn't shut down.
|
||||
// NOTE: += 2 since LSB is the shutdown bit
|
||||
self.num_futures.fetch_add(2, atomic::Ordering::SeqCst);
|
||||
}
|
||||
self.scheduler.schedule(future);
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ use std::fmt::{self, Debug};
|
||||
use std::mem;
|
||||
use std::ptr;
|
||||
use std::sync::atomic::Ordering::{Relaxed, SeqCst, Acquire, Release, AcqRel};
|
||||
use std::sync::atomic::{AtomicPtr, AtomicBool, AtomicUsize};
|
||||
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::usize;
|
||||
use std::thread;
|
||||
@ -210,7 +210,7 @@ where U: Unpark,
|
||||
///
|
||||
/// This function should be called whenever the caller is notified via a
|
||||
/// wakeup.
|
||||
pub fn tick(&mut self, enter: &mut Enter, num_futures: &mut usize) -> bool
|
||||
pub fn tick(&mut self, enter: &mut Enter, num_futures: &AtomicUsize) -> bool
|
||||
{
|
||||
let mut ret = false;
|
||||
let tick = self.inner.tick_num.fetch_add(1, SeqCst)
|
||||
@ -330,7 +330,8 @@ where U: Unpark,
|
||||
};
|
||||
|
||||
if borrow.enter(enter, || scheduled.tick()) {
|
||||
*borrow.num_futures -= 1;
|
||||
// we have a borrow of the Runtime, so we know it's not shut down
|
||||
borrow.num_futures.fetch_sub(2, SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user