mirror of
https://github.com/tower-rs/tower.git
synced 2025-09-30 14:31:41 +00:00
ready-cache: Ensure cancelation can be observed (#668)
`tokio::task` enforces a cooperative scheduling regime that can cause `oneshot::Receiver::poll` to return pending after the sender has sent an update. `ReadyCache` uses a oneshot to notify pending services that they should not become ready. When a cancelation is not observed, the ready cache return service instances that should have been canceled, which breaks assumptions and causes an invalid state. This branch replaces the use of `tokio::sync::oneshot` for canceling pending futures with a custom cancelation handle using an `AtomicBool` and `futures::task::AtomicWaker`. This ensures that canceled `Pending` services are always woken even when the task's budget is exceeded. Additionally, cancelation status is now always known to the `Pending` future, by checking the `AtomicBool` immediately on polls, even in cases where the canceled `Pending` future was woken by the inner `Service` becoming ready, rather than by the cancelation. Fixes #415 Signed-off-by: Oliver Gould <ver@buoyant.io> Co-authored-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
22b6fc743b
commit
edd922d6d0
@ -2,15 +2,16 @@
|
||||
|
||||
use super::error;
|
||||
use futures_core::Stream;
|
||||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::{stream::FuturesUnordered, task::AtomicWaker};
|
||||
pub use indexmap::Equivalent;
|
||||
use indexmap::IndexMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::hash::Hash;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use tokio::sync::oneshot;
|
||||
use tower_service::Service;
|
||||
use tracing::{debug, trace};
|
||||
|
||||
@ -75,8 +76,18 @@ where
|
||||
// Safety: This is safe because we do not use `Pin::new_unchecked`.
|
||||
impl<S, K: Eq + Hash, Req> Unpin for ReadyCache<K, S, Req> {}
|
||||
|
||||
type CancelRx = oneshot::Receiver<()>;
|
||||
type CancelTx = oneshot::Sender<()>;
|
||||
#[derive(Debug)]
|
||||
struct Cancel {
|
||||
waker: AtomicWaker,
|
||||
canceled: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CancelRx(Arc<Cancel>);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CancelTx(Arc<Cancel>);
|
||||
|
||||
type CancelPair = (CancelTx, CancelRx);
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -195,7 +206,7 @@ where
|
||||
/// must be called to cause the service to be dropped.
|
||||
pub fn evict<Q: Hash + Equivalent<K>>(&mut self, key: &Q) -> bool {
|
||||
let canceled = if let Some(c) = self.pending_cancel_txs.swap_remove(key) {
|
||||
c.send(()).expect("cancel receiver lost");
|
||||
c.cancel();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
@ -226,14 +237,14 @@ where
|
||||
///
|
||||
/// [`poll_pending`]: crate::ready_cache::cache::ReadyCache::poll_pending
|
||||
pub fn push(&mut self, key: K, svc: S) {
|
||||
let cancel = oneshot::channel();
|
||||
let cancel = cancelable();
|
||||
self.push_pending(key, svc, cancel);
|
||||
}
|
||||
|
||||
fn push_pending(&mut self, key: K, svc: S, (cancel_tx, cancel_rx): CancelPair) {
|
||||
if let Some(c) = self.pending_cancel_txs.insert(key.clone(), cancel_tx) {
|
||||
// If there is already a service for this key, cancel it.
|
||||
c.send(()).expect("cancel receiver lost");
|
||||
c.cancel();
|
||||
}
|
||||
self.pending.push(Pending {
|
||||
key: Some(key),
|
||||
@ -270,21 +281,10 @@ where
|
||||
// recreated after the service is used.
|
||||
self.ready.insert(key, (svc, (cancel_tx, cancel_rx)));
|
||||
} else {
|
||||
// This should not technically be possible. We must have decided to cancel
|
||||
// a Service (by sending on the CancelTx), yet that same service then
|
||||
// returns Ready. Since polling a Pending _first_ polls the CancelRx, that
|
||||
// _should_ always see our CancelTx send. Yet empirically, that isn't true:
|
||||
//
|
||||
// https://github.com/tower-rs/tower/issues/415
|
||||
//
|
||||
// So, we instead detect the endpoint as canceled at this point. That
|
||||
// should be fine, since the oneshot is only really there to ensure that
|
||||
// the Pending is polled again anyway.
|
||||
//
|
||||
// We assert that this can't happen in debug mode so that hopefully one day
|
||||
// we can find a test that triggers this reliably.
|
||||
debug_assert!(cancel_tx.is_some());
|
||||
debug!("canceled endpoint removed when ready");
|
||||
assert!(
|
||||
cancel_tx.is_some(),
|
||||
"services that become ready must have a pending cancelation"
|
||||
);
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Err(PendingError::Canceled(_)))) => {
|
||||
@ -294,13 +294,11 @@ where
|
||||
}
|
||||
Poll::Ready(Some(Err(PendingError::Inner(key, e)))) => {
|
||||
let cancel_tx = self.pending_cancel_txs.swap_remove(&key);
|
||||
if cancel_tx.is_some() {
|
||||
return Err(error::Failed(key, e.into())).into();
|
||||
} else {
|
||||
// See comment for the same clause under Ready(Some(Ok)).
|
||||
debug_assert!(cancel_tx.is_some());
|
||||
debug!("canceled endpoint removed on error");
|
||||
}
|
||||
assert!(
|
||||
cancel_tx.is_some(),
|
||||
"services that return an error must have a pending cancelation"
|
||||
);
|
||||
return Err(error::Failed(key, e.into())).into();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -400,6 +398,28 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// === impl Cancel ===
|
||||
|
||||
/// Creates a cancelation sender and receiver.
|
||||
///
|
||||
/// A `tokio::sync::oneshot` is NOT used, as a `Receiver` is not guaranteed to
|
||||
/// observe results as soon as a `Sender` fires. Using an `AtomicBool` allows
|
||||
/// the state to be observed as soon as the cancelation is triggered.
|
||||
fn cancelable() -> CancelPair {
|
||||
let cx = Arc::new(Cancel {
|
||||
waker: AtomicWaker::new(),
|
||||
canceled: AtomicBool::new(false),
|
||||
});
|
||||
(CancelTx(cx.clone()), CancelRx(cx))
|
||||
}
|
||||
|
||||
impl CancelTx {
|
||||
fn cancel(self) {
|
||||
self.0.canceled.store(true, Ordering::SeqCst);
|
||||
self.0.waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// === Pending ===
|
||||
|
||||
impl<K, S, Req> Future for Pending<K, S, Req>
|
||||
@ -410,9 +430,10 @@ where
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let mut fut = this.cancel.as_mut().expect("polled after complete");
|
||||
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
|
||||
assert!(r.is_ok(), "cancel sender lost");
|
||||
// Before checking whether the service is ready, check to see whether
|
||||
// readiness has been canceled.
|
||||
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
|
||||
if cancel.canceled.load(Ordering::SeqCst) {
|
||||
let key = this.key.take().expect("polled after complete");
|
||||
return Err(PendingError::Canceled(key)).into();
|
||||
}
|
||||
@ -423,7 +444,21 @@ where
|
||||
.expect("polled after ready")
|
||||
.poll_ready(cx)
|
||||
{
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Pending => {
|
||||
// Before returning Pending, register interest in cancelation so
|
||||
// that this future is polled again if the state changes.
|
||||
let CancelRx(cancel) = this.cancel.as_mut().expect("polled after complete");
|
||||
cancel.waker.register(cx.waker());
|
||||
// Because both the cancel receiver and cancel sender are held
|
||||
// by the `ReadyCache` (i.e., on a single task), then it must
|
||||
// not be possible for the cancelation state to change while
|
||||
// polling a `Pending` service.
|
||||
assert!(
|
||||
!cancel.canceled.load(Ordering::SeqCst),
|
||||
"cancelation cannot be notified while polling a pending service"
|
||||
);
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Ready(Ok(())) => {
|
||||
let key = this.key.take().expect("polled after complete");
|
||||
let cancel = this.cancel.take().expect("polled after complete");
|
||||
|
@ -2,8 +2,9 @@
|
||||
#[path = "../support.rs"]
|
||||
mod support;
|
||||
|
||||
use std::pin::Pin;
|
||||
use tokio_test::{assert_pending, assert_ready, task};
|
||||
use tower::ready_cache::ReadyCache;
|
||||
use tower::ready_cache::{error, ReadyCache};
|
||||
use tower_test::mock;
|
||||
|
||||
type Req = &'static str;
|
||||
@ -191,3 +192,32 @@ fn duplicate_key_by_index() {
|
||||
// _and_ service 0 should now be callable
|
||||
assert!(task.enter(|cx, _| cache.check_ready(cx, &0)).unwrap());
|
||||
}
|
||||
|
||||
// Tests https://github.com/tower-rs/tower/issues/415
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn cancelation_observed() {
|
||||
let mut cache = ReadyCache::default();
|
||||
let mut handles = vec![];
|
||||
|
||||
// NOTE This test passes at 129 items, but fails at 130 items (if coop
|
||||
// schedulding interferes with cancelation).
|
||||
for _ in 0..130 {
|
||||
let (svc, mut handle) = tower_test::mock::pair::<(), ()>();
|
||||
handle.allow(1);
|
||||
cache.push("ep0", svc);
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
struct Ready(ReadyCache<&'static str, tower_test::mock::Mock<(), ()>, ()>);
|
||||
impl Unpin for Ready {}
|
||||
impl std::future::Future for Ready {
|
||||
type Output = Result<(), error::Failed<&'static str>>;
|
||||
fn poll(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
self.get_mut().0.poll_pending(cx)
|
||||
}
|
||||
}
|
||||
Ready(cache).await.unwrap();
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user