From 6c8d68e9d0923f13f6bd9bd3cf112040edbbcee0 Mon Sep 17 00:00:00 2001 From: Ryan Leckey Date: Sun, 10 Jan 2021 19:23:04 -0800 Subject: [PATCH] refactor: marker traits for non-blocking vs - enforce compile-time errors if you try to block on an async-only runtime or await a blocking runtime - remove viral HRTB for Streams - support UNIX streams --- Cargo.lock | 1 + sqlx-core/Cargo.toml | 1 + sqlx-core/src/blocking.rs | 67 +-------- sqlx-core/src/blocking/close.rs | 6 +- sqlx-core/src/blocking/connect.rs | 5 +- sqlx-core/src/blocking/connection.rs | 6 +- sqlx-core/src/blocking/io.rs | 15 -- sqlx-core/src/blocking/options.rs | 5 +- sqlx-core/src/blocking/runtime.rs | 147 ++++++++++++++++++-- sqlx-core/src/close.rs | 3 +- sqlx-core/src/connect.rs | 3 +- sqlx-core/src/connection.rs | 3 +- sqlx-core/src/io/buf_stream.rs | 8 +- sqlx-core/src/io/stream.rs | 66 ++++++++- sqlx-core/src/lib.rs | 10 +- sqlx-core/src/mock.rs | 71 ++++++---- sqlx-core/src/net.rs | 3 + sqlx-core/src/net/stream.rs | 196 +++++++++++++++++++++++++++ sqlx-core/src/options.rs | 3 +- sqlx-core/src/runtime.rs | 79 ++++++++--- sqlx-core/src/runtime/actix.rs | 94 ++++++++++++- sqlx-core/src/runtime/async_std.rs | 97 ++++++++++--- sqlx-core/src/runtime/tokio.rs | 94 ++++++++++++- sqlx-mysql/src/connection.rs | 19 +-- sqlx-mysql/src/connection/close.rs | 2 - sqlx-mysql/src/connection/connect.rs | 10 +- sqlx-mysql/src/connection/ping.rs | 3 +- sqlx-mysql/src/connection/stream.rs | 3 +- sqlx-mysql/src/options.rs | 4 +- 29 files changed, 797 insertions(+), 227 deletions(-) delete mode 100644 sqlx-core/src/blocking/io.rs create mode 100644 sqlx-core/src/net.rs create mode 100644 sqlx-core/src/net/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 4ae975a9..ce8acb6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1105,6 +1105,7 @@ dependencies = [ "bytestring", "conquer-once", "crossbeam", + "either", "futures-core", "futures-io", "futures-util", diff --git a/sqlx-core/Cargo.toml b/sqlx-core/Cargo.toml index 53e8273b..246a217b 100644 --- a/sqlx-core/Cargo.toml +++ b/sqlx-core/Cargo.toml @@ -38,6 +38,7 @@ _mock = ["conquer-once", "parking_lot", "crossbeam"] [dependencies] log = "0.4.11" +either = "1.6.1" actix-rt = { version = "2.0.0-beta.2", optional = true } _async-std = { version = "1.8", optional = true, package = "async-std" } futures-util = { version = "0.3", optional = true, features = ["io"] } diff --git a/sqlx-core/src/blocking.rs b/sqlx-core/src/blocking.rs index 07aca56e..19b20cb2 100644 --- a/sqlx-core/src/blocking.rs +++ b/sqlx-core/src/blocking.rs @@ -2,18 +2,12 @@ //! for **blocking** operations. //! -use std::io::{Read, Result as IoResult, Write}; -use std::net::TcpStream; - mod acquire; mod close; mod connect; mod connection; mod options; -mod runtime; - -#[doc(hidden)] -pub mod io; +pub(crate) mod runtime; pub use acquire::Acquire; pub use close::Close; @@ -39,62 +33,3 @@ pub mod prelude { #[doc(no_inline)] pub use crate::Database as _; } - -pub(super) mod rt { - /// Uses the `std::net` primitives to implement a blocking runtime for SQLx. - #[derive(Debug)] - pub struct Blocking; -} - -impl crate::Runtime for rt::Blocking { - #[doc(hidden)] - type TcpStream = TcpStream; -} - -impl Runtime for rt::Blocking { - #[doc(hidden)] - fn connect_tcp(host: &str, port: u16) -> IoResult { - TcpStream::connect((host, port)) - } -} - -// 's: stream -impl<'s> crate::io::Stream<'s, rt::Blocking> for TcpStream { - #[doc(hidden)] - #[cfg(feature = "async")] - type ReadFuture = futures_util::future::BoxFuture<'s, IoResult>; - - #[doc(hidden)] - #[cfg(feature = "async")] - type WriteFuture = futures_util::future::BoxFuture<'s, IoResult>; - - #[doc(hidden)] - #[cfg(feature = "async")] - fn read_async(&'s mut self, _buf: &'s mut [u8]) -> Self::ReadFuture { - // UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker - unreachable!() - } - - #[doc(hidden)] - #[cfg(feature = "async")] - fn write_async(&'s mut self, _buf: &'s [u8]) -> Self::WriteFuture { - // UNREACHABLE: [`Blocking`] does not implement the [`Async`] marker - unreachable!() - } -} - -// 's: stream -impl<'s> io::Stream<'s, rt::Blocking> for TcpStream { - #[doc(hidden)] - fn read(&'s mut self, buf: &'s mut [u8]) -> IoResult { - Read::read(self, buf) - } - - #[doc(hidden)] - fn write(&'s mut self, buf: &'s [u8]) -> IoResult { - let size = buf.len(); - self.write_all(buf)?; - - Ok(size) - } -} diff --git a/sqlx-core/src/blocking/close.rs b/sqlx-core/src/blocking/close.rs index 8944b3aa..e0d9b22f 100644 --- a/sqlx-core/src/blocking/close.rs +++ b/sqlx-core/src/blocking/close.rs @@ -1,12 +1,10 @@ -use super::{io::Stream, Runtime}; +use super::Runtime; pub trait Close: crate::Close where Rt: Runtime, { - fn close(self) -> crate::Result<()> - where - for<'s> ::TcpStream: Stream<'s, Rt>; + fn close(self) -> crate::Result<()>; } // TODO: impl Close for Pool { ... } diff --git a/sqlx-core/src/blocking/connect.rs b/sqlx-core/src/blocking/connect.rs index f9c2fe8b..14814e4f 100644 --- a/sqlx-core/src/blocking/connect.rs +++ b/sqlx-core/src/blocking/connect.rs @@ -1,4 +1,4 @@ -use super::{io::Stream, Runtime}; +use super::Runtime; pub trait Connect: crate::Connect where @@ -6,8 +6,7 @@ where { fn connect(url: &str) -> crate::Result where - Self: Sized, - for<'s> ::TcpStream: Stream<'s, Rt>; + Self: Sized; } // TODO: impl Connect for Pool { ... } diff --git a/sqlx-core/src/blocking/connection.rs b/sqlx-core/src/blocking/connection.rs index 7420f1fb..774984db 100644 --- a/sqlx-core/src/blocking/connection.rs +++ b/sqlx-core/src/blocking/connection.rs @@ -1,4 +1,4 @@ -use super::{io::Stream, Close, Connect, ConnectOptions, Runtime}; +use super::{Close, Connect, ConnectOptions, Runtime}; /// A unique connection (session) with a specific database. /// @@ -15,7 +15,5 @@ where /// For detailed information, refer to the asynchronous version of /// this: [`ping()`][crate::Connection::ping]. /// - fn ping(&mut self) -> crate::Result<()> - where - for<'s> ::TcpStream: Stream<'s, Rt>; + fn ping(&mut self) -> crate::Result<()>; } diff --git a/sqlx-core/src/blocking/io.rs b/sqlx-core/src/blocking/io.rs deleted file mode 100644 index f7e93f49..00000000 --- a/sqlx-core/src/blocking/io.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::io; - -use crate::Runtime; - -// 's: stream -pub trait Stream<'s, Rt>: crate::io::Stream<'s, Rt> -where - Rt: Runtime, -{ - #[doc(hidden)] - fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result; - - #[doc(hidden)] - fn write(&'s mut self, buf: &'s [u8]) -> io::Result; -} diff --git a/sqlx-core/src/blocking/options.rs b/sqlx-core/src/blocking/options.rs index 96e316f1..d900a5b7 100644 --- a/sqlx-core/src/blocking/options.rs +++ b/sqlx-core/src/blocking/options.rs @@ -1,4 +1,4 @@ -use super::{io::Stream, Connection, Runtime}; +use super::{Connection, Runtime}; /// Options which can be used to configure how a SQL connection is opened. /// @@ -18,6 +18,5 @@ where /// fn connect(&self) -> crate::Result where - Self::Connection: Sized, - for<'s> ::TcpStream: Stream<'s, Rt>; + Self::Connection: Sized; } diff --git a/sqlx-core/src/blocking/runtime.rs b/sqlx-core/src/blocking/runtime.rs index e60d9e5e..e0bf6fe9 100644 --- a/sqlx-core/src/blocking/runtime.rs +++ b/sqlx-core/src/blocking/runtime.rs @@ -1,12 +1,139 @@ -use std::io; +use std::io::{self, Read, Write}; +use std::net::TcpStream; +#[cfg(unix)] +use std::os::unix::net::UnixStream; +#[cfg(unix)] +use std::path::Path; -/// Describes a set of types and functions used to open and manage -/// resources within SQLx. -/// -/// For detailed information, refer to the asynchronous version of -/// this: [`Runtime`][crate::Runtime]. -/// -pub trait Runtime: crate::Runtime { - /// Opens a TCP connection to a remote host at the specified port. - fn connect_tcp(host: &str, port: u16) -> io::Result; +#[cfg(feature = "async")] +use futures_util::future::BoxFuture; + +use crate::io::Stream as IoStream; + +/// Marks a [`Runtime`][crate::Runtime] as being capable of executing blocking operations. +pub trait Runtime: crate::Runtime {} + +/// Uses the `std::net` primitives to implement a blocking runtime for SQLx. +#[derive(Debug)] +pub struct Blocking; + +impl crate::Runtime for Blocking { + #[doc(hidden)] + type TcpStream = TcpStream; + + #[doc(hidden)] + #[cfg(unix)] + type UnixStream = UnixStream; + + #[doc(hidden)] + fn connect_tcp(host: &str, port: u16) -> io::Result { + TcpStream::connect((host, port)) + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(path: &Path) -> io::Result { + UnixStream::connect(path) + } + + #[doc(hidden)] + #[cfg(feature = "async")] + #[allow(unused_variables)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { + // UNREACHABLE: where Self: Async + unreachable!() + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "async"))] + #[allow(unused_variables)] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> { + // UNREACHABLE: where Self: Async + unreachable!() + } +} + +impl Runtime for Blocking {} + +// 's: stream +impl<'s> IoStream<'s, Blocking> for TcpStream { + #[doc(hidden)] + #[cfg(feature = "async")] + type ReadFuture = BoxFuture<'s, io::Result>; + + #[doc(hidden)] + #[cfg(feature = "async")] + type WriteFuture = BoxFuture<'s, io::Result>; + + #[inline] + #[doc(hidden)] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result { + Read::read(self, buf) + } + + #[inline] + #[doc(hidden)] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result { + let size = buf.len(); + Write::write_all(self, buf)?; + + Ok(size) + } + + #[doc(hidden)] + #[cfg(feature = "async")] + fn read_async(&'s mut self, _buf: &'s mut [u8]) -> Self::ReadFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } + + #[doc(hidden)] + #[cfg(feature = "async")] + fn write_async(&'s mut self, _buf: &'s [u8]) -> Self::WriteFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } +} + +// 's: stream +#[cfg(unix)] +impl<'s> IoStream<'s, Blocking> for UnixStream { + #[doc(hidden)] + #[cfg(feature = "async")] + type ReadFuture = BoxFuture<'s, io::Result>; + + #[doc(hidden)] + #[cfg(feature = "async")] + type WriteFuture = BoxFuture<'s, io::Result>; + + #[inline] + #[doc(hidden)] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result { + Read::read(self, buf) + } + + #[inline] + #[doc(hidden)] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result { + let size = buf.len(); + Write::write_all(self, buf)?; + + Ok(size) + } + + #[doc(hidden)] + #[cfg(feature = "async")] + #[allow(unused_variables)] + fn read_async(&'s mut self, _buf: &'s mut [u8]) -> Self::ReadFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } + + #[doc(hidden)] + #[cfg(feature = "async")] + #[allow(unused_variables)] + fn write_async(&'s mut self, _buf: &'s [u8]) -> Self::WriteFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } } diff --git a/sqlx-core/src/close.rs b/sqlx-core/src/close.rs index 2dfeba02..f8c84074 100644 --- a/sqlx-core/src/close.rs +++ b/sqlx-core/src/close.rs @@ -8,8 +8,7 @@ where #[cfg(feature = "async")] fn close(self) -> futures_util::future::BoxFuture<'static, crate::Result<()>> where - Rt: crate::Async, - for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; + Rt: crate::Async; } // TODO: impl Close for Pool { ... } diff --git a/sqlx-core/src/connect.rs b/sqlx-core/src/connect.rs index e206b348..7252c19d 100644 --- a/sqlx-core/src/connect.rs +++ b/sqlx-core/src/connect.rs @@ -10,8 +10,7 @@ where fn connect(url: &str) -> futures_util::future::BoxFuture<'_, crate::Result> where Self: Sized, - Rt: crate::Async, - for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; + Rt: crate::Async; } // TODO: impl Connect for Pool { ... } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 31490c8b..c99b9c50 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -28,6 +28,5 @@ where #[cfg(feature = "async")] fn ping(&mut self) -> BoxFuture<'_, crate::Result<()>> where - Rt: crate::Async, - for<'s> ::TcpStream: crate::io::Stream<'s, Rt>; + Rt: crate::Async; } diff --git a/sqlx-core/src/io/buf_stream.rs b/sqlx-core/src/io/buf_stream.rs index 4600cf71..d00cfc4f 100644 --- a/sqlx-core/src/io/buf_stream.rs +++ b/sqlx-core/src/io/buf_stream.rs @@ -2,6 +2,8 @@ use std::marker::PhantomData; use bytes::{Bytes, BytesMut}; +use super::Stream; + /// Wraps a stream and buffers input and output to and from it. /// /// It can be excessively inefficient to work directly with a `Read` or `Write`. For example, @@ -47,7 +49,7 @@ impl BufStream { } pub fn consume(&mut self, n: usize) { - let _ = self.take(n); + let _rem = self.take(n); } pub fn reserve(&mut self, additional: usize) { @@ -119,7 +121,7 @@ macro_rules! read { } #[cfg(feature = "async")] -impl crate::io::Stream<'s, Rt>> BufStream { +impl Stream<'s, Rt>> BufStream { pub async fn flush_async(&mut self) -> crate::Result<()> { // write as much as we can each time and move the cursor as we write from the buffer // if _this_ future drops, offset will have a record of how much of the wbuf has @@ -141,7 +143,7 @@ impl crate::io::Stream<'s, Rt>> BufStream } #[cfg(feature = "blocking")] -impl crate::blocking::io::Stream<'s, Rt>> BufStream { +impl Stream<'s, Rt>> BufStream { pub fn flush(&mut self) -> crate::Result<()> { self.stream.write(&self.wbuf)?; self.wbuf.clear(); diff --git a/sqlx-core/src/io/stream.rs b/sqlx-core/src/io/stream.rs index 37cb6fb2..878663c7 100644 --- a/sqlx-core/src/io/stream.rs +++ b/sqlx-core/src/io/stream.rs @@ -1,18 +1,74 @@ +#[cfg(feature = "async")] +use std::future::Future; +use std::io; + use crate::Runtime; // 's: stream -pub trait Stream<'s, Rt: Runtime>: Send + Sync + Unpin { +pub trait Stream<'s, Rt>: Send + Sync + Unpin +where + Rt: Runtime, +{ #[cfg(feature = "async")] - type ReadFuture: 's + std::future::Future> + Send; + type ReadFuture: 's + Future> + Send; #[cfg(feature = "async")] - type WriteFuture: 's + std::future::Future> + Send; + type WriteFuture: 's + Future> + Send; #[cfg(feature = "async")] #[doc(hidden)] - fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture; + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture + where + Rt: crate::Async; #[cfg(feature = "async")] #[doc(hidden)] - fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture; + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture + where + Rt: crate::Async; + + #[cfg(feature = "blocking")] + #[doc(hidden)] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result + where + Rt: crate::blocking::Runtime; + + #[cfg(feature = "blocking")] + #[doc(hidden)] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result + where + Rt: crate::blocking::Runtime; +} + +#[cfg(not(any( + feature = "async-std", + feature = "actix", + feature = "tokio", + feature = "blocking" +)))] +impl<'s, Rt> Stream<'s, Rt> for () +where + Rt: Runtime, +{ + #[cfg(feature = "async")] + type ReadFuture = futures_util::future::BoxFuture<'s, io::Result>; + + #[cfg(feature = "async")] + type WriteFuture = futures_util::future::BoxFuture<'s, io::Result>; + + #[cfg(feature = "async")] + #[doc(hidden)] + #[allow(unused_variables)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } + + #[cfg(feature = "async")] + #[doc(hidden)] + #[allow(unused_variables)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + // UNREACHABLE: where Self: Async + unreachable!() + } } diff --git a/sqlx-core/src/lib.rs b/sqlx-core/src/lib.rs index 47455f94..ee221cda 100644 --- a/sqlx-core/src/lib.rs +++ b/sqlx-core/src/lib.rs @@ -2,6 +2,7 @@ //! database driver (`sqlx-postgres`, `sqlx-mysql`, etc.). //! #![cfg_attr(doc_cfg, feature(doc_cfg))] +#![cfg_attr(not(any(feature = "async", feature = "blocking")), allow(unused))] #![deny(unsafe_code)] #![warn(rust_2018_idioms)] #![warn(future_incompatible)] @@ -30,6 +31,9 @@ mod runtime; #[doc(hidden)] pub mod io; +#[doc(hidden)] +pub mod net; + #[doc(hidden)] #[cfg(feature = "_mock")] pub mod mock; @@ -39,7 +43,7 @@ pub mod blocking; pub use acquire::Acquire; #[cfg(feature = "blocking")] -pub use blocking::rt::Blocking; +pub use blocking::runtime::Blocking; pub use close::Close; pub use connect::Connect; pub use connection::Connection; @@ -56,6 +60,10 @@ pub use runtime::Tokio; pub use runtime::{Async, DefaultRuntime, Runtime}; /// Convenience re-export of common traits for non-blocking operations. +#[cfg(any( + any(feature = "async-std", feature = "tokio", feature = "actix"), + not(feature = "blocking") +))] pub mod prelude { #[doc(no_inline)] pub use super::Acquire as _; diff --git a/sqlx-core/src/mock.rs b/sqlx-core/src/mock.rs index b6862292..41615514 100644 --- a/sqlx-core/src/mock.rs +++ b/sqlx-core/src/mock.rs @@ -1,18 +1,18 @@ use std::collections::HashMap; use std::io; -#[cfg(feature = "async")] -use std::pin::Pin; +#[cfg(unix)] +use std::path::Path; use std::sync::atomic::AtomicU16; use std::sync::atomic::Ordering; use bytes::BytesMut; use conquer_once::Lazy; use crossbeam::channel; +#[cfg(feature = "async")] +use futures_util::future::{self, BoxFuture}; use parking_lot::RwLock; -#[cfg(feature = "blocking")] -use crate::blocking; -use crate::{io::Stream, Runtime}; +use crate::{io::Stream as IoStream, Runtime}; #[derive(Debug)] pub struct Mock; @@ -32,23 +32,39 @@ static MOCK_STREAMS: Lazy>> = Lazy::new(RwLock:: impl Runtime for Mock { type TcpStream = MockStream; -} -#[cfg(feature = "async")] -impl crate::Async for Mock { - fn connect_tcp_async( - _host: &str, - port: u16, - ) -> futures_util::future::BoxFuture<'_, io::Result> { - Box::pin(futures_util::future::ready(Self::get_stream(port))) - } -} + #[cfg(unix)] + type UnixStream = MockStream; -#[cfg(feature = "blocking")] -impl crate::blocking::Runtime for Mock { + #[doc(hidden)] + #[cfg(feature = "blocking")] fn connect_tcp(_host: &str, port: u16) -> io::Result { Self::get_stream(port) } + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(_path: &Path) -> io::Result { + Err(io::Error::new( + io::ErrorKind::Other, + "Unix streams are not supported in the Mock runtime", + )) + } + + #[doc(hidden)] + #[cfg(feature = "async")] + fn connect_tcp_async(_host: &str, port: u16) -> BoxFuture<'_, io::Result> { + Box::pin(future::ready(Self::get_stream(port))) + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "async"))] + fn connect_unix_async(_path: &Path) -> BoxFuture<'_, io::Result> { + Box::pin(future::err(io::Error::new( + io::ErrorKind::Other, + "Unix streams are not supported in the Mock runtime", + ))) + } } impl Mock { @@ -82,12 +98,12 @@ impl MockStream { } } -impl<'s> Stream<'s, Mock> for MockStream { +impl<'s> IoStream<'s, Mock> for MockStream { #[cfg(feature = "async")] - type ReadFuture = Pin> + 's + Send>>; + type ReadFuture = BoxFuture<'s, io::Result>; #[cfg(feature = "async")] - type WriteFuture = Pin> + 's + Send>>; + type WriteFuture = BoxFuture<'s, io::Result>; #[cfg(feature = "async")] fn read_async(&'s mut self, mut buf: &'s mut [u8]) -> Self::ReadFuture { @@ -100,7 +116,7 @@ impl<'s> Stream<'s, Mock> for MockStream { let written = buf.write(&self.rbuf)?; // remove the bytes that we were able to write - let _ = self.rbuf.split_to(written); + let _rem = self.rbuf.split_to(written); // return how many bytes we wrote return Ok(written); @@ -124,15 +140,13 @@ impl<'s> Stream<'s, Mock> for MockStream { #[cfg(feature = "async")] fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { // send it all, right away - let _ = self.write.send(buf.to_vec()); + let _res = self.write.send(buf.to_vec()); // that was easy - Box::pin(futures_util::future::ok(buf.len())) + Box::pin(future::ok(buf.len())) } -} -#[cfg(feature = "blocking")] -impl<'s> blocking::io::Stream<'s, Mock> for MockStream { + #[cfg(feature = "blocking")] fn read(&'s mut self, mut buf: &'s mut [u8]) -> io::Result { use io::Write; @@ -142,7 +156,7 @@ impl<'s> blocking::io::Stream<'s, Mock> for MockStream { let written = buf.write(&self.rbuf)?; // remove the bytes that we were able to write - let _ = self.rbuf.split_to(written); + let _rem = self.rbuf.split_to(written); // return how many bytes we wrote return Ok(written); @@ -157,9 +171,10 @@ impl<'s> blocking::io::Stream<'s, Mock> for MockStream { } } + #[cfg(feature = "blocking")] fn write(&'s mut self, buf: &'s [u8]) -> io::Result { // send it all, right away - let _ = self.write.send(buf.to_vec()); + let _res = self.write.send(buf.to_vec()); // that was easy Ok(buf.len()) diff --git a/sqlx-core/src/net.rs b/sqlx-core/src/net.rs new file mode 100644 index 00000000..b1f01cc1 --- /dev/null +++ b/sqlx-core/src/net.rs @@ -0,0 +1,3 @@ +mod stream; + +pub use stream::Stream; diff --git a/sqlx-core/src/net/stream.rs b/sqlx-core/src/net/stream.rs new file mode 100644 index 00000000..b39ab2f7 --- /dev/null +++ b/sqlx-core/src/net/stream.rs @@ -0,0 +1,196 @@ +use std::io; +use std::path::PathBuf; + +use either::Either; +#[cfg(feature = "async")] +use futures_util::future::{self, FutureExt}; + +use crate::io::Stream as IoStream; +use crate::Runtime; + +#[derive(Debug)] +pub enum Stream +where + Rt: Runtime, +{ + Tcp(Rt::TcpStream), + + #[cfg(unix)] + Unix(Rt::UnixStream), +} + +impl Stream +where + Rt: Runtime, +{ + #[cfg(feature = "async")] + pub async fn connect_async(address: Either<&(String, u16), &PathBuf>) -> io::Result + where + Rt: crate::Async, + { + match address { + Either::Left((host, port)) => Rt::connect_tcp_async(host, *port).await.map(Self::Tcp), + + #[cfg(unix)] + Either::Right(socket) => Rt::connect_unix_async(socket).await.map(Self::Unix), + + #[cfg(not(unix))] + Either(_socket) => Err(io::Error::new( + io::ErrorKind::Other, + "Unix streams are not supported outside Unix platforms", + )), + } + } + #[cfg(feature = "blocking")] + pub fn connect(address: Either<&(String, u16), &PathBuf>) -> io::Result + where + Rt: crate::blocking::Runtime, + { + match address { + Either::Left((host, port)) => Rt::connect_tcp(host, *port).map(Self::Tcp), + + #[cfg(unix)] + Either::Right(socket) => Rt::connect_unix(socket).map(Self::Unix), + + #[cfg(not(unix))] + Either(_socket) => Err(io::Error::new( + io::ErrorKind::Other, + "Unix streams are not supported outside Unix platforms", + )), + } + } +} + +#[cfg(unix)] +impl<'s, Rt> IoStream<'s, Rt> for Stream +where + Rt: Runtime, +{ + #[doc(hidden)] + #[cfg(feature = "async")] + type ReadFuture = future::Either< + >::ReadFuture, + >::ReadFuture, + >; + + #[doc(hidden)] + #[cfg(feature = "async")] + type WriteFuture = future::Either< + >::WriteFuture, + >::WriteFuture, + >; + + #[inline] + #[doc(hidden)] + #[cfg(feature = "async")] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture + where + Rt: crate::Async, + { + match self { + Self::Tcp(stream) => stream.read_async(buf).left_future(), + Self::Unix(stream) => stream.read_async(buf).right_future(), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "async")] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture + where + Rt: crate::Async, + { + match self { + Self::Tcp(stream) => stream.write_async(buf).left_future(), + Self::Unix(stream) => stream.write_async(buf).right_future(), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result + where + Rt: crate::blocking::Runtime, + { + match self { + Self::Tcp(stream) => stream.read(buf), + Self::Unix(stream) => stream.read(buf), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result + where + Rt: crate::blocking::Runtime, + { + match self { + Self::Tcp(stream) => stream.write(buf), + Self::Unix(stream) => stream.write(buf), + } + } +} + +#[cfg(not(unix))] +impl<'s, Rt> IoStream<'s, Rt> for Stream +where + Rt: Runtime, +{ + #[doc(hidden)] + #[cfg(feature = "async")] + type ReadFuture = >::ReadFuture; + + #[doc(hidden)] + #[cfg(feature = "async")] + type WriteFuture = >::WriteFuture; + + #[inline] + #[doc(hidden)] + #[cfg(feature = "async")] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture + where + Rt: crate::Async, + { + match self { + Self::Tcp(stream) => stream.read_async(buf), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "async")] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture + where + Rt: crate::Async, + { + match self { + Self::Tcp(stream) => stream.write_async(buf), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result + where + Rt: crate::blocking::Runtime, + { + match self { + Self::Tcp(stream) => stream.read(buf), + } + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result + where + Rt: crate::blocking::Runtime, + { + match self { + Self::Tcp(stream) => stream.write(buf), + } + } +} diff --git a/sqlx-core/src/options.rs b/sqlx-core/src/options.rs index bbac8ae0..8b7f3522 100644 --- a/sqlx-core/src/options.rs +++ b/sqlx-core/src/options.rs @@ -17,6 +17,5 @@ where fn connect(&self) -> futures_util::future::BoxFuture<'_, crate::Result> where Self::Connection: Sized, - Rt: crate::Async, - for<'s> Rt::TcpStream: crate::io::Stream<'s, Rt>; + Rt: crate::Async; } diff --git a/sqlx-core/src/runtime.rs b/sqlx-core/src/runtime.rs index 8190b8a9..bd48d006 100644 --- a/sqlx-core/src/runtime.rs +++ b/sqlx-core/src/runtime.rs @@ -1,4 +1,13 @@ -use crate::io::Stream; +use std::io; +#[cfg(unix)] +use std::path::Path; + +#[cfg(feature = "async")] +use futures_util::future::BoxFuture; + +#[cfg(feature = "blocking")] +use crate::blocking; +use crate::io::Stream as IoStream; #[cfg(feature = "async-std")] #[path = "runtime/async_std.rs"] @@ -43,28 +52,41 @@ pub use tokio_::Tokio; /// pub trait Runtime: 'static + Send + Sync + Sized { #[doc(hidden)] - type TcpStream: 'static + Send + Sync; + type TcpStream: for<'s> IoStream<'s, Self>; + + #[doc(hidden)] + #[cfg(unix)] + type UnixStream: for<'s> IoStream<'s, Self>; + + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn connect_tcp(host: &str, port: u16) -> io::Result + where + Self: blocking::Runtime; + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(path: &Path) -> io::Result + where + Self: blocking::Runtime; + + #[doc(hidden)] + #[cfg(feature = "async")] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> + where + Self: Async; + + #[doc(hidden)] + #[cfg(all(unix, feature = "async"))] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> + where + Self: Async; } /// Marks a [`Runtime`] as being capable of handling asynchronous execution. // Provided so that attempting to use the asynchronous methods with the // Blocking runtime will error at compile-time as opposed to runtime. -pub trait Async: Runtime -where - // NOTE: This requires a **pervasive** bound for everything that needs - // to bound on . Remove if you can think of something else that - // allows us to both allow polymorphic read/write on streams across - // runtimes _and_ not depend on . When GATs are stabilized, - // we should be able to switch to that and remove this HRTB. - Self::TcpStream: for<'s> Stream<'s, Self>, -{ - #[cfg(feature = "async")] - #[doc(hidden)] - fn connect_tcp_async( - host: &str, - port: u16, - ) -> futures_util::future::BoxFuture<'_, std::io::Result>; -} +pub trait Async: Runtime {} // when no runtime is available // we implement `()` for it to allow the lib to still compile @@ -75,7 +97,28 @@ where feature = "blocking" )))] impl Runtime for () { + #[doc(hidden)] type TcpStream = (); + + #[doc(hidden)] + #[cfg(unix)] + type UnixStream = (); + + #[doc(hidden)] + #[cfg(feature = "async")] + #[allow(unused_variables)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { + // UNREACHABLE: where Self: Async + unreachable!() + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "async"))] + #[allow(unused_variables)] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } } // pick a default runtime diff --git a/sqlx-core/src/runtime/actix.rs b/sqlx-core/src/runtime/actix.rs index f107d568..71bbd9c5 100644 --- a/sqlx-core/src/runtime/actix.rs +++ b/sqlx-core/src/runtime/actix.rs @@ -1,6 +1,10 @@ use std::io; +#[cfg(unix)] +use std::path::Path; use actix_rt::net::TcpStream; +#[cfg(unix)] +use actix_rt::net::UnixStream; use async_compat::Compat; use futures_util::io::{Read, Write}; use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt}; @@ -17,20 +21,44 @@ use crate::{io::Stream, Async, Runtime}; #[derive(Debug)] pub struct Actix; +// NOTE: Compat<_> is used for IO streams to avoid requiring a Box per read/write call +// https://github.com/tokio-rs/tokio/issues/2723 impl Runtime for Actix { - // NOTE: Compat<_> is used to avoid requiring a Box per read/write call - // https://github.com/tokio-rs/tokio/issues/2723 #[doc(hidden)] type TcpStream = Compat; -} -impl Async for Actix { + #[doc(hidden)] + #[cfg(unix)] + type UnixStream = Compat; + + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn connect_tcp(_host: &str, _port: u16) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(_path: &Path) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + #[doc(hidden)] fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { TcpStream::connect((host, port)).map_ok(Compat::new).boxed() } + + #[doc(hidden)] + #[cfg(unix)] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> { + UnixStream::connect(path).map_ok(Compat::new).boxed() + } } +impl Async for Actix {} + // 's: stream impl<'s> Stream<'s, Actix> for Compat { #[doc(hidden)] @@ -42,12 +70,66 @@ impl<'s> Stream<'s, Actix> for Compat { #[inline] #[doc(hidden)] fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { - self.read(buf) + AsyncReadExt::read(self, buf) } #[inline] #[doc(hidden)] fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { - self.write(buf) + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, _buf: &'s mut [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, _buf: &'s [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } +} + +// 's: stream +#[cfg(unix)] +impl<'s> Stream<'s, Actix> for Compat { + #[doc(hidden)] + type ReadFuture = Read<'s, Self>; + + #[doc(hidden)] + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + AsyncReadExt::read(self, buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, _buf: &'s mut [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, _buf: &'s [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() } } diff --git a/sqlx-core/src/runtime/async_std.rs b/sqlx-core/src/runtime/async_std.rs index fd131893..0c22346e 100644 --- a/sqlx-core/src/runtime/async_std.rs +++ b/sqlx-core/src/runtime/async_std.rs @@ -1,4 +1,10 @@ +use std::io; +#[cfg(unix)] +use std::path::Path; + use _async_std::net::TcpStream; +#[cfg(unix)] +use _async_std::os::unix::net::UnixStream; #[cfg(feature = "blocking")] use _async_std::task; use futures_util::io::{Read, Write}; @@ -21,23 +27,42 @@ pub struct AsyncStd; impl Runtime for AsyncStd { #[doc(hidden)] type TcpStream = TcpStream; -} -impl Async for AsyncStd { #[doc(hidden)] - fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, std::io::Result> { - TcpStream::connect((host, port)).boxed() - } -} + #[cfg(unix)] + type UnixStream = UnixStream; -#[cfg(feature = "blocking")] -impl blocking::Runtime for AsyncStd { #[doc(hidden)] - fn connect_tcp(host: &str, port: u16) -> std::io::Result { + #[cfg(feature = "blocking")] + fn connect_tcp(host: &str, port: u16) -> io::Result { task::block_on(Self::connect_tcp_async(host, port)) } + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(path: &Path) -> io::Result { + task::block_on(Self::connect_unix_async(path)) + } + + #[doc(hidden)] + fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { + TcpStream::connect((host, port)).boxed() + } + + #[doc(hidden)] + #[cfg(unix)] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> { + UnixStream::connect(path).boxed() + } } +impl Async for AsyncStd {} + +// blocking operations provided by trivially wrapping async counterparts +// with `task::block_on` +#[cfg(feature = "blocking")] +impl blocking::Runtime for AsyncStd {} + // 's: stream impl<'s> Stream<'s, AsyncStd> for TcpStream { #[doc(hidden)] @@ -49,26 +74,62 @@ impl<'s> Stream<'s, AsyncStd> for TcpStream { #[inline] #[doc(hidden)] fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { - self.read(buf) + AsyncReadExt::read(self, buf) } #[inline] #[doc(hidden)] fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { - self.write(buf) + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result { + task::block_on(self.read_async(buf)) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result { + task::block_on(self.write_async(buf)) } } // 's: stream -#[cfg(feature = "blocking")] -impl<'s> blocking::io::Stream<'s, AsyncStd> for TcpStream { +#[cfg(unix)] +impl<'s> Stream<'s, AsyncStd> for UnixStream { #[doc(hidden)] - fn read(&'s mut self, buf: &'s mut [u8]) -> std::io::Result { - _async_std::task::block_on(self.read_async(buf)) - } + type ReadFuture = Read<'s, Self>; #[doc(hidden)] - fn write(&'s mut self, buf: &'s [u8]) -> std::io::Result { - _async_std::task::block_on(self.write_async(buf)) + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + AsyncReadExt::read(self, buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, buf: &'s mut [u8]) -> io::Result { + task::block_on(self.read_async(buf)) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, buf: &'s [u8]) -> io::Result { + task::block_on(self.write_async(buf)) } } diff --git a/sqlx-core/src/runtime/tokio.rs b/sqlx-core/src/runtime/tokio.rs index 8a03be10..1368b4e6 100644 --- a/sqlx-core/src/runtime/tokio.rs +++ b/sqlx-core/src/runtime/tokio.rs @@ -1,6 +1,10 @@ use std::io; +#[cfg(unix)] +use std::path::Path; use _tokio::net::TcpStream; +#[cfg(unix)] +use _tokio::net::UnixStream; use async_compat::Compat; use futures_util::io::{Read, Write}; use futures_util::{future::BoxFuture, AsyncReadExt, AsyncWriteExt, FutureExt, TryFutureExt}; @@ -15,20 +19,44 @@ use crate::{io::Stream, Async, Runtime}; #[derive(Debug)] pub struct Tokio; +// NOTE: Compat<_> is used for IO streams to avoid requiring a Box per read/write call +// https://github.com/tokio-rs/tokio/issues/2723 impl Runtime for Tokio { - // NOTE: Compat<_> is used to avoid requiring a Box per read/write call - // https://github.com/tokio-rs/tokio/issues/2723 #[doc(hidden)] type TcpStream = Compat; -} -impl Async for Tokio { + #[doc(hidden)] + #[cfg(unix)] + type UnixStream = Compat; + + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn connect_tcp(_host: &str, _port: u16) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[doc(hidden)] + #[cfg(all(unix, feature = "blocking"))] + fn connect_unix(_path: &Path) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + #[doc(hidden)] fn connect_tcp_async(host: &str, port: u16) -> BoxFuture<'_, io::Result> { TcpStream::connect((host, port)).map_ok(Compat::new).boxed() } + + #[doc(hidden)] + #[cfg(unix)] + fn connect_unix_async(path: &Path) -> BoxFuture<'_, io::Result> { + UnixStream::connect(path).map_ok(Compat::new).boxed() + } } +impl Async for Tokio {} + // 's: stream impl<'s> Stream<'s, Tokio> for Compat { #[doc(hidden)] @@ -40,12 +68,66 @@ impl<'s> Stream<'s, Tokio> for Compat { #[inline] #[doc(hidden)] fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { - self.read(buf) + AsyncReadExt::read(self, buf) } #[inline] #[doc(hidden)] fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { - self.write(buf) + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, _buf: &'s mut [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, _buf: &'s [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } +} + +// 's: stream +#[cfg(unix)] +impl<'s> Stream<'s, Tokio> for Compat { + #[doc(hidden)] + type ReadFuture = Read<'s, Self>; + + #[doc(hidden)] + type WriteFuture = Write<'s, Self>; + + #[inline] + #[doc(hidden)] + fn read_async(&'s mut self, buf: &'s mut [u8]) -> Self::ReadFuture { + AsyncReadExt::read(self, buf) + } + + #[inline] + #[doc(hidden)] + fn write_async(&'s mut self, buf: &'s [u8]) -> Self::WriteFuture { + AsyncWriteExt::write(self, buf) + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn read(&'s mut self, _buf: &'s mut [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() + } + + #[inline] + #[doc(hidden)] + #[cfg(feature = "blocking")] + fn write(&'s mut self, _buf: &'s [u8]) -> io::Result { + // UNREACHABLE: where Self: blocking::Runtime + unreachable!() } } diff --git a/sqlx-mysql/src/connection.rs b/sqlx-mysql/src/connection.rs index c6c09dd5..36cba9a7 100644 --- a/sqlx-mysql/src/connection.rs +++ b/sqlx-mysql/src/connection.rs @@ -1,6 +1,7 @@ use std::fmt::{self, Debug, Formatter}; use sqlx_core::io::BufStream; +use sqlx_core::net::Stream as NetStream; use sqlx_core::{Close, Connect, Connection, DefaultRuntime, Runtime}; use crate::protocol::Capabilities; @@ -16,7 +17,7 @@ pub struct MySqlConnection where Rt: Runtime, { - stream: BufStream, + stream: BufStream>, connection_id: u32, // the capability flags are used by the client and server to indicate which @@ -32,7 +33,7 @@ impl MySqlConnection where Rt: Runtime, { - pub(crate) fn new(stream: Rt::TcpStream) -> Self { + pub(crate) fn new(stream: NetStream) -> Self { Self { stream: BufStream::with_capacity(stream, 4096, 1024), connection_id: 0, @@ -73,7 +74,6 @@ where fn ping(&mut self) -> futures_util::future::BoxFuture<'_, sqlx_core::Result<()>> where Rt: sqlx_core::Async, - for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(self.ping_async()) } @@ -87,7 +87,6 @@ impl Connect for MySqlConnection { where Self: Sized, Rt: sqlx_core::Async, - for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { use sqlx_core::ConnectOptions; @@ -101,7 +100,6 @@ impl Close for MySqlConnection { fn close(self) -> futures_util::future::BoxFuture<'static, sqlx_core::Result<()>> where Rt: sqlx_core::Async, - for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(self.close_async()) } @@ -112,10 +110,7 @@ impl sqlx_core::blocking::Connection for MySqlConnection where Rt: sqlx_core::blocking::Runtime, { - fn ping(&mut self) -> sqlx_core::Result<()> - where - for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, - { + fn ping(&mut self) -> sqlx_core::Result<()> { self.ping() } } @@ -128,7 +123,6 @@ where fn connect(url: &str) -> sqlx_core::Result where Self: Sized, - for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { Self::connect(&url.parse::>()?) } @@ -139,10 +133,7 @@ impl sqlx_core::blocking::Close for MySqlConnection where Rt: sqlx_core::blocking::Runtime, { - fn close(self) -> sqlx_core::Result<()> - where - for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, - { + fn close(self) -> sqlx_core::Result<()> { self.close() } } diff --git a/sqlx-mysql/src/connection/close.rs b/sqlx-mysql/src/connection/close.rs index 9eb3566a..4c351e5f 100644 --- a/sqlx-mysql/src/connection/close.rs +++ b/sqlx-mysql/src/connection/close.rs @@ -10,7 +10,6 @@ where pub(crate) async fn close_async(mut self) -> Result<()> where Rt: sqlx_core::Async, - for<'s> ::TcpStream: sqlx_core::io::Stream<'s, Rt>, { self.write_packet(&Quit)?; self.stream.flush_async().await?; @@ -22,7 +21,6 @@ where pub(crate) fn close(mut self) -> Result<()> where Rt: sqlx_core::blocking::Runtime, - for<'s> ::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { self.write_packet(&Quit)?; self.stream.flush()?; diff --git a/sqlx-mysql/src/connection/connect.rs b/sqlx-mysql/src/connection/connect.rs index eafe5df6..c76be1e0 100644 --- a/sqlx-mysql/src/connection/connect.rs +++ b/sqlx-mysql/src/connection/connect.rs @@ -11,6 +11,7 @@ //! //! https://dev.mysql.com/doc/internals/en/connection-phase.html //! +use sqlx_core::net::Stream as NetStream; use sqlx_core::Result; use crate::protocol::{Auth, AuthResponse, Handshake, HandshakeResponse}; @@ -18,11 +19,11 @@ use crate::{MySqlConnectOptions, MySqlConnection}; macro_rules! connect { (@blocking @tcp $options:ident) => { - Rt::connect_tcp($options.get_host(), $options.get_port())?; + NetStream::connect($options.address.as_ref())?; }; (@tcp $options:ident) => { - Rt::connect_tcp_async($options.get_host(), $options.get_port()).await?; + NetStream::connect_async($options.address.as_ref()).await?; }; (@blocking @packet $self:ident) => { @@ -132,10 +133,7 @@ impl MySqlConnection where Rt: sqlx_core::blocking::Runtime, { - pub(crate) fn connect(options: &MySqlConnectOptions) -> Result - where - for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, - { + pub(crate) fn connect(options: &MySqlConnectOptions) -> Result { connect!(@blocking options) } } diff --git a/sqlx-mysql/src/connection/ping.rs b/sqlx-mysql/src/connection/ping.rs index 7e202702..4cef6821 100644 --- a/sqlx-mysql/src/connection/ping.rs +++ b/sqlx-mysql/src/connection/ping.rs @@ -14,7 +14,6 @@ where pub(crate) async fn ping_async(&mut self) -> Result<()> where Rt: sqlx_core::Async, - for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { self.write_packet(&Ping)?; @@ -26,7 +25,7 @@ where #[cfg(feature = "blocking")] pub(crate) fn ping(&mut self) -> Result<()> where - for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, + Rt: sqlx_core::blocking::Runtime, { self.write_packet(&Ping)?; diff --git a/sqlx-mysql/src/connection/stream.rs b/sqlx-mysql/src/connection/stream.rs index 2061f4fd..2ffc3ff1 100644 --- a/sqlx-mysql/src/connection/stream.rs +++ b/sqlx-mysql/src/connection/stream.rs @@ -134,7 +134,6 @@ where where T: Deserialize<'de, Capabilities>, Rt: sqlx_core::Async, - for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { read_packet!(self) } @@ -148,7 +147,7 @@ where pub(super) fn read_packet<'de, T>(&'de mut self) -> Result where T: Deserialize<'de, Capabilities>, - for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, + Rt: sqlx_core::blocking::Runtime, { read_packet!(@blocking self) } diff --git a/sqlx-mysql/src/options.rs b/sqlx-mysql/src/options.rs index be1d1d9d..67d1bf31 100644 --- a/sqlx-mysql/src/options.rs +++ b/sqlx-mysql/src/options.rs @@ -32,7 +32,7 @@ where Rt: Runtime, { runtime: PhantomData, - address: Either<(String, u16), PathBuf>, + pub(crate) address: Either<(String, u16), PathBuf>, username: Option, password: Option, database: Option, @@ -144,7 +144,6 @@ where where Self::Connection: Sized, Rt: sqlx_core::Async, - for<'s> Rt::TcpStream: sqlx_core::io::Stream<'s, Rt>, { Box::pin(MySqlConnection::connect_async(self)) } @@ -158,7 +157,6 @@ where fn connect(&self) -> sqlx_core::Result where Self::Connection: Sized, - for<'s> Rt::TcpStream: sqlx_core::blocking::io::Stream<'s, Rt>, { >::connect(self) }