mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-25 12:00:35 +00:00
time: make test-util paused time fully deterministic (#3492)
The time driver stores an Instant internally used as a "base" for future time calculations. Since this is generated as the Runtime is being constructed, it previously always happened before the user had a chance to pause time. The fractional-millisecond variations in the timing around the runtime construction and time pause cause tests running entirely in paused time to be very slightly deterministic, with the time driver advancing time by 1 millisecond more or less depending on how the sub-millisecond components of the `Instant`s involved compared. To avoid this, there is now a new option on `runtime::Builder` which will create a `Runtime` with time "instantly" paused. This, along with a small change to have the time driver use the provided clock as the source for its start time allow totally deterministic tests with paused time.
This commit is contained in:
parent
1c1e0e3fc9
commit
fcb6d041b9
@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration
|
||||
4 | fn main_is_not_async() {}
|
||||
| ^^
|
||||
|
||||
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
|
||||
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
|
||||
--> $DIR/macros_invalid_input.rs:6:15
|
||||
|
|
||||
6 | #[tokio::main(foo)]
|
||||
@ -28,7 +28,7 @@ error: the test function cannot accept arguments
|
||||
16 | async fn test_fn_has_args(_x: u8) {}
|
||||
| ^^^^^^
|
||||
|
||||
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`
|
||||
error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused`
|
||||
--> $DIR/macros_invalid_input.rs:18:15
|
||||
|
|
||||
18 | #[tokio::test(foo)]
|
||||
|
@ -25,6 +25,7 @@ impl RuntimeFlavor {
|
||||
struct FinalConfig {
|
||||
flavor: RuntimeFlavor,
|
||||
worker_threads: Option<usize>,
|
||||
start_paused: Option<bool>,
|
||||
}
|
||||
|
||||
struct Configuration {
|
||||
@ -32,6 +33,7 @@ struct Configuration {
|
||||
default_flavor: RuntimeFlavor,
|
||||
flavor: Option<RuntimeFlavor>,
|
||||
worker_threads: Option<(usize, Span)>,
|
||||
start_paused: Option<(bool, Span)>,
|
||||
}
|
||||
|
||||
impl Configuration {
|
||||
@ -44,6 +46,7 @@ impl Configuration {
|
||||
},
|
||||
flavor: None,
|
||||
worker_threads: None,
|
||||
start_paused: None,
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,31 +82,57 @@ impl Configuration {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> {
|
||||
if self.start_paused.is_some() {
|
||||
return Err(syn::Error::new(span, "`start_paused` set multiple times."));
|
||||
}
|
||||
|
||||
let start_paused = parse_bool(start_paused, span, "start_paused")?;
|
||||
self.start_paused = Some((start_paused, span));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn build(&self) -> Result<FinalConfig, syn::Error> {
|
||||
let flavor = self.flavor.unwrap_or(self.default_flavor);
|
||||
use RuntimeFlavor::*;
|
||||
match (flavor, self.worker_threads) {
|
||||
(CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new(
|
||||
worker_threads_span,
|
||||
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
|
||||
)),
|
||||
(CurrentThread, None) => Ok(FinalConfig {
|
||||
flavor,
|
||||
worker_threads: None,
|
||||
}),
|
||||
(Threaded, worker_threads) if self.rt_multi_thread_available => Ok(FinalConfig {
|
||||
flavor,
|
||||
worker_threads: worker_threads.map(|(val, _span)| val),
|
||||
}),
|
||||
|
||||
let worker_threads = match (flavor, self.worker_threads) {
|
||||
(CurrentThread, Some((_, worker_threads_span))) => {
|
||||
return Err(syn::Error::new(
|
||||
worker_threads_span,
|
||||
"The `worker_threads` option requires the `multi_thread` runtime flavor.",
|
||||
))
|
||||
}
|
||||
(CurrentThread, None) => None,
|
||||
(Threaded, worker_threads) if self.rt_multi_thread_available => {
|
||||
worker_threads.map(|(val, _span)| val)
|
||||
}
|
||||
(Threaded, _) => {
|
||||
let msg = if self.flavor.is_none() {
|
||||
"The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled."
|
||||
} else {
|
||||
"The runtime flavor `multi_thread` requires the `rt-multi-thread` feature."
|
||||
};
|
||||
Err(syn::Error::new(Span::call_site(), msg))
|
||||
return Err(syn::Error::new(Span::call_site(), msg));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let start_paused = match (flavor, self.start_paused) {
|
||||
(Threaded, Some((_, start_paused_span))) => {
|
||||
return Err(syn::Error::new(
|
||||
start_paused_span,
|
||||
"The `start_paused` option requires the `current_thread` runtime flavor.",
|
||||
));
|
||||
}
|
||||
(CurrentThread, Some((start_paused, _))) => Some(start_paused),
|
||||
(_, None) => None,
|
||||
};
|
||||
|
||||
Ok(FinalConfig {
|
||||
flavor,
|
||||
worker_threads,
|
||||
start_paused,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,6 +163,16 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result<String, syn::E
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_bool(bool: syn::Lit, span: Span, field: &str) -> Result<bool, syn::Error> {
|
||||
match bool {
|
||||
syn::Lit::Bool(b) => Ok(b.value),
|
||||
_ => Err(syn::Error::new(
|
||||
span,
|
||||
format!("Failed to parse {} as bool.", field),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_knobs(
|
||||
mut input: syn::ItemFn,
|
||||
args: syn::AttributeArgs,
|
||||
@ -174,6 +213,9 @@ fn parse_knobs(
|
||||
"flavor" => {
|
||||
config.set_flavor(namevalue.lit.clone(), namevalue.span())?;
|
||||
}
|
||||
"start_paused" => {
|
||||
config.set_start_paused(namevalue.lit.clone(), namevalue.span())?;
|
||||
}
|
||||
"core_threads" => {
|
||||
let msg = "Attribute `core_threads` is renamed to `worker_threads`";
|
||||
return Err(syn::Error::new_spanned(namevalue, msg));
|
||||
@ -204,11 +246,11 @@ fn parse_knobs(
|
||||
macro_name
|
||||
)
|
||||
}
|
||||
"flavor" | "worker_threads" => {
|
||||
"flavor" | "worker_threads" | "start_paused" => {
|
||||
format!("The `{}` attribute requires an argument.", name)
|
||||
}
|
||||
name => {
|
||||
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name)
|
||||
format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`", name)
|
||||
}
|
||||
};
|
||||
return Err(syn::Error::new_spanned(path, msg));
|
||||
@ -235,6 +277,9 @@ fn parse_knobs(
|
||||
if let Some(v) = config.worker_threads {
|
||||
rt = quote! { #rt.worker_threads(#v) };
|
||||
}
|
||||
if let Some(v) = config.start_paused {
|
||||
rt = quote! { #rt.start_paused(#v) };
|
||||
}
|
||||
|
||||
let header = {
|
||||
if is_test {
|
||||
|
@ -144,6 +144,30 @@ use proc_macro::TokenStream;
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// ### Configure the runtime to start with time paused
|
||||
///
|
||||
/// ```rust
|
||||
/// #[tokio::main(flavor = "current_thread", start_paused = true)]
|
||||
/// async fn main() {
|
||||
/// println!("Hello world");
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Equivalent code not using `#[tokio::main]`
|
||||
///
|
||||
/// ```rust
|
||||
/// fn main() {
|
||||
/// tokio::runtime::Builder::new_current_thread()
|
||||
/// .enable_all()
|
||||
/// .start_paused(true)
|
||||
/// .build()
|
||||
/// .unwrap()
|
||||
/// .block_on(async {
|
||||
/// println!("Hello world");
|
||||
/// })
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// ### NOTE:
|
||||
///
|
||||
/// If you rename the Tokio crate in your dependencies this macro will not work.
|
||||
@ -225,6 +249,15 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream {
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// ### Configure the runtime to start with time paused
|
||||
///
|
||||
/// ```no_run
|
||||
/// #[tokio::test(start_paused = true)]
|
||||
/// async fn my_test() {
|
||||
/// assert!(true);
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// ### NOTE:
|
||||
///
|
||||
/// If you rename the Tokio crate in your dependencies this macro will not work.
|
||||
|
@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" }
|
||||
tokio-stream = { version = "0.1", path = "../tokio-stream" }
|
||||
futures = { version = "0.3.0", features = ["async-await"] }
|
||||
proptest = "0.10.0"
|
||||
rand = "0.8.0"
|
||||
tempfile = "3.1.0"
|
||||
async-stream = "0.3"
|
||||
|
||||
|
@ -47,6 +47,9 @@ pub struct Builder {
|
||||
/// Whether or not to enable the time driver
|
||||
enable_time: bool,
|
||||
|
||||
/// Whether or not the clock should start paused.
|
||||
start_paused: bool,
|
||||
|
||||
/// The number of worker threads, used by Runtime.
|
||||
///
|
||||
/// Only used when not using the current-thread executor.
|
||||
@ -110,6 +113,9 @@ impl Builder {
|
||||
// Time defaults to "off"
|
||||
enable_time: false,
|
||||
|
||||
// The clock starts not-paused
|
||||
start_paused: false,
|
||||
|
||||
// Default to lazy auto-detection (one thread per CPU core)
|
||||
worker_threads: None,
|
||||
|
||||
@ -386,6 +392,7 @@ impl Builder {
|
||||
},
|
||||
enable_io: self.enable_io,
|
||||
enable_time: self.enable_time,
|
||||
start_paused: self.start_paused,
|
||||
}
|
||||
}
|
||||
|
||||
@ -489,6 +496,31 @@ cfg_time! {
|
||||
}
|
||||
}
|
||||
|
||||
cfg_test_util! {
|
||||
impl Builder {
|
||||
/// Controls if the runtime's clock starts paused or advancing.
|
||||
///
|
||||
/// Pausing time requires the current-thread runtime; construction of
|
||||
/// the runtime will panic otherwise.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use tokio::runtime;
|
||||
///
|
||||
/// let rt = runtime::Builder::new_current_thread()
|
||||
/// .enable_time()
|
||||
/// .start_paused(true)
|
||||
/// .build()
|
||||
/// .unwrap();
|
||||
/// ```
|
||||
pub fn start_paused(&mut self, start_paused: bool) -> &mut Self {
|
||||
self.start_paused = start_paused;
|
||||
self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_rt_multi_thread! {
|
||||
impl Builder {
|
||||
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
|
||||
|
@ -103,8 +103,8 @@ cfg_time! {
|
||||
pub(crate) type Clock = crate::time::Clock;
|
||||
pub(crate) type TimeHandle = Option<crate::time::driver::Handle>;
|
||||
|
||||
fn create_clock(enable_pausing: bool) -> Clock {
|
||||
crate::time::Clock::new(enable_pausing)
|
||||
fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock {
|
||||
crate::time::Clock::new(enable_pausing, start_paused)
|
||||
}
|
||||
|
||||
fn create_time_driver(
|
||||
@ -131,7 +131,7 @@ cfg_not_time! {
|
||||
pub(crate) type Clock = ();
|
||||
pub(crate) type TimeHandle = ();
|
||||
|
||||
fn create_clock(_enable_pausing: bool) -> Clock {
|
||||
fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock {
|
||||
()
|
||||
}
|
||||
|
||||
@ -162,13 +162,15 @@ pub(crate) struct Cfg {
|
||||
pub(crate) enable_io: bool,
|
||||
pub(crate) enable_time: bool,
|
||||
pub(crate) enable_pause_time: bool,
|
||||
pub(crate) start_paused: bool,
|
||||
}
|
||||
|
||||
impl Driver {
|
||||
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> {
|
||||
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
|
||||
|
||||
let clock = create_clock(cfg.enable_pause_time);
|
||||
let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);
|
||||
|
||||
let (time_driver, time_handle) =
|
||||
create_time_driver(cfg.enable_time, io_stack, clock.clone());
|
||||
|
||||
|
@ -17,7 +17,7 @@ cfg_not_test_util! {
|
||||
}
|
||||
|
||||
impl Clock {
|
||||
pub(crate) fn new(_enable_pausing: bool) -> Clock {
|
||||
pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock {
|
||||
Clock {}
|
||||
}
|
||||
|
||||
@ -78,7 +78,8 @@ cfg_test_util! {
|
||||
/// that depend on time.
|
||||
///
|
||||
/// Pausing time requires the `current_thread` Tokio runtime. This is the
|
||||
/// default runtime used by `#[tokio::test]`
|
||||
/// default runtime used by `#[tokio::test]`. The runtime can be initialized
|
||||
/// with time in a paused state using the `Builder::start_paused` method.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
@ -149,16 +150,22 @@ cfg_test_util! {
|
||||
impl Clock {
|
||||
/// Return a new `Clock` instance that uses the current execution context's
|
||||
/// source of time.
|
||||
pub(crate) fn new(enable_pausing: bool) -> Clock {
|
||||
pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock {
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
Clock {
|
||||
let clock = Clock {
|
||||
inner: Arc::new(Mutex::new(Inner {
|
||||
enable_pausing,
|
||||
base: now,
|
||||
unfrozen: Some(now),
|
||||
})),
|
||||
};
|
||||
|
||||
if start_paused {
|
||||
clock.pause();
|
||||
}
|
||||
|
||||
clock
|
||||
}
|
||||
|
||||
pub(crate) fn pause(&self) {
|
||||
|
@ -102,8 +102,8 @@ pub(self) struct ClockTime {
|
||||
impl ClockTime {
|
||||
pub(self) fn new(clock: Clock) -> Self {
|
||||
Self {
|
||||
start_time: clock.now(),
|
||||
clock,
|
||||
start_time: super::clock::now(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) {
|
||||
#[test]
|
||||
fn single_timer() {
|
||||
model(|| {
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
@ -72,7 +72,7 @@ fn single_timer() {
|
||||
#[test]
|
||||
fn drop_timer() {
|
||||
model(|| {
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
@ -103,7 +103,7 @@ fn drop_timer() {
|
||||
#[test]
|
||||
fn change_waker() {
|
||||
model(|| {
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
@ -138,7 +138,7 @@ fn reset_future() {
|
||||
model(|| {
|
||||
let finished_early = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
||||
let inner = super::Inner::new(time_source.clone(), MockUnpark::mock());
|
||||
@ -185,7 +185,7 @@ fn reset_future() {
|
||||
#[test]
|
||||
#[cfg(not(loom))]
|
||||
fn poll_process_levels() {
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
clock.pause();
|
||||
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
@ -226,7 +226,7 @@ fn poll_process_levels() {
|
||||
fn poll_process_levels_targeted() {
|
||||
let mut context = Context::from_waker(noop_waker_ref());
|
||||
|
||||
let clock = crate::time::clock::Clock::new(true);
|
||||
let clock = crate::time::clock::Clock::new(true, false);
|
||||
clock.pause();
|
||||
|
||||
let time_source = super::ClockTime::new(clock.clone());
|
||||
|
@ -24,7 +24,7 @@ cfg_rt! {
|
||||
}
|
||||
|
||||
cfg_rt_multi_thread! {
|
||||
pub(crate) use rand::FastRand;
|
||||
pub(crate) use self::rand::FastRand;
|
||||
|
||||
mod try_lock;
|
||||
pub(crate) use try_lock::TryLock;
|
||||
@ -34,7 +34,7 @@ pub(crate) mod trace;
|
||||
|
||||
#[cfg(any(feature = "macros"))]
|
||||
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
|
||||
pub use rand::thread_rng_n;
|
||||
pub use self::rand::thread_rng_n;
|
||||
|
||||
#[cfg(any(
|
||||
feature = "rt",
|
||||
|
@ -1,6 +1,9 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
#![cfg(feature = "full")]
|
||||
|
||||
use rand::SeedableRng;
|
||||
use rand::{rngs::StdRng, Rng};
|
||||
use tokio::time::{self, Duration, Instant};
|
||||
use tokio_test::assert_err;
|
||||
|
||||
#[tokio::test]
|
||||
@ -31,3 +34,26 @@ async fn pause_time_in_spawn_threads() {
|
||||
|
||||
assert_err!(t.await);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn paused_time_is_deterministic() {
|
||||
let run_1 = paused_time_stress_run();
|
||||
let run_2 = paused_time_stress_run();
|
||||
|
||||
assert_eq!(run_1, run_2);
|
||||
}
|
||||
|
||||
#[tokio::main(flavor = "current_thread", start_paused = true)]
|
||||
async fn paused_time_stress_run() -> Vec<Duration> {
|
||||
let mut rng = StdRng::seed_from_u64(1);
|
||||
|
||||
let mut times = vec![];
|
||||
let start = Instant::now();
|
||||
for _ in 0..10_000 {
|
||||
let sleep = rng.gen_range(Duration::from_secs(0)..Duration::from_secs(1));
|
||||
time::sleep(sleep).await;
|
||||
times.push(start.elapsed());
|
||||
}
|
||||
|
||||
times
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user