diff --git a/Cargo.toml b/Cargo.toml index 058234fb..de46001b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "tower-balance", "tower-buffer", + "tower-direct-service", "tower-discover", "tower-filter", "tower-in-flight-limit", diff --git a/tower-balance/Cargo.toml b/tower-balance/Cargo.toml index 7d43eb39..48612b55 100644 --- a/tower-balance/Cargo.toml +++ b/tower-balance/Cargo.toml @@ -10,6 +10,7 @@ log = "0.4.1" rand = "0.5" tokio-timer = "0.2.4" tower-service = { version = "0.1", path = "../tower-service" } +tower-direct-service = { version = "0.1", path = "../tower-direct-service" } tower-discover = { version = "0.1", path = "../tower-discover" } indexmap = "1" diff --git a/tower-balance/examples/demo.rs b/tower-balance/examples/demo.rs index 3705adfc..00c4604b 100644 --- a/tower-balance/examples/demo.rs +++ b/tower-balance/examples/demo.rs @@ -232,11 +232,7 @@ impl Discover for Disco { } } -type DemoService = - InFlightLimit< - Buffer< - lb::Balance, - Req>>; +type DemoService = InFlightLimit, Req>>; struct SendRequests where diff --git a/tower-balance/src/lib.rs b/tower-balance/src/lib.rs index 53eea2c5..01ff288c 100644 --- a/tower-balance/src/lib.rs +++ b/tower-balance/src/lib.rs @@ -1,5 +1,3 @@ -#![deny(dead_code)] - #[macro_use] extern crate futures; #[macro_use] @@ -11,14 +9,16 @@ extern crate rand; extern crate tokio_timer; extern crate tower_discover; extern crate tower_service; +extern crate tower_direct_service; use futures::{Async, Future, Poll}; use indexmap::IndexMap; -use rand::{SeedableRng, rngs::SmallRng}; +use rand::{rngs::SmallRng, SeedableRng}; use std::{fmt, error}; use std::marker::PhantomData; use tower_discover::Discover; use tower_service::Service; +use tower_direct_service::DirectService; pub mod choose; pub mod load; @@ -156,11 +156,7 @@ where /// Polls `discover` for updates, adding new items to `not_ready`. /// /// Removals may alter the order of either `ready` or `not_ready`. - fn update_from_discover(&mut self) - -> Result<(), Error<>::Error, D::Error>> - where - D::Service: Service - { + fn update_from_discover(&mut self) -> Result<(), Error> { debug!("updating from discover"); use tower_discover::Change::*; @@ -182,6 +178,7 @@ where }; // XXX is it safe to just drop the Service? Or do we need some sort of // graceful teardown? + // TODO: poll_close } } } @@ -193,10 +190,9 @@ where /// /// When `poll_ready` returns ready, the service is removed from `not_ready` and inserted /// into `ready`, potentially altering the order of `ready` and/or `not_ready`. - fn promote_to_ready(&mut self) - -> Result<(), Error<>::Error, D::Error>> + fn promote_to_ready(&mut self, mut poll_ready: F) -> Result<(), Error> where - D::Service: Service, + F: FnMut(&mut D::Service) -> Poll<(), E>, { let n = self.not_ready.len(); if n == 0 { @@ -212,7 +208,7 @@ where let (_, svc) = self.not_ready .get_index_mut(idx) .expect("invalid not_ready index");; - svc.poll_ready().map_err(Error::Inner)?.is_ready() + poll_ready(svc).map_err(Error::Inner)?.is_ready() }; trace!("not_ready[{:?}]: is_ready={:?};", idx, is_ready); if is_ready { @@ -235,15 +231,18 @@ where /// /// If the service exists in `ready` and does not poll as ready, it is moved to /// `not_ready`, potentially altering the order of `ready` and/or `not_ready`. - fn poll_ready_index(&mut self, idx: usize) - -> Option>::Error, D::Error>>> + fn poll_ready_index( + &mut self, + idx: usize, + mut poll_ready: F, + ) -> Option>> where - D::Service: Service, + F: FnMut(&mut D::Service) -> Poll<(), E>, { match self.ready.get_index_mut(idx) { None => return None, Some((_, svc)) => { - match svc.poll_ready() { + match poll_ready(svc) { Ok(Async::Ready(())) => return Some(Ok(Async::Ready(()))), Err(e) => return Some(Err(Error::Inner(e))), Ok(Async::NotReady) => {} @@ -259,10 +258,9 @@ where /// Chooses the next service to which a request will be dispatched. /// /// Ensures that . - fn choose_and_poll_ready(&mut self) - -> Poll<(), Error<>::Error, D::Error>> + fn choose_and_poll_ready(&mut self, mut poll_ready: F) -> Poll<(), Error> where - D::Service: Service, + F: FnMut(&mut D::Service) -> Poll<(), E>, { loop { let n = self.ready.len(); @@ -277,12 +275,52 @@ where }; // XXX Should we handle per-endpoint errors? - if self.poll_ready_index(idx).expect("invalid ready index")?.is_ready() { + if self + .poll_ready_index(idx, &mut poll_ready) + .expect("invalid ready index")? + .is_ready() + { self.chosen_ready_index = Some(idx); return Ok(Async::Ready(())); } } } + + fn poll_ready_inner(&mut self, mut poll_ready: F) -> Poll<(), Error> + where + F: FnMut(&mut D::Service) -> Poll<(), E>, + { + // Clear before `ready` is altered. + self.chosen_ready_index = None; + + // Before `ready` is altered, check the readiness of the last-used service, moving it + // to `not_ready` if appropriate. + if let Some(idx) = self.dispatched_ready_index.take() { + // XXX Should we handle per-endpoint errors? + self.poll_ready_index(idx, &mut poll_ready) + .expect("invalid dispatched ready key")?; + } + + // Update `not_ready` and `ready`. + self.update_from_discover()?; + self.promote_to_ready(&mut poll_ready)?; + + // Choose the next service to be used by `call`. + self.choose_and_poll_ready(&mut poll_ready) + } + + fn call(&mut self, call: F, request: Request) -> ResponseFuture + where + F: FnOnce(&mut D::Service, Request) -> FF, + FF: Future, + { + let idx = self.chosen_ready_index.take().expect("not ready"); + let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index"); + self.dispatched_ready_index = Some(idx); + + let rsp = call(svc, request); + ResponseFuture(rsp, PhantomData) + } } impl Service for Balance @@ -300,31 +338,84 @@ where /// When `Async::Ready` is returned, `chosen_ready_index` is set with a valid index /// into `ready` referring to a `Service` that is ready to disptach a request. fn poll_ready(&mut self) -> Poll<(), Self::Error> { - // Clear before `ready` is altered. - self.chosen_ready_index = None; - - // Before `ready` is altered, check the readiness of the last-used service, moving it - // to `not_ready` if appropriate. - if let Some(idx) = self.dispatched_ready_index.take() { - // XXX Should we handle per-endpoint errors? - self.poll_ready_index(idx).expect("invalid dispatched ready key")?; - } - - // Update `not_ready` and `ready`. - self.update_from_discover()?; - self.promote_to_ready()?; - - // Choose the next service to be used by `call`. - self.choose_and_poll_ready() + self.poll_ready_inner(D::Service::poll_ready) } fn call(&mut self, request: Request) -> Self::Future { - let idx = self.chosen_ready_index.take().expect("not ready"); - let (_, svc) = self.ready.get_index_mut(idx).expect("invalid chosen ready index"); - self.dispatched_ready_index = Some(idx); + self.call(D::Service::call, request) + } +} - let rsp = svc.call(request); - ResponseFuture(rsp, PhantomData) +impl DirectService for Balance +where + D: Discover, + D::Service: DirectService, + C: Choose, +{ + type Response = >::Response; + type Error = Error<>::Error, D::Error>; + type Future = ResponseFuture<>::Future, D::Error>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.poll_ready_inner(D::Service::poll_ready) + } + + fn call(&mut self, request: Request) -> Self::Future { + self.call(D::Service::call, request) + } + + fn poll_service(&mut self) -> Poll<(), Self::Error> { + let mut any_not_ready = false; + + // TODO: don't re-poll services that return Ready until call is invoked on them + + for (_, svc) in &mut self.ready { + if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? { + any_not_ready = true; + } + } + + for (_, svc) in &mut self.not_ready { + if let Async::NotReady = svc.poll_service().map_err(Error::Inner)? { + any_not_ready = true; + } + } + + if any_not_ready { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(())) + } + } + + fn poll_close(&mut self) -> Poll<(), Self::Error> { + let mut err = None; + self.ready.retain(|_, svc| match svc.poll_close() { + Ok(Async::Ready(())) => return false, + Ok(Async::NotReady) => return true, + Err(e) => { + err = Some(e); + return false; + } + }); + self.not_ready.retain(|_, svc| match svc.poll_close() { + Ok(Async::Ready(())) => return false, + Ok(Async::NotReady) => return true, + Err(e) => { + err = Some(e); + return false; + } + }); + + if let Some(e) = err { + return Err(Error::Inner(e)); + } + + if self.ready.is_empty() && self.not_ready.is_empty() { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } } } diff --git a/tower-buffer/Cargo.toml b/tower-buffer/Cargo.toml index aa6ced75..558ca0da 100644 --- a/tower-buffer/Cargo.toml +++ b/tower-buffer/Cargo.toml @@ -7,6 +7,7 @@ publish = false [dependencies] futures = "0.1" tower-service = { version = "0.1", path = "../tower-service" } +tower-direct-service = { version = "0.1", path = "../tower-direct-service" } [dev-dependencies] tower-mock = { version = "0.1", path = "../tower-mock" } diff --git a/tower-buffer/src/lib.rs b/tower-buffer/src/lib.rs index 5e4cdb2c..20ee90cc 100644 --- a/tower-buffer/src/lib.rs +++ b/tower-buffer/src/lib.rs @@ -12,28 +12,34 @@ #[macro_use] extern crate futures; extern crate tower_service; +extern crate tower_direct_service; -use futures::{Future, Stream, Poll, Async}; use futures::future::Executor; use futures::sync::oneshot; use futures::sync::mpsc::{self, UnboundedSender, UnboundedReceiver}; -use tower_service::Service; - -use std::{error, fmt}; -use std::sync::Arc; +use futures::{Async, Future, Poll, Stream}; +use std::marker::PhantomData; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::{error, fmt}; +use tower_service::Service; +use tower_direct_service::DirectService; /// Adds a buffer in front of an inner service. /// /// See crate level documentation for more details. pub struct Buffer -where T: Service, +where + T: Service, { tx: UnboundedSender>, state: Arc, } +/// A [`Buffer`] that is backed by a `DirectService`. +pub type DirectBuffer = Buffer, Request>; + /// Future eventually completed with the response to the original request. pub struct ResponseFuture { state: ResponseState, @@ -42,18 +48,81 @@ pub struct ResponseFuture { /// Errors produced by `Buffer`. #[derive(Debug)] pub enum Error { + /// The `Service` call errored. Inner(T), + /// The underlying `Service` failed. Closed, } +/// An adapter that exposes the associated types of a `DirectService` through `Service`. +/// This type does *not* let you pretend that a `DirectService` is a `Service`; that would be +/// incorrect, as the caller would then not call `poll_service` and `poll_close` as necessary on +/// the underlying `DirectService`. Instead, it merely provides a type-level adapter which allows +/// types that are generic over `T: Service`, but only need access to associated types of `T`, to +/// also take a `DirectService` ([`Buffer`] is an example of such a type). +pub struct DirectServiceRef { + _marker: PhantomData, +} + +impl Service for DirectServiceRef +where + T: DirectService, +{ + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + unreachable!("tried to poll a DirectService through a marker reference") + } + + fn call(&mut self, _: Request) -> Self::Future { + unreachable!("tried to call a DirectService through a marker reference") + } +} + +/// A wrapper that exposes a `Service` (which does not need to be driven) as a `DirectService` so +/// that a construct that is *able* to take a `DirectService` can also take instances of +/// `Service`. +pub struct DirectedService(T); + +impl DirectService for DirectedService +where + T: Service, +{ + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready() + } + + fn poll_service(&mut self) -> Poll<(), Self::Error> { + // TODO: is this the right thing to do? + Ok(Async::Ready(())) + } + + fn poll_close(&mut self) -> Poll<(), Self::Error> { + // TODO: is this the right thing to do? + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.0.call(req) + } +} + /// Task that handles processing the buffer. This type should not be used /// directly, instead `Buffer` requires an `Executor` that can accept this task. pub struct Worker -where T: Service, +where + T: DirectService, { current_message: Option>, rx: UnboundedReceiver>, service: T, + finish: bool, state: Arc, } @@ -90,6 +159,44 @@ where /// draining the buffer and dispatching the requests to the internal /// service. pub fn new(service: T, executor: &E) -> Result> + where + E: Executor, Request>>, + { + let (tx, rx) = mpsc::unbounded(); + + let state = Arc::new(State { + open: AtomicBool::new(true), + }); + + let worker = Worker { + current_message: None, + rx, + service: DirectedService(service), + finish: false, + state: state.clone(), + }; + + // TODO: handle error + executor.execute(worker) + .ok().unwrap(); + + Ok(Buffer { + tx, + state: state, + }) + } +} + +impl Buffer, Request> +where + T: DirectService, +{ + /// Creates a new `Buffer` wrapping the given directly driven `service`. + /// + /// `executor` is used to spawn a new `Worker` task that is dedicated to + /// draining the buffer and dispatching the requests to the internal + /// service. + pub fn new_direct(service: T, executor: &E) -> Result> where E: Executor>, { @@ -103,17 +210,14 @@ where current_message: None, rx, service, + finish: false, state: state.clone(), }; // TODO: handle error - executor.execute(worker) - .ok().unwrap(); + executor.execute(worker).ok().unwrap(); - Ok(Buffer { - tx, - state: state, - }) + Ok(Buffer { tx, state: state }) } } @@ -201,10 +305,15 @@ where impl Worker where - T: Service, + T: DirectService, { /// Return the next queued Message that hasn't been canceled. fn poll_next_msg(&mut self) -> Poll>, ()> { + if self.finish { + // We've already received None and are shutting down + return Ok(Async::Ready(None)); + } + if let Some(mut msg) = self.current_message.take() { // poll_cancel returns Async::Ready is the receiver is dropped. // Returning NotReady means it is still alive, so we should still @@ -228,35 +337,69 @@ where impl Future for Worker where - T: Service, + T: DirectService, { type Item = (); type Error = (); fn poll(&mut self) -> Poll<(), ()> { - while let Some(msg) = try_ready!(self.poll_next_msg()) { - // Wait for the service to be ready - match self.service.poll_ready() { - Ok(Async::Ready(())) => { - let response = self.service.call(msg.request); + let mut any_outstanding = true; + loop { + match self.poll_next_msg()? { + Async::Ready(Some(msg)) => { + // Wait for the service to be ready + match self.service.poll_ready() { + Ok(Async::Ready(())) => { + let response = self.service.call(msg.request); - // Send the response future back to the sender. - // - // An error means the request had been canceled in-between - // our calls, the response future will just be dropped. - let _ = msg.tx.send(response); + // Send the response future back to the sender. + // + // An error means the request had been canceled in-between + // our calls, the response future will just be dropped. + let _ = msg.tx.send(response); + + // Try to queue another request before we poll outstanding requests. + any_outstanding = true; + continue; + } + Ok(Async::NotReady) => { + // Put out current message back in its slot. + self.current_message = Some(msg); + // We don't want to return quite yet + // We want to also make progress on current requests + break; + } + Err(_) => { + self.state.open.store(false, Ordering::Release); + return Ok(().into()); + } + } } - Ok(Async::NotReady) => { - // Put out current message back in its slot. - self.current_message = Some(msg); + Async::Ready(None) => { + // No more more requests _ever_. + self.finish = true; + } + Async::NotReady if any_outstanding => { + // Make some progress on the service if we can. + } + Async::NotReady => { + // There are no outstanding requests to make progress on. + // And we don't have any new requests to enqueue. + // So we yield. return Ok(Async::NotReady); } - Err(_) => { - self.state.open.store(false, Ordering::Release); - return Ok(().into()) - } } + if self.finish { + try_ready!(self.service.poll_close().map_err(|_| ())); + // We are all done! + break; + } else { + if let Async::Ready(()) = self.service.poll_service().map_err(|_| ())? { + // Note to future iterations that there's no reason to call poll_service. + any_outstanding = false; + } + } } // All senders are dropped... the task is no longer needed diff --git a/tower-buffer/tests/buffer.rs b/tower-buffer/tests/buffer.rs index 5b6d4264..5a7f33f7 100644 --- a/tower-buffer/tests/buffer.rs +++ b/tower-buffer/tests/buffer.rs @@ -58,12 +58,13 @@ fn clears_canceled_requests() { type Mock = tower_mock::Mock<&'static str, &'static str, ()>; type Handle = tower_mock::Handle<&'static str, &'static str, ()>; +type DirectedMock = tower_buffer::DirectedService; struct Exec; -impl futures::future::Executor> for Exec { - fn execute(&self, fut: Worker) - -> Result<(), futures::future::ExecuteError>> +impl futures::future::Executor> for Exec { + fn execute(&self, fut: Worker) + -> Result<(), futures::future::ExecuteError>> { thread::spawn(move || { fut.wait().unwrap(); diff --git a/tower-direct-service/Cargo.toml b/tower-direct-service/Cargo.toml new file mode 100644 index 00000000..459b0a9e --- /dev/null +++ b/tower-direct-service/Cargo.toml @@ -0,0 +1,22 @@ +[package] + +name = "tower-direct-service" +# When releasing to crates.io: +# - Update html_root_url. +# - Update CHANGELOG.md. +# - Update documentation URL +# - Create "v0.1.x" git tag. +version = "0.1.0" +authors = ["Carl Lerche ", "Jon Gjengset "] +license = "MIT" +readme = "README.md" +repository = "https://github.com/tower-rs/tower" +homepage = "https://github.com/tower-rs/tower" +documentation = "https://docs.rs/tokio-direct-service/0.1.0" +description = """ +Trait representing an asynchronous, request / response based, service that must be driven. +""" +categories = ["asynchronous", "network-programming"] + +[dependencies] +futures = "0.1.23" diff --git a/tower-direct-service/LICENSE b/tower-direct-service/LICENSE new file mode 100644 index 00000000..58fb29a1 --- /dev/null +++ b/tower-direct-service/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2018 Carl Lerche + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/tower-direct-service/README.md b/tower-direct-service/README.md new file mode 100644 index 00000000..7cb0cd4b --- /dev/null +++ b/tower-direct-service/README.md @@ -0,0 +1,36 @@ +# Tower Service + +The foundational `Service` trait that Tower is based on. + +## Overview + +The [`Service`] trait provides the foundation upon which Tower is built. It is a +simple, but powerful trait. At its heart, `Service` is just an asynchronous +function of request to response. + +``` +fn(Request) -> Future +``` + +Implementations of `Service` take a request, the type of which varies per +protocol, and returns a future representing the eventual completion or failure +of the response. + +Services are used to represent both clients and servers. An *instance* of +`Service` is used through a client; a server *implements* `Service`. + +By using standardizing the interface, middleware can be created. Middleware +*implement* `Service` by passing the request to another `Service`. The +middleware may take actions such as modify the request. + +[`Service`]: https://docs.rs/tower-service/0.1/tower_service/trait.Service.html + +## License + +This project is licensed under the [MIT license](LICENSE). + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in Tower by you, shall be licensed as MIT, without any additional +terms or conditions. diff --git a/tower-direct-service/src/lib.rs b/tower-direct-service/src/lib.rs new file mode 100644 index 00000000..c379af74 --- /dev/null +++ b/tower-direct-service/src/lib.rs @@ -0,0 +1,126 @@ +#![deny(missing_docs)] +#![doc(html_root_url = "https://docs.rs/tower/0.1.0")] + +//! Definition of the `DirectService` trait to Tower +//! +//! This trait provides the necessary abstractions for defining a request / +//! response service that needs to be driven in order to make progress. It +//! is akin to the `Service` trait in `tower-service`, with the additional +//! requirement that `poll_service` must also be called to make progress on +//! pending requests. The trait is simple but powerul, and is used alongside +//! `Service` as the foundation for the rest of Tower. +//! +//! * [`DirectService`](trait.DirectService.html) is the primary trait and +//! defines the request / response exchange. See that trait for more details. + +extern crate futures; + +use futures::{Future, Poll}; + +/// An asynchronous function from `Request` to a `Response` that requires polling. +/// +/// A service that implements this trait acts like a future, and needs to be +/// polled for the futures returned from `call` to make progress. In particular, +/// `poll_service` must be called in a similar manner as `Future::poll`; whenever +/// the task driving the `DirectService` is notified. +pub trait DirectService { + /// Responses given by the service. + type Response; + + /// Errors produced by the service. + type Error; + + /// The future response value. + type Future: Future; + + /// Returns `Ready` when the service is able to process requests. + /// + /// If the service is at capacity, then `NotReady` is returned and the task + /// is notified when the service becomes ready again. This function is + /// expected to be called while on a task. + /// + /// This is a **best effort** implementation. False positives are permitted. + /// It is permitted for the service to return `Ready` from a `poll_ready` + /// call and the next invocation of `call` results in an error. + /// + /// Implementors should call `poll_service` as necessary to finish in-flight + /// requests. + fn poll_ready(&mut self) -> Poll<(), Self::Error>; + + /// Returns `Ready` whenever there is no more work to be done until `call` + /// is invoked again. + /// + /// Note that this method may return `NotReady` even if there are no + /// outstanding requests, if the service has to perform non-request-driven + /// operations (e.g., heartbeats). + fn poll_service(&mut self) -> Poll<(), Self::Error>; + + /// A method to indicate that no more requests will be sent to this service. + /// + /// This method is used to indicate that a service will no longer be given + /// another request by the caller. That is, the `call` method will + /// be called no longer (nor `poll_service`). This method is intended to + /// model "graceful shutdown" in various protocols where the intent to shut + /// down is followed by a little more blocking work. + /// + /// Callers of this function should work it it in a similar fashion to + /// `poll_service`. Once called it may return `NotReady` which indicates + /// that more external work needs to happen to make progress. The current + /// task will be scheduled to receive a notification in such an event, + /// however. + /// + /// Note that this function will imply `poll_service`. That is, if a + /// service has pending request, then it'll be flushed out during a + /// `poll_close` operation. It is not necessary to have `poll_service` + /// return `Ready` before a `poll_close` is called. Once a `poll_close` + /// is called, though, `poll_service` cannot be called. + /// + /// # Return value + /// + /// This function, like `poll_service`, returns a `Poll`. The value is + /// `Ready` once the close operation has completed. At that point it should + /// be safe to drop the service and deallocate associated resources, and all + /// futures returned from `call` should have resolved. + /// + /// If the value returned is `NotReady` then the sink is not yet closed and + /// work needs to be done to close it. The work has been scheduled and the + /// current task will receive a notification when it's next ready to call + /// this method again. + /// + /// Finally, this function may also return an error. + /// + /// # Errors + /// + /// This function will return an `Err` if any operation along the way during + /// the close operation fails. An error typically is fatal for a service and is + /// unable to be recovered from, but in specific situations this may not + /// always be true. + /// + /// Note that it's also typically an error to call `call` or `poll_service` + /// after the `poll_close` function is called. This method will *initiate* + /// a close, and continuing to send values after that (or attempt to flush) + /// may result in strange behavior, panics, errors, etc. Once this method is + /// called, it must be the only method called on this `DirectService`. + /// + /// # Panics + /// + /// This method may panic or cause panics if: + /// + /// * It is called outside the context of a future's task + /// * It is called and then `call` or `poll_service` is called + fn poll_close(&mut self) -> Poll<(), Self::Error>; + + /// Process the request and return the response asynchronously. + /// + /// This function is expected to be callable off task. As such, + /// implementations should take care to not call any of the `poll_*` + /// methods. If the service is at capacity and the request is unable + /// to be handled, the returned `Future` should resolve to an error. + /// + /// Calling `call` without calling `poll_ready` is permitted. The + /// implementation must be resilient to this fact. + /// + /// Note that for the returned future to resolve, this `DirectService` + /// must be driven through calls to `poll_service` or `poll_close`. + fn call(&mut self, req: Request) -> Self::Future; +}