task: fix leak in LocalSet (#3978)

This commit is contained in:
Alice Ryhl 2021-07-22 12:05:02 +02:00
parent 378409d15d
commit 1eb468be08
3 changed files with 74 additions and 7 deletions

View File

@ -0,0 +1,47 @@
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::Builder;
use crate::task::LocalSet;
use std::task::Poll;
/// Waking a runtime will attempt to push a task into a queue of notifications
/// in the runtime, however the tasks in such a queue usually have a reference
/// to the runtime itself. This means that if they are not properly removed at
/// runtime shutdown, this will cause a memory leak.
///
/// This test verifies that waking something during shutdown of a LocalSet does
/// not result in tasks lingering in the queue once shutdown is complete. This
/// is verified using loom's leak finder.
#[test]
fn wake_during_shutdown() {
loom::model(|| {
let rt = Builder::new_current_thread().build().unwrap();
let ls = LocalSet::new();
let (send, recv) = oneshot::channel();
ls.spawn_local(async move {
let mut send = Some(send);
let () = futures::future::poll_fn(|cx| {
if let Some(send) = send.take() {
send.send(cx.waker().clone());
}
Poll::Pending
})
.await;
});
let handle = loom::thread::spawn(move || {
let waker = recv.recv();
waker.wake();
});
ls.block_on(&rt, crate::task::yield_now());
drop(ls);
handle.join().unwrap();
drop(rt);
});
}

View File

@ -21,6 +21,7 @@ mod joinable_wrapper {
cfg_loom! { cfg_loom! {
mod loom_basic_scheduler; mod loom_basic_scheduler;
mod loom_local;
mod loom_blocking; mod loom_blocking;
mod loom_oneshot; mod loom_oneshot;
mod loom_pool; mod loom_pool;

View File

@ -242,7 +242,7 @@ struct Tasks {
/// LocalSet state shared between threads. /// LocalSet state shared between threads.
struct Shared { struct Shared {
/// Remote run queue sender /// Remote run queue sender
queue: Mutex<VecDeque<task::Notified<Arc<Shared>>>>, queue: Mutex<Option<VecDeque<task::Notified<Arc<Shared>>>>>,
/// Wake the `LocalSet` task /// Wake the `LocalSet` task
waker: AtomicWaker, waker: AtomicWaker,
@ -338,7 +338,7 @@ impl LocalSet {
queue: VecDeque::with_capacity(INITIAL_CAPACITY), queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}), }),
shared: Arc::new(Shared { shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))),
waker: AtomicWaker::new(), waker: AtomicWaker::new(),
}), }),
}, },
@ -538,7 +538,8 @@ impl LocalSet {
.shared .shared
.queue .queue
.lock() .lock()
.pop_front() .as_mut()
.and_then(|queue| queue.pop_front())
.or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) .or_else(|| self.context.tasks.borrow_mut().queue.pop_front())
} else { } else {
self.context self.context
@ -546,7 +547,14 @@ impl LocalSet {
.borrow_mut() .borrow_mut()
.queue .queue
.pop_front() .pop_front()
.or_else(|| self.context.shared.queue.lock().pop_front()) .or_else(|| {
self.context
.shared
.queue
.lock()
.as_mut()
.and_then(|queue| queue.pop_front())
})
} }
} }
@ -610,7 +618,10 @@ impl Drop for LocalSet {
task.shutdown(); task.shutdown();
} }
for task in self.context.shared.queue.lock().drain(..) { // Take the queue from the Shared object to prevent pushing
// notifications to it in the future.
let queue = self.context.shared.queue.lock().take().unwrap();
for task in queue {
task.shutdown(); task.shutdown();
} }
@ -660,8 +671,16 @@ impl Shared {
cx.tasks.borrow_mut().queue.push_back(task); cx.tasks.borrow_mut().queue.push_back(task);
} }
_ => { _ => {
self.queue.lock().push_back(task); // First check whether the queue is still there (if not, the
self.waker.wake(); // LocalSet is dropped). Then push to it if so, and if not,
// do nothing.
let mut lock = self.queue.lock();
if let Some(queue) = lock.as_mut() {
queue.push_back(task);
drop(lock);
self.waker.wake();
}
} }
}); });
} }