mirror of
https://github.com/tower-rs/tower.git
synced 2025-09-29 05:50:56 +00:00
util: fix oneshot dropping pending services immediately (#447)
## Motivation Commit #330 introduced a regression when porting `tower-util::Oneshot` from `futures` 0.1 to `std::future`. The *intended* behavior is that a oneshot future should repeatedly call `poll_ready` on the oneshotted service until it is ready, and then call the service and drive the returned future. However, #330 inadvertently changed the oneshot future to poll the service _once_, call it if it is ready, and then drop it, regardless of its readiness. In the #330 version of oneshot, an `Option` is used to store the request while waiting for the service to become ready, so that it can be `take`n and moved into the service's `call`. However, the `Option` contains both the request _and_ the service itself, and is taken the first time the service is polled. `futures::ready!` is then used when polling the service, so the method returns immediate if it is not ready. This means that the service itself (and the request), which were taken out of the `Option`, will be dropped, and if the oneshot future is polled again, it will panic. ## Solution This commit changes the `Oneshot` future so that only the request lives in the `Option`, and it is only taken when the service is called, rather than every time it is polled. This fixes the bug. I've also added a test for this which fails against master, but passes after this change. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
This commit is contained in:
parent
82e578b5b0
commit
8752a38117
@ -20,7 +20,7 @@ pub struct Oneshot<S: Service<Req>, Req> {
|
||||
|
||||
#[pin_project]
|
||||
enum State<S: Service<Req>, Req> {
|
||||
NotReady(Option<(S, Req)>),
|
||||
NotReady(S, Option<Req>),
|
||||
Called(#[pin] S::Future),
|
||||
Done,
|
||||
}
|
||||
@ -32,12 +32,12 @@ where
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
State::NotReady(Some((s, req))) => f
|
||||
State::NotReady(s, Some(req)) => f
|
||||
.debug_tuple("State::NotReady")
|
||||
.field(s)
|
||||
.field(req)
|
||||
.finish(),
|
||||
State::NotReady(None) => unreachable!(),
|
||||
State::NotReady(_, None) => unreachable!(),
|
||||
State::Called(_) => f.debug_tuple("State::Called").field(&"S::Future").finish(),
|
||||
State::Done => f.debug_tuple("State::Done").finish(),
|
||||
}
|
||||
@ -51,7 +51,7 @@ where
|
||||
#[allow(missing_docs)]
|
||||
pub fn new(svc: S, req: Req) -> Self {
|
||||
Oneshot {
|
||||
state: State::NotReady(Some((svc, req))),
|
||||
state: State::NotReady(svc, Some(req)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -68,10 +68,10 @@ where
|
||||
loop {
|
||||
#[project]
|
||||
match this.state.as_mut().project() {
|
||||
State::NotReady(nr) => {
|
||||
let (mut svc, req) = nr.take().expect("We immediately transition to ::Called");
|
||||
State::NotReady(svc, req) => {
|
||||
let _ = ready!(svc.poll_ready(cx))?;
|
||||
this.state.set(State::Called(svc.call(req)));
|
||||
let f = svc.call(req.take().expect("already called"));
|
||||
this.state.set(State::Called(f));
|
||||
}
|
||||
State::Called(fut) => {
|
||||
let res = ready!(fut.poll(cx))?;
|
||||
|
@ -1,4 +1,5 @@
|
||||
#![cfg(feature = "util")]
|
||||
|
||||
mod call_all;
|
||||
mod oneshot;
|
||||
mod service_fn;
|
||||
|
39
tower/tests/util/oneshot.rs
Normal file
39
tower/tests/util/oneshot.rs
Normal file
@ -0,0 +1,39 @@
|
||||
use std::task::{Context, Poll};
|
||||
use std::{future::Future, pin::Pin};
|
||||
use tower::util::ServiceExt;
|
||||
use tower_service::Service;
|
||||
|
||||
#[tokio::test]
|
||||
async fn service_driven_to_readiness() {
|
||||
// This test ensures that `oneshot` will repeatedly call `poll_ready` until
|
||||
// the service is ready.
|
||||
|
||||
struct PollMeTwice {
|
||||
ready: bool,
|
||||
};
|
||||
impl Service<()> for PollMeTwice {
|
||||
type Error = ();
|
||||
type Response = ();
|
||||
type Future = Pin<
|
||||
Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + Sync + 'static>,
|
||||
>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ()>> {
|
||||
if self.ready {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
self.ready = true;
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&mut self, _: ()) -> Self::Future {
|
||||
assert!(self.ready, "service not driven to readiness!");
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
}
|
||||
|
||||
let svc = PollMeTwice { ready: false };
|
||||
svc.oneshot(()).await;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user