ready_cache: just use pin_project for Pending (#667)

This gets rid of the `Unpin` impl with the weird comment on it.

Alternatively, we could just put a `S: Unpin` bound on `Pending`, but
this changes the public API to require that the service type is `Unpin`.
In practice, it will be, but we could also just avoid the trait bound.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
Eliza Weisman 2022-06-17 09:15:23 -07:00 committed by GitHub
parent 3c170aaf19
commit 45a13b128d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 20 deletions

View File

@ -56,7 +56,7 @@ limit = ["__common", "tokio/time", "tokio/sync", "tokio-util", "tracing"]
load = ["__common", "tokio/time", "tracing"]
load-shed = ["__common"]
make = ["futures-util", "pin-project-lite", "tokio/io-std"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing"]
ready-cache = ["futures-core", "futures-util", "indexmap", "tokio/sync", "tracing", "pin-project-lite"]
reconnect = ["make", "tokio/io-std", "tracing"]
retry = ["__common", "tokio/time"]
spawn-ready = ["__common", "futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]

View File

@ -85,14 +85,16 @@ enum PendingError<K, E> {
Inner(K, E),
}
/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
pin_project_lite::pin_project! {
/// A [`Future`] that becomes satisfied when an `S`-typed service is ready.
///
/// May fail due to cancelation, i.e. if the service is evicted from the balancer.
struct Pending<K, S, Req> {
key: Option<K>,
cancel: Option<CancelRx>,
ready: Option<S>,
_pd: std::marker::PhantomData<Req>,
}
}
// === ReadyCache ===
@ -400,24 +402,22 @@ where
// === Pending ===
// Safety: No use unsafe access therefore this is safe.
impl<K, S, Req> Unpin for Pending<K, S, Req> {}
impl<K, S, Req> Future for Pending<K, S, Req>
where
S: Service<Req>,
{
type Output = Result<(K, S, CancelRx), PendingError<K, S::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut fut = self.cancel.as_mut().expect("polled after complete");
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut fut = this.cancel.as_mut().expect("polled after complete");
if let Poll::Ready(r) = Pin::new(&mut fut).poll(cx) {
assert!(r.is_ok(), "cancel sender lost");
let key = self.key.take().expect("polled after complete");
let key = this.key.take().expect("polled after complete");
return Err(PendingError::Canceled(key)).into();
}
match self
match this
.ready
.as_mut()
.expect("polled after ready")
@ -425,12 +425,12 @@ where
{
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(())) => {
let key = self.key.take().expect("polled after complete");
let cancel = self.cancel.take().expect("polled after complete");
Ok((key, self.ready.take().expect("polled after ready"), cancel)).into()
let key = this.key.take().expect("polled after complete");
let cancel = this.cancel.take().expect("polled after complete");
Ok((key, this.ready.take().expect("polled after ready"), cancel)).into()
}
Poll::Ready(Err(e)) => {
let key = self.key.take().expect("polled after compete");
let key = this.key.take().expect("polled after compete");
Err(PendingError::Inner(key, e)).into()
}
}