signal: move driver to runtime thread (#2835)

Refactors the signal infrastructure to move the driver to the runtime
thread. This follows the model put forth by the I/O driver and time
driver.
This commit is contained in:
Ivan Petkov 2020-09-22 15:40:44 -07:00 committed by GitHub
parent e09b90ea32
commit 7ae5b7bd4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 532 additions and 240 deletions

View File

@ -8,6 +8,9 @@ edition = "2018"
tokio = { version = "0.3.0", path = "../tokio", features = ["full"] }
bencher = "0.1.5"
[target.'cfg(unix)'.dependencies]
libc = "0.2.42"
[[bench]]
name = "spawn"
path = "spawn.rs"
@ -33,3 +36,8 @@ harness = false
name = "sync_semaphore"
path = "sync_semaphore.rs"
harness = false
[[bench]]
name = "signal"
path = "signal.rs"
harness = false

96
benches/signal.rs Normal file
View File

@ -0,0 +1,96 @@
//! Benchmark the delay in propagating OS signals to any listeners.
#![cfg(unix)]
use bencher::{benchmark_group, benchmark_main, Bencher};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::runtime;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::mpsc;
struct Spinner {
count: usize,
}
impl Future for Spinner {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.count > 3 {
Poll::Ready(())
} else {
self.count += 1;
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
impl Spinner {
fn new() -> Self {
Self { count: 0 }
}
}
pub fn send_signal(signal: libc::c_int) {
use libc::{getpid, kill};
unsafe {
assert_eq!(kill(getpid(), signal), 0);
}
}
fn many_signals(bench: &mut Bencher) {
let num_signals = 10;
let (tx, mut rx) = mpsc::channel(num_signals);
let rt = runtime::Builder::new()
// Intentionally single threaded to measure delays in propagating wakes
.basic_scheduler()
.enable_all()
.build()
.unwrap();
let spawn_signal = |kind| {
let mut tx = tx.clone();
rt.spawn(async move {
let mut signal = signal(kind).expect("failed to create signal");
while signal.recv().await.is_some() {
if tx.send(()).await.is_err() {
break;
}
}
});
};
for _ in 0..num_signals {
// Pick some random signals which don't terminate the test harness
spawn_signal(SignalKind::child());
spawn_signal(SignalKind::io());
}
drop(tx);
// Turn the runtime for a while to ensure that all the spawned
// tasks have been polled at least once
rt.block_on(Spinner::new());
bench.iter(|| {
rt.block_on(async {
send_signal(libc::SIGCHLD);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
send_signal(libc::SIGIO);
for _ in 0..num_signals {
rx.recv().await.expect("channel closed");
}
});
});
}
benchmark_group!(signal_group, many_signals,);
benchmark_main!(signal_group);

View File

@ -109,7 +109,18 @@ impl Registration {
where
T: Evented,
{
let handle = Handle::current();
Self::new_with_ready_and_handle(io, ready, Handle::current())
}
/// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
handle: Handle,
) -> io::Result<Registration>
where
T: Evented,
{
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, ready)?
} else {

View File

@ -1,7 +1,7 @@
use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
use std::fmt;
#[cfg(feature = "blocking")]
@ -359,14 +359,17 @@ impl Builder {
}
}
fn get_cfg(&self) -> driver::Cfg {
driver::Cfg {
enable_io: self.enable_io,
enable_time: self.enable_time,
}
}
fn build_shell_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::Kind;
let clock = time::create_clock();
// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
let spawner = Spawner::Shell;
@ -377,9 +380,10 @@ impl Builder {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
@ -478,12 +482,7 @@ cfg_rt_core! {
fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
let clock = time::create_clock();
// Create I/O driver
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
@ -500,9 +499,10 @@ cfg_rt_core! {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
blocking_pool,
@ -533,10 +533,8 @@ cfg_rt_threaded! {
let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus()));
assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit");
let clock = time::create_clock();
let (driver, resources) = driver::Driver::new(self.get_cfg())?;
let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
@ -547,9 +545,10 @@ cfg_rt_threaded! {
// Create the runtime handle
let handle = Handle {
spawner,
io_handle,
time_handle,
clock,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

View File

@ -14,7 +14,7 @@ cfg_blocking_impl! {
}
cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::io::Handle {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.io_handle.clone(),
None => Default::default(),
@ -22,8 +22,18 @@ cfg_io_driver! {
}
}
cfg_signal! {
#[cfg(unix)]
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.signal_handle.clone(),
None => Default::default(),
})
}
}
cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::time::Handle {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => ctx.time_handle.clone(),
None => Default::default(),
@ -31,7 +41,7 @@ cfg_time! {
}
cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::time::Clock> {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
CONTEXT.with(|ctx| match *ctx.borrow() {
Some(ref ctx) => Some(ctx.clock.clone()),
None => None,

196
tokio/src/runtime/driver.rs Normal file
View File

@ -0,0 +1,196 @@
//! Abstracts out the entire chain of runtime sub-drivers into common types.
use crate::park::{Park, ParkThread};
use std::io;
use std::time::Duration;
// ===== io driver =====
cfg_io_driver! {
type IoDriver = crate::park::Either<crate::io::driver::Driver, crate::park::ParkThread>;
pub(crate) type IoHandle = Option<crate::io::driver::Handle>;
fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> {
use crate::park::Either;
#[cfg(loom)]
assert!(!enable);
if enable {
let driver = crate::io::driver::Driver::new()?;
let handle = driver.handle();
Ok((Either::A(driver), Some(handle)))
} else {
let driver = ParkThread::new();
Ok((Either::B(driver), None))
}
}
}
cfg_not_io_driver! {
type IoDriver = ParkThread;
pub(crate) type IoHandle = ();
fn create_io_driver(_enable: bool) -> io::Result<(IoDriver, IoHandle)> {
let driver = ParkThread::new();
Ok((driver, ()))
}
}
// ===== signal driver =====
macro_rules! cfg_unix_and_signal {
($($item:item)*) => {
$(
#[cfg(all(not(loom), unix, feature = "signal"))]
$item
)*
}
}
macro_rules! cfg_neither_unix_nor_windows {
($($item:item)*) => {
$(
#[cfg(any(loom, not(all(unix, feature = "signal"))))]
$item
)*
}
}
cfg_unix_and_signal! {
type SignalDriver = crate::park::Either<crate::signal::unix::driver::Driver, IoDriver>;
pub(crate) type SignalHandle = Option<crate::signal::unix::driver::Handle>;
fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
use crate::park::Either;
// Enable the signal driver if IO is also enabled
match io_driver {
Either::A(io_driver) => {
let driver = crate::signal::unix::driver::Driver::new(io_driver)?;
let handle = driver.handle();
Ok((Either::A(driver), Some(handle)))
}
Either::B(_) => Ok((Either::B(io_driver), None)),
}
}
}
cfg_neither_unix_nor_windows! {
type SignalDriver = IoDriver;
pub(crate) type SignalHandle = ();
fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> {
Ok((io_driver, ()))
}
}
// ===== time driver =====
cfg_time! {
type TimeDriver = crate::park::Either<crate::time::driver::Driver<SignalDriver>, SignalDriver>;
pub(crate) type Clock = crate::time::Clock;
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
fn create_clock() -> Clock {
crate::time::Clock::new()
}
fn create_time_driver(
enable: bool,
signal_driver: SignalDriver,
clock: Clock,
) -> (TimeDriver, TimeHandle) {
use crate::park::Either;
if enable {
let driver = crate::time::driver::Driver::new(signal_driver, clock);
let handle = driver.handle();
(Either::A(driver), Some(handle))
} else {
(Either::B(signal_driver), None)
}
}
}
cfg_not_time! {
type TimeDriver = SignalDriver;
pub(crate) type Clock = ();
pub(crate) type TimeHandle = ();
fn create_clock() -> Clock {
()
}
fn create_time_driver(
_enable: bool,
signal_driver: SignalDriver,
_clock: Clock,
) -> (TimeDriver, TimeHandle) {
(signal_driver, ())
}
}
// ===== runtime driver =====
#[derive(Debug)]
pub(crate) struct Driver {
inner: TimeDriver,
}
pub(crate) struct Resources {
pub(crate) io_handle: IoHandle,
pub(crate) signal_handle: SignalHandle,
pub(crate) time_handle: TimeHandle,
pub(crate) clock: Clock,
}
pub(crate) struct Cfg {
pub(crate) enable_io: bool,
pub(crate) enable_time: bool,
}
impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
let clock = create_clock();
let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?;
let (signal_driver, signal_handle) = create_signal_driver(io_driver)?;
let (time_driver, time_handle) =
create_time_driver(cfg.enable_time, signal_driver, clock.clone());
Ok((
Self { inner: time_driver },
Resources {
io_handle,
signal_handle,
time_handle,
clock,
},
))
}
}
impl Park for Driver {
type Unpark = <TimeDriver as Park>::Unpark;
type Error = <TimeDriver as Park>::Error;
fn unpark(&self) -> Self::Unpark {
self.inner.unpark()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.inner.park()
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.inner.park_timeout(duration)
}
fn shutdown(&mut self) {
self.inner.shutdown()
}
}

View File

@ -1,4 +1,4 @@
use crate::runtime::{blocking, context, io, time, Spawner};
use crate::runtime::{blocking, context, driver, Spawner};
/// Handle to the runtime.
///
@ -11,13 +11,16 @@ pub(crate) struct Handle {
pub(super) spawner: Spawner,
/// Handles to the I/O drivers
pub(super) io_handle: io::Handle,
pub(super) io_handle: driver::IoHandle,
/// Handles to the signal drivers
pub(super) signal_handle: driver::SignalHandle,
/// Handles to the time drivers
pub(super) time_handle: time::Handle,
pub(super) time_handle: driver::TimeHandle,
/// Source of `Instant::now()`
pub(super) clock: time::Clock,
pub(super) clock: driver::Clock,
/// Blocking pool spawner
pub(super) blocking_spawner: blocking::Spawner,

View File

@ -1,63 +0,0 @@
//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O
//! driver. When the `time` feature flag is **not** enabled. These APIs are
//! shells. This isolates the complexity of dealing with conditional
//! compilation.
/// Re-exported for convenience.
pub(crate) use std::io::Result;
pub(crate) use variant::*;
#[cfg(feature = "io-driver")]
mod variant {
use crate::io::driver;
use crate::park::{Either, ParkThread};
use std::io;
/// The driver value the runtime passes to the `timer` layer.
///
/// When the `io-driver` feature is enabled, this is the "real" I/O driver
/// backed by Mio. Without the `io-driver` feature, this is a thread parker
/// backed by a condition variable.
pub(crate) type Driver = Either<driver::Driver, ParkThread>;
/// The handle the runtime stores for future use.
///
/// When the `io-driver` feature is **not** enabled, this is `()`.
pub(crate) type Handle = Option<driver::Handle>;
pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> {
#[cfg(loom)]
assert!(!enable);
if enable {
let driver = driver::Driver::new()?;
let handle = driver.handle();
Ok((Either::A(driver), Some(handle)))
} else {
let driver = ParkThread::new();
Ok((Either::B(driver), None))
}
}
}
#[cfg(not(feature = "io-driver"))]
mod variant {
use crate::park::ParkThread;
use std::io;
/// I/O is not enabled, use a condition variable based parker
pub(crate) type Driver = ParkThread;
/// There is no handle
pub(crate) type Handle = ();
pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> {
let driver = ParkThread::new();
Ok((driver, ()))
}
}

View File

@ -208,13 +208,18 @@ cfg_blocking_impl! {
mod builder;
pub use self::builder::Builder;
pub(crate) mod driver;
pub(crate) mod enter;
use self::enter::enter;
mod handle;
use handle::Handle;
mod io;
mod io {
/// Re-exported for convenience.
pub(crate) use std::io::Result;
}
cfg_rt_threaded! {
mod park;
@ -227,8 +232,6 @@ use self::shell::Shell;
mod spawner;
use self::spawner::Spawner;
mod time;
cfg_rt_threaded! {
mod queue;
@ -293,7 +296,7 @@ enum Kind {
/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
Basic(Mutex<Option<BasicScheduler<driver::Driver>>>),
/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]

View File

@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::park::{Park, Unpark};
use crate::runtime::time;
use crate::runtime::driver::Driver;
use crate::util::TryLock;
use std::sync::atomic::Ordering::SeqCst;
@ -42,14 +42,14 @@ const NOTIFIED: usize = 3;
/// Shared across multiple Parker handles
struct Shared {
/// Shared driver. Only one thread at a time can use this
driver: TryLock<time::Driver>,
driver: TryLock<Driver>,
/// Unpark handle
handle: <time::Driver as Park>::Unpark,
handle: <Driver as Park>::Unpark,
}
impl Parker {
pub(crate) fn new(driver: time::Driver) -> Parker {
pub(crate) fn new(driver: Driver) -> Parker {
let handle = driver.unpark();
Parker {
@ -180,7 +180,7 @@ impl Inner {
}
}
fn park_driver(&self, driver: &mut time::Driver) {
fn park_driver(&self, driver: &mut Driver) {
match self
.state
.compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)

View File

@ -1,8 +1,8 @@
#![allow(clippy::redundant_clone)]
use crate::park::{Park, Unpark};
use crate::runtime::driver::Driver;
use crate::runtime::enter;
use crate::runtime::time;
use crate::util::{waker_ref, Wake};
use std::future::Future;
@ -12,17 +12,17 @@ use std::task::Poll::Ready;
#[derive(Debug)]
pub(super) struct Shell {
driver: time::Driver,
driver: Driver,
/// TODO: don't store this
unpark: Arc<Handle>,
}
#[derive(Debug)]
struct Handle(<time::Driver as Park>::Unpark);
struct Handle(<Driver as Park>::Unpark);
impl Shell {
pub(super) fn new(driver: time::Driver) -> Shell {
pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));
Shell { driver, unpark }

View File

@ -1,59 +0,0 @@
//! Abstracts out the APIs necessary to `Runtime` for integrating the time
//! driver. When the `time` feature flag is **not** enabled. These APIs are
//! shells. This isolates the complexity of dealing with conditional
//! compilation.
pub(crate) use variant::*;
#[cfg(feature = "time")]
mod variant {
use crate::park::Either;
use crate::runtime::io;
use crate::time::{self, driver};
pub(crate) type Clock = time::Clock;
pub(crate) type Driver = Either<driver::Driver<io::Driver>, io::Driver>;
pub(crate) type Handle = Option<driver::Handle>;
pub(crate) fn create_clock() -> Clock {
Clock::new()
}
/// Create a new timer driver / handle pair
pub(crate) fn create_driver(
enable: bool,
io_driver: io::Driver,
clock: Clock,
) -> (Driver, Handle) {
if enable {
let driver = driver::Driver::new(io_driver, clock);
let handle = driver.handle();
(Either::A(driver), Some(handle))
} else {
(Either::B(io_driver), None)
}
}
}
#[cfg(not(feature = "time"))]
mod variant {
use crate::runtime::io;
pub(crate) type Clock = ();
pub(crate) type Driver = io::Driver;
pub(crate) type Handle = ();
pub(crate) fn create_clock() -> Clock {
()
}
/// Create a new timer driver / handle pair
pub(crate) fn create_driver(
_enable: bool,
io_driver: io::Driver,
_clock: Clock,
) -> (Driver, Handle) {
(io_driver, ())
}
}

View File

@ -5,7 +5,6 @@
#![cfg(unix)]
use crate::io::{AsyncRead, PollEvented, ReadBuf};
use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage};
use crate::sync::mpsc::{channel, Receiver};
@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use std::task::{Context, Poll};
pub(crate) mod driver;
pub(crate) type OsStorage = Vec<SignalInfo>;
// Number of different unix signals
@ -202,9 +203,9 @@ impl Default for SignalInfo {
/// The purpose of this signal handler is to primarily:
///
/// 1. Flag that our specific signal was received (e.g. store an atomic flag)
/// 2. Wake up driver tasks by writing a byte to a pipe
/// 2. Wake up the driver by writing a byte to a pipe
///
/// Those two operations shoudl both be async-signal safe.
/// Those two operations should both be async-signal safe.
fn action(globals: Pin<&'static Globals>, signal: c_int) {
globals.record_event(signal as EventId);
@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
));
}
// Check that we have a signal driver running
driver::Handle::current().check_inner()?;
let globals = globals();
let siginfo = match globals.storage().get(signal as EventId) {
Some(slot) => slot,
@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> {
}
}
#[derive(Debug)]
struct Driver {
wakeup: PollEvented<UnixStream>,
}
impl Driver {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Drain the data from the pipe and maintain interest in getting more
self.drain(cx);
// Broadcast any signals which were received
globals().broadcast();
Poll::Pending
}
}
impl Driver {
fn new() -> io::Result<Driver> {
// NB: We give each driver a "fresh" reciever file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
// In the past we would reuse the actual receiver file descriptor and
// swallow any errors around double registration of the same descriptor.
// I'm not sure if the second (failed) registration simply doesn't end up
// receiving wake up notifications, or there could be some race condition
// when consuming readiness events, but having distinct descriptors for
// distinct PollEvented instances appears to mitigate this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let stream = globals().receiver.try_clone()?;
let wakeup = PollEvented::new(stream)?;
Ok(Driver { wakeup })
}
/// Drain all data in the global receiver, ensuring we'll get woken up when
/// there is a write on the other end.
///
/// We do *NOT* use the existence of any read bytes as evidence a signal was
/// received since the `pending` flags would have already been set if that
/// was the case. See
/// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more
/// info.
fn drain(&mut self, cx: &mut Context<'_>) {
let mut buf = [0; 128];
let mut buf = ReadBuf::new(&mut buf);
loop {
match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) {
Poll::Ready(Ok(())) => {
if buf.filled().is_empty() {
panic!("EOF on self-pipe")
}
buf.clear();
}
Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e),
Poll::Pending => break,
}
}
}
}
/// A stream of events for receiving a particular type of OS signal.
///
/// In general signal handling on Unix is a pretty tricky topic, and this
@ -382,7 +323,6 @@ impl Driver {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Signal {
driver: Driver,
rx: Receiver<()>,
}
@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result<Signal> {
// Turn the signal delivery on once we are ready for it
signal_enable(signal)?;
// Ensure there's a driver for our associated event loop processing
// signals.
let driver = Driver::new()?;
// One wakeup in a queue is enough, no need for us to buffer up any
// more.
let (tx, rx) = channel(1);
globals().register_listener(signal as EventId, tx);
Ok(Signal { driver, rx })
Ok(Signal { rx })
}
impl Signal {
@ -484,7 +420,6 @@ impl Signal {
/// }
/// ```
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
let _ = self.driver.poll(cx);
self.rx.poll_recv(cx)
}
}

View File

@ -0,0 +1,153 @@
//! Signal driver
use crate::io::driver::Driver as IoDriver;
use crate::io::Registration;
use crate::park::Park;
use crate::runtime::context;
use crate::signal::registry::globals;
use mio_uds::UnixStream;
use std::io::{self, Read};
use std::sync::{Arc, Weak};
use std::time::Duration;
/// Responsible for registering wakeups when an OS signal is received, and
/// subsequently dispatching notifications to any signal listeners as appropriate.
///
/// Note: this driver relies on having an enabled IO driver in order to listen to
/// pipe write wakeups.
#[derive(Debug)]
pub(crate) struct Driver {
/// Thread parker. The `Driver` park implementation delegates to this.
park: IoDriver,
/// A pipe for receiving wake events from the signal handler
receiver: UnixStream,
/// The actual registraiton for `receiver` when active.
/// Lazily bound at the first signal registration.
registration: Registration,
/// Shared state
inner: Arc<Inner>,
}
#[derive(Clone, Debug)]
pub(crate) struct Handle {
inner: Weak<Inner>,
}
#[derive(Debug)]
pub(super) struct Inner(());
// ===== impl Driver =====
impl Driver {
/// Creates a new signal `Driver` instance that delegates wakeups to `park`.
pub(crate) fn new(park: IoDriver) -> io::Result<Self> {
// NB: We give each driver a "fresh" reciever file descriptor to avoid
// the issues described in alexcrichton/tokio-process#42.
//
// In the past we would reuse the actual receiver file descriptor and
// swallow any errors around double registration of the same descriptor.
// I'm not sure if the second (failed) registration simply doesn't end up
// receiving wake up notifications, or there could be some race condition
// when consuming readiness events, but having distinct descriptors for
// distinct PollEvented instances appears to mitigate this.
//
// Unfortunately we cannot just use a single global PollEvented instance
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let receiver = globals().receiver.try_clone()?;
let registration =
Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?;
Ok(Self {
park,
receiver,
registration,
inner: Arc::new(Inner(())),
})
}
/// Returns a handle to this event loop which can be sent across threads
/// and can be used as a proxy to the event loop itself.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
fn process(&self) {
// Check if the pipe is ready to read and therefore has "woken" us up
match self.registration.take_read_ready() {
Ok(Some(ready)) => assert!(ready.is_readable()),
Ok(None) => return, // No wake has arrived, bail
Err(e) => panic!("reactor gone: {}", e),
}
// Drain the pipe completely so we can receive a new readiness event
// if another signal has come in.
let mut buf = [0; 128];
loop {
match (&self.receiver).read(&mut buf) {
Ok(0) => panic!("EOF on self-pipe"),
Ok(_) => continue, // Keep reading
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("Bad read on self-pipe: {}", e),
}
}
// Broadcast any signals which were received
globals().broadcast();
}
}
// ===== impl Park for Driver =====
impl Park for Driver {
type Unpark = <IoDriver as Park>::Unpark;
type Error = io::Error;
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.park.park()?;
self.process();
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park.park_timeout(duration)?;
self.process();
Ok(())
}
fn shutdown(&mut self) {
self.park.shutdown()
}
}
// ===== impl Handle =====
impl Handle {
/// Returns a handle to the current driver
///
/// # Panics
///
/// This function panics if there is no current signal driver set.
pub(super) fn current() -> Self {
context::signal_handle().expect(
"there is no signal driver running, must be called from the context of Tokio runtime",
)
}
pub(super) fn check_inner(&self) -> io::Result<()> {
if self.inner.strong_count() > 0 {
Ok(())
} else {
Err(io::Error::new(io::ErrorKind::Other, "signal driver gone"))
}
}
}