Smol+async global executor 1.80 dev (#3791)

* Sqlx-core: rename async_io dependency for async-std

* Sqlx-core: simplify TimeoutError

* Sqlx-core: code for async-global-executor

* Sqlx: integrate async-global-executor feature

* Note to unsafe

* Step up MSRV as async-global-executor needs it

* Sqlx-core: fix of unix socket build

* Unsafe fixes, smol executor added

* Workflow fix

* Changes outside sqlx_rt

* Cleanup conditional rt compilation

* Warning

* Add executors to test matrix

* Fix of skipping code sqlite due to mismatch in cargo feature names

* Smol executor isolated

* Fix, reduce number of tests, remove async_std

* Fix of test_block_on, regression

* Format fixes

* async-global-executor added

* async-std changed to 1.13

* litemap, zerofrom requires rustc 1.81

* Fix of missing _sqlite in cargo.toml

* Clippy lints

* Clean up

* Remove features combinations

* Fixes after merge

* Fix of compiling connect_tcp_address with both tokio + other executor selected

* Try to fix CI -Z minimal-versions check

Try to fix CI -Z minimal-versions check
This commit is contained in:
martin-kolarik 2025-09-08 20:17:55 +02:00 committed by GitHub
parent 482c9427a9
commit 6b828e698f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1293 additions and 842 deletions

View File

@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
runtime: [ async-std, tokio ]
runtime: [ async-std, async-global-executor, smol, tokio ]
tls: [ native-tls, rustls, none ]
timeout-minutes: 30
steps:
@ -122,7 +122,7 @@ jobs:
runs-on: ubuntu-24.04
strategy:
matrix:
runtime: [ async-std, tokio ]
runtime: [ async-global-executor, smol, tokio ]
linking: [ sqlite, sqlite-unbundled ]
needs: check
timeout-minutes: 30
@ -208,7 +208,7 @@ jobs:
strategy:
matrix:
postgres: [ 17, 13 ]
runtime: [ async-std, tokio ]
runtime: [ async-global-executor, smol, tokio ]
tls: [ native-tls, rustls-aws-lc-rs, rustls-ring, none ]
needs: check
timeout-minutes: 30
@ -330,7 +330,7 @@ jobs:
strategy:
matrix:
mysql: [ 8 ]
runtime: [ async-std, tokio ]
runtime: [ async-global-executor, smol, tokio ]
tls: [ native-tls, rustls-aws-lc-rs, rustls-ring, none ]
needs: check
timeout-minutes: 30
@ -431,7 +431,7 @@ jobs:
strategy:
matrix:
mariadb: [ verylatest, 11_8, 11_4, 10_11, 10_6 ]
runtime: [ async-std, tokio ]
runtime: [ async-global-executor, smol, tokio ]
tls: [ native-tls, rustls-aws-lc-rs, rustls-ring, none ]
needs: check
timeout-minutes: 30

1442
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -93,7 +93,9 @@ _unstable-docs = [
]
# Base runtime features without TLS
runtime-async-global-executor = ["_rt-async-global-executor", "sqlx-core/_rt-async-global-executor", "sqlx-macros?/_rt-async-global-executor"]
runtime-async-std = ["_rt-async-std", "sqlx-core/_rt-async-std", "sqlx-macros?/_rt-async-std"]
runtime-smol = ["_rt-smol", "sqlx-core/_rt-smol", "sqlx-macros?/_rt-smol"]
runtime-tokio = ["_rt-tokio", "sqlx-core/_rt-tokio", "sqlx-macros?/_rt-tokio"]
# TLS features
@ -108,7 +110,9 @@ tls-rustls-ring-native-roots = ["sqlx-core/_tls-rustls-ring-native-roots", "sqlx
tls-none = []
# for conditional compilation
_rt-async-global-executor = []
_rt-async-std = []
_rt-smol = []
_rt-tokio = []
_sqlite = []
@ -184,11 +188,21 @@ time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] }
uuid = "1.1.2"
# Common utility crates
dotenvy = { version = "0.15.7", default-features = false }
cfg-if = "1.0.0"
dotenvy = { version = "0.15.0", default-features = false }
# Runtimes
[workspace.dependencies.async-global-executor]
version = "3.1"
default-features = false
features = ["async-io"]
[workspace.dependencies.async-std]
version = "1.12"
version = "1.13"
[workspace.dependencies.smol]
version = "2.0"
default-features = false
[workspace.dependencies.tokio]
version = "1"

