From 57ba37c97854d32e691ea68006c8d69d58c79b23 Mon Sep 17 00:00:00 2001 From: Tudor Sidea Date: Tue, 24 Mar 2020 07:20:07 +0200 Subject: [PATCH] time: fix repeated pause/resume of time (#2253) The resume function was breaking the guarantee that Instants should never be less than any previously measured Instants when created. Altered the pause and resume function such that they will not break this guarantee. After resume, the time should continue from where it left off. Created test to prove that the advanced function still works as expected. Added additional tests for the pause/advance/resume functions. --- tokio/src/time/clock.rs | 88 +++++++++++++----------------- tokio/src/time/driver/mod.rs | 4 +- tokio/src/time/tests/test_delay.rs | 77 +++++++++++++++++--------- tokio/tests/test_clock.rs | 50 +++++++++++++++++ 4 files changed, 140 insertions(+), 79 deletions(-) create mode 100644 tokio/tests/test_clock.rs diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index bd3045a93..4ac24af3d 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -23,7 +23,7 @@ cfg_not_test_util! { now() } - pub(crate) fn is_frozen(&self) -> bool { + pub(crate) fn is_paused(&self) -> bool { false } @@ -41,16 +41,16 @@ cfg_test_util! { /// A handle to a source of time. #[derive(Debug, Clone)] pub(crate) struct Clock { - inner: Arc, + inner: Arc>, } #[derive(Debug)] struct Inner { - /// Instant at which the clock was created - start: std::time::Instant, + /// Instant to use as the clock's base instant. + base: std::time::Instant, - /// Current, "frozen" time as an offset from `start`. - frozen: Mutex>, + /// Instant at which the clock was last unfrozen + unfrozen: Option, } /// Pause time @@ -65,11 +65,7 @@ cfg_test_util! { /// runtime. pub fn pause() { let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut frozen = clock.inner.frozen.lock().unwrap(); - if frozen.is_some() { - panic!("time is already frozen"); - } - *frozen = Some(clock.inner.start.elapsed()); + clock.pause(); } /// Resume time @@ -83,13 +79,13 @@ cfg_test_util! { /// runtime. pub fn resume() { let clock = context::clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut frozen = clock.inner.frozen.lock().unwrap(); + let mut inner = clock.inner.lock().unwrap(); - if frozen.is_none() { + if inner.unfrozen.is_some() { panic!("time is not frozen"); } - *frozen = None; + inner.unfrozen = Some(std::time::Instant::now()); } /// Advance time @@ -110,11 +106,7 @@ cfg_test_util! { /// Return the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { if let Some(clock) = context::clock() { - if let Some(frozen) = *clock.inner.frozen.lock().unwrap() { - Instant::from_std(clock.inner.start + frozen) - } else { - Instant::from_std(std::time::Instant::now()) - } + clock.now() } else { Instant::from_std(std::time::Instant::now()) } @@ -124,53 +116,49 @@ cfg_test_util! { /// Return a new `Clock` instance that uses the current execution context's /// source of time. pub(crate) fn new() -> Clock { + let now = std::time::Instant::now(); + Clock { - inner: Arc::new(Inner { - start: std::time::Instant::now(), - frozen: Mutex::new(None), - }), + inner: Arc::new(Mutex::new(Inner { + base: now, + unfrozen: Some(now), + })), } } - // TODO: delete this. Some tests rely on this - #[cfg(all(test, not(loom)))] - /// Return a new `Clock` instance that uses the current execution context's - /// source of time. - pub(crate) fn new_frozen() -> Clock { - Clock { - inner: Arc::new(Inner { - start: std::time::Instant::now(), - frozen: Mutex::new(Some(Duration::from_millis(0))), - }), - } + pub(crate) fn pause(&self) { + let mut inner = self.inner.lock().unwrap(); + + let elapsed = inner.unfrozen.as_ref().expect("time is already frozen").elapsed(); + inner.base += elapsed; + inner.unfrozen = None; } - pub(crate) fn is_frozen(&self) -> bool { - self.inner.frozen.lock().unwrap().is_some() + pub(crate) fn is_paused(&self) -> bool { + let inner = self.inner.lock().unwrap(); + inner.unfrozen.is_none() } pub(crate) fn advance(&self, duration: Duration) { - let mut frozen = self.inner.frozen.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); - if let Some(ref mut elapsed) = *frozen { - *elapsed += duration; - } else { + if inner.unfrozen.is_some() { panic!("time is not frozen"); } - } - // TODO: delete this as well - #[cfg(all(test, not(loom)))] - pub(crate) fn advanced(&self) -> Duration { - self.inner.frozen.lock().unwrap().unwrap() + inner.base += duration; } pub(crate) fn now(&self) -> Instant { - Instant::from_std(if let Some(frozen) = *self.inner.frozen.lock().unwrap() { - self.inner.start + frozen - } else { - std::time::Instant::now() - }) + let inner = self.inner.lock().unwrap(); + + let mut ret = inner.base; + + if let Some(unfrozen) = inner.unfrozen { + ret += unfrozen.elapsed(); + } + + Instant::from_std(ret) } } } diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 914443416..bb45cb7e6 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -247,7 +247,7 @@ where if deadline > now { let dur = deadline - now; - if self.clock.is_frozen() { + if self.clock.is_paused() { self.park.park_timeout(Duration::from_secs(0))?; self.clock.advance(dur); } else { @@ -278,7 +278,7 @@ where if deadline > now { let duration = cmp::min(deadline - now, duration); - if self.clock.is_frozen() { + if self.clock.is_paused() { self.park.park_timeout(Duration::from_secs(0))?; self.clock.advance(duration); } else { diff --git a/tokio/src/time/tests/test_delay.rs b/tokio/src/time/tests/test_delay.rs index 388f9b8b3..f843434be 100644 --- a/tokio/src/time/tests/test_delay.rs +++ b/tokio/src/time/tests/test_delay.rs @@ -16,9 +16,20 @@ macro_rules! poll { }; } +#[test] +fn frozen_utility_returns_correct_advanced_duration() { + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); + + clock.advance(ms(10)); + assert_eq!(clock.now() - start, ms(10)); +} + #[test] fn immediate_delay() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); let when = clock.now(); let mut e = task::spawn(delay_until(&handle, when)); @@ -28,13 +39,12 @@ fn immediate_delay() { assert_ok!(driver.park_timeout(Duration::from_millis(1000))); // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.advanced(), ms(1000)); + assert_eq!(clock.now() - start, ms(1000)); } #[test] fn delayed_delay_level_0() { let (mut driver, clock, handle) = setup(); - let start = clock.now(); for &i in &[1, 10, 60] { @@ -45,7 +55,7 @@ fn delayed_delay_level_0() { assert_pending!(poll!(e)); assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(i)); + assert_eq!(clock.now() - start, ms(i)); assert_ready_ok!(poll!(e)); } @@ -74,20 +84,21 @@ fn sub_ms_delayed_delay() { #[test] fn delayed_delay_wrapping_level_0() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); assert_ok!(driver.park_timeout(ms(5))); - assert_eq!(clock.advanced(), ms(5)); + assert_eq!(clock.now() - start, ms(5)); let mut e = task::spawn(delay_until(&handle, clock.now() + ms(60))); assert_pending!(poll!(e)); assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(64)); + assert_eq!(clock.now() - start, ms(64)); assert_pending!(poll!(e)); assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(65)); + assert_eq!(clock.now() - start, ms(65)); assert_ready_ok!(poll!(e)); } @@ -95,6 +106,7 @@ fn delayed_delay_wrapping_level_0() { #[test] fn timer_wrapping_with_higher_levels() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Set delay to hit level 1 let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(64))); @@ -109,13 +121,13 @@ fn timer_wrapping_with_higher_levels() { // This should result in s1 firing assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(64)); + assert_eq!(clock.now() - start, ms(64)); assert_ready_ok!(poll!(e1)); assert_pending!(poll!(e2)); assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(65)); + assert_eq!(clock.now() - start, ms(65)); assert_ready_ok!(poll!(e1)); } @@ -123,6 +135,7 @@ fn timer_wrapping_with_higher_levels() { #[test] fn delay_with_deadline_in_past() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create `Delay` that elapsed immediately. let mut e = task::spawn(delay_until(&handle, clock.now() - ms(100))); @@ -135,12 +148,13 @@ fn delay_with_deadline_in_past() { assert_ok!(driver.park_timeout(ms(1000))); // The time has not advanced. The `turn` completed immediately. - assert_eq!(clock.advanced(), ms(1000)); + assert_eq!(clock.now() - start, ms(1000)); } #[test] fn delayed_delay_level_1() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create a `Delay` that elapses in the future let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); @@ -150,19 +164,20 @@ fn delayed_delay_level_1() { // Turn the timer, this will wake up to cascade the timer down. assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.advanced(), ms(192)); + assert_eq!(clock.now() - start, ms(192)); // The delay has not elapsed. assert_pending!(poll!(e)); // Turn the timer again assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.advanced(), ms(234)); + assert_eq!(clock.now() - start, ms(234)); // The delay has elapsed. assert_ready_ok!(poll!(e)); let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create a `Delay` that elapses in the future let mut e = task::spawn(delay_until(&handle, clock.now() + ms(234))); @@ -172,20 +187,20 @@ fn delayed_delay_level_1() { // Turn the timer with a smaller timeout than the cascade. assert_ok!(driver.park_timeout(ms(100))); - assert_eq!(clock.advanced(), ms(100)); + assert_eq!(clock.now() - start, ms(100)); assert_pending!(poll!(e)); // Turn the timer, this will wake up to cascade the timer down. assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.advanced(), ms(192)); + assert_eq!(clock.now() - start, ms(192)); // The delay has not elapsed. assert_pending!(poll!(e)); // Turn the timer again assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.advanced(), ms(234)); + assert_eq!(clock.now() - start, ms(234)); // The delay has elapsed. assert_ready_ok!(poll!(e)); @@ -194,6 +209,7 @@ fn delayed_delay_level_1() { #[test] fn concurrently_set_two_timers_second_one_shorter() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); let mut e1 = task::spawn(delay_until(&handle, clock.now() + ms(500))); let mut e2 = task::spawn(delay_until(&handle, clock.now() + ms(200))); @@ -204,24 +220,24 @@ fn concurrently_set_two_timers_second_one_shorter() { // Delay until a cascade assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(192)); + assert_eq!(clock.now() - start, ms(192)); // Delay until the second timer. assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(200)); + assert_eq!(clock.now() - start, ms(200)); // The shorter delay fires assert_ready_ok!(poll!(e2)); assert_pending!(poll!(e1)); assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(448)); + assert_eq!(clock.now() - start, ms(448)); assert_pending!(poll!(e1)); // Turn again, this time the time will advance to the second delay assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(500)); + assert_eq!(clock.now() - start, ms(500)); assert_ready_ok!(poll!(e1)); } @@ -229,6 +245,7 @@ fn concurrently_set_two_timers_second_one_shorter() { #[test] fn short_delay() { let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create a `Delay` that elapses in the future let mut e = task::spawn(delay_until(&handle, clock.now() + ms(1))); @@ -243,7 +260,7 @@ fn short_delay() { assert_ready_ok!(poll!(e)); // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.advanced(), ms(1)); + assert_eq!(clock.now() - start, ms(1)); } #[test] @@ -251,6 +268,7 @@ fn sorta_long_delay_until() { const MIN_5: u64 = 5 * 60 * 1000; let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create a `Delay` that elapses in the future let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MIN_5))); @@ -262,13 +280,13 @@ fn sorta_long_delay_until() { for &elapsed in cascades { assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(elapsed)); + assert_eq!(clock.now() - start, ms(elapsed)); assert_pending!(poll!(e)); } assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(MIN_5)); + assert_eq!(clock.now() - start, ms(MIN_5)); // The delay has elapsed. assert_ready_ok!(poll!(e)); @@ -279,6 +297,7 @@ fn very_long_delay() { const MO_5: u64 = 5 * 30 * 24 * 60 * 60 * 1000; let (mut driver, clock, handle) = setup(); + let start = clock.now(); // Create a `Delay` that elapses in the future let mut e = task::spawn(delay_until(&handle, clock.now() + ms(MO_5))); @@ -295,7 +314,7 @@ fn very_long_delay() { for &elapsed in cascades { assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(elapsed)); + assert_eq!(clock.now() - start, ms(elapsed)); assert_pending!(poll!(e)); } @@ -304,7 +323,7 @@ fn very_long_delay() { assert_ok!(driver.park()); // The time has advanced to the point of the delay elapsing. - assert_eq!(clock.advanced(), ms(MO_5)); + assert_eq!(clock.now() - start, ms(MO_5)); // The delay has elapsed. assert_ready_ok!(poll!(e)); @@ -340,7 +359,9 @@ fn unpark_is_delayed() { fn unpark(&self) {} } - let clock = Clock::new_frozen(); + let clock = Clock::new(); + clock.pause(); + let start = clock.now(); let mut driver = Driver::new(MockPark(clock.clone()), clock.clone()); let handle = driver.handle(); @@ -354,7 +375,7 @@ fn unpark_is_delayed() { assert_ok!(driver.park()); - assert_eq!(clock.advanced(), ms(500)); + assert_eq!(clock.now() - start, ms(500)); assert_ready_ok!(poll!(e1)); assert_ready_ok!(poll!(e2)); @@ -367,6 +388,7 @@ fn set_timeout_at_deadline_greater_than_max_timer() { const YR_5: u64 = 5 * YR_1; let (mut driver, clock, handle) = setup(); + let start = clock.now(); for _ in 0..5 { assert_ok!(driver.park_timeout(ms(YR_1))); @@ -376,13 +398,14 @@ fn set_timeout_at_deadline_greater_than_max_timer() { assert_pending!(poll!(e)); assert_ok!(driver.park_timeout(ms(1000))); - assert_eq!(clock.advanced(), ms(YR_5) + ms(1)); + assert_eq!(clock.now() - start, ms(YR_5) + ms(1)); assert_ready_ok!(poll!(e)); } fn setup() -> (Driver, Clock, Handle) { - let clock = Clock::new_frozen(); + let clock = Clock::new(); + clock.pause(); let driver = Driver::new(MockPark(clock.clone()), clock.clone()); let handle = driver.handle(); diff --git a/tokio/tests/test_clock.rs b/tokio/tests/test_clock.rs new file mode 100644 index 000000000..891636fdb --- /dev/null +++ b/tokio/tests/test_clock.rs @@ -0,0 +1,50 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::time::{self, Duration, Instant}; + +#[tokio::test] +async fn resume_lets_time_move_forward_instead_of_resetting_it() { + let start = Instant::now(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + let advanced_by_ten_secs = Instant::now(); + assert!(advanced_by_ten_secs - start > Duration::from_secs(10)); + assert!(advanced_by_ten_secs - start < Duration::from_secs(11)); + time::resume(); + assert!(advanced_by_ten_secs < Instant::now()); + assert!(Instant::now() - advanced_by_ten_secs < Duration::from_secs(1)); +} + +#[tokio::test] +async fn can_pause_after_resume() { + let start = Instant::now(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + time::resume(); + time::pause(); + time::advance(Duration::from_secs(10)).await; + assert!(Instant::now() - start > Duration::from_secs(20)); + assert!(Instant::now() - start < Duration::from_secs(21)); +} + +#[tokio::test] +#[should_panic] +async fn freezing_time_while_frozen_panics() { + time::pause(); + time::pause(); +} + +#[tokio::test] +#[should_panic] +async fn advancing_time_when_time_is_not_frozen_panics() { + time::advance(Duration::from_secs(1)).await; +} + +#[tokio::test] +#[should_panic] +async fn resuming_time_when_not_frozen_panics() { + time::pause(); + time::resume(); + time::resume(); +}