Use Crossbeam's Parker/Unparker (#528)

This commit is contained in:
Stjepan Glavina 2019-01-03 06:51:22 +01:00 committed by Carl Lerche
parent 9a8d087c69
commit 5e2d93f060
5 changed files with 31 additions and 199 deletions

View File

@ -19,4 +19,5 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]
[dependencies]
crossbeam-utils = "0.6.2"
futures = "0.1.19"

View File

@ -35,6 +35,7 @@
//! [`Park`]: park/index.html
//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
extern crate crossbeam_utils;
extern crate futures;
mod enter;

View File

@ -46,10 +46,10 @@
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use crossbeam_utils::sync::{Parker, Unparker};
/// Block the current thread.
///
/// See [module documentation][mod] for more details.
@ -167,26 +167,11 @@ pub struct ParkError {
/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub struct UnparkThread {
inner: Arc<Inner>,
inner: Unparker,
}
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;
thread_local! {
static CURRENT_PARK_THREAD: Arc<Inner> = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
static CURRENT_PARKER: Parker = Parker::new();
}
// ===== impl ParkThread =====
@ -204,9 +189,9 @@ impl ParkThread {
/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> R
where F: FnOnce(&Arc<Inner>) -> R,
where F: FnOnce(&Parker) -> R,
{
CURRENT_PARK_THREAD.with(|inner| f(inner))
CURRENT_PARKER.with(|inner| f(inner))
}
}
@ -215,16 +200,18 @@ impl Park for ParkThread {
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
let inner = self.with_current(|inner| inner.clone());
let inner = self.with_current(|inner| inner.unparker().clone());
UnparkThread { inner }
}
fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(None))
self.with_current(|inner| inner.park());
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(Some(duration)))
self.with_current(|inner| inner.park_timeout(duration));
Ok(())
}
}
@ -235,74 +222,3 @@ impl Unpark for UnparkThread {
self.inner.unpark();
}
}
// ===== impl Inner =====
impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return Ok(()),
IDLE => {},
_ => unreachable!(),
}
// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();
// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return Ok(());
}
IDLE => {},
_ => unreachable!(),
}
m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};
// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification
self.state.store(IDLE, Ordering::SeqCst);
// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);
Ok(())
}
fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}
// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
// Transition to NOTIFY
match self.state.swap(NOTIFY, Ordering::SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
}
// Wakeup the sleeper
self.condvar.notify_one();
}
}

View File

@ -22,7 +22,7 @@ tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-channel = "0.3.3"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.0"
crossbeam-utils = "0.6.2"
num_cpus = "1.2"
rand = "0.6"
log = "0.4"

View File

@ -2,21 +2,20 @@ use tokio_executor::park::{Park, Unpark};
use std::error::Error;
use std::fmt;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
use crossbeam_utils::sync::{Parker, Unparker};
/// Parks the thread.
#[derive(Debug)]
pub struct DefaultPark {
inner: Arc<Inner>,
inner: Parker,
}
/// Unparks threads that were parked by `DefaultPark`.
#[derive(Debug)]
pub struct DefaultUnpark {
inner: Arc<Inner>,
inner: Unparker,
}
/// Error returned by [`ParkThread`]
@ -29,40 +28,28 @@ pub struct ParkError {
_p: (),
}
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}
const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;
// ===== impl DefaultPark =====
impl DefaultPark {
/// Creates a new `DefaultPark` instance.
pub fn new() -> DefaultPark {
let inner = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
DefaultPark { inner }
DefaultPark {
inner: Parker::new(),
}
}
/// Unpark the thread without having to clone the unpark handle.
///
/// Named `notify` to avoid conflicting with the `unpark` fn.
pub(crate) fn notify(&self) {
self.inner.unpark();
self.inner.unparker().unpark();
}
pub(crate) fn park_sync(&self, duration: Option<Duration>) {
self.inner.park(duration);
match duration {
None => self.inner.park(),
Some(duration) => self.inner.park_timeout(duration),
}
}
}
@ -71,17 +58,18 @@ impl Park for DefaultPark {
type Error = ParkError;
fn unpark(&self) -> Self::Unpark {
let inner = self.inner.clone();
DefaultUnpark { inner }
DefaultUnpark {
inner: self.inner.unparker().clone(),
}
}
fn park(&mut self) -> Result<(), Self::Error> {
self.inner.park(None);
self.inner.park();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.inner.park(Some(duration));
self.inner.park_timeout(duration);
Ok(())
}
}
@ -94,80 +82,6 @@ impl Unpark for DefaultUnpark {
}
}
impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self, timeout: Option<Duration>) {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
}
// If the duration is zero, then there is no need to actually block
if let Some(ref dur) = timeout {
if *dur == Duration::from_millis(0) {
return;
}
}
// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();
// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}
m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};
// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification.
self.state.store(IDLE, SeqCst);
// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);
}
fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}
// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
// Transition to NOTIFY
match self.state.swap(NOTIFY, SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
}
// Wakeup the sleeper
self.condvar.notify_one();
}
}
// ===== impl ParkError =====
impl fmt::Display for ParkError {