util: Fix call_all hang when stream is pending (#656)

Currently `call_all` will hang in a busy loop if called when the input
stream is pending.
This commit is contained in:
Leonardo Yvens 2022-03-29 20:25:51 +01:00 committed by Eliza Weisman
parent 7d81577d17
commit d989a82731
2 changed files with 33 additions and 14 deletions

View File

@ -99,20 +99,18 @@ where
.expect("Using CallAll after extracing inner Service");
ready!(svc.poll_ready(cx)).map_err(Into::into)?;
// If it is, gather the next request (if there is one)
match this.stream.as_mut().poll_next(cx) {
Poll::Ready(r) => match r {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
},
Poll::Pending => {
// TODO: We probably want to "release" the slot we reserved in Svc here.
// It may be a while until we get around to actually using it.
// If it is, gather the next request (if there is one), or return `Pending` if the
// stream is not ready.
// TODO: We probably want to "release" the slot we reserved in Svc if the
// stream returns `Pending`. It may be a while until we get around to actually
// using it.
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(req) => {
this.queue.push(svc.call(req));
}
None => {
// We're all done once any outstanding requests have completed
*this.eof = true;
}
}
}

View File

@ -143,3 +143,24 @@ async fn unordered() {
.unwrap();
assert!(v.is_none());
}
#[tokio::test]
async fn pending() {
let _t = support::trace_init();
let (mock, mut handle) = mock::pair::<_, &'static str>();
let mut task = task::spawn(());
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let ca = mock.call_all(support::IntoStream::new(rx));
pin_mut!(ca);
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
tx.send("req").unwrap();
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
assert_request_eq!(handle, "req").send_response("res");
let res = assert_ready!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
assert_eq!(res.transpose().unwrap(), Some("res"));
assert_pending!(task.enter(|cx, _| ca.as_mut().poll_next(cx)));
}