mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
rt: fix basic_scheduler
notification bug (#1861)
The "global executor" thread-local is to track where to spawn new tasks, **not** which scheduler is active on the current thread. This fixes a bug with scheduling tasks on the basic_scheduler by tracking the currently active basic_scheduler with a dedicated thread-local variable. Fixes: #1851
This commit is contained in:
parent
ec7f2ae306
commit
a2cfc877a7
@ -1,11 +1,12 @@
|
|||||||
use crate::park::{Park, Unpark};
|
use crate::park::{Park, Unpark};
|
||||||
use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};
|
use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};
|
||||||
|
|
||||||
use std::cell::UnsafeCell;
|
use std::cell::{Cell, UnsafeCell};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::mem::ManuallyDrop;
|
use std::mem::ManuallyDrop;
|
||||||
|
use std::ptr;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::task::{RawWaker, RawWakerVTable, Waker};
|
use std::task::{RawWaker, RawWakerVTable, Waker};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -87,6 +88,10 @@ const MAX_TASKS_PER_TICK: usize = 61;
|
|||||||
/// How often to check the remote queue first
|
/// How often to check the remote queue first
|
||||||
const CHECK_REMOTE_INTERVAL: u8 = 13;
|
const CHECK_REMOTE_INTERVAL: u8 = 13;
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static ACTIVE: Cell<*const SchedulerPriv> = Cell::new(ptr::null())
|
||||||
|
}
|
||||||
|
|
||||||
impl<P> BasicScheduler<P>
|
impl<P> BasicScheduler<P>
|
||||||
where
|
where
|
||||||
P: Park,
|
P: Park,
|
||||||
@ -138,6 +143,27 @@ where
|
|||||||
let local = &mut self.local;
|
let local = &mut self.local;
|
||||||
let scheduler = &*self.scheduler;
|
let scheduler = &*self.scheduler;
|
||||||
|
|
||||||
|
struct Guard {
|
||||||
|
old: *const SchedulerPriv,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Guard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
ACTIVE.with(|cell| cell.set(self.old));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Track the current scheduler
|
||||||
|
let _guard = ACTIVE.with(|cell| {
|
||||||
|
let guard = Guard {
|
||||||
|
old: cell.get(),
|
||||||
|
};
|
||||||
|
|
||||||
|
cell.set(scheduler as *const SchedulerPriv);
|
||||||
|
|
||||||
|
guard
|
||||||
|
});
|
||||||
|
|
||||||
runtime::global::with_basic_scheduler(scheduler, || {
|
runtime::global::with_basic_scheduler(scheduler, || {
|
||||||
let mut _enter = runtime::enter();
|
let mut _enter = runtime::enter();
|
||||||
|
|
||||||
@ -283,9 +309,11 @@ impl Schedule for SchedulerPriv {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn schedule(&self, task: Task<Self>) {
|
fn schedule(&self, task: Task<Self>) {
|
||||||
use crate::runtime::global;
|
let is_current = ACTIVE.with(|cell| {
|
||||||
|
cell.get() == self as *const SchedulerPriv
|
||||||
|
});
|
||||||
|
|
||||||
if global::basic_scheduler_is_current(self) {
|
if is_current {
|
||||||
unsafe { self.schedule_local(task) };
|
unsafe { self.schedule_local(task) };
|
||||||
} else {
|
} else {
|
||||||
let mut lock = self.remote_queue.lock().unwrap();
|
let mut lock = self.remote_queue.lock().unwrap();
|
||||||
|
@ -65,13 +65,6 @@ where
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) fn basic_scheduler_is_current(basic_scheduler: &basic_scheduler::SchedulerPriv) -> bool {
|
|
||||||
EXECUTOR.with(|current_executor| match current_executor.get() {
|
|
||||||
State::Basic(ptr) => ptr == basic_scheduler as *const _,
|
|
||||||
_ => false,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg_rt_threaded! {
|
cfg_rt_threaded! {
|
||||||
use crate::runtime::thread_pool;
|
use crate::runtime::thread_pool;
|
||||||
|
|
||||||
|
20
tokio/tests/fs.rs
Normal file
20
tokio/tests/fs.rs
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
#![warn(rust_2018_idioms)]
|
||||||
|
#![cfg(feature = "full")]
|
||||||
|
|
||||||
|
use tokio::fs;
|
||||||
|
use tokio_test::assert_ok;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn path_read_write() {
|
||||||
|
let temp = tempdir();
|
||||||
|
let dir = temp.path();
|
||||||
|
|
||||||
|
assert_ok!(fs::write(dir.join("bar"), b"bytes").await);
|
||||||
|
let out = assert_ok!(fs::read(dir.join("bar")).await);
|
||||||
|
|
||||||
|
assert_eq!(out, b"bytes");
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tempdir() -> tempfile::TempDir {
|
||||||
|
tempfile::tempdir().unwrap()
|
||||||
|
}
|
@ -198,6 +198,21 @@ rt_test! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn spawn_await_chain() {
|
||||||
|
let mut rt = rt();
|
||||||
|
|
||||||
|
let out = rt.block_on(async {
|
||||||
|
assert_ok!(tokio::spawn(async {
|
||||||
|
assert_ok!(tokio::spawn(async {
|
||||||
|
"hello"
|
||||||
|
}).await)
|
||||||
|
}).await)
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(out, "hello");
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn outstanding_tasks_dropped() {
|
fn outstanding_tasks_dropped() {
|
||||||
let mut rt = rt();
|
let mut rt = rt();
|
||||||
|
29
tokio/tests/task_blocking.rs
Normal file
29
tokio/tests/task_blocking.rs
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
#![warn(rust_2018_idioms)]
|
||||||
|
#![cfg(feature = "full")]
|
||||||
|
|
||||||
|
use tokio::task;
|
||||||
|
use tokio_test::assert_ok;
|
||||||
|
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn basic_blocking() {
|
||||||
|
// Run a few times
|
||||||
|
for _ in 0..100 {
|
||||||
|
let out = assert_ok!(
|
||||||
|
tokio::spawn(async {
|
||||||
|
assert_ok!(
|
||||||
|
task::spawn_blocking(|| {
|
||||||
|
thread::sleep(Duration::from_millis(5));
|
||||||
|
"hello"
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(out, "hello");
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user