mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
Threadpool refactor (#294)
* Switch worker lifecycle to an enum * Move some files around * Rename State -> PoolState
This commit is contained in:
parent
3ba5595233
commit
79afc7ee68
@ -2,24 +2,18 @@ use callback::Callback;
|
||||
use config::{Config, MAX_WORKERS};
|
||||
use park::{BoxPark, BoxedPark, DefaultPark};
|
||||
use sender::Sender;
|
||||
use shutdown_task::ShutdownTask;
|
||||
use sleep_stack::SleepStack;
|
||||
use state::State;
|
||||
use pool::Inner;
|
||||
use thread_pool::ThreadPool;
|
||||
use inner::Inner;
|
||||
use worker::{Worker, WorkerId};
|
||||
use worker_entry::WorkerEntry;
|
||||
use worker::{self, Worker, WorkerId};
|
||||
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::time::Duration;
|
||||
|
||||
use num_cpus;
|
||||
use tokio_executor::Enter;
|
||||
use tokio_executor::park::Park;
|
||||
use futures::task::AtomicTask;
|
||||
|
||||
#[cfg(feature = "unstable-futures")]
|
||||
use futures2;
|
||||
@ -330,30 +324,20 @@ impl Builder {
|
||||
let park = (self.new_park)(&id);
|
||||
let unpark = park.unpark();
|
||||
|
||||
workers.push(WorkerEntry::new(park, unpark));
|
||||
workers.push(worker::Entry::new(park, unpark));
|
||||
}
|
||||
|
||||
let inner = Arc::new(Inner {
|
||||
state: AtomicUsize::new(State::new().into()),
|
||||
sleep_stack: AtomicUsize::new(SleepStack::new().into()),
|
||||
num_workers: AtomicUsize::new(self.pool_size),
|
||||
next_thread_id: AtomicUsize::new(0),
|
||||
workers: workers.into_boxed_slice(),
|
||||
shutdown_task: ShutdownTask {
|
||||
task1: AtomicTask::new(),
|
||||
#[cfg(feature = "unstable-futures")]
|
||||
task2: futures2::task::AtomicWaker::new(),
|
||||
},
|
||||
config: self.config.clone(),
|
||||
// Create the pool
|
||||
let inner = Arc::new(
|
||||
Inner::new(
|
||||
workers.into_boxed_slice(),
|
||||
self.config.clone()));
|
||||
|
||||
// Wrap with `Sender`
|
||||
let inner = Some(Sender {
|
||||
inner
|
||||
});
|
||||
|
||||
// Now, we prime the sleeper stack
|
||||
for i in 0..self.pool_size {
|
||||
inner.push_sleeper(i).unwrap();
|
||||
}
|
||||
|
||||
let inner = Some(Sender { inner });
|
||||
|
||||
ThreadPool { inner }
|
||||
}
|
||||
}
|
||||
|
@ -20,20 +20,17 @@ pub mod park;
|
||||
mod builder;
|
||||
mod callback;
|
||||
mod config;
|
||||
mod inner;
|
||||
#[cfg(feature = "unstable-futures")]
|
||||
mod futures2_wake;
|
||||
mod notifier;
|
||||
mod pool;
|
||||
mod sender;
|
||||
mod shutdown;
|
||||
mod shutdown_task;
|
||||
mod sleep_stack;
|
||||
mod state;
|
||||
mod task;
|
||||
mod thread_pool;
|
||||
mod worker;
|
||||
mod worker_entry;
|
||||
mod worker_state;
|
||||
|
||||
pub use builder::Builder;
|
||||
pub use sender::Sender;
|
||||
|
@ -1,4 +1,4 @@
|
||||
use inner::Inner;
|
||||
use pool::Inner;
|
||||
use task::Task;
|
||||
|
||||
use std::mem;
|
||||
|
@ -1,3 +1,13 @@
|
||||
mod state;
|
||||
|
||||
pub(crate) use self::state::{
|
||||
// TODO: Rename `State`
|
||||
PoolState,
|
||||
SHUTDOWN_ON_IDLE,
|
||||
SHUTDOWN_NOW,
|
||||
MAX_FUTURES,
|
||||
};
|
||||
|
||||
use config::{Config, MAX_WORKERS};
|
||||
use sleep_stack::{
|
||||
SleepStack,
|
||||
@ -5,19 +15,10 @@ use sleep_stack::{
|
||||
TERMINATED,
|
||||
};
|
||||
use shutdown_task::ShutdownTask;
|
||||
use state::{State, SHUTDOWN_ON_IDLE, SHUTDOWN_NOW};
|
||||
use task::Task;
|
||||
use worker::{Worker, WorkerId};
|
||||
use worker_entry::WorkerEntry;
|
||||
use worker_state::{
|
||||
WorkerState,
|
||||
PUSHED_MASK,
|
||||
WORKER_SHUTDOWN,
|
||||
WORKER_RUNNING,
|
||||
WORKER_SLEEPING,
|
||||
WORKER_NOTIFIED,
|
||||
WORKER_SIGNALED,
|
||||
};
|
||||
use worker::{self, Worker, WorkerId, WorkerState, PUSHED_MASK};
|
||||
|
||||
use futures::task::AtomicTask;
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::sync::atomic::Ordering::{Acquire, AcqRel, Release, Relaxed};
|
||||
@ -26,6 +27,7 @@ use std::sync::Arc;
|
||||
|
||||
use rand::{Rng, SeedableRng, XorShiftRng};
|
||||
|
||||
// TODO: Rename this
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Inner {
|
||||
// ThreadPool state
|
||||
@ -46,7 +48,7 @@ pub(crate) struct Inner {
|
||||
// Storage for workers
|
||||
//
|
||||
// This will *usually* be a small number
|
||||
pub workers: Box<[WorkerEntry]>,
|
||||
pub workers: Box<[worker::Entry]>,
|
||||
|
||||
// Task notified when the worker shuts down
|
||||
pub shutdown_task: ShutdownTask,
|
||||
@ -56,10 +58,36 @@ pub(crate) struct Inner {
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
/// Create a new `Inner`
|
||||
pub fn new(workers: Box<[worker::Entry]>, config: Config) -> Inner {
|
||||
let pool_size = workers.len();
|
||||
|
||||
let ret = Inner {
|
||||
state: AtomicUsize::new(PoolState::new().into()),
|
||||
sleep_stack: AtomicUsize::new(SleepStack::new().into()),
|
||||
num_workers: AtomicUsize::new(pool_size),
|
||||
next_thread_id: AtomicUsize::new(0),
|
||||
workers,
|
||||
shutdown_task: ShutdownTask {
|
||||
task1: AtomicTask::new(),
|
||||
#[cfg(feature = "unstable-futures")]
|
||||
task2: futures2::task::AtomicWaker::new(),
|
||||
},
|
||||
config,
|
||||
};
|
||||
|
||||
// Now, we prime the sleeper stack
|
||||
for i in 0..pool_size {
|
||||
ret.push_sleeper(i).unwrap();
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
/// Start shutting down the pool. This means that no new futures will be
|
||||
/// accepted.
|
||||
pub fn shutdown(&self, now: bool, purge_queue: bool) {
|
||||
let mut state: State = self.state.load(Acquire).into();
|
||||
let mut state: PoolState = self.state.load(Acquire).into();
|
||||
|
||||
trace!("shutdown; state={:?}", state);
|
||||
|
||||
@ -119,10 +147,12 @@ impl Inner {
|
||||
}
|
||||
|
||||
pub fn terminate_sleeping_workers(&self) {
|
||||
use worker::Lifecycle::Signaled;
|
||||
|
||||
trace!(" -> shutting down workers");
|
||||
// Wakeup all sleeping workers. They will wake up, see the state
|
||||
// transition, and terminate.
|
||||
while let Some((idx, worker_state)) = self.pop_sleeper(WORKER_SIGNALED, TERMINATED) {
|
||||
while let Some((idx, worker_state)) = self.pop_sleeper(Signaled, TERMINATED) {
|
||||
trace!(" -> shutdown worker; idx={:?}; state={:?}", idx, worker_state);
|
||||
self.signal_stop(idx, worker_state);
|
||||
}
|
||||
@ -130,6 +160,8 @@ impl Inner {
|
||||
|
||||
/// Signals to the worker that it should stop
|
||||
fn signal_stop(&self, idx: usize, mut state: WorkerState) {
|
||||
use worker::Lifecycle::*;
|
||||
|
||||
let worker = &self.workers[idx];
|
||||
|
||||
// Transition the worker state to signaled
|
||||
@ -137,7 +169,7 @@ impl Inner {
|
||||
let mut next = state;
|
||||
|
||||
match state.lifecycle() {
|
||||
WORKER_SHUTDOWN => {
|
||||
Shutdown => {
|
||||
trace!("signal_stop -- WORKER_SHUTDOWN; idx={}", idx);
|
||||
// If the worker is in the shutdown state, then it will never be
|
||||
// started again.
|
||||
@ -145,16 +177,25 @@ impl Inner {
|
||||
|
||||
return;
|
||||
}
|
||||
WORKER_RUNNING | WORKER_SLEEPING => {}
|
||||
_ => {
|
||||
Running | Sleeping => {}
|
||||
Notified | Signaled => {
|
||||
trace!("signal_stop -- skipping; idx={}; state={:?}", idx, state);
|
||||
// All other states will naturally converge to a state of
|
||||
// shutdown.
|
||||
// These two states imply that the worker is active, thus it
|
||||
// will eventually see the shutdown signal, so we don't need
|
||||
// to do anything.
|
||||
//
|
||||
// The worker is forced to see the shutdown signal
|
||||
// eventually as:
|
||||
//
|
||||
// a) No more work will arrive
|
||||
// b) The shutdown signal is stored as the head of the
|
||||
// sleep, stack which will prevent the worker from going to
|
||||
// sleep again.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
next.set_lifecycle(WORKER_SIGNALED);
|
||||
next.set_lifecycle(Signaled);
|
||||
|
||||
let actual = worker.state.compare_and_swap(
|
||||
state.into(), next.into(), AcqRel).into();
|
||||
@ -208,9 +249,11 @@ impl Inner {
|
||||
/// Called from outside of the scheduler, this function is how new tasks
|
||||
/// enter the system.
|
||||
fn submit_external(&self, task: Task, inner: &Arc<Inner>) {
|
||||
use worker::Lifecycle::Notified;
|
||||
|
||||
// First try to get a handle to a sleeping worker. This ensures that
|
||||
// sleeping tasks get woken up
|
||||
if let Some((idx, state)) = self.pop_sleeper(WORKER_NOTIFIED, EMPTY) {
|
||||
if let Some((idx, state)) = self.pop_sleeper(Notified, EMPTY) {
|
||||
trace!("submit to existing worker; idx={}; state={:?}", idx, state);
|
||||
self.submit_to_external(idx, task, state, inner);
|
||||
return;
|
||||
@ -236,23 +279,30 @@ impl Inner {
|
||||
let entry = &self.workers[idx];
|
||||
|
||||
if !entry.submit_external(task, state) {
|
||||
Worker::spawn(WorkerId::new(idx), inner);
|
||||
self.spawn_worker(idx, inner);
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_worker(&self, idx: usize, inner: &Arc<Inner>) {
|
||||
Worker::spawn(WorkerId::new(idx), inner);
|
||||
}
|
||||
|
||||
/// If there are any other workers currently relaxing, signal them that work
|
||||
/// is available so that they can try to find more work to process.
|
||||
pub fn signal_work(&self, inner: &Arc<Inner>) {
|
||||
if let Some((idx, mut state)) = self.pop_sleeper(WORKER_SIGNALED, EMPTY) {
|
||||
use worker::Lifecycle::*;
|
||||
|
||||
if let Some((idx, mut state)) = self.pop_sleeper(Signaled, EMPTY) {
|
||||
let entry = &self.workers[idx];
|
||||
|
||||
debug_assert!(state.lifecycle() != Signaled, "actual={:?}", state.lifecycle());
|
||||
|
||||
// Transition the worker state to signaled
|
||||
loop {
|
||||
let mut next = state;
|
||||
|
||||
// pop_sleeper should skip these
|
||||
debug_assert!(state.lifecycle() != WORKER_SIGNALED);
|
||||
next.set_lifecycle(WORKER_SIGNALED);
|
||||
next.set_lifecycle(Signaled);
|
||||
|
||||
let actual = entry.state.compare_and_swap(
|
||||
state.into(), next.into(), AcqRel).into();
|
||||
@ -267,15 +317,17 @@ impl Inner {
|
||||
// The state has been transitioned to signal, now we need to wake up
|
||||
// the worker if necessary.
|
||||
match state.lifecycle() {
|
||||
WORKER_SLEEPING => {
|
||||
Sleeping => {
|
||||
trace!("signal_work -- wakeup; idx={}", idx);
|
||||
self.workers[idx].wakeup();
|
||||
}
|
||||
WORKER_SHUTDOWN => {
|
||||
Shutdown => {
|
||||
trace!("signal_work -- spawn; idx={}", idx);
|
||||
Worker::spawn(WorkerId::new(idx), inner);
|
||||
}
|
||||
_ => {}
|
||||
Running | Notified | Signaled => {
|
||||
// The workers are already active. No need to wake them up.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -313,7 +365,7 @@ impl Inner {
|
||||
}
|
||||
|
||||
/// Pop a worker from the sleep stack
|
||||
fn pop_sleeper(&self, max_lifecycle: usize, terminal: usize)
|
||||
fn pop_sleeper(&self, max_lifecycle: worker::Lifecycle, terminal: usize)
|
||||
-> Option<(usize, WorkerState)>
|
||||
{
|
||||
debug_assert!(terminal == EMPTY || terminal == TERMINATED);
|
||||
@ -371,6 +423,7 @@ impl Inner {
|
||||
|
||||
// Unset the PUSHED flag and get the current state.
|
||||
let state: WorkerState = self.workers[head].state
|
||||
// TODO This should be fetch_and(!PUSHED_MASK)
|
||||
.fetch_sub(PUSHED_MASK, Release).into();
|
||||
|
||||
if state.lifecycle() >= max_lifecycle {
|
@ -6,7 +6,7 @@ use std::{fmt, usize};
|
||||
/// shutdown on idle, 2 for shutting down). The remaining bits represent the
|
||||
/// number of futures that still need to complete.
|
||||
#[derive(Eq, PartialEq, Clone, Copy)]
|
||||
pub(crate) struct State(usize);
|
||||
pub(crate) struct PoolState(usize);
|
||||
|
||||
/// Flag used to track if the pool is running
|
||||
pub(crate) const SHUTDOWN_ON_IDLE: usize = 1;
|
||||
@ -20,10 +20,10 @@ const NUM_FUTURES_OFFSET: usize = 2;
|
||||
/// Max number of futures the pool can handle.
|
||||
pub(crate) const MAX_FUTURES: usize = usize::MAX >> NUM_FUTURES_OFFSET;
|
||||
|
||||
impl State {
|
||||
impl PoolState {
|
||||
#[inline]
|
||||
pub fn new() -> State {
|
||||
State(0)
|
||||
pub fn new() -> PoolState {
|
||||
PoolState(0)
|
||||
}
|
||||
|
||||
/// Returns the number of futures still pending completion.
|
||||
@ -75,19 +75,19 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<usize> for State {
|
||||
impl From<usize> for PoolState {
|
||||
fn from(src: usize) -> Self {
|
||||
State(src)
|
||||
PoolState(src)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<State> for usize {
|
||||
fn from(src: State) -> Self {
|
||||
impl From<PoolState> for usize {
|
||||
fn from(src: PoolState) -> Self {
|
||||
src.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for State {
|
||||
impl fmt::Debug for PoolState {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("State")
|
||||
.field("lifecycle", &self.lifecycle())
|
@ -1,5 +1,4 @@
|
||||
use inner::Inner;
|
||||
use state::{State, SHUTDOWN_NOW, MAX_FUTURES};
|
||||
use pool::{Inner, PoolState, SHUTDOWN_NOW, MAX_FUTURES};
|
||||
use task::Task;
|
||||
|
||||
use std::sync::Arc;
|
||||
@ -90,7 +89,7 @@ impl Sender {
|
||||
|
||||
/// Logic to prepare for spawning
|
||||
fn prepare_for_spawn(&self) -> Result<(), SpawnError> {
|
||||
let mut state: State = self.inner.state.load(Acquire).into();
|
||||
let mut state: PoolState = self.inner.state.load(Acquire).into();
|
||||
|
||||
// Increment the number of futures spawned on the pool as well as
|
||||
// validate that the pool is still running/
|
||||
@ -145,7 +144,7 @@ impl tokio_executor::Executor for Sender {
|
||||
|
||||
impl<'a> tokio_executor::Executor for &'a Sender {
|
||||
fn status(&self) -> Result<(), tokio_executor::SpawnError> {
|
||||
let state: State = self.inner.state.load(Acquire).into();
|
||||
let state: PoolState = self.inner.state.load(Acquire).into();
|
||||
|
||||
if state.num_futures() == MAX_FUTURES {
|
||||
// No capacity
|
||||
|
@ -1,5 +1,5 @@
|
||||
use pool::Inner;
|
||||
use sender::Sender;
|
||||
use inner::Inner;
|
||||
|
||||
use std::sync::atomic::Ordering::{Acquire};
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
use builder::Builder;
|
||||
use inner::Inner;
|
||||
use pool::Inner;
|
||||
use sender::Sender;
|
||||
use shutdown::Shutdown;
|
||||
|
||||
|
@ -1,10 +1,6 @@
|
||||
use park::{BoxPark, BoxUnpark};
|
||||
use task::{Task, Queue};
|
||||
use worker_state::{
|
||||
WorkerState,
|
||||
WORKER_SHUTDOWN,
|
||||
WORKER_SLEEPING,
|
||||
};
|
||||
use worker::WorkerState;
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::fmt;
|
||||
@ -13,6 +9,7 @@ use std::sync::atomic::AtomicUsize;
|
||||
|
||||
use deque;
|
||||
|
||||
// TODO: None of the fields should be public
|
||||
pub(crate) struct WorkerEntry {
|
||||
// Worker state. This is mutated when notifying the worker.
|
||||
pub state: AtomicUsize,
|
||||
@ -62,6 +59,8 @@ impl WorkerEntry {
|
||||
///
|
||||
/// Returns `false` if the worker needs to be spawned.
|
||||
pub fn submit_external(&self, task: Task, mut state: WorkerState) -> bool {
|
||||
use worker::Lifecycle::*;
|
||||
|
||||
// Push the task onto the external queue
|
||||
self.push_external(task);
|
||||
|
||||
@ -81,14 +80,18 @@ impl WorkerEntry {
|
||||
}
|
||||
|
||||
match state.lifecycle() {
|
||||
WORKER_SLEEPING => {
|
||||
Sleeping => {
|
||||
// The worker is currently sleeping, the condition variable must
|
||||
// be signaled
|
||||
self.wakeup();
|
||||
true
|
||||
}
|
||||
WORKER_SHUTDOWN => false,
|
||||
_ => true,
|
||||
Shutdown => false,
|
||||
Running | Notified | Signaled => {
|
||||
// In these states, the worker is active and will eventually see
|
||||
// the task that was just submitted.
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,17 +1,20 @@
|
||||
use inner::Inner;
|
||||
mod entry;
|
||||
mod state;
|
||||
|
||||
pub(crate) use self::entry::{
|
||||
WorkerEntry as Entry,
|
||||
};
|
||||
pub(crate) use self::state::{
|
||||
// TODO: Rename `State`
|
||||
WorkerState,
|
||||
Lifecycle,
|
||||
PUSHED_MASK,
|
||||
};
|
||||
|
||||
use pool::{Inner, PoolState};
|
||||
use notifier::Notifier;
|
||||
use sender::Sender;
|
||||
use state::State;
|
||||
use task::Task;
|
||||
use worker_entry::WorkerEntry;
|
||||
use worker_state::{
|
||||
WorkerState,
|
||||
WORKER_SHUTDOWN,
|
||||
WORKER_RUNNING,
|
||||
WORKER_SLEEPING,
|
||||
WORKER_NOTIFIED,
|
||||
WORKER_SIGNALED,
|
||||
};
|
||||
|
||||
use tokio_executor;
|
||||
|
||||
@ -197,10 +200,12 @@ impl Worker {
|
||||
/// Returns `true` if the worker should run.
|
||||
#[inline]
|
||||
fn check_run_state(&self, first: bool) -> bool {
|
||||
use self::Lifecycle::*;
|
||||
|
||||
let mut state: WorkerState = self.entry().state.load(Acquire).into();
|
||||
|
||||
loop {
|
||||
let pool_state: State = self.inner.state.load(Acquire).into();
|
||||
let pool_state: PoolState = self.inner.state.load(Acquire).into();
|
||||
|
||||
if pool_state.is_terminated() {
|
||||
return false;
|
||||
@ -209,12 +214,16 @@ impl Worker {
|
||||
let mut next = state;
|
||||
|
||||
match state.lifecycle() {
|
||||
WORKER_RUNNING => break,
|
||||
WORKER_NOTIFIED | WORKER_SIGNALED => {
|
||||
Running => break,
|
||||
Notified | Signaled => {
|
||||
// transition back to running
|
||||
next.set_lifecycle(WORKER_RUNNING);
|
||||
next.set_lifecycle(Running);
|
||||
}
|
||||
Shutdown | Sleeping => {
|
||||
// The worker should never be in these states when calling
|
||||
// this function.
|
||||
panic!("unexpected worker state; lifecycle={:?}", state.lifecycle());
|
||||
}
|
||||
lifecycle => panic!("unexpected worker state; lifecycle={}", lifecycle),
|
||||
}
|
||||
|
||||
let actual = self.entry().state.compare_and_swap(
|
||||
@ -311,7 +320,7 @@ impl Worker {
|
||||
self.entry().push_internal(task);
|
||||
}
|
||||
Complete => {
|
||||
let mut state: State = self.inner.state.load(Acquire).into();
|
||||
let mut state: PoolState = self.inner.state.load(Acquire).into();
|
||||
|
||||
loop {
|
||||
let mut next = state;
|
||||
@ -387,7 +396,9 @@ impl Worker {
|
||||
///
|
||||
/// Returns `true` if woken up due to new work arriving.
|
||||
fn sleep(&self) -> bool {
|
||||
trace!("Worker::sleep; idx={}", self.id.idx);
|
||||
use self::Lifecycle::*;
|
||||
|
||||
trace!("Worker::sleep; worker={:?}", self);
|
||||
|
||||
let mut state: WorkerState = self.entry().state.load(Acquire).into();
|
||||
|
||||
@ -399,18 +410,22 @@ impl Worker {
|
||||
let mut next = state;
|
||||
|
||||
match state.lifecycle() {
|
||||
WORKER_RUNNING => {
|
||||
Running => {
|
||||
// Try setting the pushed state
|
||||
next.set_pushed();
|
||||
|
||||
// Transition the worker state to sleeping
|
||||
next.set_lifecycle(WORKER_SLEEPING);
|
||||
next.set_lifecycle(Sleeping);
|
||||
}
|
||||
WORKER_NOTIFIED | WORKER_SIGNALED => {
|
||||
Notified | Signaled => {
|
||||
// No need to sleep, transition back to running and move on.
|
||||
next.set_lifecycle(WORKER_RUNNING);
|
||||
next.set_lifecycle(Running);
|
||||
}
|
||||
Shutdown | Sleeping => {
|
||||
// The worker cannot transition to sleep when already in a
|
||||
// sleeping state.
|
||||
panic!("unexpected worker state; actual={:?}", state.lifecycle());
|
||||
}
|
||||
actual => panic!("unexpected worker state; {}", actual),
|
||||
}
|
||||
|
||||
let actual = self.entry().state.compare_and_swap(
|
||||
@ -489,12 +504,12 @@ impl Worker {
|
||||
|
||||
loop {
|
||||
match state.lifecycle() {
|
||||
WORKER_SLEEPING => {}
|
||||
WORKER_NOTIFIED | WORKER_SIGNALED => {
|
||||
Sleeping => {}
|
||||
Notified | Signaled => {
|
||||
// Transition back to running
|
||||
loop {
|
||||
let mut next = state;
|
||||
next.set_lifecycle(WORKER_RUNNING);
|
||||
next.set_lifecycle(Running);
|
||||
|
||||
let actual = self.entry().state.compare_and_swap(
|
||||
state.into(), next.into(), AcqRel).into();
|
||||
@ -506,7 +521,12 @@ impl Worker {
|
||||
state = actual;
|
||||
}
|
||||
}
|
||||
_ => unreachable!(),
|
||||
Shutdown | Running => {
|
||||
// To get here, the block above transitioned the tate to
|
||||
// `Sleeping`. No other thread can concurrently
|
||||
// transition to `Shutdown` or `Running`.
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
|
||||
if !drop_thread {
|
||||
@ -515,7 +535,7 @@ impl Worker {
|
||||
}
|
||||
|
||||
let mut next = state;
|
||||
next.set_lifecycle(WORKER_SHUTDOWN);
|
||||
next.set_lifecycle(Shutdown);
|
||||
|
||||
let actual = self.entry().state.compare_and_swap(
|
||||
state.into(), next.into(), AcqRel).into();
|
||||
@ -543,7 +563,7 @@ impl Worker {
|
||||
}
|
||||
}
|
||||
|
||||
fn entry(&self) -> &WorkerEntry {
|
||||
fn entry(&self) -> &Entry {
|
||||
&self.inner.workers[self.id.idx]
|
||||
}
|
||||
}
|
169
tokio-threadpool/src/worker/state.rs
Normal file
169
tokio-threadpool/src/worker/state.rs
Normal file
@ -0,0 +1,169 @@
|
||||
use std::cmp;
|
||||
use std::fmt;
|
||||
|
||||
/// Tracks worker state
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub(crate) struct WorkerState(usize);
|
||||
|
||||
/// Set when the worker is pushed onto the scheduler's stack of sleeping
|
||||
/// threads.
|
||||
pub(crate) const PUSHED_MASK: usize = 0b001;
|
||||
|
||||
/// Manages the worker lifecycle part of the state
|
||||
const LIFECYCLE_MASK: usize = 0b1110;
|
||||
const LIFECYCLE_SHIFT: usize = 1;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
pub(crate) enum Lifecycle {
|
||||
/// The worker does not currently have an associated thread.
|
||||
Shutdown = 0 << LIFECYCLE_SHIFT,
|
||||
|
||||
/// The worker is currently processing its task.
|
||||
Running = 1 << LIFECYCLE_SHIFT,
|
||||
|
||||
/// The worker is currently asleep in the condvar
|
||||
Sleeping = 2 << LIFECYCLE_SHIFT,
|
||||
|
||||
/// The worker has been notified it should process more work.
|
||||
Notified = 3 << LIFECYCLE_SHIFT,
|
||||
|
||||
/// A stronger form of notification. In this case, the worker is expected to
|
||||
/// wakeup and try to acquire more work... if it enters this state while
|
||||
/// already busy with other work, it is expected to signal another worker.
|
||||
Signaled = 4 << LIFECYCLE_SHIFT,
|
||||
}
|
||||
|
||||
impl WorkerState {
|
||||
/// Returns true if the worker entry is pushed in the sleeper stack
|
||||
pub fn is_pushed(&self) -> bool {
|
||||
self.0 & PUSHED_MASK == PUSHED_MASK
|
||||
}
|
||||
|
||||
pub fn set_pushed(&mut self) {
|
||||
self.0 |= PUSHED_MASK
|
||||
}
|
||||
|
||||
pub fn is_notified(&self) -> bool {
|
||||
use self::Lifecycle::*;
|
||||
|
||||
match self.lifecycle() {
|
||||
Notified | Signaled => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lifecycle(&self) -> Lifecycle {
|
||||
Lifecycle::from(self.0 & LIFECYCLE_MASK)
|
||||
}
|
||||
|
||||
pub fn set_lifecycle(&mut self, val: Lifecycle) {
|
||||
self.0 = (self.0 & !LIFECYCLE_MASK) | (val as usize)
|
||||
}
|
||||
|
||||
pub fn is_signaled(&self) -> bool {
|
||||
self.lifecycle() == Lifecycle::Signaled
|
||||
}
|
||||
|
||||
pub fn notify(&mut self) {
|
||||
use self::Lifecycle::Signaled;
|
||||
|
||||
if self.lifecycle() != Signaled {
|
||||
self.set_lifecycle(Signaled)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WorkerState {
|
||||
fn default() -> WorkerState {
|
||||
// All workers will start pushed in the sleeping stack
|
||||
WorkerState(PUSHED_MASK)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<usize> for WorkerState {
|
||||
fn from(src: usize) -> Self {
|
||||
WorkerState(src)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WorkerState> for usize {
|
||||
fn from(src: WorkerState) -> Self {
|
||||
src.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for WorkerState {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("WorkerState")
|
||||
.field("lifecycle", &self.lifecycle())
|
||||
.field("is_pushed", &self.is_pushed())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
// ===== impl Lifecycle =====
|
||||
|
||||
impl From<usize> for Lifecycle {
|
||||
fn from(src: usize) -> Lifecycle {
|
||||
use self::Lifecycle::*;
|
||||
|
||||
debug_assert!(
|
||||
src == Shutdown as usize ||
|
||||
src == Running as usize ||
|
||||
src == Sleeping as usize ||
|
||||
src == Notified as usize ||
|
||||
src == Signaled as usize);
|
||||
|
||||
unsafe { ::std::mem::transmute(src) }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Lifecycle> for usize {
|
||||
fn from(src: Lifecycle) -> usize {
|
||||
let v = src as usize;
|
||||
debug_assert!(v & LIFECYCLE_MASK == v);
|
||||
v
|
||||
}
|
||||
}
|
||||
|
||||
impl cmp::PartialOrd for Lifecycle {
|
||||
#[inline]
|
||||
fn partial_cmp(&self, other: &Lifecycle) -> Option<cmp::Ordering> {
|
||||
let a: usize = (*self).into();
|
||||
let b: usize = (*other).into();
|
||||
|
||||
a.partial_cmp(&b)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use super::Lifecycle::*;
|
||||
|
||||
#[test]
|
||||
fn lifecycle_encode() {
|
||||
let lifecycles = &[
|
||||
Shutdown,
|
||||
Running,
|
||||
Sleeping,
|
||||
Notified,
|
||||
Signaled,
|
||||
];
|
||||
|
||||
for &lifecycle in lifecycles {
|
||||
let mut v: usize = lifecycle.into();
|
||||
v &= LIFECYCLE_MASK;
|
||||
|
||||
assert_eq!(lifecycle, Lifecycle::from(v));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lifecycle_ord() {
|
||||
assert!(Running >= Shutdown);
|
||||
assert!(Signaled >= Notified);
|
||||
assert!(Signaled >= Sleeping);
|
||||
}
|
||||
}
|
@ -1,109 +0,0 @@
|
||||
use std::fmt;
|
||||
|
||||
/// Tracks worker state
|
||||
#[derive(Clone, Copy, Eq, PartialEq)]
|
||||
pub(crate) struct WorkerState(usize);
|
||||
|
||||
// Some constants used to work with State
|
||||
// const A: usize: 0;
|
||||
|
||||
// TODO: This should be split up between what is accessed by each thread and
|
||||
// what is concurrent. The bits accessed by each thread should be sized to
|
||||
// exactly one cache line.
|
||||
|
||||
/// Set when the worker is pushed onto the scheduler's stack of sleeping
|
||||
/// threads.
|
||||
pub(crate) const PUSHED_MASK: usize = 0b001;
|
||||
|
||||
/// Manages the worker lifecycle part of the state
|
||||
const WORKER_LIFECYCLE_MASK: usize = 0b1110;
|
||||
const WORKER_LIFECYCLE_SHIFT: usize = 1;
|
||||
|
||||
/// The worker does not currently have an associated thread.
|
||||
pub(crate) const WORKER_SHUTDOWN: usize = 0;
|
||||
|
||||
/// The worker is currently processing its task.
|
||||
pub(crate) const WORKER_RUNNING: usize = 1;
|
||||
|
||||
/// The worker is currently asleep in the condvar
|
||||
pub(crate) const WORKER_SLEEPING: usize = 2;
|
||||
|
||||
/// The worker has been notified it should process more work.
|
||||
pub(crate) const WORKER_NOTIFIED: usize = 3;
|
||||
|
||||
/// A stronger form of notification. In this case, the worker is expected to
|
||||
/// wakeup and try to acquire more work... if it enters this state while already
|
||||
/// busy with other work, it is expected to signal another worker.
|
||||
pub(crate) const WORKER_SIGNALED: usize = 4;
|
||||
|
||||
impl WorkerState {
|
||||
/// Returns true if the worker entry is pushed in the sleeper stack
|
||||
pub fn is_pushed(&self) -> bool {
|
||||
self.0 & PUSHED_MASK == PUSHED_MASK
|
||||
}
|
||||
|
||||
pub fn set_pushed(&mut self) {
|
||||
self.0 |= PUSHED_MASK
|
||||
}
|
||||
|
||||
pub fn is_notified(&self) -> bool {
|
||||
match self.lifecycle() {
|
||||
WORKER_NOTIFIED | WORKER_SIGNALED => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn lifecycle(&self) -> usize {
|
||||
(self.0 & WORKER_LIFECYCLE_MASK) >> WORKER_LIFECYCLE_SHIFT
|
||||
}
|
||||
|
||||
pub fn set_lifecycle(&mut self, val: usize) {
|
||||
self.0 = (self.0 & !WORKER_LIFECYCLE_MASK) |
|
||||
(val << WORKER_LIFECYCLE_SHIFT)
|
||||
}
|
||||
|
||||
pub fn is_signaled(&self) -> bool {
|
||||
self.lifecycle() == WORKER_SIGNALED
|
||||
}
|
||||
|
||||
pub fn notify(&mut self) {
|
||||
if self.lifecycle() != WORKER_SIGNALED {
|
||||
self.set_lifecycle(WORKER_NOTIFIED)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for WorkerState {
|
||||
fn default() -> WorkerState {
|
||||
// All workers will start pushed in the sleeping stack
|
||||
WorkerState(PUSHED_MASK)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<usize> for WorkerState {
|
||||
fn from(src: usize) -> Self {
|
||||
WorkerState(src)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<WorkerState> for usize {
|
||||
fn from(src: WorkerState) -> Self {
|
||||
src.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for WorkerState {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
fmt.debug_struct("WorkerState")
|
||||
.field("lifecycle", &match self.lifecycle() {
|
||||
WORKER_SHUTDOWN => "WORKER_SHUTDOWN",
|
||||
WORKER_RUNNING => "WORKER_RUNNING",
|
||||
WORKER_SLEEPING => "WORKER_SLEEPING",
|
||||
WORKER_NOTIFIED => "WORKER_NOTIFIED",
|
||||
WORKER_SIGNALED => "WORKER_SIGNALED",
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.field("is_pushed", &self.is_pushed())
|
||||
.finish()
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user