mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
sync: fix racy UnsafeCell
access on a closed oneshot (#4226)
This commit is contained in:
parent
79d7a625d0
commit
4b78ed4d68
@ -1031,11 +1031,38 @@ impl State {
|
||||
}
|
||||
|
||||
fn set_complete(cell: &AtomicUsize) -> State {
|
||||
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
|
||||
// the `RX_TASK_SET` flag is set. However, `loom` does not support
|
||||
// fences yet.
|
||||
let val = cell.fetch_or(VALUE_SENT, AcqRel);
|
||||
State(val)
|
||||
// This method is a compare-and-swap loop rather than a fetch-or like
|
||||
// other `set_$WHATEVER` methods on `State`. This is because we must
|
||||
// check if the state has been closed before setting the `VALUE_SENT`
|
||||
// bit.
|
||||
//
|
||||
// We don't want to set both the `VALUE_SENT` bit if the `CLOSED`
|
||||
// bit is already set, because `VALUE_SENT` will tell the receiver that
|
||||
// it's okay to access the inner `UnsafeCell`. Immediately after calling
|
||||
// `set_complete`, if the channel was closed, the sender will _also_
|
||||
// access the `UnsafeCell` to take the value back out, so if a
|
||||
// `poll_recv` or `try_recv` call is occurring concurrently, both
|
||||
// threads may try to access the `UnsafeCell` if we were to set the
|
||||
// `VALUE_SENT` bit on a closed channel.
|
||||
let mut state = cell.load(Ordering::Relaxed);
|
||||
loop {
|
||||
if State(state).is_closed() {
|
||||
break;
|
||||
}
|
||||
// TODO: This could be `Release`, followed by an `Acquire` fence *if*
|
||||
// the `RX_TASK_SET` flag is set. However, `loom` does not support
|
||||
// fences yet.
|
||||
match cell.compare_exchange_weak(
|
||||
state,
|
||||
state | VALUE_SENT,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(actual) => state = actual,
|
||||
}
|
||||
}
|
||||
State(state)
|
||||
}
|
||||
|
||||
fn is_rx_task_set(self) -> bool {
|
||||
|
@ -55,6 +55,35 @@ fn changing_rx_task() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_recv_close() {
|
||||
// reproduces https://github.com/tokio-rs/tokio/issues/4225
|
||||
loom::model(|| {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
thread::spawn(move || {
|
||||
let _ = tx.send(());
|
||||
});
|
||||
|
||||
rx.close();
|
||||
let _ = rx.try_recv();
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recv_closed() {
|
||||
// reproduces https://github.com/tokio-rs/tokio/issues/4225
|
||||
loom::model(|| {
|
||||
let (tx, mut rx) = oneshot::channel();
|
||||
|
||||
thread::spawn(move || {
|
||||
let _ = tx.send(1);
|
||||
});
|
||||
|
||||
rx.close();
|
||||
let _ = block_on(rx);
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: Move this into `oneshot` proper.
|
||||
|
||||
use std::future::Future;
|
||||
|
Loading…
x
Reference in New Issue
Block a user