Merge branch 'tokio-1.6.x' into merge-1.6.x

This commit is contained in:
Carl Lerche 2021-06-14 17:51:30 -07:00
commit 606206ecad
No known key found for this signature in database
GPG Key ID: AA14CE6F061D8F7A
5 changed files with 190 additions and 32 deletions

View File

@ -1,3 +1,11 @@
# 1.6.2 (June 14, 2021)
### Fixes
- test: sub-ms `time:advance` regression introduced in 1.6 ([#3852])
[#3852]: https://github.com/tokio-rs/tokio/pull/3852
# 1.6.1 (May 28, 2021)
This release reverts [#3518] because it doesn't work on some kernels due to

View File

@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
version = "1.6.1"
version = "1.6.2"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
documentation = "https://docs.rs/tokio/1.6.0/tokio/"
documentation = "https://docs.rs/tokio/1.6.2/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """

View File

@ -7,7 +7,7 @@
//! configurable.
cfg_not_test_util! {
use crate::time::{Duration, Instant};
use crate::time::{Instant};
#[derive(Debug, Clone)]
pub(crate) struct Clock {}
@ -24,14 +24,6 @@ cfg_not_test_util! {
pub(crate) fn now(&self) -> Instant {
now()
}
pub(crate) fn is_paused(&self) -> bool {
false
}
pub(crate) fn advance(&self, _dur: Duration) {
unreachable!();
}
}
}
@ -121,10 +113,9 @@ cfg_test_util! {
/// runtime.
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let until = clock.now() + duration;
clock.advance(duration);
crate::time::sleep_until(until).await;
crate::task::yield_now().await;
}
/// Return the current instant, factoring in frozen time.

View File

@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> {
/// Parker to delegate to
park: P,
// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}
/// A structure which handles conversion from Instants to u64 timestamps.
@ -178,6 +187,8 @@ where
time_source,
handle: Handle::new(Arc::new(inner)),
park,
#[cfg(feature = "test-util")]
did_wake: Arc::new(AtomicBool::new(false)),
}
}
@ -192,8 +203,6 @@ where
}
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;
let mut lock = self.handle.get().state.lock();
assert!(!self.handle.is_shutdown());
@ -217,26 +226,14 @@ where
duration = std::cmp::min(limit, duration);
}
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
// Simulate advancing time
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park()?;
}
@ -248,6 +245,39 @@ where
Ok(())
}
cfg_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
let clock = &self.time_source.clock;
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
// If the time driver was woken, then the park completed
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(duration)?;
}
Ok(())
}
fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}
cfg_not_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
self.park.park_timeout(duration)
}
}
}
impl Handle {
@ -387,11 +417,11 @@ impl<P> Park for Driver<P>
where
P: Park + 'static,
{
type Unpark = P::Unpark;
type Unpark = TimerUnpark<P>;
type Error = P::Error;
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
TimerUnpark::new(self)
}
fn park(&mut self) -> Result<(), Self::Error> {
@ -426,6 +456,33 @@ where
}
}
pub(crate) struct TimerUnpark<P: Park + 'static> {
inner: P::Unpark,
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}
impl<P: Park + 'static> TimerUnpark<P> {
fn new(driver: &Driver<P>) -> TimerUnpark<P> {
TimerUnpark {
inner: driver.park.unpark(),
#[cfg(feature = "test-util")]
did_wake: driver.did_wake.clone(),
}
}
}
impl<P: Park + 'static> Unpark for TimerUnpark<P> {
fn unpark(&self) {
#[cfg(feature = "test-util")]
self.did_wake.store(true, Ordering::SeqCst);
self.inner.unpark();
}
}
// ===== impl Inner =====
impl Inner {

View File

@ -4,7 +4,7 @@
use rand::SeedableRng;
use rand::{rngs::StdRng, Rng};
use tokio::time::{self, Duration, Instant, Sleep};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task};
use std::{
future::Future,
@ -215,6 +215,108 @@ async fn interval() {
assert_pending!(poll_next(&mut i));
}
#[tokio::test(start_paused = true)]
async fn test_time_advance_sub_ms() {
let now = Instant::now();
let dur = Duration::from_micros(51_592);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
let now = Instant::now();
let dur = Duration::from_micros(1);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
}
#[tokio::test(start_paused = true)]
async fn test_time_advance_3ms_and_change() {
let now = Instant::now();
let dur = Duration::from_micros(3_141_592);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
let now = Instant::now();
let dur = Duration::from_micros(3_123_456);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
}
#[tokio::test(start_paused = true)]
async fn regression_3710_with_submillis_advance() {
let start = Instant::now();
time::advance(Duration::from_millis(1)).await;
let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60)));
assert_pending!(sleep.poll());
let before = Instant::now();
let dur = Duration::from_micros(51_592);
time::advance(dur).await;
assert_eq!(before.elapsed(), dur);
assert_pending!(sleep.poll());
}
#[tokio::test(start_paused = true)]
async fn exact_1ms_advance() {
let now = Instant::now();
let dur = Duration::from_millis(1);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
let now = Instant::now();
let dur = Duration::from_millis(1);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);
}
#[tokio::test(start_paused = true)]
async fn advance_once_with_timer() {
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(1500)).await;
assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}
#[tokio::test(start_paused = true)]
async fn advance_multi_with_timer() {
// Round to the nearest ms
// time::sleep(Duration::from_millis(1)).await;
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());
time::advance(Duration::from_micros(250)).await;
assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}
fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
interval.enter(|cx, mut interval| interval.poll_tick(cx))
}