Merge 'tokio-1.38.x' into 'tokio.1.42.x'

This commit is contained in:
Alice Ryhl 2025-04-07 16:18:11 +02:00
commit 9faea740df
6 changed files with 80 additions and 33 deletions

View File

@ -1,7 +1,7 @@
only_if: $CIRRUS_TAG == '' && ($CIRRUS_PR != '' || $CIRRUS_BRANCH == 'master' || $CIRRUS_BRANCH =~ 'tokio-.*')
auto_cancellation: $CIRRUS_BRANCH != 'master' && $CIRRUS_BRANCH !=~ 'tokio-.*'
freebsd_instance:
image_family: freebsd-14-1
image_family: freebsd-14-2
env:
RUST_STABLE: 1.81
RUST_NIGHTLY: nightly-2024-05-05

View File

@ -460,10 +460,18 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check semver
- name: Check `tokio` semver
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
rust-toolchain: ${{ env.rust_stable }}
package: tokio
release-type: minor
- name: Check semver for rest of the workspace
if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }}
uses: obi1kenobi/cargo-semver-checks-action@v2
with:
rust-toolchain: ${{ env.rust_stable }}
exclude: tokio
release-type: minor
cross-check:
@ -694,7 +702,14 @@ jobs:
toolchain: ${{ env.rust_min }}
- uses: Swatinem/rust-cache@v2
- name: "check --workspace --all-features"
run: cargo check --workspace --all-features
run: |
if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then
# Only check `tokio` crate as the PR is backporting to an earlier tokio release.
cargo check -p tokio --all-features
else
# Check all crates in the workspace
cargo check --workspace --all-features
fi
env:
RUSTFLAGS: "" # remove -Dwarnings
@ -993,7 +1008,7 @@ jobs:
- name: Install cargo-hack, wasmtime, and cargo-wasi
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,wasmtime,cargo-wasi
tool: cargo-hack,wasmtime
- uses: Swatinem/rust-cache@v2
- name: WASI test tokio full
@ -1019,9 +1034,12 @@ jobs:
- name: test tests-integration --features wasi-rt
# TODO: this should become: `cargo hack wasi test --each-feature`
run: cargo wasi test --test rt_yield --features wasi-rt
run: cargo test --target ${{ matrix.target }} --test rt_yield --features wasi-rt
if: matrix.target == 'wasm32-wasip1'
working-directory: tests-integration
env:
CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --"
RUSTFLAGS: -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864
- name: test tests-integration --features wasi-threads-rt
run: cargo test --target ${{ matrix.target }} --features wasi-threads-rt

View File

