diff --git a/tower-util/src/ext/mod.rs b/tower-util/src/ext/mod.rs index 340be1e9..0c17948b 100644 --- a/tower-util/src/ext/mod.rs +++ b/tower-util/src/ext/mod.rs @@ -8,6 +8,7 @@ mod apply; mod from_err; mod map; mod map_err; +mod oneshot; mod ready; mod then; @@ -16,6 +17,7 @@ pub use self::apply::Apply; pub use self::from_err::FromErr; pub use self::map::Map; pub use self::map_err::MapErr; +pub use self::oneshot::Oneshot; pub use self::ready::Ready; pub use self::then::Then; @@ -116,4 +118,12 @@ pub trait ServiceExt: Service { { MapErr::new(self, f) } + + /// Consume this `Service`, calling with the providing request once it is ready. + fn oneshot(self, req: Request) -> Oneshot + where + Self: Sized, + { + Oneshot::new(self, req) + } } diff --git a/tower-util/src/ext/oneshot.rs b/tower-util/src/ext/oneshot.rs new file mode 100644 index 00000000..fb672c6f --- /dev/null +++ b/tower-util/src/ext/oneshot.rs @@ -0,0 +1,62 @@ +use std::mem; + +use futures::{Async, Future, Poll}; +use tower_service::Service; + +/// A `Future` consuming a `Service` and request, waiting until the `Service` +/// is ready, and then calling `Service::call` with the request, and +/// waiting for that `Future`. +pub struct Oneshot, Req> { + state: State, +} + +enum State, Req> { + NotReady(S, Req), + Called(S::Future), + Tmp, +} + +impl Oneshot +where + S: Service, +{ + pub(super) fn new(svc: S, req: Req) -> Self { + Oneshot { + state: State::NotReady(svc, req), + } + } +} + +impl Future for Oneshot +where + S: Service, +{ + type Item = S::Response; + type Error = S::Error; + + fn poll(&mut self) -> Poll { + loop { + match mem::replace(&mut self.state, State::Tmp) { + State::NotReady(mut svc, req) => match svc.poll_ready()? { + Async::Ready(()) => { + self.state = State::Called(svc.call(req)); + } + Async::NotReady => { + self.state = State::NotReady(svc, req); + return Ok(Async::NotReady); + } + }, + State::Called(mut fut) => match fut.poll()? { + Async::Ready(res) => { + return Ok(Async::Ready(res)); + } + Async::NotReady => { + self.state = State::Called(fut); + return Ok(Async::NotReady); + } + }, + State::Tmp => panic!("polled after complete"), + } + } + } +}