From 73b1bafbf8da0016105ffe88ee42dfa73aefd29f Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Wed, 8 Jun 2022 11:02:42 +0200 Subject: [PATCH] Add `AsyncReadBody` (#1072) * Add `AsyncReadBody` * changelog * sort cargo.toml --- axum-extra/CHANGELOG.md | 2 + axum-extra/Cargo.toml | 16 +++-- axum-extra/src/body/async_read_body.rs | 96 ++++++++++++++++++++++++++ axum-extra/src/body/mod.rs | 7 ++ axum-extra/src/lib.rs | 1 + 5 files changed, 116 insertions(+), 6 deletions(-) create mode 100644 axum-extra/src/body/async_read_body.rs create mode 100644 axum-extra/src/body/mod.rs diff --git a/axum-extra/CHANGELOG.md b/axum-extra/CHANGELOG.md index 41a9ebe6..7188d354 100644 --- a/axum-extra/CHANGELOG.md +++ b/axum-extra/CHANGELOG.md @@ -8,8 +8,10 @@ and this project adheres to [Semantic Versioning]. # Unreleased - **fixed:** Use `impl IntoResponse` less in docs ([#1049]) +- **added:** Add `AsyncReadBody` for creating a body from a `tokio::io::AsyncRead` ([#1072]) [#1049]: https://github.com/tokio-rs/axum/pull/1049 +[#1072]: https://github.com/tokio-rs/axum/pull/1072 # 0.3.3 (18. May, 2022) diff --git a/axum-extra/Cargo.toml b/axum-extra/Cargo.toml index c1f0e88f..7ef03c99 100644 --- a/axum-extra/Cargo.toml +++ b/axum-extra/Cargo.toml @@ -12,14 +12,16 @@ version = "0.3.3" [features] default = [] -erased-json = ["serde_json", "serde"] -typed-routing = ["axum-macros", "serde", "percent-encoding"] + +async-read-body = ["tokio-util/io"] cookie = ["cookie-lib"] -cookie-signed = ["cookie", "cookie-lib/signed"] cookie-private = ["cookie", "cookie-lib/private"] +cookie-signed = ["cookie", "cookie-lib/signed"] +erased-json = ["serde_json", "serde"] form = ["serde", "serde_html_form"] query = ["serde", "serde_html_form"] spa = ["tower-http/fs"] +typed-routing = ["axum-macros", "serde", "percent-encoding"] [dependencies] axum = { path = "../axum", version = "0.5", default-features = false } @@ -27,6 +29,7 @@ bytes = "1.1.0" http = "0.2" mime = "0.3" pin-project-lite = "0.2" +tokio = "1.19" tower = { version = "0.4", default_features = false, features = ["util"] } tower-http = { version = "0.3", features = ["map-response-body"] } tower-layer = "0.3" @@ -34,11 +37,12 @@ tower-service = "0.3" # optional dependencies axum-macros = { path = "../axum-macros", version = "0.2.2", optional = true } -serde = { version = "1.0", optional = true } -serde_json = { version = "1.0.71", optional = true } -percent-encoding = { version = "2.1", optional = true } cookie-lib = { package = "cookie", version = "0.16", features = ["percent-encode"], optional = true } +percent-encoding = { version = "2.1", optional = true } +serde = { version = "1.0", optional = true } serde_html_form = { version = "0.1", optional = true } +serde_json = { version = "1.0.71", optional = true } +tokio-util = { version = "0.7", optional = true } [dev-dependencies] axum = { path = "../axum", version = "0.5", features = ["headers"] } diff --git a/axum-extra/src/body/async_read_body.rs b/axum-extra/src/body/async_read_body.rs new file mode 100644 index 00000000..6e66f0df --- /dev/null +++ b/axum-extra/src/body/async_read_body.rs @@ -0,0 +1,96 @@ +use axum::{ + body::{self, Bytes, HttpBody, StreamBody}, + http::HeaderMap, + response::{IntoResponse, Response}, + Error, +}; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; + +pin_project! { + /// An [`HttpBody`] created from an [`AsyncRead`]. + /// + /// # Example + /// + /// `AsyncReadBody` can be used to stream the contents of a file: + /// + /// ```rust + /// use axum::{ + /// Router, + /// routing::get, + /// http::{StatusCode, header::CONTENT_TYPE}, + /// response::{Response, IntoResponse}, + /// }; + /// use axum_extra::body::AsyncReadBody; + /// use tokio::fs::File; + /// + /// async fn cargo_toml() -> Result { + /// let file = File::open("Cargo.toml") + /// .await + /// .map_err(|err| { + /// (StatusCode::NOT_FOUND, format!("File not found: {}", err)) + /// })?; + /// + /// let headers = [(CONTENT_TYPE, "text/x-toml")]; + /// let body = AsyncReadBody::new(file); + /// Ok((headers, body).into_response()) + /// } + /// + /// let app = Router::new().route("/Cargo.toml", get(cargo_toml)); + /// # let _: Router = app; + /// ``` + #[cfg(feature = "async-read-body")] + #[derive(Debug)] + pub struct AsyncReadBody { + #[pin] + read: StreamBody>, + } +} + +impl AsyncReadBody { + /// Create a new `AsyncReadBody`. + pub fn new(read: R) -> Self + where + R: AsyncRead + Send + 'static, + { + Self { + read: StreamBody::new(ReaderStream::new(read)), + } + } +} + +impl HttpBody for AsyncReadBody +where + R: AsyncRead + Send + 'static, +{ + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.project().read.poll_data(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +impl IntoResponse for AsyncReadBody +where + R: AsyncRead + Send + 'static, +{ + fn into_response(self) -> Response { + Response::new(body::boxed(self)) + } +} diff --git a/axum-extra/src/body/mod.rs b/axum-extra/src/body/mod.rs new file mode 100644 index 00000000..b84ac240 --- /dev/null +++ b/axum-extra/src/body/mod.rs @@ -0,0 +1,7 @@ +//! Additional bodies. + +#[cfg(feature = "async-read-body")] +mod async_read_body; + +#[cfg(feature = "async-read-body")] +pub use self::async_read_body::AsyncReadBody; diff --git a/axum-extra/src/lib.rs b/axum-extra/src/lib.rs index 705bed73..84d224a5 100644 --- a/axum-extra/src/lib.rs +++ b/axum-extra/src/lib.rs @@ -43,6 +43,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(test, allow(clippy::float_cmp))] +pub mod body; pub mod extract; pub mod response; pub mod routing;