View File

@ -58,7 +58,6 @@ default = ["postgres", "sqlite", "mysql", "native-tls", "completions", "sqlx-tom
rustls = ["sqlx/tls-rustls"]
native-tls = ["sqlx/tls-native-tls"]
# databases
mysql = ["sqlx/mysql"]
postgres = ["sqlx/postgres"]
sqlite = ["sqlx/sqlite", "_sqlite"]

View File

@ -20,7 +20,10 @@ any = []
json = ["serde", "serde_json"]
# for conditional compilation
_rt-async-std = ["async-std", "async-io"]
_rt-async-global-executor = ["async-global-executor", "_rt-async-io"]
_rt-async-io = ["async-io", "async-fs"] # see note at async-fs declaration
_rt-async-std = ["async-std", "_rt-async-io"]
_rt-smol = ["smol", "_rt-async-io"]
_rt-tokio = ["tokio", "tokio-stream"]
_tls-native-tls = ["native-tls"]
_tls-rustls-aws-lc-rs = ["_tls-rustls", "rustls/aws-lc-rs", "webpki-roots"]
@ -42,7 +45,9 @@ _unstable-doc = ["sqlx-toml"]
[dependencies]
# Runtimes
async-global-executor = { workspace = true, optional = true }
async-std = { workspace = true, optional = true }
smol = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
# TLS
@ -63,8 +68,11 @@ mac_address = { workspace = true, optional = true }
uuid = { workspace = true, optional = true }
async-io = { version = "2.4.1", optional = true }
# work around bug in async-fs 2.0.0, which references futures-lite dependency wrongly, see https://github.com/launchbadge/sqlx/pull/3791#issuecomment-3043363281
async-fs = { version = "2.1", optional = true }
base64 = { version = "0.22.0", default-features = false, features = ["std"] }
bytes = "1.1.0"
cfg-if = { workspace = true }
chrono = { version = "0.4.34", default-features = false, features = ["clock"], optional = true }
crc = { version = "3", optional = true }
crossbeam-queue = "0.3.2"

View File

@ -1,14 +1,18 @@
use std::future::Future;
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::{
future::Future,
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
};
use bytes::BufMut;
use cfg_if::cfg_if;
pub use buffered::{BufferedSocket, WriteBuffer};
use crate::io::ReadBuf;
use crate::{io::ReadBuf, rt::spawn_blocking};
mod buffered;
@ -192,53 +196,94 @@ pub async fn connect_tcp<Ws: WithSocket>(
// IPv6 addresses in URLs will be wrapped in brackets and the `url` crate doesn't trim those.
let host = host.trim_matches(&['[', ']'][..]);
#[cfg(feature = "_rt-tokio")]
if crate::rt::rt_tokio::available() {
use tokio::net::TcpStream;
let addresses = if let Ok(addr) = host.parse::<Ipv4Addr>() {
let addr = SocketAddrV4::new(addr, port);
vec![SocketAddr::V4(addr)].into_iter()
} else if let Ok(addr) = host.parse::<Ipv6Addr>() {
let addr = SocketAddrV6::new(addr, port, 0, 0);
vec![SocketAddr::V6(addr)].into_iter()
} else {
let host = host.to_string();
spawn_blocking(move || {
let addr = (host.as_str(), port);
ToSocketAddrs::to_socket_addrs(&addr)
})
.await?
};
let stream = TcpStream::connect((host, port)).await?;
stream.set_nodelay(true)?;
let mut last_err = None;
return Ok(with_socket.with_socket(stream).await);
// Loop through all the Socket Addresses that the hostname resolves to
for socket_addr in addresses {
match connect_tcp_address(socket_addr).await {
Ok(stream) => return Ok(with_socket.with_socket(stream).await),
Err(e) => last_err = Some(e),
}
}
#[cfg(feature = "_rt-async-std")]
{
use async_io::Async;
use async_std::net::ToSocketAddrs;
use std::net::TcpStream;
// If we reach this point, it means we failed to connect to any of the addresses.
// Return the last error we encountered, or a custom error if the hostname didn't resolve to any address.
Err(match last_err {
Some(err) => err,
None => io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Hostname did not resolve to any addresses",
)
.into(),
})
}
let mut last_err = None;
async fn connect_tcp_address(socket_addr: SocketAddr) -> crate::Result<impl Socket> {
cfg_if! {
if #[cfg(feature = "_rt-tokio")] {
if crate::rt::rt_tokio::available() {
use tokio::net::TcpStream;
// Loop through all the Socket Addresses that the hostname resolves to
for socket_addr in (host, port).to_socket_addrs().await? {
let stream = Async::<TcpStream>::connect(socket_addr)
.await
.and_then(|s| {
s.get_ref().set_nodelay(true)?;
Ok(s)
});
match stream {
Ok(stream) => return Ok(with_socket.with_socket(stream).await),
Err(e) => last_err = Some(e),
let stream = TcpStream::connect(socket_addr).await?;
stream.set_nodelay(true)?;
Ok(stream)
} else {
crate::rt::missing_rt(socket_addr)
}
}
} else if #[cfg(feature = "_rt-async-io")] {
use async_io::Async;
use std::net::TcpStream;
// If we reach this point, it means we failed to connect to any of the addresses.
// Return the last error we encountered, or a custom error if the hostname didn't resolve to any address.
match last_err {
Some(err) => Err(err.into()),
None => Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Hostname did not resolve to any addresses",
)
.into()),
let stream = Async::<TcpStream>::connect(socket_addr).await?;
stream.get_ref().set_nodelay(true)?;
Ok(stream)
} else {
crate::rt::missing_rt(socket_addr);
#[allow(unreachable_code)]
Ok(())
}
}
}
#[cfg(not(feature = "_rt-async-std"))]
{
crate::rt::missing_rt((host, port, with_socket))
// Work around `impl Socket`` and 'unability to specify test build cargo feature'.
// `connect_tcp_address` compilation would fail without this impl with
// 'cannot infer return type' error.
impl Socket for () {
fn try_read(&mut self, _: &mut dyn ReadBuf) -> io::Result<usize> {
unreachable!()
}
fn try_write(&mut self, _: &[u8]) -> io::Result<usize> {
unreachable!()
}
fn poll_read_ready(&mut self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_write_ready(&mut self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_shutdown(&mut self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
@ -260,19 +305,17 @@ pub async fn connect_uds<P: AsRef<Path>, Ws: WithSocket>(
return Ok(with_socket.with_socket(stream).await);
}
#[cfg(feature = "_rt-async-std")]
{
use async_io::Async;
use std::os::unix::net::UnixStream;
cfg_if! {
if #[cfg(feature = "_rt-async-io")] {
use async_io::Async;
use std::os::unix::net::UnixStream;
let stream = Async::<UnixStream>::connect(path).await?;
let stream = Async::<UnixStream>::connect(path).await?;
Ok(with_socket.with_socket(stream).await)
}
#[cfg(not(feature = "_rt-async-std"))]
{
crate::rt::missing_rt((path, with_socket))
Ok(with_socket.with_socket(stream).await)
} else {
crate::rt::missing_rt((path, with_socket))
}
}
}

View File

@ -4,19 +4,31 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
#[cfg(feature = "_rt-async-std")]
pub mod rt_async_std;
use cfg_if::cfg_if;
#[cfg(feature = "_rt-async-io")]
pub mod rt_async_io;
#[cfg(feature = "_rt-async-global-executor")]
pub mod rt_async_global_executor;
#[cfg(feature = "_rt-smol")]
pub mod rt_smol;
#[cfg(feature = "_rt-tokio")]
pub mod rt_tokio;
#[derive(Debug, thiserror::Error)]
#[error("operation timed out")]
pub struct TimeoutError(());
pub struct TimeoutError;
pub enum JoinHandle<T> {
#[cfg(feature = "_rt-async-global-executor")]
AsyncGlobalExecutor(rt_async_global_executor::JoinHandle<T>),
#[cfg(feature = "_rt-async-std")]
AsyncStd(async_std::task::JoinHandle<T>),
#[cfg(feature = "_rt-smol")]
Smol(rt_smol::JoinHandle<T>),
#[cfg(feature = "_rt-tokio")]
Tokio(tokio::task::JoinHandle<T>),
// `PhantomData<T>` requires `T: Unpin`
@ -32,18 +44,16 @@ pub async fn timeout<F: Future>(duration: Duration, f: F) -> Result<F::Output, T
#[allow(clippy::needless_return)]
return tokio::time::timeout(duration, f)
.await
.map_err(|_| TimeoutError(()));
.map_err(|_| TimeoutError);
}
#[cfg(feature = "_rt-async-std")]
{
async_std::future::timeout(duration, f)
.await
.map_err(|_| TimeoutError(()))
cfg_if! {
if #[cfg(feature = "_rt-async-io")] {
rt_async_io::timeout(duration, f).await
} else {
missing_rt((duration, f))
}
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt((duration, f))
}
pub async fn sleep(duration: Duration) {
@ -52,13 +62,13 @@ pub async fn sleep(duration: Duration) {
return tokio::time::sleep(duration).await;
}
#[cfg(feature = "_rt-async-std")]
{
async_std::task::sleep(duration).await
cfg_if! {
if #[cfg(feature = "_rt-async-io")] {
rt_async_io::sleep(duration).await
} else {
missing_rt(duration)
}
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt(duration)
}
#[track_caller]
@ -72,13 +82,21 @@ where
return JoinHandle::Tokio(handle.spawn(fut));
}
#[cfg(feature = "_rt-async-std")]
{
JoinHandle::AsyncStd(async_std::task::spawn(fut))
cfg_if! {
if #[cfg(feature = "_rt-async-global-executor")] {
JoinHandle::AsyncGlobalExecutor(rt_async_global_executor::JoinHandle {
task: Some(async_global_executor::spawn(fut)),
})
} else if #[cfg(feature = "_rt-async-std")] {
JoinHandle::AsyncStd(async_std::task::spawn(fut))
} else if #[cfg(feature = "_rt-smol")] {
JoinHandle::Smol(rt_smol::JoinHandle {
task: Some(smol::spawn(fut)),
})
} else {
missing_rt(fut)
}
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt(fut)
}
#[track_caller]
@ -92,13 +110,21 @@ where
return JoinHandle::Tokio(handle.spawn_blocking(f));
}
#[cfg(feature = "_rt-async-std")]
{
JoinHandle::AsyncStd(async_std::task::spawn_blocking(f))
cfg_if! {
if #[cfg(feature = "_rt-async-global-executor")] {
JoinHandle::AsyncGlobalExecutor(rt_async_global_executor::JoinHandle {
task: Some(async_global_executor::spawn_blocking(f)),
})
} else if #[cfg(feature = "_rt-async-std")] {
JoinHandle::AsyncStd(async_std::task::spawn_blocking(f))
} else if #[cfg(feature = "_rt-smol")] {
JoinHandle::Smol(rt_smol::JoinHandle {
task: Some(smol::unblock(f)),
})
} else {
missing_rt(f)
}
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt(f)
}
pub async fn yield_now() {
@ -107,44 +133,43 @@ pub async fn yield_now() {
return tokio::task::yield_now().await;
}
#[cfg(feature = "_rt-async-std")]
{
async_std::task::yield_now().await;
cfg_if! {
if #[cfg(feature = "_rt-async-global-executor")] {
rt_async_global_executor::yield_now().await
} else if #[cfg(feature = "_rt-async-std")] {
async_std::task::yield_now().await
} else if #[cfg(feature = "_rt-smol")] {
smol::future::yield_now().await
} else {
missing_rt(())
}
}
#[cfg(not(feature = "_rt-async-std"))]
missing_rt(())
}
#[track_caller]
pub fn test_block_on<F: Future>(f: F) -> F::Output {
#[cfg(feature = "_rt-tokio")]
{
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start Tokio runtime")
.block_on(f)
}
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
{
async_std::task::block_on(f)
}
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
{
missing_rt(f)
cfg_if! {
if #[cfg(feature = "_rt-async-io")] {
async_io::block_on(f)
} else if #[cfg(feature = "_rt-tokio")] {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start Tokio runtime")
.block_on(f)
} else {
missing_rt(f)
}
}
}
#[track_caller]
pub fn missing_rt<T>(_unused: T) -> ! {
pub const fn missing_rt<T>(_unused: T) -> ! {
if cfg!(feature = "_rt-tokio") {
panic!("this functionality requires a Tokio context")
}
panic!("either the `runtime-async-std` or `runtime-tokio` feature must be enabled")
panic!("one of the `runtime-async-global-executor`, `runtime-async-std`, `runtime-smol`, or `runtime-tokio` feature must be enabled")
}
impl<T: Send + 'static> Future for JoinHandle<T> {
@ -153,8 +178,12 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
#[track_caller]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
#[cfg(feature = "_rt-async-global-executor")]
Self::AsyncGlobalExecutor(handle) => Pin::new(handle).poll(cx),
#[cfg(feature = "_rt-async-std")]
Self::AsyncStd(handle) => Pin::new(handle).poll(cx),
#[cfg(feature = "_rt-smol")]
Self::Smol(handle) => Pin::new(handle).poll(cx),
#[cfg(feature = "_rt-tokio")]
Self::Tokio(handle) => Pin::new(handle)
.poll(cx)

View File

@ -0,0 +1,30 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use async_global_executor::Task;
pub struct JoinHandle<T> {
pub task: Option<Task<T>>,
}
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(task) = self.task.take() {
task.detach();
}
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.task.as_mut() {
Some(task) => Future::poll(Pin::new(task), cx),
None => unreachable!("JoinHandle polled after dropping"),
}
}
}

