test: Make Mock both Send and Sync (#3594)

Co-authored-by: Jake Ham <hamjacob@amazon.com>
This commit is contained in:
Jake Ham 2021-03-10 11:29:22 -08:00 committed by GitHub
parent bcb95db4e2
commit 6919f7cede
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 8 additions and 12 deletions

View File

@ -30,11 +30,11 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.2.0", features = ["sync"] }
tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", optional = true }
[dev-dependencies]
tokio = { version = "1.2.0", features = ["full", "test-util"] }
tokio = { version = "1.2.0", path = "../tokio", features = ["full", "test-util"] }
async-stream = "0.3"
tokio-test = { path = "../tokio-test" }
futures = { version = "0.3", default-features = false }

View File

@ -19,7 +19,7 @@ Testing utilities for Tokio- and futures-based code
categories = ["asynchronous", "testing"]
[dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["rt", "sync", "time", "test-util"] }
tokio = { version = "1.2.0", path = "../tokio", features = ["rt", "sync", "time", "test-util"] }
tokio-stream = { version = "0.1", path = "../tokio-stream" }
async-stream = "0.3"
@ -27,7 +27,7 @@ bytes = "1.0.0"
futures-core = "0.3.0"
[dev-dependencies]
tokio = { version = "1.0.0", path = "../tokio", features = ["full"] }
tokio = { version = "1.2.0", path = "../tokio", features = ["full"] }
futures-util = "0.3.0"
[package.metadata.docs.rs]

View File

@ -21,6 +21,7 @@
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Duration, Instant, Sleep};
use tokio_stream::wrappers::UnboundedReceiverStream;
use futures_core::{ready, Stream};
use std::collections::VecDeque;
@ -69,8 +70,7 @@ struct Inner {
waiting: Option<Instant>,
sleep: Option<Pin<Box<Sleep>>>,
read_wait: Option<Waker>,
// rx: mpsc::UnboundedReceiver<Action>,
rx: Pin<Box<dyn Stream<Item = Action> + Send>>,
rx: UnboundedReceiverStream<Action>,
}
impl Builder {
@ -185,13 +185,9 @@ impl Handle {
impl Inner {
fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();
let rx = Box::pin(async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
});
let rx = UnboundedReceiverStream::new(rx);
let inner = Inner {
actions,