mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
537 lines
13 KiB
Rust
537 lines
13 KiB
Rust
extern crate env_logger;
|
|
extern crate futures;
|
|
extern crate tokio_executor;
|
|
extern crate tokio_threadpool;
|
|
|
|
use tokio_executor::park::{Park, Unpark};
|
|
use tokio_threadpool::park::{DefaultPark, DefaultUnpark};
|
|
use tokio_threadpool::*;
|
|
|
|
use futures::future::lazy;
|
|
use futures::{Async, Future, Poll, Sink, Stream};
|
|
|
|
use std::cell::Cell;
|
|
use std::sync::atomic::Ordering::Relaxed;
|
|
use std::sync::atomic::*;
|
|
use std::sync::{mpsc, Arc};
|
|
use std::time::Duration;
|
|
|
|
thread_local!(static FOO: Cell<u32> = Cell::new(0));
|
|
|
|
fn ignore_results<F: Future + Send + 'static>(f: F) -> Box<Future<Item = (), Error = ()> + Send> {
|
|
Box::new(f.map(|_| ()).map_err(|_| ()))
|
|
}
|
|
|
|
#[test]
|
|
fn natural_shutdown_simple_futures() {
|
|
let _ = ::env_logger::try_init();
|
|
|
|
for _ in 0..1_000 {
|
|
let num_inc = Arc::new(AtomicUsize::new(0));
|
|
let num_dec = Arc::new(AtomicUsize::new(0));
|
|
|
|
FOO.with(|f| {
|
|
f.set(1);
|
|
|
|
let pool = {
|
|
let num_inc = num_inc.clone();
|
|
let num_dec = num_dec.clone();
|
|
|
|
Builder::new()
|
|
.around_worker(move |w, _| {
|
|
num_inc.fetch_add(1, Relaxed);
|
|
w.run();
|
|
num_dec.fetch_add(1, Relaxed);
|
|
})
|
|
.build()
|
|
};
|
|
|
|
let tx = pool.sender().clone();
|
|
|
|
let a = {
|
|
let (t, rx) = mpsc::channel();
|
|
tx.spawn(lazy(move || {
|
|
// Makes sure this runs on a worker thread
|
|
FOO.with(|f| assert_eq!(f.get(), 0));
|
|
|
|
t.send("one").unwrap();
|
|
Ok(())
|
|
}))
|
|
.unwrap();
|
|
rx
|
|
};
|
|
|
|
let b = {
|
|
let (t, rx) = mpsc::channel();
|
|
tx.spawn(lazy(move || {
|
|
// Makes sure this runs on a worker thread
|
|
FOO.with(|f| assert_eq!(f.get(), 0));
|
|
|
|
t.send("two").unwrap();
|
|
Ok(())
|
|
}))
|
|
.unwrap();
|
|
rx
|
|
};
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!("one", a.recv().unwrap());
|
|
assert_eq!("two", b.recv().unwrap());
|
|
|
|
// Wait for the pool to shutdown
|
|
pool.shutdown().wait().unwrap();
|
|
|
|
// Assert that at least one thread started
|
|
let num_inc = num_inc.load(Relaxed);
|
|
assert!(num_inc > 0);
|
|
|
|
// Assert that all threads shutdown
|
|
let num_dec = num_dec.load(Relaxed);
|
|
assert_eq!(num_inc, num_dec);
|
|
});
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn force_shutdown_drops_futures() {
|
|
let _ = ::env_logger::try_init();
|
|
|
|
for _ in 0..1_000 {
|
|
let num_inc = Arc::new(AtomicUsize::new(0));
|
|
let num_dec = Arc::new(AtomicUsize::new(0));
|
|
let num_drop = Arc::new(AtomicUsize::new(0));
|
|
|
|
struct Never(Arc<AtomicUsize>);
|
|
|
|
impl Future for Never {
|
|
type Item = ();
|
|
type Error = ();
|
|
|
|
fn poll(&mut self) -> Poll<(), ()> {
|
|
Ok(Async::NotReady)
|
|
}
|
|
}
|
|
|
|
impl Drop for Never {
|
|
fn drop(&mut self) {
|
|
self.0.fetch_add(1, Relaxed);
|
|
}
|
|
}
|
|
|
|
let a = num_inc.clone();
|
|
let b = num_dec.clone();
|
|
|
|
let pool = Builder::new()
|
|
.around_worker(move |w, _| {
|
|
a.fetch_add(1, Relaxed);
|
|
w.run();
|
|
b.fetch_add(1, Relaxed);
|
|
})
|
|
.build();
|
|
let mut tx = pool.sender().clone();
|
|
|
|
tx.spawn(Never(num_drop.clone())).unwrap();
|
|
|
|
// Wait for the pool to shutdown
|
|
pool.shutdown_now().wait().unwrap();
|
|
|
|
// Assert that only a single thread was spawned.
|
|
let a = num_inc.load(Relaxed);
|
|
assert!(a >= 1);
|
|
|
|
// Assert that all threads shutdown
|
|
let b = num_dec.load(Relaxed);
|
|
assert_eq!(a, b);
|
|
|
|
// Assert that the future was dropped
|
|
let c = num_drop.load(Relaxed);
|
|
assert_eq!(c, 1);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn drop_threadpool_drops_futures() {
|
|
let _ = ::env_logger::try_init();
|
|
|
|
for _ in 0..1_000 {
|
|
let num_inc = Arc::new(AtomicUsize::new(0));
|
|
let num_dec = Arc::new(AtomicUsize::new(0));
|
|
let num_drop = Arc::new(AtomicUsize::new(0));
|
|
|
|
struct Never(Arc<AtomicUsize>);
|
|
|
|
impl Future for Never {
|
|
type Item = ();
|
|
type Error = ();
|
|
|
|
fn poll(&mut self) -> Poll<(), ()> {
|
|
Ok(Async::NotReady)
|
|
}
|
|
}
|
|
|
|
impl Drop for Never {
|
|
fn drop(&mut self) {
|
|
self.0.fetch_add(1, Relaxed);
|
|
}
|
|
}
|
|
|
|
let a = num_inc.clone();
|
|
let b = num_dec.clone();
|
|
|
|
let pool = Builder::new()
|
|
.max_blocking(2)
|
|
.pool_size(20)
|
|
.around_worker(move |w, _| {
|
|
a.fetch_add(1, Relaxed);
|
|
w.run();
|
|
b.fetch_add(1, Relaxed);
|
|
})
|
|
.build();
|
|
let mut tx = pool.sender().clone();
|
|
|
|
tx.spawn(Never(num_drop.clone())).unwrap();
|
|
|
|
// Wait for the pool to shutdown
|
|
drop(pool);
|
|
|
|
// Assert that only a single thread was spawned.
|
|
let a = num_inc.load(Relaxed);
|
|
assert!(a >= 1);
|
|
|
|
// Assert that all threads shutdown
|
|
let b = num_dec.load(Relaxed);
|
|
assert_eq!(a, b);
|
|
|
|
// Assert that the future was dropped
|
|
let c = num_drop.load(Relaxed);
|
|
assert_eq!(c, 1);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn many_oneshot_futures() {
|
|
const NUM: usize = 10_000;
|
|
|
|
let _ = ::env_logger::try_init();
|
|
|
|
for _ in 0..50 {
|
|
let pool = ThreadPool::new();
|
|
let mut tx = pool.sender().clone();
|
|
let cnt = Arc::new(AtomicUsize::new(0));
|
|
|
|
for _ in 0..NUM {
|
|
let cnt = cnt.clone();
|
|
tx.spawn(lazy(move || {
|
|
cnt.fetch_add(1, Relaxed);
|
|
Ok(())
|
|
}))
|
|
.unwrap();
|
|
}
|
|
|
|
// Wait for the pool to shutdown
|
|
pool.shutdown().wait().unwrap();
|
|
|
|
let num = cnt.load(Relaxed);
|
|
assert_eq!(num, NUM);
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn many_multishot_futures() {
|
|
use futures::sync::mpsc;
|
|
|
|
const CHAIN: usize = 200;
|
|
const CYCLES: usize = 5;
|
|
const TRACKS: usize = 50;
|
|
|
|
let _ = ::env_logger::try_init();
|
|
|
|
for _ in 0..50 {
|
|
let pool = ThreadPool::new();
|
|
let mut pool_tx = pool.sender().clone();
|
|
|
|
let mut start_txs = Vec::with_capacity(TRACKS);
|
|
let mut final_rxs = Vec::with_capacity(TRACKS);
|
|
|
|
for _ in 0..TRACKS {
|
|
let (start_tx, mut chain_rx) = mpsc::channel(10);
|
|
|
|
for _ in 0..CHAIN {
|
|
let (next_tx, next_rx) = mpsc::channel(10);
|
|
|
|
let rx = chain_rx.map_err(|e| panic!("{:?}", e));
|
|
|
|
// Forward all the messages
|
|
pool_tx
|
|
.spawn(
|
|
next_tx
|
|
.send_all(rx)
|
|
.map(|_| ())
|
|
.map_err(|e| panic!("{:?}", e)),
|
|
)
|
|
.unwrap();
|
|
|
|
chain_rx = next_rx;
|
|
}
|
|
|
|
// This final task cycles if needed
|
|
let (final_tx, final_rx) = mpsc::channel(10);
|
|
let cycle_tx = start_tx.clone();
|
|
let mut rem = CYCLES;
|
|
|
|
let task = chain_rx.take(CYCLES as u64).for_each(move |msg| {
|
|
rem -= 1;
|
|
let send = if rem == 0 {
|
|
final_tx.clone().send(msg)
|
|
} else {
|
|
cycle_tx.clone().send(msg)
|
|
};
|
|
|
|
send.then(|res| {
|
|
res.unwrap();
|
|
Ok(())
|
|
})
|
|
});
|
|
pool_tx.spawn(ignore_results(task)).unwrap();
|
|
|
|
start_txs.push(start_tx);
|
|
final_rxs.push(final_rx);
|
|
}
|
|
|
|
for start_tx in start_txs {
|
|
start_tx.send("ping").wait().unwrap();
|
|
}
|
|
|
|
for final_rx in final_rxs {
|
|
final_rx.wait().next().unwrap().unwrap();
|
|
}
|
|
|
|
// Shutdown the pool
|
|
pool.shutdown().wait().unwrap();
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn global_executor_is_configured() {
|
|
let pool = ThreadPool::new();
|
|
let tx = pool.sender().clone();
|
|
|
|
let (signal_tx, signal_rx) = mpsc::channel();
|
|
|
|
tx.spawn(lazy(move || {
|
|
tokio_executor::spawn(lazy(move || {
|
|
signal_tx.send(()).unwrap();
|
|
Ok(())
|
|
}));
|
|
|
|
Ok(())
|
|
}))
|
|
.unwrap();
|
|
|
|
signal_rx.recv().unwrap();
|
|
|
|
pool.shutdown().wait().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn new_threadpool_is_idle() {
|
|
let pool = ThreadPool::new();
|
|
pool.shutdown_on_idle().wait().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn busy_threadpool_is_not_idle() {
|
|
use futures::sync::oneshot;
|
|
|
|
// let pool = ThreadPool::new();
|
|
let pool = Builder::new().pool_size(4).max_blocking(2).build();
|
|
let tx = pool.sender().clone();
|
|
|
|
let (term_tx, term_rx) = oneshot::channel();
|
|
|
|
tx.spawn(term_rx.then(|_| Ok(()))).unwrap();
|
|
|
|
let mut idle = pool.shutdown_on_idle();
|
|
|
|
struct IdleFut<'a>(&'a mut Shutdown);
|
|
|
|
impl<'a> Future for IdleFut<'a> {
|
|
type Item = ();
|
|
type Error = ();
|
|
fn poll(&mut self) -> Poll<(), ()> {
|
|
assert!(self.0.poll().unwrap().is_not_ready());
|
|
Ok(Async::Ready(()))
|
|
}
|
|
}
|
|
|
|
IdleFut(&mut idle).wait().unwrap();
|
|
|
|
term_tx.send(()).unwrap();
|
|
|
|
idle.wait().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn panic_in_task() {
|
|
let pool = ThreadPool::new();
|
|
let tx = pool.sender().clone();
|
|
|
|
struct Boom;
|
|
|
|
impl Future for Boom {
|
|
type Item = ();
|
|
type Error = ();
|
|
|
|
fn poll(&mut self) -> Poll<(), ()> {
|
|
panic!();
|
|
}
|
|
}
|
|
|
|
impl Drop for Boom {
|
|
fn drop(&mut self) {
|
|
assert!(::std::thread::panicking());
|
|
}
|
|
}
|
|
|
|
tx.spawn(Boom).unwrap();
|
|
|
|
pool.shutdown_on_idle().wait().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn multi_threadpool() {
|
|
use futures::sync::oneshot;
|
|
|
|
let pool1 = ThreadPool::new();
|
|
let pool2 = ThreadPool::new();
|
|
|
|
let (tx, rx) = oneshot::channel();
|
|
let (done_tx, done_rx) = mpsc::channel();
|
|
|
|
pool2.spawn({
|
|
rx.and_then(move |_| {
|
|
done_tx.send(()).unwrap();
|
|
Ok(())
|
|
})
|
|
.map_err(|e| panic!("err={:?}", e))
|
|
});
|
|
|
|
pool1.spawn(lazy(move || {
|
|
tx.send(()).unwrap();
|
|
Ok(())
|
|
}));
|
|
|
|
done_rx.recv().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn eagerly_drops_futures() {
|
|
use futures::future::{empty, lazy, Future};
|
|
use futures::task;
|
|
use std::sync::mpsc;
|
|
|
|
struct NotifyOnDrop(mpsc::Sender<()>);
|
|
|
|
impl Drop for NotifyOnDrop {
|
|
fn drop(&mut self) {
|
|
self.0.send(()).unwrap();
|
|
}
|
|
}
|
|
|
|
struct MyPark {
|
|
inner: DefaultPark,
|
|
#[allow(dead_code)]
|
|
park_tx: mpsc::SyncSender<()>,
|
|
unpark_tx: mpsc::SyncSender<()>,
|
|
}
|
|
|
|
impl Park for MyPark {
|
|
type Unpark = MyUnpark;
|
|
type Error = <DefaultPark as Park>::Error;
|
|
|
|
fn unpark(&self) -> Self::Unpark {
|
|
MyUnpark {
|
|
inner: self.inner.unpark(),
|
|
unpark_tx: self.unpark_tx.clone(),
|
|
}
|
|
}
|
|
|
|
fn park(&mut self) -> Result<(), Self::Error> {
|
|
self.inner.park()
|
|
}
|
|
|
|
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
|
|
self.inner.park_timeout(duration)
|
|
}
|
|
}
|
|
|
|
struct MyUnpark {
|
|
inner: DefaultUnpark,
|
|
#[allow(dead_code)]
|
|
unpark_tx: mpsc::SyncSender<()>,
|
|
}
|
|
|
|
impl Unpark for MyUnpark {
|
|
fn unpark(&self) {
|
|
self.inner.unpark()
|
|
}
|
|
}
|
|
|
|
let (task_tx, task_rx) = mpsc::channel();
|
|
let (drop_tx, drop_rx) = mpsc::channel();
|
|
let (park_tx, park_rx) = mpsc::sync_channel(0);
|
|
let (unpark_tx, unpark_rx) = mpsc::sync_channel(0);
|
|
|
|
// Get the signal that the handler dropped.
|
|
let notify_on_drop = NotifyOnDrop(drop_tx);
|
|
|
|
let pool = tokio_threadpool::Builder::new()
|
|
.custom_park(move |_| MyPark {
|
|
inner: DefaultPark::new(),
|
|
park_tx: park_tx.clone(),
|
|
unpark_tx: unpark_tx.clone(),
|
|
})
|
|
.build();
|
|
|
|
pool.spawn(lazy(move || {
|
|
// Get a handle to the current task.
|
|
let task = task::current();
|
|
|
|
// Send it to the main thread to hold on to.
|
|
task_tx.send(task).unwrap();
|
|
|
|
// This future will never resolve, it is only used to hold on to thee
|
|
// `notify_on_drop` handle.
|
|
empty::<(), ()>().then(move |_| {
|
|
// This code path should never be reached.
|
|
if true {
|
|
panic!()
|
|
}
|
|
|
|
// Explicitly drop `notify_on_drop` here, this is mostly to ensure
|
|
// that the `notify_on_drop` handle gets moved into the task. It
|
|
// will actually get dropped when the runtime is dropped.
|
|
drop(notify_on_drop);
|
|
|
|
Ok(())
|
|
})
|
|
}));
|
|
|
|
// Wait until we get the task handle.
|
|
let task = task_rx.recv().unwrap();
|
|
|
|
// Drop the pool, this should result in futures being forcefully dropped.
|
|
drop(pool);
|
|
|
|
// Make sure `MyPark` and `MyUnpark` were dropped during shutdown.
|
|
assert_eq!(park_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
|
|
assert_eq!(unpark_rx.try_recv(), Err(mpsc::TryRecvError::Disconnected));
|
|
|
|
// If the future is forcefully dropped, then we will get a signal here.
|
|
drop_rx.recv().unwrap();
|
|
|
|
// Ensure `task` lives until after the test completes.
|
|
drop(task);
|
|
}
|