Reduce dependency on futures-util (#3358)

This commit is contained in:
Paolo Barbolini 2025-05-25 09:39:35 +02:00 committed by GitHub
parent d0aff24c85
commit 869ba86e51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 50 additions and 67 deletions

29
Cargo.lock generated
View File

@ -311,6 +311,8 @@ dependencies = [
"base64 0.22.1",
"bytes",
"form_urlencoded",
"futures-core",
"futures-sink",
"futures-util",
"http 1.2.0",
"http-body 1.0.1",
@ -382,6 +384,7 @@ dependencies = [
"cookie",
"fastrand",
"form_urlencoded",
"futures-core",
"futures-util",
"headers",
"http 1.2.0",
@ -1341,7 +1344,7 @@ name = "example-chat"
version = "0.1.0"
dependencies = [
"axum",
"futures",
"futures-util",
"tokio",
"tracing",
"tracing-subscriber",
@ -1780,7 +1783,7 @@ dependencies = [
"axum",
"axum-extra",
"eventsource-stream",
"futures",
"futures-util",
"headers",
"reqwest 0.12.12",
"reqwest-eventsource",
@ -1808,7 +1811,7 @@ name = "example-stream-to-file"
version = "0.1.0"
dependencies = [
"axum",
"futures",
"futures-util",
"tokio",
"tokio-util",
"tracing",
@ -1858,7 +1861,8 @@ name = "example-testing-websockets"
version = "0.1.0"
dependencies = [
"axum",
"futures",
"futures-channel",
"futures-util",
"tokio",
"tokio-tungstenite",
]
@ -1983,7 +1987,6 @@ version = "0.1.0"
dependencies = [
"axum",
"axum-extra",
"futures",
"futures-util",
"headers",
"tokio",
@ -2080,21 +2083,6 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
@ -2174,7 +2162,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",

View File

@ -87,6 +87,7 @@ __private_docs = [
axum = { path = "../axum", version = "0.8.4", default-features = false, features = ["original-uri"] }
axum-core = { path = "../axum-core", version = "0.5.2" }
bytes = "1.1.0"
futures-core = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
http = "1.0.0"
http-body = "1.0.0"

View File

@ -10,7 +10,7 @@ use axum::{
};
use axum_core::__composite_rejection as composite_rejection;
use axum_core::__define_rejection as define_rejection;
use futures_util::stream::Stream;
use futures_core::stream::Stream;
use http::{
header::{HeaderMap, CONTENT_TYPE},
Request, StatusCode,

View File

@ -7,7 +7,8 @@ use axum::{
handler::Handler,
response::{IntoResponse, Response},
};
use futures_util::future::{BoxFuture, FutureExt, Map};
use futures_core::future::BoxFuture;
use futures_util::future::{FutureExt, Map};
use std::{future::Future, marker::PhantomData};
mod or;

View File

@ -5,7 +5,8 @@ use axum::{
handler::Handler,
response::{IntoResponse, Response},
};
use futures_util::future::{BoxFuture, Either as EitherFuture, FutureExt, Map};
use futures_core::future::BoxFuture;
use futures_util::future::{Either as EitherFuture, FutureExt, Map};
use std::{future::Future, marker::PhantomData};
/// [`Handler`] that runs one [`Handler`] and if that rejects it'll fallback to another

View File

@ -7,7 +7,8 @@ use axum::{
BoxError,
};
use bytes::{BufMut, BytesMut};
use futures_util::stream::{BoxStream, Stream, TryStream, TryStreamExt};
use futures_core::{stream::BoxStream, Stream, TryStream};
use futures_util::stream::TryStreamExt;
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
@ -44,7 +45,7 @@ pin_project! {
/// ```rust
/// use axum::{BoxError, response::{IntoResponse, Response}};
/// use axum_extra::json_lines::JsonLines;
/// use futures_util::stream::Stream;
/// use futures_core::stream::Stream;
///
/// fn stream_of_values() -> impl Stream<Item = Result<serde_json::Value, BoxError>> {
/// # futures_util::stream::empty()

View File

@ -4,7 +4,7 @@ use axum::{
BoxError,
};
use bytes::Bytes;
use futures_util::TryStream;
use futures_core::TryStream;
use http::{header, StatusCode};
use std::{io, path::Path};
use tokio::{

View File

@ -88,6 +88,8 @@ __private = ["tokio", "http1", "dep:reqwest"]
[dependencies]
axum-core = { path = "../axum-core", version = "0.5.2" }
bytes = "1.0"
futures-core = "0.3"
futures-sink = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["alloc"] }
http = "1.0.0"
http-body = "1.0.0"

View File

@ -208,7 +208,7 @@ use axum::{
body::Body,
extract::Request,
};
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use tower::{Service, Layer};
use std::task::{Context, Poll};

View File

@ -9,7 +9,7 @@ use axum_core::{
response::{IntoResponse, Response},
RequestExt,
};
use futures_util::stream::Stream;
use futures_core::Stream;
use http::{
header::{HeaderMap, CONTENT_TYPE},
StatusCode,

View File

@ -94,10 +94,9 @@ use self::rejection::*;
use super::FromRequestParts;
use crate::{body::Bytes, response::Response, Error};
use axum_core::body::Body;
use futures_util::{
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use futures_core::Stream;
use futures_sink::Sink;
use futures_util::{sink::SinkExt, stream::StreamExt};
use http::{
header::{self, HeaderMap, HeaderName, HeaderValue},
request::Parts,

View File

@ -2,7 +2,7 @@ use crate::{
extract::FromRequestParts,
response::{IntoResponse, Response},
};
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use http::Request;
use pin_project_lite::pin_project;
use std::{

View File

@ -1,5 +1,5 @@
use axum_core::extract::{FromRequest, FromRequestParts, Request};
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use std::{
any::type_name,
convert::Infallible,

View File

@ -2,7 +2,7 @@ use crate::body::{Body, Bytes, HttpBody};
use crate::response::{IntoResponse, Response};
use crate::BoxError;
use axum_core::extract::{FromRequest, FromRequestParts};
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use http::Request;
use std::{
any::type_name,

View File

@ -1,6 +1,6 @@
use crate::response::{IntoResponse, Response};
use axum_core::extract::FromRequestParts;
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use http::Request;
use std::{
any::type_name,

View File

@ -34,7 +34,8 @@ use axum_core::{
response::{IntoResponse, Response},
};
use bytes::{BufMut, BytesMut};
use futures_util::stream::{Stream, TryStream};
use futures_core::Stream;
use futures_util::stream::TryStream;
use http_body::Frame;
use pin_project_lite::pin_project;
use std::{

View File

@ -472,7 +472,7 @@ mod private {
task::{Context, Poll},
};
pub struct ServeFuture(pub(super) futures_util::future::BoxFuture<'static, io::Result<()>>);
pub struct ServeFuture(pub(super) futures_core::future::BoxFuture<'static, io::Result<()>>);
impl Future for ServeFuture {
type Output = io::Result<()>;

View File

@ -1,6 +1,6 @@
use super::{serve, Request, Response};
use bytes::Bytes;
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use http::header::{HeaderName, HeaderValue};
use std::ops::Deref;
use std::{convert::Infallible, future::IntoFuture, net::SocketAddr};

View File

@ -6,7 +6,7 @@ publish = false
[dependencies]
axum = { path = "../../axum", features = ["ws"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -15,7 +15,7 @@ use axum::{
routing::get,
Router,
};
use futures::{sink::SinkExt, stream::StreamExt};
use futures_util::{sink::SinkExt, stream::StreamExt};
use std::{
collections::HashSet,
sync::{Arc, Mutex},

View File

@ -5,7 +5,6 @@
//! ```
use axum::{extract::Request, routing::get, Router};
use futures_util::pin_mut;
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::path::PathBuf;
@ -43,7 +42,6 @@ async fn main() {
info!("HTTPS server listening on {bind}. To contact curl -k https://localhost:3000");
let app = Router::new().route("/", get(handler));
pin_mut!(tcp_listener);
loop {
let tower_service = app.clone();
let tls_acceptor = tls_acceptor.clone();

View File

@ -5,7 +5,6 @@
//! ```
use axum::{http::Request, routing::get, Router};
use futures_util::pin_mut;
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use openssl::ssl::{Ssl, SslAcceptor, SslFiletype, SslMethod};
@ -55,8 +54,6 @@ async fn main() {
info!("HTTPS server listening on {bind}. To contact curl -k https://localhost:3000");
let app = Router::new().route("/", get(handler));
pin_mut!(tcp_listener);
loop {
let tower_service = app.clone();
let tls_acceptor = tls_acceptor.clone();

View File

@ -5,7 +5,6 @@
//! ```
use axum::{extract::Request, routing::get, Router};
use futures_util::pin_mut;
use hyper::body::Incoming;
use hyper_util::rt::{TokioExecutor, TokioIo};
use std::{
@ -47,7 +46,6 @@ async fn main() {
info!("HTTPS server listening on {bind}. To contact curl -k https://localhost:3000");
let app = Router::new().route("/", get(handler));
pin_mut!(tcp_listener);
loop {
let tower_service = app.clone();
let tls_acceptor = tls_acceptor.clone();

View File

@ -7,7 +7,7 @@ publish = false
[dependencies]
axum = { path = "../../axum" }
axum-extra = { path = "../../axum-extra", features = ["typed-header"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
headers = "0.4"
tokio = { version = "1.0", features = ["full"] }
tokio-stream = "0.1"

View File

@ -14,7 +14,7 @@ use axum::{
Router,
};
use axum_extra::TypedHeader;
use futures::stream::{self, Stream};
use futures_util::stream::{self, Stream};
use std::{convert::Infallible, path::PathBuf, time::Duration};
use tokio_stream::StreamExt as _;
use tower_http::{services::ServeDir, trace::TraceLayer};

View File

@ -6,7 +6,7 @@ publish = false
[dependencies]
axum = { path = "../../axum", features = ["multipart"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
tokio = { version = "1.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["io"] }
tracing = "0.1"

View File

@ -12,8 +12,8 @@ use axum::{
routing::{get, post},
BoxError, Router,
};
use futures::{Stream, TryStreamExt};
use std::io;
use futures_util::{Stream, TryStreamExt};
use std::{io, pin::pin};
use tokio::{fs::File, io::BufWriter};
use tokio_util::io::StreamReader;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@ -112,8 +112,7 @@ where
async {
// Convert the stream into an `AsyncRead`.
let body_with_io_error = stream.map_err(io::Error::other);
let body_reader = StreamReader::new(body_with_io_error);
futures::pin_mut!(body_reader);
let mut body_reader = pin!(StreamReader::new(body_with_io_error));
// Create the file. `File` implements `AsyncWrite`.
let path = std::path::Path::new(UPLOADS_DIRECTORY).join(path);

View File

@ -6,6 +6,7 @@ publish = false
[dependencies]
axum = { path = "../../axum", features = ["ws"] }
futures = "0.3"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
tokio = { version = "1.0", features = ["full"] }
tokio-tungstenite = "0.26"

View File

@ -13,7 +13,7 @@ use axum::{
routing::get,
Router,
};
use futures::{Sink, SinkExt, Stream, StreamExt};
use futures_util::{Sink, SinkExt, Stream, StreamExt};
#[tokio::main]
async fn main() {
@ -131,8 +131,8 @@ mod tests {
async fn unit_test() {
// Need to use "futures" channels rather than "tokio" channels as they implement `Sink` and
// `Stream`
let (socket_write, mut test_rx) = futures::channel::mpsc::channel(1024);
let (mut test_tx, socket_read) = futures::channel::mpsc::channel(1024);
let (socket_write, mut test_rx) = futures_channel::mpsc::channel(1024);
let (mut test_tx, socket_read) = futures_channel::mpsc::channel(1024);
tokio::spawn(unit_testable_handle_socket(socket_write, socket_read));

View File

@ -15,7 +15,6 @@ path = "src/client.rs"
[dependencies]
axum = { path = "../../axum", features = ["ws"] }
axum-extra = { path = "../../axum-extra", features = ["typed-header"] }
futures = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
headers = "0.4"
tokio = { version = "1.0", features = ["full"] }

View File

@ -10,10 +10,10 @@
//! websocket server and how the client-side and server-side code can be quite similar.
//!
use futures_util::stream::FuturesUnordered;
use futures_util::{SinkExt, StreamExt};
use std::ops::ControlFlow;
use std::time::Instant;
use tokio::task::JoinSet;
use tokio_tungstenite::tungstenite::Utf8Bytes;
// we will use tungstenite for websocket client impl (same library as what axum is using)
@ -29,12 +29,10 @@ const SERVER: &str = "ws://127.0.0.1:3000/ws";
async fn main() {
let start_time = Instant::now();
//spawn several clients that will concurrently talk to the server
let mut clients = (0..N_CLIENTS)
.map(|cli| tokio::spawn(spawn_client(cli)))
.collect::<FuturesUnordered<_>>();
let mut clients = (0..N_CLIENTS).map(spawn_client).collect::<JoinSet<_>>();
//wait for all our clients to exit
while clients.next().await.is_some() {}
while clients.join_next().await.is_some() {}
let end_time = Instant::now();

View File

@ -39,7 +39,7 @@ use axum::extract::connect_info::ConnectInfo;
use axum::extract::ws::CloseFrame;
//allows to split the websocket stream into separate TX and RX branches
use futures::{sink::SinkExt, stream::StreamExt};
use futures_util::{sink::SinkExt, stream::StreamExt};
#[tokio::main]
async fn main() {