mirror of
https://github.com/tower-rs/tower.git
synced 2026-03-03 20:36:28 +00:00
In the past, any errors thrown by a `Service` wrapped in a `tower_buffer::Buffer` were silently swallowed, and the handles were simply informed that the connection to the `Service` was closed. This patch captures errors from a wrapped `Service`, and communicates that error to all pending and future requests. It does so by wrapping up the error in an `Arc`, which is sent to all pending `oneshot` request channels, and is stored in a shared location so that future requests will see the error when their send to the `Worker` fail. Note that this patch also removes the `open` field from `State`, as it is no longer necessary following #120, since bounded channels have a `try_ready` method we can rely on instead. Note that this change is not entirely backwards compatible -- the error type for a `Service` that is wrapped in `Buffer` must now be `Send + Sync` so that it can safely be communicated back to callers. Furthermore, `tower_buffer::Error::Closed` now contains the error that the failed `Service` produced, which may trip up old code.
135 lines
3.4 KiB
Rust
135 lines
3.4 KiB
Rust
extern crate futures;
|
|
extern crate tower_buffer;
|
|
extern crate tower_mock;
|
|
extern crate tower_service;
|
|
|
|
use futures::prelude::*;
|
|
use tower_buffer::*;
|
|
use tower_service::*;
|
|
|
|
use std::thread;
|
|
|
|
#[test]
|
|
fn req_and_res() {
|
|
let (mut service, mut handle) = new_service();
|
|
|
|
let response = service.call("hello");
|
|
|
|
let request = handle.next_request().unwrap();
|
|
assert_eq!(*request, "hello");
|
|
request.respond("world");
|
|
|
|
assert_eq!(response.wait().unwrap(), "world");
|
|
}
|
|
|
|
#[test]
|
|
fn clears_canceled_requests() {
|
|
let (mut service, mut handle) = new_service();
|
|
|
|
handle.allow(1);
|
|
|
|
let res1 = service.call("hello");
|
|
|
|
let req1 = handle.next_request().unwrap();
|
|
assert_eq!(*req1, "hello");
|
|
|
|
// don't respond yet, new requests will get buffered
|
|
|
|
let res2 = service.call("hello2");
|
|
with_task(|| {
|
|
assert!(handle.poll_request().unwrap().is_not_ready());
|
|
});
|
|
|
|
let res3 = service.call("hello3");
|
|
|
|
drop(res2);
|
|
|
|
req1.respond("world");
|
|
assert_eq!(res1.wait().unwrap(), "world");
|
|
|
|
// res2 was dropped, so it should have been canceled in the buffer
|
|
handle.allow(1);
|
|
|
|
let req3 = handle.next_request().unwrap();
|
|
assert_eq!(*req3, "hello3");
|
|
req3.respond("world3");
|
|
assert_eq!(res3.wait().unwrap(), "world3");
|
|
}
|
|
|
|
#[test]
|
|
fn when_inner_is_not_ready() {
|
|
let (mut service, mut handle) = new_service();
|
|
|
|
// Make the service NotReady
|
|
handle.allow(0);
|
|
|
|
let mut res1 = service.call("hello");
|
|
|
|
// Allow the Buffer's executor to do work
|
|
::std::thread::sleep(::std::time::Duration::from_millis(100));
|
|
with_task(|| {
|
|
assert!(res1.poll().expect("res1.poll").is_not_ready());
|
|
assert!(handle.poll_request().expect("poll_request").is_not_ready());
|
|
});
|
|
|
|
handle.allow(1);
|
|
|
|
let req1 = handle.next_request().expect("next_request1");
|
|
assert_eq!(*req1, "hello");
|
|
req1.respond("world");
|
|
|
|
assert_eq!(res1.wait().expect("res1.wait"), "world");
|
|
}
|
|
|
|
#[test]
|
|
fn when_inner_fails() {
|
|
let (mut service, mut handle) = new_service();
|
|
|
|
// Make the service NotReady
|
|
handle.allow(0);
|
|
handle.error("foobar");
|
|
|
|
let mut res1 = service.call("hello");
|
|
|
|
// Allow the Buffer's executor to do work
|
|
::std::thread::sleep(::std::time::Duration::from_millis(100));
|
|
with_task(|| {
|
|
let e = res1.poll().unwrap_err();
|
|
if let Error::Closed(e) = e {
|
|
assert!(format!("{:?}", e).contains("poll_ready"));
|
|
assert_eq!(e.error(), &tower_mock::Error::Other("foobar"));
|
|
} else {
|
|
panic!("unexpected error type: {:?}", e);
|
|
}
|
|
});
|
|
}
|
|
|
|
type Mock = tower_mock::Mock<&'static str, &'static str, &'static str>;
|
|
type Handle = tower_mock::Handle<&'static str, &'static str, &'static str>;
|
|
|
|
struct Exec;
|
|
|
|
impl<F> futures::future::Executor<F> for Exec
|
|
where
|
|
F: Future<Item = (), Error = ()> + Send + 'static,
|
|
{
|
|
fn execute(&self, fut: F) -> Result<(), futures::future::ExecuteError<F>> {
|
|
thread::spawn(move || {
|
|
fut.wait().unwrap();
|
|
});
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn new_service() -> (Buffer<Mock, &'static str>, Handle) {
|
|
let (service, handle) = Mock::new();
|
|
// bound is >0 here because clears_canceled_requests needs multiple outstanding requests
|
|
let service = Buffer::with_executor(service, 10, &Exec).unwrap();
|
|
(service, handle)
|
|
}
|
|
|
|
fn with_task<F: FnOnce() -> U, U>(f: F) -> U {
|
|
use futures::future::{Future, lazy};
|
|
lazy(|| Ok::<_, ()>(f())).wait().unwrap()
|
|
}
|