mirror of
https://github.com/tokio-rs/axum.git
synced 2025-09-27 13:00:39 +00:00
axum::serve: Enable Hyper request header timeout (#3478)
It's possible for an HTTP request to get stuck (due to network issues or malicious clients) halfway through the request line or header, in which case no amount of timeouts configured by an application using axum/tower will help, since Hyper hasn't yet handed off the request to the service. This has two consequences: - Wasted memory: Up to ~1 MB per connection in the worst case, compared to the ~2.5 kB used by a regular idle connection. - A `with_graceful_shutdown` signal will cause the server to stop accepting new requests, but then hang instead of actually stopping. This changes axum::serve to configure a Timer for HTTP/1 connections, which activates Hyper's default timeout (currently 30 seconds). The timeout applies only to the request line and request header, not the request body or subsequent response (where axum applications can instead simply configure a timeout using `tower_http::timeout::TimeoutLayer`). The timeout does not currently apply to newly opened connections, even though Hyper's `header_read_timeout` SHOULD apply here too since 1.4.0; discussion on tokio-rs#2741 suggests a possible TokioIo issue. However, the graceful shutdown still works as expected for such connections. The timer is only enabled for HTTP/1 here since similar functionality does not currently appear to exist for HTTP/2 connections in Hyper. This is a breaking change for any axum::serve users who currently rely on being able to begin sending a HTTP/1 request, and then take more than 30 seconds to finish the request line and headers, or rely on keeping connections idle for 30+ seconds between requests.
This commit is contained in:
parent
5c090dcb3e
commit
1073468163
@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- **breaking:** Router fallbacks are now properly merged for nested routers ([#3158])
|
||||
- **breaking:** `#[from_request(via(Extractor))]` now uses the extractor's
|
||||
rejection type instead of `axum::response::Response` ([#3261])
|
||||
- **breaking:** `axum::serve` now applies hyper's default `header_read_timeout` ([#3478])
|
||||
- **added:** Implement `OptionalFromRequest` for `Multipart` ([#3220])
|
||||
- **changed:** `serve` has an additional generic argument and can now work with any response body
|
||||
type, not just `axum::body::Body` ([#3205])
|
||||
@ -20,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
[#3205]: https://github.com/tokio-rs/axum/pull/3205
|
||||
[#3220]: https://github.com/tokio-rs/axum/pull/3220
|
||||
[#3412]: https://github.com/tokio-rs/axum/pull/3412
|
||||
[#3478]: https://github.com/tokio-rs/axum/pull/3478
|
||||
|
||||
# 0.8.4
|
||||
|
||||
|
@ -14,7 +14,7 @@ use axum_core::{body::Body, extract::Request, response::Response};
|
||||
use futures_util::FutureExt;
|
||||
use http_body::Body as HttpBody;
|
||||
use hyper::body::Incoming;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
|
||||
#[cfg(any(feature = "http1", feature = "http2"))]
|
||||
use hyper_util::{server::conn::auto::Builder, service::TowerToHyperService};
|
||||
use tokio::sync::watch;
|
||||
@ -28,7 +28,8 @@ pub use self::listener::{Listener, ListenerExt, TapIo};
|
||||
/// Serve the service with the supplied listener.
|
||||
///
|
||||
/// This method of running a service is intentionally simple and doesn't support any configuration.
|
||||
/// Use hyper or hyper-util if you need configuration.
|
||||
/// hyper's default configuration applies (including [timeouts]); use hyper or hyper-util if you
|
||||
/// need configuration.
|
||||
///
|
||||
/// It supports both HTTP/1 as well as HTTP/2.
|
||||
///
|
||||
@ -88,6 +89,7 @@ pub use self::listener::{Listener, ListenerExt, TapIo};
|
||||
/// error. Errors on the TCP socket will be handled by sleeping for a short while (currently, one
|
||||
/// second).
|
||||
///
|
||||
/// [timeouts]: hyper::server::conn::http1::Builder::header_read_timeout
|
||||
/// [`Router`]: crate::Router
|
||||
/// [`Router::into_make_service_with_connect_info`]: crate::Router::into_make_service_with_connect_info
|
||||
/// [`MethodRouter`]: crate::routing::MethodRouter
|
||||
@ -409,6 +411,11 @@ async fn handle_connection<L, M, S, B>(
|
||||
tokio::spawn(async move {
|
||||
#[allow(unused_mut)]
|
||||
let mut builder = Builder::new(TokioExecutor::new());
|
||||
|
||||
// Enable Hyper's default HTTP/1 request header timeout.
|
||||
#[cfg(feature = "http1")]
|
||||
builder.http1().timer(TokioTimer::new());
|
||||
|
||||
// CONNECT protocol needed for HTTP/2 websockets
|
||||
#[cfg(feature = "http2")]
|
||||
builder.http2().enable_connect_protocol();
|
||||
@ -495,6 +502,7 @@ mod tests {
|
||||
use std::{
|
||||
future::{pending, IntoFuture as _},
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use axum_core::{body::Body, extract::Request};
|
||||
@ -503,7 +511,7 @@ mod tests {
|
||||
#[cfg(unix)]
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::{
|
||||
io::{self, AsyncRead, AsyncWrite},
|
||||
io::{self, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
|
||||
net::TcpListener,
|
||||
};
|
||||
use tower::ServiceBuilder;
|
||||
@ -521,6 +529,27 @@ mod tests {
|
||||
Router, ServiceExt,
|
||||
};
|
||||
|
||||
struct ReadyListener<T>(Option<T>);
|
||||
|
||||
impl<T> Listener for ReadyListener<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Io = T;
|
||||
type Addr = ();
|
||||
|
||||
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
|
||||
match self.0.take() {
|
||||
Some(server) => (server, ()),
|
||||
None => std::future::pending().await,
|
||||
}
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<Self::Addr> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code, unused_must_use)]
|
||||
async fn if_it_compiles_it_works() {
|
||||
#[derive(Clone, Debug)]
|
||||
@ -686,6 +715,67 @@ mod tests {
|
||||
assert_ne!(address.port(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_with_graceful_shutdown_request_header_timeout() {
|
||||
for (timeout, req) in [
|
||||
// Idle connections (between requests) are closed immediately
|
||||
// when a graceful shutdown is triggered.
|
||||
(0, ""), // idle before request sent
|
||||
(0, "GET / HTTP/1.1\r\n\r\n"), // idle after complete exchange
|
||||
// hyper times stalled request lines/headers out after 30 sec,
|
||||
// after which the graceful shutdown can be completed.
|
||||
(30, "GET / HT"), // stall during request line
|
||||
(30, "GET / HTTP/1.0\r\nAccept: "), // stall during request headers
|
||||
] {
|
||||
let (mut client, server) = io::duplex(1024);
|
||||
client.write_all(req.as_bytes()).await.unwrap();
|
||||
|
||||
let server_task = async {
|
||||
serve(ReadyListener(Some(server)), Router::new())
|
||||
.with_graceful_shutdown(tokio::time::sleep(Duration::from_secs(1)))
|
||||
.await
|
||||
.unwrap();
|
||||
};
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(timeout + 2), server_task)
|
||||
.await
|
||||
.expect("server_task didn't exit in time");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn test_hyper_header_read_timeout_is_enabled() {
|
||||
let header_read_timeout_default = 30;
|
||||
for req in [
|
||||
"GET / HT", // stall during request line
|
||||
"GET / HTTP/1.0\r\nAccept: ", // stall during request headers
|
||||
] {
|
||||
let (mut client, server) = io::duplex(1024);
|
||||
client.write_all(req.as_bytes()).await.unwrap();
|
||||
|
||||
let server_task = async {
|
||||
serve(ReadyListener(Some(server)), Router::new())
|
||||
.await
|
||||
.unwrap();
|
||||
};
|
||||
|
||||
let wait_for_server_to_close_conn = async {
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(header_read_timeout_default + 1),
|
||||
client.read_to_end(&mut Vec::new()),
|
||||
)
|
||||
.await
|
||||
.expect("timeout: server didn't close connection in time")
|
||||
.expect("read_to_end");
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = server_task => unreachable!(),
|
||||
_ = wait_for_server_to_close_conn => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn into_future_outside_tokio() {
|
||||
let router: Router = Router::new();
|
||||
@ -704,27 +794,6 @@ mod tests {
|
||||
|
||||
#[crate::test]
|
||||
async fn serving_on_custom_io_type() {
|
||||
struct ReadyListener<T>(Option<T>);
|
||||
|
||||
impl<T> Listener for ReadyListener<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Io = T;
|
||||
type Addr = ();
|
||||
|
||||
async fn accept(&mut self) -> (Self::Io, Self::Addr) {
|
||||
match self.0.take() {
|
||||
Some(server) => (server, ()),
|
||||
None => std::future::pending().await,
|
||||
}
|
||||
}
|
||||
|
||||
fn local_addr(&self) -> io::Result<Self::Addr> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let (client, server) = io::duplex(1024);
|
||||
let listener = ReadyListener(Some(server));
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user