From 8752a3811788e94670c62dc0acbc9613207931b1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 23 Apr 2020 16:07:48 -0700 Subject: [PATCH] util: fix oneshot dropping pending services immediately (#447) ## Motivation Commit #330 introduced a regression when porting `tower-util::Oneshot` from `futures` 0.1 to `std::future`. The *intended* behavior is that a oneshot future should repeatedly call `poll_ready` on the oneshotted service until it is ready, and then call the service and drive the returned future. However, #330 inadvertently changed the oneshot future to poll the service _once_, call it if it is ready, and then drop it, regardless of its readiness. In the #330 version of oneshot, an `Option` is used to store the request while waiting for the service to become ready, so that it can be `take`n and moved into the service's `call`. However, the `Option` contains both the request _and_ the service itself, and is taken the first time the service is polled. `futures::ready!` is then used when polling the service, so the method returns immediate if it is not ready. This means that the service itself (and the request), which were taken out of the `Option`, will be dropped, and if the oneshot future is polled again, it will panic. ## Solution This commit changes the `Oneshot` future so that only the request lives in the `Option`, and it is only taken when the service is called, rather than every time it is polled. This fixes the bug. I've also added a test for this which fails against master, but passes after this change. Signed-off-by: Eliza Weisman --- tower/src/util/oneshot.rs | 14 ++++++------- tower/tests/util/main.rs | 1 + tower/tests/util/oneshot.rs | 39 +++++++++++++++++++++++++++++++++++++ 3 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 tower/tests/util/oneshot.rs diff --git a/tower/src/util/oneshot.rs b/tower/src/util/oneshot.rs index 6f6e809b..b91e1573 100644 --- a/tower/src/util/oneshot.rs +++ b/tower/src/util/oneshot.rs @@ -20,7 +20,7 @@ pub struct Oneshot, Req> { #[pin_project] enum State, Req> { - NotReady(Option<(S, Req)>), + NotReady(S, Option), Called(#[pin] S::Future), Done, } @@ -32,12 +32,12 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - State::NotReady(Some((s, req))) => f + State::NotReady(s, Some(req)) => f .debug_tuple("State::NotReady") .field(s) .field(req) .finish(), - State::NotReady(None) => unreachable!(), + State::NotReady(_, None) => unreachable!(), State::Called(_) => f.debug_tuple("State::Called").field(&"S::Future").finish(), State::Done => f.debug_tuple("State::Done").finish(), } @@ -51,7 +51,7 @@ where #[allow(missing_docs)] pub fn new(svc: S, req: Req) -> Self { Oneshot { - state: State::NotReady(Some((svc, req))), + state: State::NotReady(svc, Some(req)), } } } @@ -68,10 +68,10 @@ where loop { #[project] match this.state.as_mut().project() { - State::NotReady(nr) => { - let (mut svc, req) = nr.take().expect("We immediately transition to ::Called"); + State::NotReady(svc, req) => { let _ = ready!(svc.poll_ready(cx))?; - this.state.set(State::Called(svc.call(req))); + let f = svc.call(req.take().expect("already called")); + this.state.set(State::Called(f)); } State::Called(fut) => { let res = ready!(fut.poll(cx))?; diff --git a/tower/tests/util/main.rs b/tower/tests/util/main.rs index 28a29320..1f7f773b 100644 --- a/tower/tests/util/main.rs +++ b/tower/tests/util/main.rs @@ -1,4 +1,5 @@ #![cfg(feature = "util")] mod call_all; +mod oneshot; mod service_fn; diff --git a/tower/tests/util/oneshot.rs b/tower/tests/util/oneshot.rs new file mode 100644 index 00000000..63ba0040 --- /dev/null +++ b/tower/tests/util/oneshot.rs @@ -0,0 +1,39 @@ +use std::task::{Context, Poll}; +use std::{future::Future, pin::Pin}; +use tower::util::ServiceExt; +use tower_service::Service; + +#[tokio::test] +async fn service_driven_to_readiness() { + // This test ensures that `oneshot` will repeatedly call `poll_ready` until + // the service is ready. + + struct PollMeTwice { + ready: bool, + }; + impl Service<()> for PollMeTwice { + type Error = (); + type Response = (); + type Future = Pin< + Box> + Send + Sync + 'static>, + >; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.ready { + Poll::Ready(Ok(())) + } else { + self.ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn call(&mut self, _: ()) -> Self::Future { + assert!(self.ready, "service not driven to readiness!"); + Box::pin(async { Ok(()) }) + } + } + + let svc = PollMeTwice { ready: false }; + svc.oneshot(()).await; +}