From e3115231dd9388d1594b117a6890ec76b200965d Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 19 Feb 2019 16:26:05 -0800 Subject: [PATCH] sync: fix mpsc/sempahore when releasing permits (#904) This patch fixes Semaphore by adding a missing code path to the release routine that handles the case where the waiter's node is queued in the sempahore but has not yet been assigned the permit. This fix is used by mpsc to handle the case when the Sender has called `poll_ready` and is dropped before the permit is acquired. Fixes #900 --- tokio-sync/src/mpsc/chan.rs | 4 +- tokio-sync/src/semaphore.rs | 63 +++++++++++++++++++++++------- tokio-sync/tests/fuzz_semaphore.rs | 30 ++++++++++++++ tokio-sync/tests/mpsc.rs | 34 ++++++++++++++++ 4 files changed, 114 insertions(+), 17 deletions(-) diff --git a/tokio-sync/src/mpsc/chan.rs b/tokio-sync/src/mpsc/chan.rs index ea8aebfd5..de7ba0f9a 100644 --- a/tokio-sync/src/mpsc/chan.rs +++ b/tokio-sync/src/mpsc/chan.rs @@ -346,9 +346,7 @@ impl Semaphore for (::semaphore::Semaphore, usize) { } fn drop_permit(&self, permit: &mut Permit) { - if permit.is_acquired() { - permit.release(&self.0); - } + permit.release(&self.0); } fn add_permit(&self) { diff --git a/tokio-sync/src/semaphore.rs b/tokio-sync/src/semaphore.rs index 544c0171a..24c7fa395 100644 --- a/tokio-sync/src/semaphore.rs +++ b/tokio-sync/src/semaphore.rs @@ -643,14 +643,10 @@ impl Permit { } /// Release a permit back to the semaphore - /// - /// # Panics - /// - /// This function panics if called when the permit has not yet been - /// acquired. pub fn release(&mut self, semaphore: &Semaphore) { - self.forget(); - semaphore.add_permits(1); + if self.forget2() { + semaphore.add_permits(1); + } } /// Forget the permit **without** releasing it back to the semaphore. @@ -660,14 +656,27 @@ impl Permit { /// /// Repeatedly calling `forget` without associated calls to `add_permit` /// will result in the semaphore losing all permits. - /// - /// # Panics - /// - /// This function panics if called when the permit has not yet been - /// acquired. pub fn forget(&mut self) { - assert!(self.is_acquired(), "permit not acquired; state = {:?}", self.state); - self.state = PermitState::Idle; + self.forget2(); + } + + /// Returns `true` if the permit was acquired + fn forget2(&mut self) -> bool { + match self.state { + PermitState::Idle => false, + PermitState::Waiting => { + let ret = self.waiter + .as_ref() + .unwrap() + .cancel_interest(); + self.state = PermitState::Idle; + ret + } + PermitState::Acquired => { + self.state = PermitState::Idle; + true + } + } } } @@ -776,6 +785,32 @@ impl WaiterNode { self.task.register() } + /// Returns `true` if the permit has been acquired + fn cancel_interest(&self) -> bool { + use self::NodeState::*; + + match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) { + // Successfully removed interest from the queued node. The permit + // has not been assigned to the node. + Ok(_) => false, + // The semaphore has been closed, there is no further action to + // take. + Err(Closed) => false, + // The permit has been assigned. It must be acquired in order to + // be released back to the semaphore. + Err(Assigned) => { + match self.acquire2() { + Ok(true) => true, + // Not a reachable state + Ok(false) => panic!(), + // The semaphore has been closed, no further action to take. + Err(_) => false, + } + } + Err(state) => panic!("unexpected state = {:?}", state), + } + } + /// Transition the state to `QueuedWaiting`. /// /// This step can only happen from `Queued` or from `Idle`. diff --git a/tokio-sync/tests/fuzz_semaphore.rs b/tokio-sync/tests/fuzz_semaphore.rs index a281d74bf..9bd88467f 100644 --- a/tokio-sync/tests/fuzz_semaphore.rs +++ b/tokio-sync/tests/fuzz_semaphore.rs @@ -76,6 +76,36 @@ fn basic_usage() { }); } +#[test] +fn release() { + loom::fuzz(|| { + let semaphore = Arc::new(Semaphore::new(1)); + + { + let semaphore = semaphore.clone(); + thread::spawn(move || { + let mut permit = Permit::new(); + + block_on(future::lazy(|| { + permit.poll_acquire(&semaphore).unwrap(); + Ok::<_, ()>(()) + + })).unwrap(); + + permit.release(&semaphore); + }); + } + + let mut permit = Permit::new(); + + block_on(future::poll_fn(|| { + permit.poll_acquire(&semaphore) + })).unwrap(); + + permit.release(&semaphore); + }); +} + #[test] fn basic_closing() { const NUM: usize = 2; diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index b7ecd9efe..5083e4ccf 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -65,6 +65,40 @@ fn send_recv_with_buffer() { assert!(val.is_none()); } +#[test] +fn start_send_past_cap() { + let (mut tx1, mut rx) = mpsc::channel(1); + let mut tx2 = tx1.clone(); + + let mut task1 = MockTask::new(); + let mut task2 = MockTask::new(); + + let res = tx1.start_send(()).unwrap(); + assert!(res.is_ready()); + + task1.enter(|| { + let res = tx1.start_send(()).unwrap(); + assert!(!res.is_ready()); + }); + + task2.enter(|| { + assert_not_ready!(tx2.poll_ready()); + }); + + drop(tx1); + + let val = assert_ready!(rx.poll()); + assert!(val.is_some()); + + assert!(task2.is_notified()); + assert!(!task1.is_notified()); + + drop(tx2); + + let val = assert_ready!(rx.poll()); + assert!(val.is_none()); +} + #[test] #[should_panic] fn buffer_gteq_one() {