Replace some more futures_util APIs with std variants (#3874)

This commit is contained in:
Paolo Barbolini 2025-06-16 00:18:39 +02:00 committed by GitHub
parent 90797200ee
commit df47ffedd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 55 additions and 67 deletions

15
Cargo.lock generated
View File

@ -1385,7 +1385,6 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -1493,7 +1492,6 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@ -3401,7 +3399,7 @@ dependencies = [
"criterion",
"dotenvy",
"env_logger",
"futures",
"futures-util",
"hex",
"libsqlite3-sys",
"paste",
@ -3437,7 +3435,7 @@ dependencies = [
"dialoguer",
"dotenvy",
"filetime",
"futures",
"futures-util",
"glob",
"openssl",
"serde_json",
@ -3502,7 +3500,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"futures",
"sqlx",
"tokio",
]
@ -3536,7 +3533,6 @@ name = "sqlx-example-postgres-chat"
version = "0.1.0"
dependencies = [
"crossterm",
"futures",
"ratatui",
"sqlx",
"tokio",
@ -3560,7 +3556,6 @@ dependencies = [
"anyhow",
"clap",
"dotenvy",
"futures",
"serde",
"serde_json",
"sqlx",
@ -3571,7 +3566,7 @@ dependencies = [
name = "sqlx-example-postgres-listen"
version = "0.1.0"
dependencies = [
"futures",
"futures-util",
"sqlx",
"tokio",
]
@ -3584,7 +3579,6 @@ dependencies = [
"async-trait",
"clap",
"dotenvy",
"futures",
"mockall",
"sqlx",
"tokio",
@ -3597,7 +3591,6 @@ dependencies = [
"anyhow",
"clap",
"dotenvy",
"futures",
"sqlx",
"tokio",
]
@ -3606,7 +3599,6 @@ dependencies = [
name = "sqlx-example-postgres-transaction"
version = "0.1.0"
dependencies = [
"futures",
"sqlx",
"tokio",
]
@ -3617,7 +3609,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"futures",
"sqlx",
"tokio",
]

View File

@ -169,7 +169,7 @@ sqlx-sqlite = { workspace = true, optional = true }
[dev-dependencies]
anyhow = "1.0.52"
time_ = { version = "0.3.2", package = "time" }
futures = "0.3.19"
futures-util = { version = "0.3.19", default-features = false, features = ["alloc"] }
env_logger = "0.11"
async-std = { workspace = true, features = ["attributes"] }
tokio = { version = "1.15.0", features = ["full"] }

View File

@ -326,7 +326,7 @@ The `fetch` query finalizer returns a stream-like type that iterates through the
```rust
// provides `try_next`
use futures::TryStreamExt;
use futures_util::TryStreamExt;
// provides `try_get`
use sqlx::Row;

View File

@ -6,7 +6,6 @@ workspace = "../../../"
[dependencies]
anyhow = "1.0"
futures = "0.3"
sqlx = { path = "../../../", features = [ "mysql", "runtime-tokio", "tls-native-tls" ] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.20.0", features = ["rt", "macros"]}

View File

@ -6,7 +6,6 @@ workspace = "../../../"
[dependencies]
sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio", "tls-native-tls" ] }
futures = "0.3.1"
tokio = { version = "1.20.0", features = [ "rt-multi-thread", "macros" ] }
ratatui = "0.27.0"
crossterm = "0.27.0"

View File

@ -7,7 +7,6 @@ workspace = "../../../"
[dependencies]
anyhow = "1.0"
dotenvy = "0.15.0"
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { path = "../../../", features = [ "runtime-tokio", "postgres", "json" ] }

View File

@ -6,5 +6,5 @@ workspace = "../../../"
[dependencies]
sqlx = { path = "../../../", features = [ "runtime-tokio", "postgres" ] }
futures = "0.3.1"
futures-util = { version = "0.3.1", default-features = false }
tokio = { version = "1.20.0", features = ["rt-multi-thread", "macros", "time"]}

View File

@ -1,4 +1,4 @@
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use sqlx::postgres::PgListener;
use sqlx::{Executor, PgPool};
use std::pin::pin;

View File

@ -6,7 +6,6 @@ workspace = "../../../"
[dependencies]
anyhow = "1.0"
futures = "0.3"
sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio", "tls-native-tls" ] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.20.0", features = ["rt", "macros"]}

View File

@ -6,7 +6,6 @@ workspace = "../../../"
[dependencies]
anyhow = "1.0"
futures = "0.3"
sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio", "tls-native-tls" ] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.20.0", features = ["rt", "macros"]}

View File

@ -6,5 +6,4 @@ workspace = "../../../"
[dependencies]
sqlx = { path = "../../../", features = [ "postgres", "runtime-tokio", "tls-native-tls" ] }
futures = "0.3.1"
tokio = { version = "1.20.0", features = ["rt-multi-thread", "macros"]}

View File

@ -6,7 +6,6 @@ workspace = "../../../"
[dependencies]
anyhow = "1.0"
futures = "0.3"
sqlx = { path = "../../../", features = [ "sqlite", "runtime-tokio", "tls-native-tls" ] }
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.20.0", features = ["rt", "macros"]}

View File

@ -33,7 +33,7 @@ sqlx = { workspace = true, default-features = false, features = [
"migrate",
"any",
] }
futures = "0.3.19"
futures-util = { version = "0.3.19", features = ["alloc"] }
clap = { version = "4.3.10", features = ["derive", "env", "wrap_help"] }
clap_complete = { version = "4.3.1", optional = true }
chrono = { version = "0.4.19", default-features = false, features = ["clock"] }

View File

@ -1,8 +1,9 @@
use std::future::Future;
use std::io;
use std::time::Duration;
use anyhow::Result;
use futures::{Future, TryFutureExt};
use futures_util::TryFutureExt;
use sqlx::{AnyConnection, Connection};
use tokio::{select, signal};

View File

@ -111,7 +111,7 @@ macro_rules! impl_acquire {
self,
) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, $crate::error::Error>>
{
Box::pin(futures_util::future::ok(self))
Box::pin(std::future::ready(Ok(self)))
}
#[inline]

View File

@ -1,4 +1,4 @@
use futures_util::future::BoxFuture;
use futures_core::future::BoxFuture;
use std::borrow::Cow;
use crate::any::{Any, AnyConnection};

View File

@ -5,8 +5,8 @@ use crate::error::{BoxDynError, Error};
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use std::fmt::Debug;
use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use std::{fmt::Debug, future};
/// A type that contains or can provide a database
/// connection to use for executing queries against the database.
@ -121,9 +121,11 @@ pub trait Executor<'c>: Send + Debug + Sized {
E: 'q + Execute<'q, Self::Database>,
{
self.fetch_optional(query)
.and_then(|row| match row {
Some(row) => future::ok(row),
None => future::err(Error::RowNotFound),
.and_then(|row| {
future::ready(match row {
Some(row) => Ok(row),
None => Err(Error::RowNotFound),
})
})
.boxed()
}

View File

@ -3,7 +3,7 @@
//! This was created initially to get around some weird compiler errors we were getting with
//! `async-stream`, and now it'd just be more work to replace.
use std::future::Future;
use std::future::{self, Future};
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
@ -76,7 +76,7 @@ impl<T> Yielder<T> {
//
// Note that because this has no way to schedule a wakeup, this could deadlock the task
// if called in the wrong place.
futures_util::future::poll_fn(|_cx| {
future::poll_fn(|_cx| {
if !yielded {
yielded = true;
Poll::Pending

View File

@ -1,7 +1,7 @@
use futures_util::future;
use std::future;
use std::io::{self, Read, Write};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use rustls::{
client::{
@ -33,7 +33,7 @@ impl<S: Socket> RustlsSocket<S> {
loop {
match self.state.complete_io(&mut self.inner) {
Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
futures_util::ready!(self.inner.poll_ready(cx))?;
ready!(self.inner.poll_ready(cx))?;
}
ready => return Poll::Ready(ready.map(|_| ())),
}
@ -76,12 +76,12 @@ impl<S: Socket> Socket for RustlsSocket<S> {
self.close_notify_sent = true;
}
futures_util::ready!(self.poll_complete_io(cx))?;
ready!(self.poll_complete_io(cx))?;
// Server can close socket as soon as it receives the connection shutdown request.
// We shouldn't expect it to stick around for the TLS session to close cleanly.
// https://security.stackexchange.com/a/82034
let _ = futures_util::ready!(self.inner.socket.poll_shutdown(cx));
let _ = ready!(self.inner.socket.poll_shutdown(cx));
Poll::Ready(Ok(()))
}

View File

@ -1,10 +1,9 @@
use crate::net::Socket;
use std::future;
use std::io::{self, Read, Write};
use std::task::{ready, Context, Poll};
use futures_util::future;
pub struct StdSocket<S> {
pub socket: S,
wants_read: bool,

View File

@ -1,4 +1,5 @@
use std::fmt::{self, Debug, Formatter};
use std::future::{self, Future};
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -11,7 +12,6 @@ use crate::error::Error;
use super::inner::{is_beyond_max_lifetime, DecrementSizeGuard, PoolInner};
use crate::pool::options::PoolConnectionMetadata;
use std::future::Future;
const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);
@ -183,7 +183,7 @@ impl<'c, DB: Database> crate::acquire::Acquire<'c> for &'c mut PoolConnection<DB
#[inline]
fn acquire(self) -> futures_core::future::BoxFuture<'c, Result<Self::Connection, Error>> {
Box::pin(futures_util::future::ok(&mut **self))
Box::pin(future::ready(Ok(&mut **self)))
}
#[inline]

View File

@ -9,7 +9,7 @@ use crossbeam_queue::ArrayQueue;
use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};
use std::cmp;
use std::future::Future;
use std::future::{self, Future};
use std::pin::pin;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
@ -18,7 +18,6 @@ use std::task::Poll;
use crate::logger::private_level_filter_to_trace_level;
use crate::pool::options::PoolConnectionMetadata;
use crate::private_tracing_dynamic_event;
use futures_util::future::{self};
use futures_util::FutureExt;
use std::time::{Duration, Instant};
use tracing::Level;

View File

@ -59,7 +59,7 @@ use std::fmt;
use std::future::Future;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::task::{ready, Context, Poll};
use std::time::{Duration, Instant};
use event_listener::EventListener;
@ -627,7 +627,7 @@ impl Future for CloseEvent {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(listener) = &mut self.listener {
futures_core::ready!(listener.poll_unpin(cx));
ready!(listener.poll_unpin(cx));
}
// `EventListener` doesn't like being polled after it yields, and even if it did it

View File

@ -1,8 +1,8 @@
use std::marker::PhantomData;
use std::{future, marker::PhantomData};
use either::Either;
use futures_core::stream::BoxStream;
use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{StreamExt, TryFutureExt, TryStreamExt};
use crate::arguments::{Arguments, IntoArguments};
use crate::database::{Database, HasStatementCache};
@ -459,9 +459,11 @@ where
O: 'e,
{
self.fetch_optional(executor)
.and_then(|row| match row {
Some(row) => future::ok(row),
None => future::err(Error::RowNotFound),
.and_then(|row| {
future::ready(match row {
Some(row) => Ok(row),
None => Err(Error::RowNotFound),
})
})
.await
}

View File

@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::fmt::{self, Debug, Formatter};
use std::future;
use std::ops::{Deref, DerefMut};
use futures_core::future::BoxFuture;
@ -247,7 +248,7 @@ impl<'t, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<'_, D
#[inline]
fn acquire(self) -> BoxFuture<'t, Result<Self::Connection, Error>> {
Box::pin(futures_util::future::ok(&mut **self))
Box::pin(future::ready(Ok(&mut **self)))
}
#[inline]

View File

@ -3,6 +3,7 @@ use std::cmp::Ordering;
use std::ffi::CStr;
use std::fmt::Write;
use std::fmt::{self, Debug, Formatter};
use std::future;
use std::os::raw::{c_char, c_int, c_void};
use std::panic::catch_unwind;
use std::ptr;
@ -10,7 +11,6 @@ use std::ptr::NonNull;
use futures_core::future::BoxFuture;
use futures_intrusive::sync::MutexGuard;
use futures_util::future;
use libsqlite3_sys::{
sqlite3, sqlite3_commit_hook, sqlite3_get_autocommit, sqlite3_progress_handler,
sqlite3_rollback_hook, sqlite3_update_hook, SQLITE_DELETE, SQLITE_INSERT, SQLITE_UPDATE,
@ -286,7 +286,7 @@ impl Connection for SqliteConnection {
// Well, we could use this to ensure that the command channel has been cleared,
// but it would only develop a backlog if a lot of queries are executed and then cancelled
// partway through, and then this would only make that situation worse.
Box::pin(future::ok(()))
Box::pin(future::ready(Ok(())))
}
#[doc(hidden)]

View File

@ -108,7 +108,7 @@ macro_rules! test_unprepared_type {
#[sqlx_macros::test]
async fn [< test_unprepared_type_ $name >] () -> anyhow::Result<()> {
use sqlx::prelude::*;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
let mut conn = sqlx_test::new::<$db>().await?;

View File

@ -46,7 +46,7 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
#[sqlx_macros::test]
async fn it_does_not_stop_stream_after_decoding_error() -> anyhow::Result<()> {
use futures::stream::StreamExt;
use futures_util::stream::StreamExt;
sqlx::any::install_default_drivers();

View File

@ -1,4 +1,4 @@
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use sqlx::mssql::{Mssql, MssqlPoolOptions};
use sqlx::{Column, Connection, Executor, MssqlConnection, Row, Statement, TypeInfo};
use sqlx_core::mssql::MssqlRow;

View File

@ -1,5 +1,5 @@
use anyhow::Context;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use sqlx::mysql::{MySql, MySqlConnection, MySqlPool, MySqlPoolOptions, MySqlRow};
use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
use sqlx_core::connection::ConnectOptions;
@ -460,7 +460,7 @@ async fn test_issue_622() -> anyhow::Result<()> {
}));
}
futures::future::try_join_all(handles).await?;
futures_util::future::try_join_all(handles).await?;
Ok(())
}

View File

@ -1,4 +1,4 @@
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use sqlx::postgres::types::PgRange;
use sqlx::{Connection, Executor, FromRow, Postgres};
use sqlx_postgres::PgHasArrayType;

View File

@ -2,7 +2,7 @@ use sqlx::{Connection, PgConnection, Postgres, Transaction};
use sqlx_postgres::types::PgHstore;
use sqlx_test::new;
use futures::TryStreamExt;
use futures_util::TryStreamExt;
#[sqlx_macros::test]
async fn test_query() -> anyhow::Result<()> {

View File

@ -1,4 +1,4 @@
use futures::{Stream, StreamExt, TryStreamExt};
use futures_util::{Stream, StreamExt, TryStreamExt};
use sqlx::postgres::types::Oid;
use sqlx::postgres::{
@ -604,7 +604,7 @@ async fn it_can_drop_multiple_transactions() -> anyhow::Result<()> {
#[ignore]
#[sqlx_macros::test]
async fn pool_smoke_test() -> anyhow::Result<()> {
use futures::{future, task::Poll, Future};
use futures_util::{task::Poll, Future};
eprintln!("starting pool");
@ -645,7 +645,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
let mut acquire = pin!(pool.acquire());
// poll the acquire future once to put the waiter in the queue
future::poll_fn(move |cx| {
std::future::poll_fn(move |cx| {
let _ = acquire.as_mut().poll(cx);
Poll::Ready(())
})
@ -943,7 +943,7 @@ async fn test_issue_622() -> anyhow::Result<()> {
}));
}
futures::future::try_join_all(handles).await?;
futures_util::future::try_join_all(handles).await?;
Ok(())
}

View File

@ -1,4 +1,4 @@
use futures::TryStreamExt;
use futures_util::TryStreamExt;
use rand::{Rng, SeedableRng};
use rand_xoshiro::Xoshiro256PlusPlus;
use sqlx::sqlite::{SqliteConnectOptions, SqliteOperation, SqlitePoolOptions};