diff --git a/src/unix.rs b/src/unix.rs index 0e11b00df..039abedca 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -15,7 +15,7 @@ use std::io::prelude::*; use std::io; use std::mem; use std::os::unix::prelude::*; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use std::sync::{Mutex, Once, ONCE_INIT}; use futures::future; @@ -39,7 +39,7 @@ const SIGNUM: usize = 32; struct SignalInfo { pending: AtomicBool, // The ones interested in this signal - recipients: Mutex>>, + recipients: Mutex)>>, init: Once, initialized: UnsafeCell, @@ -269,7 +269,7 @@ impl Driver { // TODO: This thing probably generates unnecessary wakups of // this task. But let's optimise it later on, when we // know this works. - match recipients[i].start_send(signum) { + match recipients[i].1.start_send(signum) { Ok(AsyncSink::Ready) => {} Ok(AsyncSink::NotReady(_)) => {} Err(_) => { recipients.swap_remove(i); } @@ -313,7 +313,11 @@ impl Driver { /// If you've got any questions about this feel free to open an issue on the /// repo, though, as I'd love to chat about this! In other words, I'd love to /// alleviate some of these limitations if possible! -pub struct Signal(Receiver); +pub struct Signal { + signal: c_int, + token: usize, + rx: Receiver, +} impl Signal { /// Creates a new stream which will receive notifications when the current @@ -335,6 +339,8 @@ impl Signal { /// multiple times. When a signal is received then all the associated /// channels will receive the signal notification. pub fn new(signal: c_int, handle: &Handle) -> IoFuture { + static TOKENS: AtomicUsize = ATOMIC_USIZE_INIT; + let result = (|| { // Turn the signal delivery on once we are ready for it try!(signal_enable(signal)); @@ -352,8 +358,14 @@ impl Signal { // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); - globals().signals[signal as usize].recipients.lock().unwrap().push(tx); - Ok(Signal(rx)) + let token = TOKENS.fetch_add(1, Ordering::SeqCst); + let idx = signal as usize; + globals().signals[idx].recipients.lock().unwrap().push((token, tx)); + Ok(Signal { + rx: rx, + token: token, + signal: signal, + }) })(); future::result(result).boxed() @@ -365,9 +377,15 @@ impl Stream for Signal { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - // It seems the channel doesn't generate any errors anyway - self.0.poll().map_err(|_| io::Error::new(io::ErrorKind::Other, "Unknown futures::sync::mpsc error")) + // receivers don't generate errors + self.rx.poll().map_err(|_| panic!()) } } -// TODO: Drop for Signal and remove the other end proactively? +impl Drop for Signal { + fn drop(&mut self) { + let idx = self.signal as usize; + let mut list = globals().signals[idx].recipients.lock().unwrap(); + list.retain(|pair| pair.0 != self.token); + } +}