use crate::{ backend::Backend, describe::Describe, error::Error, executor::Executor, params::IntoQueryParameters, pool::{Live, SharedPool}, row::FromRow, row::Row, }; use futures_core::{future::BoxFuture, stream::BoxStream}; use futures_util::stream::StreamExt; use std::{sync::Arc, time::Instant}; pub struct Connection where DB: Backend, { live: Live, pool: Option>>, } impl Connection where DB: Backend, { pub(crate) fn new(live: Live, pool: Option>>) -> Self { Self { live, pool } } pub async fn open(url: &str) -> crate::Result { let raw = DB::open(url).await?; Ok(Self::new(Live::unpooled(raw), None)) } /// Verifies a connection to the database is still alive. pub async fn ping(&mut self) -> crate::Result<()> { self.live.ping().await } /// Analyze the SQL statement and report the inferred bind parameter types and returned /// columns. /// /// Mainly intended for use by sqlx-macros. pub async fn describe(&mut self, statement: &str) -> crate::Result> { self.live.describe(statement).await } } impl Executor for Connection where DB: Backend, { type Backend = DB; fn execute<'c, 'q: 'c, I: 'c>( &'c mut self, query: &'q str, params: I, ) -> BoxFuture<'c, Result> where I: IntoQueryParameters + Send, { Box::pin(async move { self.live.execute(query, params.into_params()).await }) } fn fetch<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( &'c mut self, query: &'q str, params: I, ) -> BoxStream<'c, Result> where I: IntoQueryParameters + Send, T: FromRow + Send + Unpin, { Box::pin(async_stream::try_stream! { let mut s = self.live.fetch(query, params.into_params()); while let Some(row) = s.next().await.transpose()? { yield T::from_row(Row(row)); } }) } fn fetch_optional<'c, 'q: 'c, I: 'c, O: 'c, T: 'c>( &'c mut self, query: &'q str, params: I, ) -> BoxFuture<'c, Result, Error>> where I: IntoQueryParameters + Send, T: FromRow, { Box::pin(async move { let row = self .live .fetch_optional(query, params.into_params()) .await?; Ok(row.map(Row).map(T::from_row)) }) } }