mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
timer: improve memory ordering in Inner's increment (#2107)
This commit improves the memory ordering in the implementation of Inner's increment function. The former code did a sequentially consistent load of self.num, then entered a loop with a sequentially consistent compare and swap on the same, bailing out with and Err only if the loaded value was MAX_TIMEOUTS. The use of SeqCst means that all threads must observe all relevant memory operations in the same order, implying synchronization between all CPUs. This commit adjusts the implementation in two key ways. First, the initial load of self.num is now down with Relaxed ordering. If two threads entered this code simultaneously, formerly, tokio required that one proceed before the other, negating their parallelism. Now, either thread may proceed without coordination. Second, the SeqCst compare_and_swap is changed to a Release, Relaxed compare_exchange_weak. The first memory ordering referrs to success: if the value is swapped the load of that value for comparison will be Relaxed and the store will be Release. The second memory ordering referrs to failure: if the value is not swapped the load is Relaxed. The _weak variant may spuriously fail but will generate better code. These changes mean that it is possible for more loops to be taken per call than strictly necessary but with greater parallelism available on this operation, improved energy consumption as CPUs don't have to coordinate as much.
This commit is contained in:
parent
6cf1a5b6b8
commit
3fb213a861
@ -20,7 +20,8 @@ use crate::park::{Park, Unpark};
|
||||
use crate::time::{wheel, Error};
|
||||
use crate::time::{Clock, Duration, Instant};
|
||||
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release, SeqCst};
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::usize;
|
||||
use std::{cmp, fmt};
|
||||
@ -333,28 +334,32 @@ impl Inner {
|
||||
self.elapsed.load(SeqCst)
|
||||
}
|
||||
|
||||
#[cfg(all(test, loom))]
|
||||
fn num(&self, ordering: std::sync::atomic::Ordering) -> usize {
|
||||
self.num.load(ordering)
|
||||
}
|
||||
|
||||
/// Increments the number of active timeouts
|
||||
fn increment(&self) -> Result<(), Error> {
|
||||
let mut curr = self.num.load(SeqCst);
|
||||
|
||||
let mut curr = self.num.load(Relaxed);
|
||||
loop {
|
||||
if curr == MAX_TIMEOUTS {
|
||||
return Err(Error::at_capacity());
|
||||
}
|
||||
|
||||
let actual = self.num.compare_and_swap(curr, curr + 1, SeqCst);
|
||||
|
||||
if curr == actual {
|
||||
return Ok(());
|
||||
match self
|
||||
.num
|
||||
.compare_exchange_weak(curr, curr + 1, Release, Relaxed)
|
||||
{
|
||||
Ok(_) => return Ok(()),
|
||||
Err(next) => curr = next,
|
||||
}
|
||||
|
||||
curr = actual;
|
||||
}
|
||||
}
|
||||
|
||||
/// Decrements the number of active timeouts
|
||||
fn decrement(&self) {
|
||||
let prev = self.num.fetch_sub(1, SeqCst);
|
||||
let prev = self.num.fetch_sub(1, Acquire);
|
||||
debug_assert!(prev <= MAX_TIMEOUTS);
|
||||
}
|
||||
|
||||
@ -381,3 +386,6 @@ impl fmt::Debug for Inner {
|
||||
fmt.debug_struct("Inner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, loom))]
|
||||
mod tests;
|
||||
|
55
tokio/src/time/driver/tests/mod.rs
Normal file
55
tokio/src/time/driver/tests/mod.rs
Normal file
@ -0,0 +1,55 @@
|
||||
use crate::park::Unpark;
|
||||
use crate::time::driver::Inner;
|
||||
use crate::time::Instant;
|
||||
|
||||
use loom::thread;
|
||||
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
|
||||
struct MockUnpark;
|
||||
|
||||
impl Unpark for MockUnpark {
|
||||
fn unpark(&self) {}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn balanced_incr_and_decr() {
|
||||
const OPS: usize = 100;
|
||||
|
||||
fn incr(inner: Arc<Inner>) {
|
||||
for _ in 0..OPS {
|
||||
inner.increment().expect("increment should not have failed");
|
||||
thread::yield_now();
|
||||
}
|
||||
}
|
||||
|
||||
fn decr(inner: Arc<Inner>) {
|
||||
let mut ops_performed = 0;
|
||||
while ops_performed < OPS {
|
||||
if inner.num(Ordering::Relaxed) > 0 {
|
||||
ops_performed += 1;
|
||||
inner.decrement();
|
||||
}
|
||||
thread::yield_now();
|
||||
}
|
||||
}
|
||||
|
||||
loom::model(|| {
|
||||
let unpark = Box::new(MockUnpark);
|
||||
let instant = Instant::now();
|
||||
|
||||
let inner = Arc::new(Inner::new(instant, unpark));
|
||||
|
||||
let incr_inner = inner.clone();
|
||||
let decr_inner = inner.clone();
|
||||
|
||||
let incr_hndle = thread::spawn(move || incr(incr_inner));
|
||||
let decr_hndle = thread::spawn(move || decr(decr_inner));
|
||||
|
||||
incr_hndle.join().expect("should never fail");
|
||||
decr_hndle.join().expect("should never fail");
|
||||
|
||||
assert_eq!(inner.num(Ordering::SeqCst), 0);
|
||||
})
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user