View File

@ -0,0 +1,5 @@
mod join_handle;
pub use join_handle::*;
pub mod yield_now;
pub use yield_now::*;

View File

@ -0,0 +1,28 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub fn yield_now() -> impl Future<Output = ()> {
YieldNow(false)
}
struct YieldNow(bool);
impl Future for YieldNow {
type Output = ();
// The futures executor is implemented as a FIFO queue, so all this future
// does is re-schedule the future back to the end of the queue, giving room
// for other futures to progress.
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if !self.0 {
self.0 = true;
cx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(())
}
}
}

View File

@ -0,0 +1,4 @@
mod socket;
mod timeout;
pub use timeout::*;

View File

@ -3,12 +3,12 @@ use crate::net::Socket;
use std::io;
use std::io::{Read, Write};
use std::net::{Shutdown, TcpStream};
use std::task::{Context, Poll};
use crate::io::ReadBuf;
use async_io::Async;
use crate::io::ReadBuf;
impl Socket for Async<TcpStream> {
fn try_read(&mut self, buf: &mut dyn ReadBuf) -> io::Result<usize> {
self.get_ref().read(buf.init_mut())

View File

@ -0,0 +1,20 @@
use std::{future::Future, pin::pin, time::Duration};
use futures_util::future::{select, Either};
use crate::rt::TimeoutError;
pub async fn sleep(duration: Duration) {
timeout_future(duration).await;
}
pub async fn timeout<F: Future>(duration: Duration, future: F) -> Result<F::Output, TimeoutError> {
match select(pin!(future), timeout_future(duration)).await {
Either::Left((result, _)) => Ok(result),
Either::Right(_) => Err(TimeoutError),
}
}
fn timeout_future(duration: Duration) -> impl Future {
async_io::Timer::after(duration)
}

View File

@ -1 +0,0 @@
mod socket;

View File

@ -0,0 +1,30 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use smol::Task;
pub struct JoinHandle<T> {
pub task: Option<Task<T>>,
}
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(task) = self.task.take() {
task.detach();
}
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.task.as_mut() {
Some(task) => Future::poll(Pin::new(task), cx),
None => unreachable!("JoinHandle polled after dropping"),
}
}
}

