miri: make miri accept our intrusive linked lists (#4397)

This commit is contained in:
Taiki Endo 2022-02-09 19:11:17 +09:00 committed by GitHub
parent ca51f6a980
commit 1be8e9dfb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 168 additions and 61 deletions

View File

@ -188,11 +188,13 @@ jobs:
override: true
- uses: Swatinem/rust-cache@v1
- name: miri
run: |
set -e
rm -rf tests
cargo miri test --features rt,rt-multi-thread,sync task
# Many of tests in tokio/tests and doctests use #[tokio::test] or
# #[tokio::main] that calls epoll_create1 that Miri does not support.
run: cargo miri test --features full --lib --no-fail-fast
working-directory: tokio
env:
MIRIFLAGS: -Zmiri-disable-isolation -Zmiri-tag-raw-pointers
PROPTEST_CASES: 10
san:
name: san

View File

@ -173,6 +173,12 @@ LOOM_MAX_PREEMPTIONS=1 RUSTFLAGS="--cfg loom" \
cargo test --lib --release --features full -- --test-threads=1 --nocapture
```
You can run miri tests with
```
MIRIFLAGS="-Zmiri-disable-isolation -Zmiri-tag-raw-pointers" PROPTEST_CASES=10 \
cargo +nightly miri test --features full --lib
```
### Tests
If the change being proposed alters code (as opposed to only documentation for

View File

@ -228,6 +228,7 @@ fn flush_while_idle() {
}
#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn read_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;
@ -299,6 +300,7 @@ fn read_with_buffer_larger_than_max() {
}
#[test]
#[cfg_attr(miri, ignore)] // takes a really long time with miri
fn write_with_buffer_larger_than_max() {
// Chunks
let chunk_a = 16 * 1024;

View File

@ -280,6 +280,7 @@ pub(crate) mod test {
drop(signal_guard);
}
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let signal_driver = IoDriver::new().and_then(SignalDriver::new).unwrap();

View File

@ -26,6 +26,10 @@ where
}
}
fn header_ptr(&self) -> NonNull<Header> {
self.cell.cast()
}
fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
@ -93,7 +97,8 @@ where
match self.header().state.transition_to_running() {
TransitionToRunning::Success => {
let waker_ref = waker_ref::<T, S>(self.header());
let header_ptr = self.header_ptr();
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let res = poll_future(&self.core().stage, cx);

View File

@ -313,7 +313,7 @@ cfg_rt_multi_thread! {
impl<S: 'static> Task<S> {
fn into_raw(self) -> NonNull<Header> {
let ret = self.header().into();
let ret = self.raw.header_ptr();
mem::forget(self);
ret
}
@ -427,7 +427,7 @@ unsafe impl<S> linked_list::Link for Task<S> {
type Target = Header;
fn as_raw(handle: &Task<S>) -> NonNull<Header> {
handle.header().into()
handle.raw.header_ptr()
}
unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {

View File

@ -63,6 +63,10 @@ impl RawTask {
RawTask { ptr }
}
pub(super) fn header_ptr(&self) -> NonNull<Header> {
self.ptr
}
/// Returns a reference to the task's meta structure.
///
/// Safe as `Header` is `Sync`.

View File

@ -15,7 +15,7 @@ pub(super) struct WakerRef<'a, S: 'static> {
/// Returns a `WakerRef` which avoids having to pre-emptively increase the
/// refcount if there is no need to do so.
pub(super) fn waker_ref<T, S>(header: &Header) -> WakerRef<'_, S>
pub(super) fn waker_ref<T, S>(header: &NonNull<Header>) -> WakerRef<'_, S>
where
T: Future,
S: Schedule,
@ -28,7 +28,7 @@ where
// point and not an *owned* waker, we must ensure that `drop` is never
// called on this waker instance. This is done by wrapping it with
// `ManuallyDrop` and then never calling drop.
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(header))) };
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(raw_waker::<T, S>(*header))) };
WakerRef {
waker,
@ -77,7 +77,7 @@ where
let harness = Harness::<T, S>::from_raw(ptr);
trace!(harness, "waker.clone");
(*header).state.ref_inc();
raw_waker::<T, S>(header)
raw_waker::<T, S>(ptr)
}
unsafe fn drop_waker<T, S>(ptr: *const ())
@ -114,12 +114,12 @@ where
harness.wake_by_ref();
}
fn raw_waker<T, S>(header: *const Header) -> RawWaker
fn raw_waker<T, S>(header: NonNull<Header>) -> RawWaker
where
T: Future,
S: Schedule,
{
let ptr = header as *const ();
let ptr = header.as_ptr() as *const ();
let vtable = &RawWakerVTable::new(
clone_waker::<T, S>,
wake_by_val::<T, S>,

View File

@ -101,13 +101,21 @@ fn steal_batch() {
assert!(local1.pop().is_none());
}
const fn normal_or_miri(normal: usize, miri: usize) -> usize {
if cfg!(miri) {
miri
} else {
normal
}
}
#[test]
fn stress1() {
const NUM_ITER: usize = 1;
const NUM_STEAL: usize = 1_000;
const NUM_LOCAL: usize = 1_000;
const NUM_PUSH: usize = 500;
const NUM_POP: usize = 250;
const NUM_STEAL: usize = normal_or_miri(1_000, 10);
const NUM_LOCAL: usize = normal_or_miri(1_000, 10);
const NUM_PUSH: usize = normal_or_miri(500, 10);
const NUM_POP: usize = normal_or_miri(250, 10);
let mut metrics = MetricsBatch::new();
@ -169,8 +177,8 @@ fn stress1() {
#[test]
fn stress2() {
const NUM_ITER: usize = 1;
const NUM_TASKS: usize = 1_000_000;
const NUM_STEAL: usize = 1_000;
const NUM_TASKS: usize = normal_or_miri(1_000_000, 50);
const NUM_STEAL: usize = normal_or_miri(1_000, 10);
let mut metrics = MetricsBatch::new();

View File

@ -202,7 +202,12 @@ mod tests {
registry.broadcast();
// Yield so the previous broadcast can get received
crate::time::sleep(std::time::Duration::from_millis(10)).await;
//
// This yields many times since the block_on task is only polled every 61
// ticks.
for _ in 0..100 {
crate::task::yield_now().await;
}
// Send subsequent signal
registry.record_event(0);

View File

@ -151,6 +151,7 @@ impl<T> fmt::Debug for ReusableBoxFuture<T> {
}
#[cfg(test)]
#[cfg(not(miri))] // Miri breaks when you use Pin<&mut dyn Future>
mod test {
use super::ReusableBoxFuture;
use futures::future::FutureExt;

View File

@ -130,6 +130,7 @@ enum NotificationType {
}
#[derive(Debug)]
#[repr(C)] // required by `linked_list::Link` impl
struct Waiter {
/// Intrusive linked-list pointers.
pointers: linked_list::Pointers<Waiter>,
@ -731,8 +732,8 @@ unsafe impl linked_list::Link for Waiter {
ptr
}
unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
target.cast()
}
}

View File

@ -45,3 +45,37 @@ fn notify_clones_waker_before_lock() {
// The result doesn't matter, we're just testing that we don't deadlock.
let _ = future.poll(&mut cx);
}
#[test]
fn notify_simple() {
let notify = Notify::new();
let mut fut1 = tokio_test::task::spawn(notify.notified());
assert!(fut1.poll().is_pending());
let mut fut2 = tokio_test::task::spawn(notify.notified());
assert!(fut2.poll().is_pending());
notify.notify_waiters();
assert!(fut1.poll().is_ready());
assert!(fut2.poll().is_ready());
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn watch_test() {
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {
let (tx, mut rx) = crate::sync::watch::channel(());
crate::spawn(async move {
let _ = tx.send(());
});
let _ = rx.changed().await;
});
}

View File

@ -326,15 +326,16 @@ pub(super) type EntryList = crate::util::linked_list::LinkedList<TimerShared, Ti
///
/// Note that this structure is located inside the `TimerEntry` structure.
#[derive(Debug)]
#[repr(C)] // required by `link_list::Link` impl
pub(crate) struct TimerShared {
/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,
/// Current state. This records whether the timer entry is currently under
/// the ownership of the driver, and if not, its current state (not
/// complete, fired, error, etc).
state: StateCell,
/// Data manipulated by the driver thread itself, only.
driver_state: CachePadded<TimerSharedPadded>,
_p: PhantomPinned,
}
@ -420,7 +421,14 @@ impl TimerShared {
/// padded. This contains the information that the driver thread accesses most
/// frequently to minimize contention. In particular, we move it away from the
/// waker, as the waker is updated on every poll.
#[repr(C)] // required by `link_list::Link` impl
struct TimerSharedPadded {
/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: linked_list::Pointers<TimerShared>,
/// The expiration time for which this entry is currently registered.
/// Generally owned by the driver, but is accessed by the entry when not
/// registered.
@ -428,12 +436,6 @@ struct TimerSharedPadded {
/// The true expiration time. Set by the timer future, read by the driver.
true_when: AtomicU64,
/// A link within the doubly-linked list of timers on a particular level and
/// slot. Valid only if state is equal to Registered.
///
/// Only accessed under the entry lock.
pointers: StdUnsafeCell<linked_list::Pointers<TimerShared>>,
}
impl std::fmt::Debug for TimerSharedPadded {
@ -450,7 +452,7 @@ impl TimerSharedPadded {
Self {
cached_when: AtomicU64::new(0),
true_when: AtomicU64::new(0),
pointers: StdUnsafeCell::new(linked_list::Pointers::new()),
pointers: linked_list::Pointers::new(),
}
}
}
@ -474,7 +476,7 @@ unsafe impl linked_list::Link for TimerShared {
unsafe fn pointers(
target: NonNull<Self::Target>,
) -> NonNull<linked_list::Pointers<Self::Target>> {
unsafe { NonNull::new(target.as_ref().driver_state.0.pointers.get()).unwrap() }
target.cast()
}
}

View File

@ -27,7 +27,12 @@ fn block_on<T>(f: impl std::future::Future<Output = T>) -> T {
return loom::future::block_on(f);
#[cfg(not(loom))]
return futures::executor::block_on(f);
{
let rt = crate::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(f)
}
}
fn model(f: impl Fn() + Send + Sync + 'static) {
@ -182,6 +187,15 @@ fn reset_future() {
})
}
#[cfg(not(loom))]
fn normal_or_miri<T>(normal: T, miri: T) -> T {
if cfg!(miri) {
miri
} else {
normal
}
}
#[test]
#[cfg(not(loom))]
fn poll_process_levels() {
@ -195,7 +209,7 @@ fn poll_process_levels() {
let mut entries = vec![];
for i in 0..1024 {
for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle,
clock.now() + Duration::from_millis(i),
@ -208,7 +222,7 @@ fn poll_process_levels() {
entries.push(entry);
}
for t in 1..1024 {
for t in 1..normal_or_miri(1024, 64) {
handle.process_at_time(t as u64);
for (deadline, future) in entries.iter_mut().enumerate() {
let mut context = Context::from_waker(noop_waker_ref());

View File

@ -57,6 +57,13 @@ pub(crate) unsafe trait Link {
unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Self::Handle;
/// Return the pointers for a node
///
/// # Safety
///
/// The resulting pointer should have the same tag in the stacked-borrows
/// stack as the argument. In particular, the method may not create an
/// intermediate reference in the process of creating the resulting raw
/// pointer.
unsafe fn pointers(target: NonNull<Self::Target>) -> NonNull<Pointers<Self::Target>>;
}
@ -353,6 +360,7 @@ mod tests {
use std::pin::Pin;
#[derive(Debug)]
#[repr(C)]
struct Entry {
pointers: Pointers<Entry>,
val: i32,
@ -370,8 +378,8 @@ mod tests {
Pin::new_unchecked(&*ptr.as_ptr())
}
unsafe fn pointers(mut target: NonNull<Entry>) -> NonNull<Pointers<Entry>> {
NonNull::from(&mut target.as_mut().pointers)
unsafe fn pointers(target: NonNull<Entry>) -> NonNull<Pointers<Entry>> {
target.cast()
}
}

View File

@ -157,6 +157,12 @@ struct Slot<T> {
/// Next entry in the free list.
next: u32,
/// Makes miri happy by making mutable references not take exclusive access.
///
/// Could probably also be fixed by replacing `slots` with a raw-pointer
/// based equivalent.
_pin: std::marker::PhantomPinned,
}
/// Value paired with a reference to the page.
@ -409,7 +415,7 @@ impl<T: Entry> Page<T> {
slot.value.with(|ptr| unsafe { (*ptr).value.reset() });
// Return a reference to the slot
Some((me.addr(idx), slot.gen_ref(me)))
Some((me.addr(idx), locked.gen_ref(idx, me)))
} else if me.len == locked.slots.len() {
// The page is full
None
@ -428,9 +434,10 @@ impl<T: Entry> Page<T> {
locked.slots.push(Slot {
value: UnsafeCell::new(Value {
value: Default::default(),
page: &**me as *const _,
page: Arc::as_ptr(me),
}),
next: 0,
_pin: std::marker::PhantomPinned,
});
// Increment the head to indicate the free stack is empty
@ -443,7 +450,7 @@ impl<T: Entry> Page<T> {
debug_assert_eq!(locked.slots.len(), locked.head);
Some((me.addr(idx), locked.slots[idx].gen_ref(me)))
Some((me.addr(idx), locked.gen_ref(idx, me)))
}
}
}
@ -558,18 +565,15 @@ impl<T> Slots<T> {
idx
}
}
impl<T: Entry> Slot<T> {
/// Generates a `Ref` for the slot. This involves bumping the page's ref count.
fn gen_ref(&self, page: &Arc<Page<T>>) -> Ref<T> {
// The ref holds a ref on the page. The `Arc` is forgotten here and is
// resurrected in `release` when the `Ref` is dropped. By avoiding to
// hold on to an explicit `Arc` value, the struct size of `Ref` is
// reduced.
/// Generates a `Ref` for the slot at the given index. This involves bumping the page's ref count.
fn gen_ref(&self, idx: usize, page: &Arc<Page<T>>) -> Ref<T> {
assert!(idx < self.slots.len());
mem::forget(page.clone());
let slot = self as *const Slot<T>;
let value = slot as *const Value<T>;
let vec_ptr = self.slots.as_ptr();
let slot: *const Slot<T> = unsafe { vec_ptr.add(idx) };
let value: *const Value<T> = slot as *const Value<T>;
Ref { value }
}
@ -691,11 +695,13 @@ mod test {
#[test]
fn insert_many() {
const MANY: usize = normal_or_miri(10_000, 50);
let mut slab = Slab::<Foo>::new();
let alloc = slab.allocator();
let mut entries = vec![];
for i in 0..10_000 {
for i in 0..MANY {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val));
@ -708,15 +714,15 @@ mod test {
entries.clear();
for i in 0..10_000 {
for i in 0..MANY {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(10_000 - i, SeqCst);
val.id.store(MANY - i, SeqCst);
entries.push((addr, val));
}
for (i, (addr, v)) in entries.iter().enumerate() {
assert_eq!(10_000 - i, v.id.load(SeqCst));
assert_eq!(10_000 - i, slab.get(*addr).unwrap().id.load(SeqCst));
assert_eq!(MANY - i, v.id.load(SeqCst));
assert_eq!(MANY - i, slab.get(*addr).unwrap().id.load(SeqCst));
}
}
@ -726,7 +732,7 @@ mod test {
let alloc = slab.allocator();
let mut entries = vec![];
for i in 0..10_000 {
for i in 0..normal_or_miri(10_000, 100) {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
entries.push((addr, val));
@ -734,7 +740,7 @@ mod test {
for _ in 0..10 {
// Drop 1000 in reverse
for _ in 0..1_000 {
for _ in 0..normal_or_miri(1_000, 10) {
entries.pop();
}
@ -753,7 +759,7 @@ mod test {
let mut entries1 = vec![];
let mut entries2 = vec![];
for i in 0..10_000 {
for i in 0..normal_or_miri(10_000, 100) {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
@ -771,6 +777,14 @@ mod test {
}
}
const fn normal_or_miri(normal: usize, miri: usize) -> usize {
if cfg!(miri) {
miri
} else {
normal
}
}
#[test]
fn compact_all() {
let mut slab = Slab::<Foo>::new();
@ -780,7 +794,7 @@ mod test {
for _ in 0..2 {
entries.clear();
for i in 0..10_000 {
for i in 0..normal_or_miri(10_000, 100) {
let (addr, val) = alloc.allocate().unwrap();
val.id.store(i, SeqCst);
@ -808,7 +822,7 @@ mod test {
let alloc = slab.allocator();
let mut entries = vec![];
for _ in 0..5 {
for _ in 0..normal_or_miri(5, 2) {
entries.clear();
// Allocate a few pages + 1

View File

@ -31,7 +31,7 @@ impl Deref for WakerRef<'_> {
/// Creates a reference to a `Waker` from a reference to `Arc<impl Wake>`.
pub(crate) fn waker_ref<W: Wake>(wake: &Arc<W>) -> WakerRef<'_> {
let ptr = &**wake as *const _ as *const ();
let ptr = Arc::as_ptr(wake) as *const ();
let waker = unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) };