ext: add ServiceExt::oneshot to call the service when it is ready (#164)

This commit is contained in:
Sean McArthur 2019-02-22 16:15:42 -08:00 committed by GitHub
parent f42338934a
commit 0dc8281ef6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 0 deletions

View File

@ -8,6 +8,7 @@ mod apply;
mod from_err;
mod map;
mod map_err;
mod oneshot;
mod ready;
mod then;
@ -16,6 +17,7 @@ pub use self::apply::Apply;
pub use self::from_err::FromErr;
pub use self::map::Map;
pub use self::map_err::MapErr;
pub use self::oneshot::Oneshot;
pub use self::ready::Ready;
pub use self::then::Then;
@ -116,4 +118,12 @@ pub trait ServiceExt<Request>: Service<Request> {
{
MapErr::new(self, f)
}
/// Consume this `Service`, calling with the providing request once it is ready.
fn oneshot(self, req: Request) -> Oneshot<Self, Request>
where
Self: Sized,
{
Oneshot::new(self, req)
}
}

View File

@ -0,0 +1,62 @@
use std::mem;
use futures::{Async, Future, Poll};
use tower_service::Service;
/// A `Future` consuming a `Service` and request, waiting until the `Service`
/// is ready, and then calling `Service::call` with the request, and
/// waiting for that `Future`.
pub struct Oneshot<S: Service<Req>, Req> {
state: State<S, Req>,
}
enum State<S: Service<Req>, Req> {
NotReady(S, Req),
Called(S::Future),
Tmp,
}
impl<S, Req> Oneshot<S, Req>
where
S: Service<Req>,
{
pub(super) fn new(svc: S, req: Req) -> Self {
Oneshot {
state: State::NotReady(svc, req),
}
}
}
impl<S, Req> Future for Oneshot<S, Req>
where
S: Service<Req>,
{
type Item = S::Response;
type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.state, State::Tmp) {
State::NotReady(mut svc, req) => match svc.poll_ready()? {
Async::Ready(()) => {
self.state = State::Called(svc.call(req));
}
Async::NotReady => {
self.state = State::NotReady(svc, req);
return Ok(Async::NotReady);
}
},
State::Called(mut fut) => match fut.poll()? {
Async::Ready(res) => {
return Ok(Async::Ready(res));
}
Async::NotReady => {
self.state = State::Called(fut);
return Ok(Async::NotReady);
}
},
State::Tmp => panic!("polled after complete"),
}
}
}
}