@ -17,3 +17,16 @@ members = [
[workspace.metadata.spellcheck]
config = "spellcheck.toml"
[workspace.lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = [
'cfg(fuzzing)',
'cfg(loom)',
'cfg(mio_unsupported_force_poll_poll)',
'cfg(tokio_allow_from_blocking_fd)',
'cfg(tokio_internal_mt_counters)',
'cfg(tokio_no_parking_lot)',
'cfg(tokio_no_tuning_tests)',
'cfg(tokio_taskdump)',
'cfg(tokio_unstable)',
] }

View File

@ -95,3 +95,6 @@ path = "named-pipe-multi-client.rs"
[[example]]
name = "dump"
path = "dump.rs"
[lints]
workspace = true

View File

@ -249,6 +249,20 @@ Yanked. Please use 1.39.1 instead.
[#6709]: https://github.com/tokio-rs/tokio/pull/6709
[#6710]: https://github.com/tokio-rs/tokio/pull/6710
# 1.38.2 (April 2nd, 2025)
This release fixes a soundness issue in the broadcast channel. The channel
accepts values that are `Send` but `!Sync`. Previously, the channel called
`clone()` on these values without synchronizing. This release fixes the channel
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
reporting the issue).
### Fixed
- sync: synchronize `clone()` call in broadcast channel ([#7232])
[#7232]: https://github.com/tokio-rs/tokio/pull/7232
# 1.38.1 (July 16th, 2024)
This release fixes the bug identified as ([#6682]), which caused timers not

View File

@ -118,7 +118,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::loom::sync::{Arc, Mutex, MutexGuard};
use crate::runtime::coop::cooperative;
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;
@ -304,7 +304,7 @@ use self::error::{RecvError, SendError, TryRecvError};
/// Data shared between senders and receivers.
struct Shared<T> {
/// slots in the channel.
buffer: Box<[RwLock<Slot<T>>]>,
buffer: Box<[Mutex<Slot<T>>]>,
/// Mask a position -> index.
mask: usize,
@ -348,7 +348,7 @@ struct Slot<T> {
///
/// The value is set by `send` when the write lock is held. When a reader
/// drops, `rem` is decremented. When it hits zero, the value is dropped.
val: UnsafeCell<Option<T>>,
val: Option<T>,
}
/// An entry in the wait queue.
@ -386,7 +386,7 @@ generate_addr_of_methods! {
}
struct RecvGuard<'a, T> {
slot: RwLockReadGuard<'a, Slot<T>>,
slot: MutexGuard<'a, Slot<T>>,
}
/// Receive a value future.
@ -395,11 +395,15 @@ struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
/// Entry in the waiter `LinkedList`.
waiter: UnsafeCell<Waiter>,
waiter: WaiterCell,
}
unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
// from `Recv`.
struct WaiterCell(UnsafeCell<Waiter>);
unsafe impl Send for WaiterCell {}
unsafe impl Sync for WaiterCell {}
/// Max number of receivers. Reserve space to lock.
const MAX_RECEIVERS: usize = usize::MAX >> 2;
@ -467,12 +471,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
(tx, rx)
}
unsafe impl<T: Send> Send for Sender<T> {}
unsafe impl<T: Send> Sync for Sender<T> {}
unsafe impl<T: Send> Send for Receiver<T> {}
unsafe impl<T: Send> Sync for Receiver<T> {}
impl<T> Sender<T> {
/// Creates the sending-half of the [`broadcast`] channel.
///
@ -511,10 +509,10 @@ impl<T> Sender<T> {
let mut buffer = Vec::with_capacity(capacity);
for i in 0..capacity {
buffer.push(RwLock::new(Slot {
buffer.push(Mutex::new(Slot {
rem: AtomicUsize::new(0),
pos: (i as u64).wrapping_sub(capacity as u64),
val: UnsafeCell::new(None),
val: None,
}));
}
@ -600,7 +598,7 @@ impl<T> Sender<T> {
tail.pos = tail.pos.wrapping_add(1);
// Get the slot
let mut slot = self.shared.buffer[idx].write();
let mut slot = self.shared.buffer[idx].lock();
// Track the position
slot.pos = pos;
@ -609,7 +607,7 @@ impl<T> Sender<T> {
slot.rem.with_mut(|v| *v = rem);
// Write the value
slot.val = UnsafeCell::new(Some(value));
slot.val = Some(value);
// Release the slot lock before notifying the receivers.
drop(slot);
@ -696,7 +694,7 @@ impl<T> Sender<T> {
while low < high {
let mid = low + (high - low) / 2;
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
low = mid + 1;
} else {
high = mid;
@ -738,7 +736,7 @@ impl<T> Sender<T> {
let tail = self.shared.tail.lock();
let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
self.shared.buffer[idx].read().rem.load(SeqCst) == 0
self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
}
/// Returns the number of active receivers.
@ -1058,7 +1056,7 @@ impl<T> Receiver<T> {
let idx = (self.next & self.shared.mask as u64) as usize;
// The slot holding the next value to read
let mut slot = self.shared.buffer[idx].read();
let mut slot = self.shared.buffer[idx].lock();
if slot.pos != self.next {
// Release the `slot` lock before attempting to acquire the `tail`
@ -1075,7 +1073,7 @@ impl<T> Receiver<T> {
let mut tail = self.shared.tail.lock();
// Acquire slot lock again
slot = self.shared.buffer[idx].read();
slot = self.shared.buffer[idx].lock();
// Make sure the position did not change. This could happen in the
// unlikely event that the buffer is wrapped between dropping the
@ -1367,12 +1365,12 @@ impl<'a, T> Recv<'a, T> {
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
Recv {
receiver,
waiter: UnsafeCell::new(Waiter {
waiter: WaiterCell(UnsafeCell::new(Waiter {
queued: AtomicBool::new(false),
waker: None,
pointers: linked_list::Pointers::new(),
_p: PhantomPinned,
}),
})),
}
}
@ -1384,7 +1382,7 @@ impl<'a, T> Recv<'a, T> {
is_unpin::<&mut Receiver<T>>();
let me = self.get_unchecked_mut();
(me.receiver, &me.waiter)
(me.receiver, &me.waiter.0)
}
}
}
@ -1418,6 +1416,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// `Shared::notify_rx` before we drop the object.
let queued = self
.waiter
.0
.with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
// If the waiter is queued, we need to unlink it from the waiters list.
@ -1432,6 +1431,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// `Relaxed` order suffices because we hold the tail lock.
let queued = self
.waiter
.0
.with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
if queued {
@ -1440,7 +1440,7 @@ impl<'a, T> Drop for Recv<'a, T> {
// safety: tail lock is held and the wait node is verified to be in
// the list.
unsafe {
self.waiter.with_mut(|ptr| {
self.waiter.0.with_mut(|ptr| {
tail.waiters.remove((&mut *ptr).into());
});
}
@ -1486,7 +1486,7 @@ impl<'a, T> RecvGuard<'a, T> {
where
T: Clone,
{
self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
self.slot.val.clone()
}
}
@ -1494,8 +1494,7 @@ impl<'a, T> Drop for RecvGuard<'a, T> {
fn drop(&mut self) {
// Decrement the remaining counter
if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
// Safety: Last receiver, drop the value
self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
self.slot.val = None;
}
}
}