sync: close the broadcast::Sender in broadcast::Sender::new() (#7629)

This commit is contained in:
Martin Grigorov 2025-09-20 09:44:55 +03:00 committed by GitHub
parent 3b5a15dfdf
commit 6d1ae62868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 1 deletions

View File

@ -566,7 +566,7 @@ impl<T> Sender<T> {
tail: Mutex::new(Tail {
pos: 0,
rx_cnt: receiver_count,
closed: false,
closed: receiver_count == 0,
waiters: LinkedList::new(),
}),
num_tx: AtomicUsize::new(1),

View File

@ -706,3 +706,17 @@ fn broadcast_sender_closed_with_extra_subscribe() {
assert!(task3.is_woken());
assert_ready!(task3.poll());
}
#[tokio::test]
async fn broadcast_sender_new_must_be_closed() {
let capacity = 1;
let tx: broadcast::Sender<()> = broadcast::Sender::new(capacity);
let mut task = task::spawn(tx.closed());
assert_ready!(task.poll());
let _rx = tx.subscribe();
let mut task2 = task::spawn(tx.closed());
assert_pending!(task2.poll());
}