View File

@ -0,0 +1,2 @@
mod join_handle;
pub use join_handle::*;

View File

@ -1,3 +1,5 @@
use cfg_if::cfg_if;
// For types with identical signatures that don't require runtime support,
// we can just arbitrarily pick one to use based on what's enabled.
//
@ -5,7 +7,7 @@
// (including `tokio-console` support) and more widely deployed.
pub struct AsyncSemaphore {
// We use the semaphore from futures-intrusive as the one from async-std
// We use the semaphore from futures-intrusive as the one from async-lock
// is missing the ability to add arbitrary permits, and is not guaranteed to be fair:
// * https://github.com/smol-rs/async-lock/issues/22
// * https://github.com/smol-rs/async-lock/issues/23
@ -14,7 +16,14 @@ pub struct AsyncSemaphore {
// and there are some soundness concerns (although it turns out any intrusive future is unsound
// in MIRI due to the necessitated mutable aliasing):
// https://github.com/launchbadge/sqlx/issues/1668
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
#[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))]
inner: futures_intrusive::sync::Semaphore,
#[cfg(feature = "_rt-tokio")]
@ -24,12 +33,24 @@ pub struct AsyncSemaphore {
impl AsyncSemaphore {
#[track_caller]
pub fn new(fair: bool, permits: usize) -> Self {
if cfg!(not(any(feature = "_rt-async-std", feature = "_rt-tokio"))) {
if cfg!(not(any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol",
feature = "_rt-tokio"
))) {
crate::rt::missing_rt((fair, permits));
}
AsyncSemaphore {
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
#[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))]
inner: futures_intrusive::sync::Semaphore::new(fair, permits),
#[cfg(feature = "_rt-tokio")]
inner: {
@ -40,61 +61,93 @@ impl AsyncSemaphore {
}
pub fn permits(&self) -> usize {
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
return self.inner.permits();
#[cfg(feature = "_rt-tokio")]
return self.inner.available_permits();
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
crate::rt::missing_rt(())
cfg_if! {
if #[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))] {
self.inner.permits()
} else if #[cfg(feature = "_rt-tokio")] {
self.inner.available_permits()
} else {
crate::rt::missing_rt(())
}
}
}
pub async fn acquire(&self, permits: u32) -> AsyncSemaphoreReleaser<'_> {
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
return AsyncSemaphoreReleaser {
inner: self.inner.acquire(permits as usize).await,
};
#[cfg(feature = "_rt-tokio")]
return AsyncSemaphoreReleaser {
inner: self
.inner
// Weird quirk: `tokio::sync::Semaphore` mostly uses `usize` for permit counts,
// but `u32` for this and `try_acquire_many()`.
.acquire_many(permits)
.await
.expect("BUG: we do not expose the `.close()` method"),
};
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
crate::rt::missing_rt(permits)
cfg_if! {
if #[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))] {
AsyncSemaphoreReleaser {
inner: self.inner.acquire(permits as usize).await,
}
} else if #[cfg(feature = "_rt-tokio")] {
AsyncSemaphoreReleaser {
inner: self
.inner
// Weird quirk: `tokio::sync::Semaphore` mostly uses `usize` for permit counts,
// but `u32` for this and `try_acquire_many()`.
.acquire_many(permits)
.await
.expect("BUG: we do not expose the `.close()` method"),
}
} else {
crate::rt::missing_rt(permits)
}
}
}
pub fn try_acquire(&self, permits: u32) -> Option<AsyncSemaphoreReleaser<'_>> {
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
return Some(AsyncSemaphoreReleaser {
inner: self.inner.try_acquire(permits as usize)?,
});
#[cfg(feature = "_rt-tokio")]
return Some(AsyncSemaphoreReleaser {
inner: self.inner.try_acquire_many(permits).ok()?,
});
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
crate::rt::missing_rt(permits)
cfg_if! {
if #[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))] {
Some(AsyncSemaphoreReleaser {
inner: self.inner.try_acquire(permits as usize)?,
})
} else if #[cfg(feature = "_rt-tokio")] {
Some(AsyncSemaphoreReleaser {
inner: self.inner.try_acquire_many(permits).ok()?,
})
} else {
crate::rt::missing_rt(permits)
}
}
}
pub fn release(&self, permits: usize) {
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
return self.inner.release(permits);
#[cfg(feature = "_rt-tokio")]
return self.inner.add_permits(permits);
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
crate::rt::missing_rt(permits)
cfg_if! {
if #[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))] {
self.inner.release(permits);
} else if #[cfg(feature = "_rt-tokio")] {
self.inner.add_permits(permits);
} else {
crate::rt::missing_rt(permits);
}
}
}
}
@ -108,30 +161,46 @@ pub struct AsyncSemaphoreReleaser<'a> {
// and there are some soundness concerns (although it turns out any intrusive future is unsound
// in MIRI due to the necessitated mutable aliasing):
// https://github.com/launchbadge/sqlx/issues/1668
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
#[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))]
inner: futures_intrusive::sync::SemaphoreReleaser<'a>,
#[cfg(feature = "_rt-tokio")]
inner: tokio::sync::SemaphorePermit<'a>,
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
#[cfg(not(any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol",
feature = "_rt-tokio"
)))]
_phantom: std::marker::PhantomData<&'a ()>,
}
impl AsyncSemaphoreReleaser<'_> {
pub fn disarm(self) {
#[cfg(feature = "_rt-tokio")]
{
self.inner.forget();
cfg_if! {
if #[cfg(all(
any(
feature = "_rt-async-global-executor",
feature = "_rt-async-std",
feature = "_rt-smol"
),
not(feature = "_rt-tokio")
))] {
let mut this = self;
this.inner.disarm();
} else if #[cfg(feature = "_rt-tokio")] {
self.inner.forget();
} else {
crate::rt::missing_rt(());
}
}
#[cfg(all(feature = "_rt-async-std", not(feature = "_rt-tokio")))]
{
let mut this = self;
this.inner.disarm();
}
#[cfg(not(any(feature = "_rt-async-std", feature = "_rt-tokio")))]
crate::rt::missing_rt(())
}
}

