runtime: rename current_thread -> basic_scheduler (#1769)

It no longer supports executing !Send futures. The use case for
It is wanting a “light” runtime. There will be “local” task execution
using a different strategy coming later.

This patch also renames `thread_pool` -> `threaded_scheduler`, but
only in public APIs for now.
This commit is contained in:
Carl Lerche 2019-11-16 07:19:45 -08:00 committed by GitHub
parent 1474794055
commit 3f0eabe779
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 122 additions and 115 deletions

View File

@ -18,9 +18,9 @@ extern crate proc_macro;
use proc_macro::TokenStream;
use quote::quote;
enum RuntimeType {
Single,
Multi,
enum Runtime {
Basic,
Threaded,
Auto,
}
@ -28,8 +28,8 @@ enum RuntimeType {
///
/// ## Options:
///
/// - `current_thread` - Uses the `current_thread` runtime.
/// - `threadpool` - Uses the multi-threaded `threadpool` runtime. Used by default.
/// - `basic_scheduler` - All tasks are executed on the current thread.
/// - `threaded_scheduler` - Uses the multi-threaded scheduler. Used by default.
///
/// ## Function arguments:
///
@ -37,14 +37,6 @@ enum RuntimeType {
///
/// ## Usage
///
/// ### Select runtime
///
/// ```rust
/// #[tokio::main(current_thread)]
/// async fn main() {
/// println!("Hello world");
/// }
/// ```
/// ### Using default
///
/// ```rust
@ -53,6 +45,15 @@ enum RuntimeType {
/// println!("Hello world");
/// }
/// ```
///
/// ### Select runtime
///
/// ```rust
/// #[tokio::main(basic_scheduler)]
/// async fn main() {
/// println!("Hello world");
/// }
/// ```
#[proc_macro_attribute]
#[cfg(not(test))] // Work around for rust-lang/rust#62127
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
@ -77,7 +78,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
.into();
}
let mut runtime = RuntimeType::Auto;
let mut runtime = Runtime::Auto;
for arg in args {
if let syn::NestedMeta::Meta(syn::Meta::Path(path)) = arg {
@ -87,10 +88,10 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
return syn::Error::new_spanned(path, msg).to_compile_error().into();
}
match ident.unwrap().to_string().to_lowercase().as_str() {
"threadpool" => runtime = RuntimeType::Multi,
"current_thread" => runtime = RuntimeType::Single,
"threaded_scheduler" => runtime = Runtime::Threaded,
"basic_scheduler" => runtime = Runtime::Basic,
name => {
let msg = format!("Unknown attribute {} is specified; expected `current_thread` or `threadpool`", name);
let msg = format!("Unknown attribute {} is specified; expected `basic_scheduler` or `threaded_scheduler`", name);
return syn::Error::new_spanned(path, msg).to_compile_error().into();
}
}
@ -98,17 +99,17 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
}
let result = match runtime {
RuntimeType::Multi | RuntimeType::Auto => quote! {
Runtime::Threaded | Runtime::Auto => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
tokio::runtime::Runtime::new().unwrap().block_on(async { #body })
}
},
RuntimeType::Single => quote! {
Runtime::Basic => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
.block_on(async { #body })
@ -123,15 +124,15 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
///
/// ## Options:
///
/// - `current_thread` - Uses the `current_thread` runtime. Used by default.
/// - `threadpool` - Uses multi-threaded runtime.
/// - `basic_scheduler` - All tasks are executed on the current thread. Used by default.
/// - `threaded_scheduler` - Use multi-threaded scheduler.
///
/// ## Usage
///
/// ### Select runtime
///
/// ```no_run
/// #[tokio::test(threadpool)]
/// #[tokio::test(threaded_scheduler)]
/// async fn my_test() {
/// assert!(true);
/// }
@ -176,7 +177,7 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
.into();
}
let mut runtime = RuntimeType::Auto;
let mut runtime = Runtime::Auto;
for arg in args {
if let syn::NestedMeta::Meta(syn::Meta::Path(path)) = arg {
@ -186,10 +187,10 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
return syn::Error::new_spanned(path, msg).to_compile_error().into();
}
match ident.unwrap().to_string().to_lowercase().as_str() {
"threadpool" => runtime = RuntimeType::Multi,
"current_thread" => runtime = RuntimeType::Single,
"threaded_scheduler" => runtime = Runtime::Threaded,
"basic_scheduler" => runtime = Runtime::Basic,
name => {
let msg = format!("Unknown attribute {} is specified; expected `current_thread` or `threadpool`", name);
let msg = format!("Unknown attribute {} is specified; expected `basic_scheduler` or `threaded_scheduler`", name);
return syn::Error::new_spanned(path, msg).to_compile_error().into();
}
}
@ -197,19 +198,19 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
}
let result = match runtime {
RuntimeType::Multi => quote! {
Runtime::Threaded => quote! {
#[test]
#(#attrs)*
fn #name() #ret {
tokio::runtime::Runtime::new().unwrap().block_on(async { #body })
}
},
RuntimeType::Single | RuntimeType::Auto => quote! {
Runtime::Basic | Runtime::Auto => quote! {
#[test]
#(#attrs)*
fn #name() #ret {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
.block_on(async { #body })

View File

@ -27,7 +27,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;
let mut rt = runtime::Builder::new().current_thread().build().unwrap();
let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
rt.block_on(future)
}

View File

@ -12,24 +12,24 @@ use std::time::Duration;
/// Executes tasks on the current thread
#[derive(Debug)]
pub(crate) struct CurrentThread<P>
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
/// Scheduler component
scheduler: Arc<Scheduler>,
scheduler: Arc<SchedulerPriv>,
/// Local state
local: Local<P>,
local: LocalState<P>,
}
#[derive(Debug, Clone)]
pub(crate) struct Spawner {
scheduler: Arc<Scheduler>,
scheduler: Arc<SchedulerPriv>,
}
/// The scheduler component.
pub(super) struct Scheduler {
pub(super) struct SchedulerPriv {
/// List of all active tasks spawned onto this executor.
///
/// # Safety
@ -45,7 +45,7 @@ pub(super) struct Scheduler {
///
/// References should not be handed out. Only call `push` / `pop` functions.
/// Only call from the owning thread.
local_queue: UnsafeCell<VecDeque<Task<Scheduler>>>,
local_queue: UnsafeCell<VecDeque<Task<SchedulerPriv>>>,
/// Remote run queue.
///
@ -59,12 +59,12 @@ pub(super) struct Scheduler {
unpark: Box<dyn Unpark>,
}
unsafe impl Send for Scheduler {}
unsafe impl Sync for Scheduler {}
unsafe impl Send for SchedulerPriv {}
unsafe impl Sync for SchedulerPriv {}
/// Local state
#[derive(Debug)]
struct Local<P> {
struct LocalState<P> {
/// Current tick
tick: u8,
@ -75,7 +75,7 @@ struct Local<P> {
#[derive(Debug)]
struct RemoteQueue {
/// FIFO list of tasks
queue: VecDeque<Task<Scheduler>>,
queue: VecDeque<Task<SchedulerPriv>>,
/// `true` when a task can be pushed into the queue, false otherwise.
open: bool,
@ -87,15 +87,15 @@ const MAX_TASKS_PER_TICK: usize = 61;
/// How often to check the remote queue first
const CHECK_REMOTE_INTERVAL: u8 = 13;
impl<P> CurrentThread<P>
impl<P> BasicScheduler<P>
where
P: Park,
{
pub(crate) fn new(park: P) -> CurrentThread<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = park.unpark();
CurrentThread {
scheduler: Arc::new(Scheduler {
BasicScheduler {
scheduler: Arc::new(SchedulerPriv {
owned_tasks: UnsafeCell::new(task::OwnedList::new()),
local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
remote_queue: Mutex::new(RemoteQueue {
@ -105,7 +105,7 @@ where
pending_drop: task::TransferStack::new(),
unpark: Box::new(unpark),
}),
local: Local { tick: 0, park },
local: LocalState { tick: 0, park },
}
}
@ -138,11 +138,11 @@ where
let local = &mut self.local;
let scheduler = &*self.scheduler;
runtime::global::with_current_thread(scheduler, || {
runtime::global::with_basic_scheduler(scheduler, || {
let mut _enter = runtime::enter();
let raw_waker = RawWaker::new(
scheduler as *const Scheduler as *const (),
scheduler as *const SchedulerPriv as *const (),
&RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
);
@ -181,8 +181,8 @@ impl Spawner {
}
}
impl Scheduler {
fn tick(&self, local: &mut Local<impl Park>) {
impl SchedulerPriv {
fn tick(&self, local: &mut LocalState<impl Park>) {
for _ in 0..MAX_TASKS_PER_TICK {
// Get the current tick
let tick = local.tick;
@ -223,7 +223,7 @@ impl Scheduler {
/// # Safety
///
/// Must be called from the same thread that holds the `CurrentThread`
/// Must be called from the same thread that holds the `BasicScheduler`
/// value.
pub(super) unsafe fn spawn_background<F>(&self, future: F)
where
@ -254,7 +254,7 @@ impl Scheduler {
}
}
impl Schedule for Scheduler {
impl Schedule for SchedulerPriv {
fn bind(&self, task: &Task<Self>) {
unsafe {
(*self.owned_tasks.get()).insert(task);
@ -274,7 +274,7 @@ impl Schedule for Scheduler {
fn schedule(&self, task: Task<Self>) {
use crate::runtime::global;
if global::current_thread_is_current(self) {
if global::basic_scheduler_is_current(self) {
unsafe { self.schedule_local(task) };
} else {
let mut lock = self.remote_queue.lock().unwrap();
@ -293,7 +293,7 @@ impl Schedule for Scheduler {
}
}
impl<P> Drop for CurrentThread<P>
impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
@ -328,36 +328,36 @@ where
}
}
impl fmt::Debug for Scheduler {
impl fmt::Debug for SchedulerPriv {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Scheduler").finish()
}
}
unsafe fn sched_clone_waker(ptr: *const ()) -> RawWaker {
let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler));
let s1 = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
#[allow(clippy::redundant_clone)]
let s2 = s1.clone();
RawWaker::new(
&**s2 as *const Scheduler as *const (),
&**s2 as *const SchedulerPriv as *const (),
&RawWakerVTable::new(sched_clone_waker, sched_wake, sched_wake_by_ref, sched_drop),
)
}
unsafe fn sched_wake(ptr: *const ()) {
let scheduler = Arc::from_raw(ptr as *const Scheduler);
let scheduler = Arc::from_raw(ptr as *const SchedulerPriv);
scheduler.unpark.unpark();
}
unsafe fn sched_wake_by_ref(ptr: *const ()) {
let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const Scheduler));
let scheduler = ManuallyDrop::new(Arc::from_raw(ptr as *const SchedulerPriv));
scheduler.unpark.unpark();
}
unsafe fn sched_drop(ptr: *const ()) {
let _ = Arc::from_raw(ptr as *const Scheduler);
let _ = Arc::from_raw(ptr as *const SchedulerPriv);
}
unsafe fn sched_noop(_ptr: *const ()) {

View File

@ -61,7 +61,7 @@ pub struct Builder {
enum Kind {
Shell,
#[cfg(feature = "rt-core")]
CurrentThread,
Basic,
#[cfg(feature = "rt-full")]
ThreadPool,
}
@ -121,19 +121,19 @@ impl Builder {
self
}
/// Use only the current thread for executing tasks.
/// Use a simpler scheduler that runs all tasks on the current-thread.
///
/// The executor and all necessary drivers will all be run on the current
/// thread during `block_on` calls.
#[cfg(feature = "rt-core")]
pub fn current_thread(&mut self) -> &mut Self {
self.kind = Kind::CurrentThread;
pub fn basic_scheduler(&mut self) -> &mut Self {
self.kind = Kind::Basic;
self
}
/// Use a thread-pool for executing tasks.
/// Use a multi-threaded scheduler for executing tasks.
#[cfg(feature = "rt-full")]
pub fn thread_pool(&mut self) -> &mut Self {
pub fn threaded_scheduler(&mut self) -> &mut Self {
self.kind = Kind::ThreadPool;
self
}
@ -252,15 +252,15 @@ impl Builder {
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
match self.kind {
Kind::Shell => self.build_shell(),
Kind::Shell => self.build_shell_runtime(),
#[cfg(feature = "rt-core")]
Kind::CurrentThread => self.build_current_thread(),
Kind::Basic => self.build_basic_runtime(),
#[cfg(feature = "rt-full")]
Kind::ThreadPool => self.build_threadpool(),
Kind::ThreadPool => self.build_threaded_runtime(),
}
}
fn build_shell(&mut self) -> io::Result<Runtime> {
fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;
let clock = time::create_clock();
@ -289,8 +289,8 @@ impl Builder {
}
#[cfg(feature = "rt-core")]
fn build_current_thread(&mut self) -> io::Result<Runtime> {
use crate::runtime::{CurrentThread, Kind};
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
let clock = time::create_clock();
@ -305,7 +305,7 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = CurrentThread::new(driver);
let scheduler = BasicScheduler::new(driver);
let spawner = scheduler.spawner();
// Blocking pool
@ -313,9 +313,9 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();
Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
kind: Kind::Basic(scheduler),
handle: Handle {
kind: handle::Kind::CurrentThread(spawner),
kind: handle::Kind::Basic(spawner),
io_handles,
time_handles,
clock,
@ -326,7 +326,7 @@ impl Builder {
}
#[cfg(feature = "rt-full")]
fn build_threadpool(&mut self) -> io::Result<Runtime> {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Kind, ThreadPool};
use std::sync::Mutex;

View File

@ -1,4 +1,4 @@
use crate::runtime::current_thread;
use crate::runtime::basic_scheduler;
#[cfg(feature = "rt-full")]
use crate::runtime::thread_pool;
@ -11,8 +11,8 @@ enum State {
// default executor not defined
Empty,
// Current-thread executor
CurrentThread(*const current_thread::Scheduler),
// Basic scheduler (runs on the current-thread)
Basic(*const basic_scheduler::SchedulerPriv),
// default executor is a thread pool instance.
#[cfg(feature = "rt-full")]
@ -73,17 +73,17 @@ where
{
EXECUTOR.with(|current_executor| match current_executor.get() {
#[cfg(feature = "rt-full")]
State::ThreadPool(threadpool_ptr) => {
let thread_pool = unsafe { &*threadpool_ptr };
State::ThreadPool(thread_pool_ptr) => {
let thread_pool = unsafe { &*thread_pool_ptr };
thread_pool.spawn_background(future);
}
State::CurrentThread(current_thread_ptr) => {
let current_thread = unsafe { &*current_thread_ptr };
State::Basic(basic_scheduler_ptr) => {
let basic_scheduler = unsafe { &*basic_scheduler_ptr };
// Safety: The `CurrentThread` value set the thread-local (same
// Safety: The `BasicScheduler` value set the thread-local (same
// thread).
unsafe {
current_thread.spawn_background(future);
basic_scheduler.spawn_background(future);
}
}
State::Empty => {
@ -95,19 +95,22 @@ where
})
}
pub(super) fn with_current_thread<F, R>(current_thread: &current_thread::Scheduler, f: F) -> R
pub(super) fn with_basic_scheduler<F, R>(
basic_scheduler: &basic_scheduler::SchedulerPriv,
f: F,
) -> R
where
F: FnOnce() -> R,
{
with_state(
State::CurrentThread(current_thread as *const current_thread::Scheduler),
State::Basic(basic_scheduler as *const basic_scheduler::SchedulerPriv),
f,
)
}
pub(super) fn current_thread_is_current(current_thread: &current_thread::Scheduler) -> bool {
pub(super) fn basic_scheduler_is_current(basic_scheduler: &basic_scheduler::SchedulerPriv) -> bool {
EXECUTOR.with(|current_executor| match current_executor.get() {
State::CurrentThread(ptr) => ptr == current_thread as *const _,
State::Basic(ptr) => ptr == basic_scheduler as *const _,
_ => false,
})
}

View File

@ -1,5 +1,5 @@
#[cfg(feature = "rt-core")]
use crate::runtime::current_thread;
use crate::runtime::basic_scheduler;
#[cfg(feature = "rt-full")]
use crate::runtime::thread_pool;
use crate::runtime::{blocking, io, time};
@ -30,7 +30,7 @@ pub struct Handle {
pub(super) enum Kind {
Shell,
#[cfg(feature = "rt-core")]
CurrentThread(current_thread::Spawner),
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-full")]
ThreadPool(thread_pool::Spawner),
}
@ -76,7 +76,7 @@ impl Handle {
match &self.kind {
Kind::Shell => panic!("spawning not enabled for runtime"),
#[cfg(feature = "rt-core")]
Kind::CurrentThread(spawner) => spawner.spawn(future),
Kind::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-full")]
Kind::ThreadPool(spawner) => spawner.spawn(future),
}

View File

@ -132,17 +132,17 @@
#[macro_use]
mod tests;
#[cfg(feature = "rt-core")]
mod basic_scheduler;
#[cfg(feature = "rt-core")]
use self::basic_scheduler::BasicScheduler;
mod blocking;
use blocking::BlockingPool;
mod builder;
pub use self::builder::Builder;
#[cfg(feature = "rt-core")]
mod current_thread;
#[cfg(feature = "rt-core")]
use self::current_thread::CurrentThread;
pub(crate) mod enter;
use self::enter::enter;
@ -220,7 +220,7 @@ enum Kind {
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
CurrentThread(CurrentThread<time::Driver>),
Basic(BasicScheduler<time::Driver>),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-full")]
@ -255,10 +255,10 @@ impl Runtime {
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
#[cfg(feature = "rt-full")]
let ret = Builder::new().thread_pool().build();
let ret = Builder::new().threaded_scheduler().build();
#[cfg(all(not(feature = "rt-full"), feature = "rt-core"))]
let ret = Builder::new().current_thread().build();
let ret = Builder::new().basic_scheduler().build();
#[cfg(not(feature = "rt-core"))]
let ret = Builder::new().build();
@ -306,7 +306,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-full")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::CurrentThread(exec) => exec.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
}
}
@ -329,7 +329,7 @@ impl Runtime {
self.handle.enter(|| match kind {
Kind::Shell(exec) => exec.block_on(future),
#[cfg(feature = "rt-core")]
Kind::CurrentThread(exec) => exec.block_on(future),
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-full")]
Kind::ThreadPool(exec) => exec.block_on(future),
})

View File

@ -304,7 +304,7 @@ mod tests {
}
fn rt() -> Runtime {
runtime::Builder::new().current_thread().build().unwrap()
runtime::Builder::new().basic_scheduler().build().unwrap()
}
async fn collect(mut rx: crate::sync::mpsc::Receiver<()>) -> Vec<()> {

View File

@ -221,7 +221,7 @@ mod tests {
fn rt() -> Runtime {
crate::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
}

View File

@ -18,7 +18,7 @@ fn run_test() {
let finished_clone = finished.clone();
thread::spawn(move || {
let mut rt = runtime::Builder::new().current_thread().build().unwrap();
let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
let mut futures = FuturesOrdered::new();
rt.block_on(async {

View File

@ -28,7 +28,7 @@ fn spawned_task_does_not_progress_without_block_on() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
}

View File

@ -4,12 +4,12 @@
macro_rules! rt_test {
($($t:tt)*) => {
mod current_thread {
mod basic_scheduler {
$($t)*
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
}

View File

@ -16,7 +16,10 @@ use std::task::{Context, Poll};
#[test]
fn single_thread() {
// No panic when starting a runtime w/ a single thread
let _ = runtime::Builder::new().thread_pool().num_threads(1).build();
let _ = runtime::Builder::new()
.threaded_scheduler()
.num_threads(1)
.build();
}
#[test]
@ -185,7 +188,7 @@ fn drop_threadpool_drops_futures() {
let b = num_dec.clone();
let rt = runtime::Builder::new()
.thread_pool()
.threaded_scheduler()
.after_start(move || {
a.fetch_add(1, Relaxed);
})
@ -224,7 +227,7 @@ fn after_start_and_before_stop_is_called() {
let after_inner = after_start.clone();
let before_inner = before_stop.clone();
let mut rt = tokio::runtime::Builder::new()
.thread_pool()
.threaded_scheduler()
.after_start(move || {
after_inner.clone().fetch_add(1, Ordering::Relaxed);
})

View File

@ -37,7 +37,7 @@ fn dropping_loops_does_not_cause_starvation() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
}

View File

@ -47,7 +47,7 @@ fn multi_loop() {
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.basic_scheduler()
.build()
.unwrap()
}

View File

@ -24,10 +24,10 @@ fn timer_with_threaded_runtime() {
}
#[test]
fn timer_with_current_thread_runtime() {
fn timer_with_basic_scheduler() {
use tokio::runtime::Builder;
let mut rt = Builder::new().current_thread().build().unwrap();
let mut rt = Builder::new().basic_scheduler().build().unwrap();
let (tx, rx) = mpsc::channel();
rt.block_on(async move {