mirror of
https://github.com/tokio-rs/tokio.git
synced 2025-10-01 12:20:39 +00:00
stream: remove bytes from public API (#2908)
This commit is contained in:
parent
c23c1ecbcb
commit
aa171f2aa9
@ -1,6 +1,5 @@
|
||||
use crate::stream::Stream;
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use core::future::Future;
|
||||
use core::marker::PhantomPinned;
|
||||
use core::mem;
|
||||
@ -205,44 +204,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Buf> FromStream<T> for Bytes {}
|
||||
|
||||
impl<T: Buf> sealed::FromStreamPriv<T> for Bytes {
|
||||
type InternalCollection = BytesMut;
|
||||
|
||||
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
|
||||
BytesMut::new()
|
||||
}
|
||||
|
||||
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
|
||||
collection.put(item);
|
||||
true
|
||||
}
|
||||
|
||||
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> Bytes {
|
||||
mem::replace(collection, BytesMut::new()).freeze()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Buf> FromStream<T> for BytesMut {}
|
||||
|
||||
impl<T: Buf> sealed::FromStreamPriv<T> for BytesMut {
|
||||
type InternalCollection = BytesMut;
|
||||
|
||||
fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BytesMut {
|
||||
BytesMut::new()
|
||||
}
|
||||
|
||||
fn extend(_: sealed::Internal, collection: &mut BytesMut, item: T) -> bool {
|
||||
collection.put(item);
|
||||
true
|
||||
}
|
||||
|
||||
fn finalize(_: sealed::Internal, collection: &mut BytesMut) -> BytesMut {
|
||||
mem::replace(collection, BytesMut::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) mod sealed {
|
||||
#[doc(hidden)]
|
||||
pub trait FromStreamPriv<T> {
|
||||
|
@ -2,8 +2,6 @@ use tokio::stream::{self, StreamExt};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
|
||||
|
||||
use bytes::{Bytes, BytesMut};
|
||||
|
||||
#[allow(clippy::let_unit_value)]
|
||||
#[tokio::test]
|
||||
async fn empty_unit() {
|
||||
@ -25,18 +23,6 @@ async fn empty_box_slice() {
|
||||
assert!(coll.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_bytes() {
|
||||
let coll: Bytes = stream::empty::<&[u8]>().collect().await;
|
||||
assert!(coll.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_bytes_mut() {
|
||||
let coll: BytesMut = stream::empty::<&[u8]>().collect().await;
|
||||
assert!(coll.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_string() {
|
||||
let coll: String = stream::empty::<&str>().collect().await;
|
||||
@ -112,27 +98,6 @@ async fn collect_str_items() {
|
||||
assert_eq!("hello world", coll);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn collect_bytes() {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let mut fut = task::spawn(rx.collect::<Bytes>());
|
||||
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
tx.send(&b"hello "[..]).unwrap();
|
||||
assert!(fut.is_woken());
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
tx.send(&b"world"[..]).unwrap();
|
||||
assert!(fut.is_woken());
|
||||
assert_pending!(fut.poll());
|
||||
|
||||
drop(tx);
|
||||
assert!(fut.is_woken());
|
||||
let coll = assert_ready!(fut.poll());
|
||||
assert_eq!(&b"hello world"[..], coll);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn collect_results_ok() {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
|
Loading…
x
Reference in New Issue
Block a user