From 2322d39800630c33d13074473a5ee2d9753719d8 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 22 Aug 2021 14:41:51 +0200 Subject: [PATCH] Add `StreamBody` (#237) This adds `StreamBody` which converts a `Stream` of `Bytes` into a `http_body::Body`. --- As suggested by Kestrer on Discord it would make sense for axum to provide different kinds of body types other than `Empty`, `Full`, and `hyper::Body`. There is also some talk about [splitting up `hyper::Body`](https://github.com/hyperium/hyper/issues/2345) so this can be seen as getting started on that effort. axum's body types could be moved to hyper or http-body if thats the direction we decide on. The types I'm thinking about adding are: - `StreamBody`- added in this PR - `AsyncReadBody` - similar to [http-body#41](https://github.com/hyperium/http-body/pull/41/files) - `ChannelBody` - similar to `hyper::Body::channel` --- CHANGELOG.md | 1 + src/body.rs | 4 ++ src/body/stream_body.rs | 103 ++++++++++++++++++++++++++++++++++++++++ src/response/mod.rs | 1 + src/tests/mod.rs | 1 + 5 files changed, 110 insertions(+) create mode 100644 src/body/stream_body.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index bfb4d2ed..e1b0a88b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Responses: - **added:** Add `Headers` for easily customizing headers on a response ([#193](https://github.com/tokio-rs/axum/pull/193)) - **added:** Add `Redirect` response ([#192](https://github.com/tokio-rs/axum/pull/192)) + - **added:** Add `body::StreamBody` for easily responding with a stream of byte chunks ([#237](https://github.com/tokio-rs/axum/pull/237)) - **changed:** Add associated `Body` and `BodyError` types to `IntoResponse`. This is required for returning responses with bodies other than `hyper::Body` from handlers. See the docs for advice on how to implement `IntoResponse` ([#86](https://github.com/tokio-rs/axum/pull/86)) diff --git a/src/body.rs b/src/body.rs index e6e83e1e..8e9c6a94 100644 --- a/src/body.rs +++ b/src/body.rs @@ -3,6 +3,10 @@ use crate::BoxError; use crate::Error; +mod stream_body; + +pub use self::stream_body::StreamBody; + #[doc(no_inline)] pub use http_body::{Body as HttpBody, Empty, Full}; diff --git a/src/body/stream_body.rs b/src/body/stream_body.rs new file mode 100644 index 00000000..5e054d05 --- /dev/null +++ b/src/body/stream_body.rs @@ -0,0 +1,103 @@ +use crate::{BoxError, Error}; +use bytes::Bytes; +use futures_util::stream::{self, Stream, TryStreamExt}; +use http::HeaderMap; +use http_body::Body; +use std::convert::Infallible; +use std::{ + fmt, + pin::Pin, + task::{Context, Poll}, +}; +use sync_wrapper::SyncWrapper; + +/// An [`http_body::Body`] created from a [`Stream`]. +/// +/// # Example +/// +/// ``` +/// use axum::{ +/// Router, +/// handler::get, +/// body::StreamBody, +/// }; +/// use futures::stream; +/// +/// async fn handler() -> StreamBody { +/// let chunks: Vec> = vec![ +/// Ok("Hello,"), +/// Ok(" "), +/// Ok("world!"), +/// ]; +/// let stream = stream::iter(chunks); +/// StreamBody::new(stream) +/// } +/// +/// let app = Router::new().route("/", get(handler)); +/// # async { +/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap(); +/// # }; +/// ``` +/// +/// [`Stream`]: futures_util::stream::Stream +// this should probably be extracted to `http_body`, eventually... +pub struct StreamBody { + stream: SyncWrapper> + Send>>>, +} + +impl StreamBody { + /// Create a new `StreamBody` from a [`Stream`]. + /// + /// [`Stream`]: futures_util::stream::Stream + pub fn new(stream: S) -> Self + where + S: Stream> + Send + 'static, + T: Into + 'static, + E: Into + 'static, + { + let stream = stream + .map_ok(Into::into) + .map_err(|err| Error::new(err.into())); + Self { + stream: SyncWrapper::new(Box::pin(stream)), + } + } +} + +impl Default for StreamBody { + fn default() -> Self { + Self::new(stream::empty::>()) + } +} + +impl fmt::Debug for StreamBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("StreamBody").finish() + } +} + +impl Body for StreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(self.stream.get_mut()).poll_next(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +#[test] +fn stream_body_traits() { + crate::tests::assert_send::(); + crate::tests::assert_sync::(); + crate::tests::assert_unpin::(); +} diff --git a/src/response/mod.rs b/src/response/mod.rs index fbed2fb8..3dcf2c6f 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -198,6 +198,7 @@ macro_rules! impl_into_response_for_body { impl_into_response_for_body!(hyper::Body); impl_into_response_for_body!(Full); impl_into_response_for_body!(Empty); +impl_into_response_for_body!(crate::body::StreamBody); impl IntoResponse for http_body::combinators::BoxBody where diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 62af8a17..aea1b288 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -701,3 +701,4 @@ where pub(crate) fn assert_send() {} pub(crate) fn assert_sync() {} +pub(crate) fn assert_unpin() {}