Extract handle_connection out of do_serve

This commit is contained in:
Jonas Platte 2025-04-26 21:37:14 +02:00
parent 4d4750e68e
commit 1c5b7dfc94
No known key found for this signature in database
GPG Key ID: 7D261D771D915378

View File

@ -304,55 +304,7 @@ where
}
};
let io = TokioIo::new(io);
trace!("connection {remote_addr:?} accepted");
make_service
.ready()
.await
.unwrap_or_else(|err| match err {});
let tower_service = make_service
.call(IncomingStream {
io: &io,
remote_addr,
})
.await
.unwrap_or_else(|err| match err {})
.map_request(|req: Request<Incoming>| req.map(Body::new));
let hyper_service = TowerToHyperService::new(tower_service);
let signal_tx = signal_tx.clone();
let close_rx = close_rx.clone();
tokio::spawn(async move {
#[allow(unused_mut)]
let mut builder = Builder::new(TokioExecutor::new());
// CONNECT protocol needed for HTTP/2 websockets
#[cfg(feature = "http2")]
builder.http2().enable_connect_protocol();
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
let mut signal_closed = pin!(signal_tx.closed().fuse());
loop {
tokio::select! {
result = conn.as_mut() => {
if let Err(_err) = result {
trace!("failed to serve connection: {_err:#}");
}
break;
}
_ = &mut signal_closed => {
trace!("signal received in task, starting graceful shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
drop(close_rx);
});
handle_connection(&mut make_service, &signal_tx, &close_rx, io, remote_addr).await;
}
drop(close_rx);
@ -365,6 +317,71 @@ where
close_tx.closed().await;
}
async fn handle_connection<L, M, S>(
make_service: &mut M,
signal_tx: &watch::Sender<()>,
close_rx: &watch::Receiver<()>,
io: <L as Listener>::Io,
remote_addr: <L as Listener>::Addr,
) where
L: Listener,
L::Addr: Debug,
M: for<'a> Service<IncomingStream<'a, L>, Error = Infallible, Response = S> + Send + 'static,
for<'a> <M as Service<IncomingStream<'a, L>>>::Future: Send,
S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
{
let io = TokioIo::new(io);
trace!("connection {remote_addr:?} accepted");
make_service
.ready()
.await
.unwrap_or_else(|err| match err {});
let tower_service = make_service
.call(IncomingStream {
io: &io,
remote_addr,
})
.await
.unwrap_or_else(|err| match err {})
.map_request(|req: Request<Incoming>| req.map(Body::new));
let hyper_service = TowerToHyperService::new(tower_service);
let signal_tx = signal_tx.clone();
let close_rx = close_rx.clone();
tokio::spawn(async move {
#[allow(unused_mut)]
let mut builder = Builder::new(TokioExecutor::new());
// CONNECT protocol needed for HTTP/2 websockets
#[cfg(feature = "http2")]
builder.http2().enable_connect_protocol();
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
let mut signal_closed = pin!(signal_tx.closed().fuse());
loop {
tokio::select! {
result = conn.as_mut() => {
if let Err(_err) = result {
trace!("failed to serve connection: {_err:#}");
}
break;
}
_ = &mut signal_closed => {
trace!("signal received in task, starting graceful shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
drop(close_rx);
});
}
/// An incoming stream.
///
/// Used with [`serve`] and [`IntoMakeServiceWithConnectInfo`].