signal: Clear out old Signal on drop

This commit is contained in:
Alex Crichton 2017-01-24 10:54:55 -08:00 committed by Carl Lerche
parent 70e4ed67ad
commit 955cd2836d

View File

@ -15,7 +15,7 @@ use std::io::prelude::*;
use std::io; use std::io;
use std::mem; use std::mem;
use std::os::unix::prelude::*; use std::os::unix::prelude::*;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Mutex, Once, ONCE_INIT}; use std::sync::{Mutex, Once, ONCE_INIT};
use futures::future; use futures::future;
@ -39,7 +39,7 @@ const SIGNUM: usize = 32;
struct SignalInfo { struct SignalInfo {
pending: AtomicBool, pending: AtomicBool,
// The ones interested in this signal // The ones interested in this signal
recipients: Mutex<Vec<Sender<c_int>>>, recipients: Mutex<Vec<(usize, Sender<c_int>)>>,
init: Once, init: Once,
initialized: UnsafeCell<bool>, initialized: UnsafeCell<bool>,
@ -269,7 +269,7 @@ impl Driver {
// TODO: This thing probably generates unnecessary wakups of // TODO: This thing probably generates unnecessary wakups of
// this task. But let's optimise it later on, when we // this task. But let's optimise it later on, when we
// know this works. // know this works.
match recipients[i].start_send(signum) { match recipients[i].1.start_send(signum) {
Ok(AsyncSink::Ready) => {} Ok(AsyncSink::Ready) => {}
Ok(AsyncSink::NotReady(_)) => {} Ok(AsyncSink::NotReady(_)) => {}
Err(_) => { recipients.swap_remove(i); } Err(_) => { recipients.swap_remove(i); }
@ -313,7 +313,11 @@ impl Driver {
/// If you've got any questions about this feel free to open an issue on the /// If you've got any questions about this feel free to open an issue on the
/// repo, though, as I'd love to chat about this! In other words, I'd love to /// repo, though, as I'd love to chat about this! In other words, I'd love to
/// alleviate some of these limitations if possible! /// alleviate some of these limitations if possible!
pub struct Signal(Receiver<c_int>); pub struct Signal {
signal: c_int,
token: usize,
rx: Receiver<c_int>,
}
impl Signal { impl Signal {
/// Creates a new stream which will receive notifications when the current /// Creates a new stream which will receive notifications when the current
@ -335,6 +339,8 @@ impl Signal {
/// multiple times. When a signal is received then all the associated /// multiple times. When a signal is received then all the associated
/// channels will receive the signal notification. /// channels will receive the signal notification.
pub fn new(signal: c_int, handle: &Handle) -> IoFuture<Signal> { pub fn new(signal: c_int, handle: &Handle) -> IoFuture<Signal> {
static TOKENS: AtomicUsize = ATOMIC_USIZE_INIT;
let result = (|| { let result = (|| {
// Turn the signal delivery on once we are ready for it // Turn the signal delivery on once we are ready for it
try!(signal_enable(signal)); try!(signal_enable(signal));
@ -352,8 +358,14 @@ impl Signal {
// One wakeup in a queue is enough, no need for us to buffer up any // One wakeup in a queue is enough, no need for us to buffer up any
// more. // more.
let (tx, rx) = channel(1); let (tx, rx) = channel(1);
globals().signals[signal as usize].recipients.lock().unwrap().push(tx); let token = TOKENS.fetch_add(1, Ordering::SeqCst);
Ok(Signal(rx)) let idx = signal as usize;
globals().signals[idx].recipients.lock().unwrap().push((token, tx));
Ok(Signal {
rx: rx,
token: token,
signal: signal,
})
})(); })();
future::result(result).boxed() future::result(result).boxed()
@ -365,9 +377,15 @@ impl Stream for Signal {
type Error = io::Error; type Error = io::Error;
fn poll(&mut self) -> Poll<Option<c_int>, io::Error> { fn poll(&mut self) -> Poll<Option<c_int>, io::Error> {
// It seems the channel doesn't generate any errors anyway // receivers don't generate errors
self.0.poll().map_err(|_| io::Error::new(io::ErrorKind::Other, "Unknown futures::sync::mpsc error")) self.rx.poll().map_err(|_| panic!())
} }
} }
// TODO: Drop for Signal and remove the other end proactively? impl Drop for Signal {
fn drop(&mut self) {
let idx = self.signal as usize;
let mut list = globals().signals[idx].recipients.lock().unwrap();
list.retain(|pair| pair.0 != self.token);
}
}