Move serve implementation out of WithGracefulShutdown

This commit is contained in:
Jonas Platte 2025-04-26 21:25:36 +02:00
parent 6587b65393
commit 085be6970b

View File

@ -267,14 +267,14 @@ where
fn into_future(self) -> Self::IntoFuture {
private::ServeFuture(Box::pin(async move {
self.run().await;
do_serve(self.listener, self.make_service, self.signal).await;
Ok(())
}))
}
}
#[cfg(all(feature = "tokio", any(feature = "http1", feature = "http2")))]
impl<L, M, S, F> WithGracefulShutdown<L, M, S, F>
async fn do_serve<L, M, F, S>(mut listener: L, mut make_service: M, signal: F)
where
L: Listener,
L::Addr: Debug,
@ -284,93 +284,84 @@ where
S::Future: Send,
F: Future<Output = ()> + Send + 'static,
{
async fn run(self) {
let Self {
mut listener,
mut make_service,
signal,
_marker: _,
} = self;
let (signal_tx, signal_rx) = watch::channel(());
tokio::spawn(async move {
signal.await;
trace!("received graceful shutdown signal. Telling tasks to shutdown");
drop(signal_rx);
});
let (close_tx, close_rx) = watch::channel(());
loop {
let (io, remote_addr) = tokio::select! {
conn = listener.accept() => conn,
_ = signal_tx.closed() => {
trace!("signal received, not accepting new connections");
break;
}
};
let io = TokioIo::new(io);
trace!("connection {remote_addr:?} accepted");
poll_fn(|cx| make_service.poll_ready(cx))
.await
.unwrap_or_else(|err| match err {});
let tower_service = make_service
.call(IncomingStream {
io: &io,
remote_addr,
})
.await
.unwrap_or_else(|err| match err {})
.map_request(|req: Request<Incoming>| req.map(Body::new));
let hyper_service = TowerToHyperService::new(tower_service);
let signal_tx = signal_tx.clone();
let close_rx = close_rx.clone();
let (signal_tx, signal_rx) = watch::channel(());
tokio::spawn(async move {
signal.await;
trace!("received graceful shutdown signal. Telling tasks to shutdown");
drop(signal_rx);
});
#[allow(unused_mut)]
let mut builder = Builder::new(TokioExecutor::new());
// CONNECT protocol needed for HTTP/2 websockets
#[cfg(feature = "http2")]
builder.http2().enable_connect_protocol();
let (close_tx, close_rx) = watch::channel(());
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
let mut signal_closed = pin!(signal_tx.closed().fuse());
loop {
let (io, remote_addr) = tokio::select! {
conn = listener.accept() => conn,
_ = signal_tx.closed() => {
trace!("signal received, not accepting new connections");
break;
}
};
let io = TokioIo::new(io);
trace!("connection {remote_addr:?} accepted");
poll_fn(|cx| make_service.poll_ready(cx))
.await
.unwrap_or_else(|err| match err {});
let tower_service = make_service
.call(IncomingStream {
io: &io,
remote_addr,
})
.await
.unwrap_or_else(|err| match err {})
.map_request(|req: Request<Incoming>| req.map(Body::new));
let hyper_service = TowerToHyperService::new(tower_service);
let signal_tx = signal_tx.clone();
let close_rx = close_rx.clone();
tokio::spawn(async move {
#[allow(unused_mut)]
let mut builder = Builder::new(TokioExecutor::new());
// CONNECT protocol needed for HTTP/2 websockets
#[cfg(feature = "http2")]
builder.http2().enable_connect_protocol();
let mut conn = pin!(builder.serve_connection_with_upgrades(io, hyper_service));
let mut signal_closed = pin!(signal_tx.closed().fuse());
loop {
tokio::select! {
result = conn.as_mut() => {
if let Err(_err) = result {
trace!("failed to serve connection: {_err:#}");
}
break;
}
_ = &mut signal_closed => {
trace!("signal received in task, starting graceful shutdown");
conn.as_mut().graceful_shutdown();
loop {
tokio::select! {
result = conn.as_mut() => {
if let Err(_err) = result {
trace!("failed to serve connection: {_err:#}");
}
break;
}
_ = &mut signal_closed => {
trace!("signal received in task, starting graceful shutdown");
conn.as_mut().graceful_shutdown();
}
}
}
drop(close_rx);
});
}
drop(close_rx);
drop(listener);
trace!(
"waiting for {} task(s) to finish",
close_tx.receiver_count()
);
close_tx.closed().await;
drop(close_rx);
});
}
drop(close_rx);
drop(listener);
trace!(
"waiting for {} task(s) to finish",
close_tx.receiver_count()
);
close_tx.closed().await;
}
/// An incoming stream.