mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-09-28 12:10:37 +00:00
Merge 'tokio-1.25.3' into 'tokio-1.32.x' (#6227)
This commit is contained in:
commit
22b3a65934
@ -360,6 +360,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559])
|
||||
[#5513]: https://github.com/tokio-rs/tokio/pull/5513
|
||||
[#5517]: https://github.com/tokio-rs/tokio/pull/5517
|
||||
|
||||
# 1.25.3 (December 17th, 2023)
|
||||
|
||||
### Fixed
|
||||
- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])
|
||||
|
||||
[#6221]: https://github.com/tokio-rs/tokio/pull/6221
|
||||
|
||||
# 1.25.2 (September 22, 2023)
|
||||
|
||||
Forward ports 1.20.6 changes.
|
||||
|
@ -80,23 +80,23 @@ tokio_thread_local! {
|
||||
#[cfg(feature = "rt")]
|
||||
thread_id: Cell::new(None),
|
||||
|
||||
/// Tracks the current runtime handle to use when spawning,
|
||||
/// accessing drivers, etc...
|
||||
// Tracks the current runtime handle to use when spawning,
|
||||
// accessing drivers, etc...
|
||||
#[cfg(feature = "rt")]
|
||||
current: current::HandleCell::new(),
|
||||
|
||||
/// Tracks the current scheduler internal context
|
||||
// Tracks the current scheduler internal context
|
||||
#[cfg(feature = "rt")]
|
||||
scheduler: Scoped::new(),
|
||||
|
||||
#[cfg(feature = "rt")]
|
||||
current_task_id: Cell::new(None),
|
||||
|
||||
/// Tracks if the current thread is currently driving a runtime.
|
||||
/// Note, that if this is set to "entered", the current scheduler
|
||||
/// handle may not reference the runtime currently executing. This
|
||||
/// is because other runtime handles may be set to current from
|
||||
/// within a runtime.
|
||||
// Tracks if the current thread is currently driving a runtime.
|
||||
// Note, that if this is set to "entered", the current scheduler
|
||||
// handle may not reference the runtime currently executing. This
|
||||
// is because other runtime handles may be set to current from
|
||||
// within a runtime.
|
||||
#[cfg(feature = "rt")]
|
||||
runtime: Cell::new(EnterRuntime::NotEntered),
|
||||
|
||||
|
@ -219,11 +219,16 @@ impl Registration {
|
||||
loop {
|
||||
let event = self.readiness(interest).await?;
|
||||
|
||||
let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await;
|
||||
|
||||
match f() {
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
self.clear_readiness(event);
|
||||
}
|
||||
x => return x,
|
||||
x => {
|
||||
coop.made_progress();
|
||||
return x;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
77
tokio/tests/coop_budger.rs
Normal file
77
tokio/tests/coop_budger.rs
Normal file
@ -0,0 +1,77 @@
|
||||
#![warn(rust_2018_idioms)]
|
||||
#![cfg(all(feature = "full", target_os = "linux"))]
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
/// Ensure that UDP sockets have functional budgeting
|
||||
///
|
||||
/// # Design
|
||||
/// Two sockets communicate by spamming packets from one to the other.
|
||||
///
|
||||
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
|
||||
/// send system call because we are using the loopback interface.
|
||||
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
|
||||
/// entirety of the lifecycle of a packet within the kernel network stack.
|
||||
///
|
||||
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
|
||||
/// is through budgeting.
|
||||
///
|
||||
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
|
||||
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
|
||||
/// and there are two budget events per packet, a send and a recv.
|
||||
#[tokio::test]
|
||||
async fn coop_budget_udp_send_recv() {
|
||||
const BUDGET: usize = 128;
|
||||
const N_ITERATIONS: usize = 1024;
|
||||
|
||||
const PACKET: &[u8] = b"Hello, world";
|
||||
const PACKET_LEN: usize = 12;
|
||||
|
||||
assert_eq!(
|
||||
PACKET_LEN,
|
||||
PACKET.len(),
|
||||
"Defect in test, programmer can't do math"
|
||||
);
|
||||
|
||||
// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
|
||||
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
|
||||
|
||||
tx.connect(rx.local_addr().unwrap()).await.unwrap();
|
||||
rx.connect(tx.local_addr().unwrap()).await.unwrap();
|
||||
|
||||
let tracker = Arc::new(AtomicUsize::default());
|
||||
|
||||
let tracker_clone = Arc::clone(&tracker);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tracker_clone.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
});
|
||||
|
||||
for _ in 0..N_ITERATIONS {
|
||||
tx.send(PACKET).await.unwrap();
|
||||
|
||||
let mut tmp = [0; PACKET_LEN];
|
||||
|
||||
// ensure that we aren't somehow accumulating other
|
||||
assert_eq!(
|
||||
PACKET_LEN,
|
||||
rx.recv(&mut tmp).await.unwrap(),
|
||||
"Defect in test case, received unexpected result from socket"
|
||||
);
|
||||
assert_eq!(
|
||||
PACKET, &tmp,
|
||||
"Defect in test case, received unexpected result from socket"
|
||||
);
|
||||
}
|
||||
|
||||
assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user