#![warn(rust_2018_idioms)] #![feature(async_await)] use tokio::sync::oneshot; use tokio_executor::current_thread::{self, block_on_all, CurrentThread, TaskExecutor}; use tokio_executor::TypedExecutor; use std::any::Any; use std::cell::{Cell, RefCell}; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use std::thread; use std::time::Duration; mod from_block_on_all { use super::*; fn test>>) + 'static>(spawn: F) { let cnt = Rc::new(Cell::new(0)); let c = cnt.clone(); let msg = block_on_all(async move { c.set(1 + c.get()); // Spawn! spawn(Box::pin(async move { c.set(1 + c.get()); })); "hello" }); assert_eq!(2, cnt.get()); assert_eq!(msg, "hello"); } #[test] fn spawn() { test(current_thread::spawn) } #[test] fn execute() { test(|f| { TaskExecutor::current().spawn(f).unwrap(); }); } } #[test] fn block_waits() { let (tx, rx) = oneshot::channel(); thread::spawn(|| { thread::sleep(Duration::from_millis(1000)); tx.send(()).unwrap(); }); let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); block_on_all(async move { rx.await.unwrap(); cnt.set(1 + cnt.get()); }); assert_eq!(1, cnt2.get()); } #[test] fn spawn_many() { const ITER: usize = 200; let cnt = Rc::new(Cell::new(0)); let mut tokio_current_thread = CurrentThread::new(); for _ in 0..ITER { let cnt = cnt.clone(); tokio_current_thread.spawn(async move { cnt.set(1 + cnt.get()); }); } tokio_current_thread.run().unwrap(); assert_eq!(cnt.get(), ITER); } mod does_not_set_global_executor_by_default { use super::*; fn test + Send>>) -> Result<(), E> + 'static, E>( spawn: F, ) { block_on_all(async { spawn(Box::pin(async {})).unwrap_err(); }); } #[test] fn spawn() { test(|f| tokio_executor::DefaultExecutor::current().spawn(f)) } } mod from_block_on_future { use super::*; fn test>>)>(spawn: F) { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); let mut tokio_current_thread = CurrentThread::new(); tokio_current_thread.block_on(async move { let cnt3 = cnt2.clone(); spawn(Box::pin(async move { cnt3.set(1 + cnt3.get()); })); }); tokio_current_thread.run().unwrap(); assert_eq!(1, cnt.get()); } #[test] fn spawn() { test(current_thread::spawn); } #[test] fn execute() { test(|f| { current_thread::TaskExecutor::current().spawn(f).unwrap(); }); } } mod outstanding_tasks_are_dropped_when_executor_is_dropped { use super::*; async fn never(_rc: Rc<()>) { loop { yield_once().await; } } fn test(spawn: F, dotspawn: G) where F: Fn(Pin>>) + 'static, G: Fn(&mut CurrentThread, Pin>>), { let mut rc = Rc::new(()); let mut tokio_current_thread = CurrentThread::new(); dotspawn(&mut tokio_current_thread, Box::pin(never(rc.clone()))); drop(tokio_current_thread); // Ensure the daemon is dropped assert!(Rc::get_mut(&mut rc).is_some()); // Using the global spawn fn let mut rc = Rc::new(()); let rc2 = rc.clone(); let mut tokio_current_thread = CurrentThread::new(); tokio_current_thread.block_on(async move { spawn(Box::pin(never(rc2))); }); drop(tokio_current_thread); // Ensure the daemon is dropped assert!(Rc::get_mut(&mut rc).is_some()); } #[test] fn spawn() { test(current_thread::spawn, |rt, f| { rt.spawn(f); }) } #[test] fn execute() { test( |f| { current_thread::TaskExecutor::current().spawn(f).unwrap(); }, // Note: `CurrentThread` doesn't currently implement // `futures::Executor`, so we'll call `.spawn(...)` rather than // `.execute(...)` for now. If `CurrentThread` is changed to // implement Executor, change this to `.execute(...).unwrap()`. |rt, f| { rt.spawn(f); }, ); } } #[test] #[should_panic] fn nesting_run() { block_on_all(async { block_on_all(async {}); }); } mod run_in_future { use super::*; #[test] #[should_panic] fn spawn() { block_on_all(async { current_thread::spawn(async { block_on_all(async {}); }); }); } #[test] #[should_panic] fn execute() { block_on_all(async { current_thread::TaskExecutor::current() .spawn(async { block_on_all(async {}); }) .unwrap(); }); } } #[test] fn tick_on_infini_future() { let num = Rc::new(Cell::new(0)); async fn infini(num: Rc>) { loop { num.set(1 + num.get()); yield_once().await } } CurrentThread::new() .spawn(infini(num.clone())) .turn(None) .unwrap(); assert_eq!(1, num.get()); } mod tasks_are_scheduled_fairly { use super::*; async fn spin(state: Rc>, idx: usize) { loop { // borrow_mut scope { let mut state = state.borrow_mut(); if idx == 0 { let diff = state[0] - state[1]; assert!(diff.abs() <= 1); if state[0] >= 50 { return; } } state[idx] += 1; if state[idx] >= 100 { return; } } yield_once().await; } } fn test>>)>(spawn: F) { let state = Rc::new(RefCell::new([0, 0])); block_on_all(async move { spawn(Box::pin(spin(state.clone(), 0))); spawn(Box::pin(spin(state, 1))); }); } #[test] fn spawn() { test(current_thread::spawn) } #[test] fn execute() { test(|f| { current_thread::TaskExecutor::current().spawn(f).unwrap(); }) } } mod and_turn { use super::*; fn test(spawn: F, dotspawn: G) where F: Fn(Pin>>) + 'static, G: Fn(&mut CurrentThread, Pin>>), { let cnt = Rc::new(Cell::new(0)); let c = cnt.clone(); let mut tokio_current_thread = CurrentThread::new(); // Spawn a basic task to get the executor to turn dotspawn(&mut tokio_current_thread, Box::pin(async {})); // Turn once... tokio_current_thread.turn(None).unwrap(); dotspawn( &mut tokio_current_thread, Box::pin(async move { c.set(1 + c.get()); // Spawn! spawn(Box::pin(async move { c.set(1 + c.get()); })); }), ); // This does not run the newly spawned thread tokio_current_thread.turn(None).unwrap(); assert_eq!(1, cnt.get()); // This runs the newly spawned thread tokio_current_thread.turn(None).unwrap(); assert_eq!(2, cnt.get()); } #[test] fn spawn() { test(current_thread::spawn, |rt, f| { rt.spawn(f); }) } #[test] fn execute() { test( |f| { current_thread::TaskExecutor::current().spawn(f).unwrap(); }, // Note: `CurrentThread` doesn't currently implement // `futures::Executor`, so we'll call `.spawn(...)` rather than // `.execute(...)` for now. If `CurrentThread` is changed to // implement Executor, change this to `.execute(...).unwrap()`. |rt, f| { rt.spawn(f); }, ); } } mod in_drop { use super::*; struct OnDrop(Option); impl Drop for OnDrop { fn drop(&mut self) { (self.0.take().unwrap())(); } } async fn noop(_data: Box) {} fn test(spawn: F, dotspawn: G) where F: Fn(Pin>>) + 'static, G: Fn(&mut CurrentThread, Pin>>), { let mut tokio_current_thread = CurrentThread::new(); let (tx, rx) = oneshot::channel(); dotspawn( &mut tokio_current_thread, Box::pin(noop(Box::new(OnDrop(Some(move || { spawn(Box::pin(async move { tx.send(()).unwrap(); })); }))))), ); tokio_current_thread.block_on(rx).unwrap(); tokio_current_thread.run().unwrap(); } #[test] fn spawn() { test(current_thread::spawn, |rt, f| { rt.spawn(f); }) } #[test] fn execute() { test( |f| { current_thread::TaskExecutor::current().spawn(f).unwrap(); }, // Note: `CurrentThread` doesn't currently implement // `futures::Executor`, so we'll call `.spawn(...)` rather than // `.execute(...)` for now. If `CurrentThread` is changed to // implement Executor, change this to `.execute(...).unwrap()`. |rt, f| { rt.spawn(f); }, ); } } /* #[test] fn hammer_turn() { use futures::sync::mpsc; const ITER: usize = 100; const N: usize = 100; const THREADS: usize = 4; for _ in 0..ITER { let mut ths = vec![]; // Add some jitter for _ in 0..THREADS { let th = thread::spawn(|| { let mut tokio_current_thread = CurrentThread::new(); let (tx, rx) = mpsc::unbounded(); tokio_current_thread.spawn({ let cnt = Rc::new(Cell::new(0)); let c = cnt.clone(); rx.for_each(move |_| { c.set(1 + c.get()); Ok(()) }) .map_err(|e| panic!("err={:?}", e)) .map(move |v| { assert_eq!(N, cnt.get()); v }) }); thread::spawn(move || { for _ in 0..N { tx.unbounded_send(()).unwrap(); thread::yield_now(); } }); while !tokio_current_thread.is_idle() { tokio_current_thread.turn(None).unwrap(); } }); ths.push(th); } for th in ths { th.join().unwrap(); } } } */ #[test] fn turn_has_polled() { let mut tokio_current_thread = CurrentThread::new(); // Spawn oneshot receiver let (sender, receiver) = oneshot::channel::<()>(); tokio_current_thread.spawn(async move { let _ = receiver.await; }); // Turn once... let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); // Should've polled the receiver once, but considered it not ready assert!(res.has_polled()); // Turn another time let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); // Should've polled nothing, the receiver is not ready yet assert!(!res.has_polled()); // Make the receiver ready sender.send(()).unwrap(); // Turn another time let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); // Should've polled the receiver, it's ready now assert!(res.has_polled()); // Now the executor should be empty assert!(tokio_current_thread.is_idle()); let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); // So should've polled nothing assert!(!res.has_polled()); } // Our own mock Park that is never really waiting and the only // thing it does is to send, on request, something (once) to a oneshot // channel struct MyPark { sender: Option>, send_now: Rc>, } struct MyUnpark; impl tokio_executor::park::Park for MyPark { type Unpark = MyUnpark; type Error = (); fn unpark(&self) -> Self::Unpark { MyUnpark } fn park(&mut self) -> Result<(), Self::Error> { // If called twice with send_now, this will intentionally panic if self.send_now.get() { self.sender.take().unwrap().send(()).unwrap(); } Ok(()) } fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> { self.park() } } impl tokio_executor::park::Unpark for MyUnpark { fn unpark(&self) {} } #[test] fn turn_fair() { let send_now = Rc::new(Cell::new(false)); let (sender, receiver) = oneshot::channel::<()>(); let (sender_2, receiver_2) = oneshot::channel::<()>(); let (sender_3, receiver_3) = oneshot::channel::<()>(); let my_park = MyPark { sender: Some(sender_3), send_now: send_now.clone(), }; let mut tokio_current_thread = CurrentThread::new_with_park(my_park); let receiver_1_done = Rc::new(Cell::new(false)); let receiver_1_done_clone = receiver_1_done.clone(); // Once an item is received on the oneshot channel, it will immediately // immediately make the second oneshot channel ready tokio_current_thread.spawn(async move { receiver.await.unwrap(); sender_2.send(()).unwrap(); receiver_1_done_clone.set(true); }); let receiver_2_done = Rc::new(Cell::new(false)); let receiver_2_done_clone = receiver_2_done.clone(); tokio_current_thread.spawn(async move { receiver_2.await.unwrap(); receiver_2_done_clone.set(true); }); // The third receiver is only woken up from our Park implementation, it simulates // e.g. a socket that first has to be polled to know if it is ready now let receiver_3_done = Rc::new(Cell::new(false)); let receiver_3_done_clone = receiver_3_done.clone(); tokio_current_thread.spawn(async move { receiver_3.await.unwrap(); receiver_3_done_clone.set(true); }); // First turn should've polled both and considered them not ready let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); assert!(res.has_polled()); // Next turn should've polled nothing let res = tokio_current_thread .turn(Some(Duration::from_millis(0))) .unwrap(); assert!(!res.has_polled()); assert!(!receiver_1_done.get()); assert!(!receiver_2_done.get()); assert!(!receiver_3_done.get()); // After this the receiver future will wake up the second receiver future, // so there are pending futures again sender.send(()).unwrap(); // Now the first receiver should be done, the second receiver should be ready // to be polled again and the socket not yet let res = tokio_current_thread.turn(None).unwrap(); assert!(res.has_polled()); assert!(receiver_1_done.get()); assert!(!receiver_2_done.get()); assert!(!receiver_3_done.get()); // Now let our park implementation know that it should send something to sender 3 send_now.set(true); // This should resolve the second receiver directly, but also poll the socket // and read the packet from it. If it didn't do both here, we would handle // futures that are woken up from the reactor and directly unfairly and would // favour the ones that are woken up directly. let res = tokio_current_thread.turn(None).unwrap(); assert!(res.has_polled()); assert!(receiver_1_done.get()); assert!(receiver_2_done.get()); assert!(receiver_3_done.get()); // Don't send again send_now.set(false); // Now we should be idle and turning should not poll anything assert!(tokio_current_thread.is_idle()); let res = tokio_current_thread.turn(None).unwrap(); assert!(!res.has_polled()); } #[test] fn spawn_from_other_thread() { let mut current_thread = CurrentThread::new(); let handle = current_thread.handle(); let (sender, receiver) = oneshot::channel::<()>(); thread::spawn(move || { handle .spawn(async move { sender.send(()).unwrap(); }) .unwrap(); }); let _ = current_thread.block_on(receiver).unwrap(); } #[test] fn spawn_from_other_thread_unpark() { use std::sync::mpsc::channel as mpsc_channel; let mut current_thread = CurrentThread::new(); let handle = current_thread.handle(); let (sender_1, receiver_1) = oneshot::channel::<()>(); let (sender_2, receiver_2) = mpsc_channel::<()>(); thread::spawn(move || { let _ = receiver_2.recv().unwrap(); handle .spawn(async move { sender_1.send(()).unwrap(); }) .unwrap(); }); // Ensure that unparking the executor works correctly. It will first // check if there are new futures (there are none), then execute the // lazy future below which will cause the future to be spawned from // the other thread. Then the executor will park but should be woken // up because *now* we have a new future to schedule let _ = current_thread.block_on(async move { // inlined 'lazy' async move { sender_2.send(()).unwrap(); } .await; receiver_1.await.unwrap(); }); } #[test] fn spawn_from_executor_with_handle() { let mut current_thread = CurrentThread::new(); let handle = current_thread.handle(); let (tx, rx) = oneshot::channel(); current_thread.spawn(async move { handle .spawn(async move { tx.send(()).unwrap(); }) .unwrap(); }); current_thread.block_on(rx).unwrap(); } #[test] fn handle_status() { let current_thread = CurrentThread::new(); let handle = current_thread.handle(); assert!(handle.status().is_ok()); drop(current_thread); assert!(handle.spawn(async { () }).is_err()); assert!(handle.status().is_err()); } #[test] fn handle_is_sync() { let current_thread = CurrentThread::new(); let handle = current_thread.handle(); let _box: Box = Box::new(handle); } async fn yield_once() { YieldOnce(false).await } struct YieldOnce(bool); impl Future for YieldOnce { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if self.0 { Poll::Ready(()) } else { self.0 = true; // Push to the back of the executor's queue cx.waker().wake_by_ref(); Poll::Pending } } }