sync: drop old tasks in oneshot (#911)

This commit is contained in:
Sean McArthur 2019-02-20 12:50:29 -08:00 committed by Carl Lerche
parent ab206b976c
commit f9345f99bb
3 changed files with 113 additions and 15 deletions

View File

@ -156,13 +156,17 @@ impl<T> Sender<T> {
}
if state.is_tx_task_set() {
let tx_task = unsafe { inner.tx_task() };
let will_notify = inner.tx_task.with(|ptr| unsafe {
(&*ptr).will_notify_current()
});
if !tx_task.will_notify_current() {
if !will_notify {
state = State::unset_tx_task(&inner.state);
if state.is_closed() {
return Ok(Async::Ready(()));
} else {
unsafe { inner.drop_tx_task() };
}
}
}
@ -294,8 +298,9 @@ impl<T> Inner<T> {
}
if prev.is_rx_task_set() {
let rx_task = unsafe { self.rx_task() };
rx_task.notify();
self.rx_task.with(|ptr| unsafe {
(&*ptr).notify()
});
}
true
@ -316,18 +321,21 @@ impl<T> Inner<T> {
Err(RecvError(()))
} else {
if state.is_rx_task_set() {
let rx_task = unsafe { self.rx_task() };
let will_notify = self.rx_task.with(|ptr| unsafe {
(&*ptr).will_notify_current()
});
// Check if the task is still the same
if !rx_task.will_notify_current() {
if !will_notify {
// Unset the task
state = State::unset_rx_task(&self.state);
if state.is_complete() {
return match unsafe { self.consume_value() } {
Some(value) => Ok(Ready(value)),
None => Err(RecvError(())),
};
} else {
unsafe { self.drop_rx_task() };
}
}
}
@ -358,8 +366,9 @@ impl<T> Inner<T> {
let prev = State::set_closed(&self.state);
if prev.is_tx_task_set() && !prev.is_complete() {
let tx_task = unsafe { self.tx_task() };
tx_task.notify();
self.tx_task.with(|ptr| unsafe {
(&*ptr).notify()
});
}
}
@ -370,8 +379,16 @@ impl<T> Inner<T> {
})
}
unsafe fn rx_task(&self) -> &Task {
&*self.rx_task.with(|ptr| ptr)
unsafe fn drop_rx_task(&self) {
self.rx_task.with_mut(|ptr| {
ManuallyDrop::drop(&mut *ptr)
})
}
unsafe fn drop_tx_task(&self) {
self.tx_task.with_mut(|ptr| {
ManuallyDrop::drop(&mut *ptr)
})
}
unsafe fn set_rx_task(&self) {
@ -380,10 +397,6 @@ impl<T> Inner<T> {
});
}
unsafe fn tx_task(&self) -> &Task {
&*self.tx_task.with(|ptr| ptr)
}
unsafe fn set_tx_task(&self) {
self.tx_task.with_mut(|ptr| {
*ptr = ManuallyDrop::new(task::current())

View File

@ -5,6 +5,7 @@ extern crate loom;
#[allow(warnings)]
mod oneshot;
use futures::{Async, Future};
use loom::thread;
use loom::futures::block_on;
@ -21,3 +22,75 @@ fn smoke() {
assert_eq!(1, value);
});
}
#[test]
fn changing_rx_task() {
loom::fuzz(|| {
let (tx, mut rx) = oneshot::channel();
thread::spawn(move || {
tx.send(1).unwrap();
});
let rx = thread::spawn(move || {
let t1 = block_on(futures::future::poll_fn(|| {
Ok::<_, ()>(rx.poll().into())
})).unwrap();
match t1 {
Ok(Async::Ready(value)) => {
// ok
assert_eq!(1, value);
None
},
Ok(Async::NotReady) => {
Some(rx)
},
Err(_) => unreachable!(),
}
}).join().unwrap();
if let Some(rx) = rx {
// Previous task parked, use a new task...
let value = block_on(rx).unwrap();
assert_eq!(1, value);
}
});
}
#[test]
fn changing_tx_task() {
loom::fuzz(|| {
let (mut tx, rx) = oneshot::channel::<i32>();
thread::spawn(move || {
drop(rx);
});
let tx = thread::spawn(move || {
let t1 = block_on(futures::future::poll_fn(|| {
Ok::<_, ()>(tx.poll_close().into())
})).unwrap();
match t1 {
Ok(Async::Ready(())) => {
None
},
Ok(Async::NotReady) => {
Some(tx)
},
Err(_) => unreachable!(),
}
}).join().unwrap();
if let Some(mut tx) = tx {
// Previous task parked, use a new task...
block_on(futures::future::poll_fn(move || {
tx.poll_close()
})).unwrap();
}
});
}

View File

@ -208,10 +208,16 @@ fn receiver_changes_task() {
assert_not_ready!(rx.poll());
});
assert_eq!(2, task1.notifier_ref_count());
assert_eq!(1, task2.notifier_ref_count());
task2.enter(|| {
assert_not_ready!(rx.poll());
});
assert_eq!(1, task1.notifier_ref_count());
assert_eq!(2, task2.notifier_ref_count());
tx.send(1).unwrap();
assert!(!task1.is_notified());
@ -231,10 +237,16 @@ fn sender_changes_task() {
assert_not_ready!(tx.poll_close());
});
assert_eq!(2, task1.notifier_ref_count());
assert_eq!(1, task2.notifier_ref_count());
task2.enter(|| {
assert_not_ready!(tx.poll_close());
});
assert_eq!(1, task1.notifier_ref_count());
assert_eq!(2, task2.notifier_ref_count());
drop(rx);
assert!(!task1.is_notified());