View File

@ -12,7 +12,9 @@ rust-version.workspace = true
default = []
# for conditional compilation
_rt-async-global-executor = ["async-global-executor", "sqlx-core/_rt-async-global-executor"]
_rt-async-std = ["async-std", "sqlx-core/_rt-async-std"]
_rt-smol = ["smol", "sqlx-core/_rt-smol"]
_rt-tokio = ["tokio", "sqlx-core/_rt-tokio"]
_tls-native-tls = ["sqlx-core/_tls-native-tls"]
@ -57,9 +59,12 @@ sqlx-mysql = { workspace = true, features = ["offline", "migrate"], optional = t
sqlx-postgres = { workspace = true, features = ["offline", "migrate"], optional = true }
sqlx-sqlite = { workspace = true, features = ["offline", "migrate"], optional = true }
async-global-executor = { workspace = true, optional = true }
async-std = { workspace = true, optional = true }
smol = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
cfg-if = { workspace = true}
dotenvy = { workspace = true }
hex = { version = "0.4.3" }
@ -75,5 +80,4 @@ url = { version = "2.2.2" }
[lints.rust.unexpected_cfgs]
level = "warn"
# 1.80 will warn without this
check-cfg = ['cfg(sqlx_macros_unstable)', 'cfg(procmacro2_semver_exempt)']

View File

@ -19,6 +19,8 @@
feature(track_path)
)]
use cfg_if::cfg_if;
#[cfg(feature = "macros")]
use crate::query::QueryDriver;
@ -55,29 +57,30 @@ pub fn block_on<F>(f: F) -> F::Output
where
F: std::future::Future,
{
#[cfg(feature = "_rt-tokio")]
{
use std::sync::LazyLock;
cfg_if! {
if #[cfg(feature = "_rt-async-global-executor")] {
sqlx_core::rt::test_block_on(f)
} else if #[cfg(feature = "_rt-async-std")] {
async_std::task::block_on(f)
} else if #[cfg(feature = "_rt-smol")] {
sqlx_core::rt::test_block_on(f)
} else if #[cfg(feature = "_rt-tokio")] {
use std::sync::LazyLock;
use tokio::runtime::{self, Runtime};
use tokio::runtime::{self, Runtime};
// We need a single, persistent Tokio runtime since we're caching connections,
// otherwise we'll get "IO driver has terminated" errors.
static TOKIO_RT: LazyLock<Runtime> = LazyLock::new(|| {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start Tokio runtime")
});
// We need a single, persistent Tokio runtime since we're caching connections,
// otherwise we'll get "IO driver has terminated" errors.
static TOKIO_RT: LazyLock<Runtime> = LazyLock::new(|| {
runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start Tokio runtime")
});
TOKIO_RT.block_on(f)
TOKIO_RT.block_on(f)
} else {
sqlx_core::rt::missing_rt(f)
}
}
#[cfg(all(feature = "_rt-async-std", not(feature = "tokio")))]
{
async_std::task::block_on(f)
}
#[cfg(not(any(feature = "_rt-async-std", feature = "tokio")))]
sqlx_core::rt::missing_rt(f)
}

View File

@ -15,7 +15,9 @@ proc-macro = true
default = []
# for conditional compilation
_rt-async-global-executor = ["sqlx-macros-core/_rt-async-global-executor"]
_rt-async-std = ["sqlx-macros-core/_rt-async-std"]
_rt-smol = ["sqlx-macros-core/_rt-smol"]
_rt-tokio = ["sqlx-macros-core/_rt-tokio"]
_tls-native-tls = ["sqlx-macros-core/_tls-native-tls"]