query_as: don't stop stream after decoding error (#1887)

* query_as: don't stop stream after decoding error

Fixes https://github.com/launchbadge/sqlx/issues/1884

When a single row cannot be converted to the target type of query_as,
it should not prevent the library user from accessing the other rows

Otherwise, the user cannot access all query results in query_as.

* use union in tests to maximize db compatibility
This commit is contained in:
Ophir LOJKINE 2022-06-02 04:18:18 +02:00 committed by GitHub
parent 24baac779f
commit 20d61f4271
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 12 deletions

View File

@ -110,18 +110,14 @@ where
O: 'e,
A: 'e,
{
Box::pin(try_stream! {
let mut s = executor.fetch_many(self.inner);
while let Some(v) = s.try_next().await? {
r#yield!(match v {
Either::Left(v) => Either::Left(v),
Either::Right(row) => Either::Right(O::from_row(&row)?),
});
}
Ok(())
})
executor
.fetch_many(self.inner)
.map(|v| match v {
Ok(Either::Right(row)) => O::from_row(&row).map(Either::Right),
Ok(Either::Left(v)) => Ok(Either::Left(v)),
Err(e) => Err(e),
})
.boxed()
}
/// Execute the query and return all the generated results, collected into a [`Vec`].

View File

@ -38,6 +38,35 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
Ok(())
}
#[sqlx_macros::test]
async fn it_does_not_stop_stream_after_decoding_error() -> anyhow::Result<()> {
use futures::stream::StreamExt;
// see https://github.com/launchbadge/sqlx/issues/1884
let pool = sqlx_test::pool::<Any>().await?;
#[derive(Debug, PartialEq)]
struct MyType;
impl<'a> sqlx::FromRow<'a, AnyRow> for MyType {
fn from_row(row: &'a AnyRow) -> sqlx::Result<Self> {
let n = row.try_get::<i32, _>(0)?;
if n == 1 {
Err(sqlx::Error::RowNotFound)
} else {
Ok(MyType)
}
}
}
let rows = sqlx::query_as("SELECT 0 UNION ALL SELECT 1 UNION ALL SELECT 2")
.fetch(&pool)
.map(|r| r.ok())
.collect::<Vec<_>>()
.await;
assert_eq!(rows, vec![Some(MyType), None, Some(MyType)]);
Ok(())
}
#[sqlx_macros::test]
async fn it_gets_by_name() -> anyhow::Result<()> {
let mut conn = new::<Any>().await?;