examples: add futures executor threadpool (#3198)

This commit is contained in:
Carlos B 2020-12-22 20:08:43 +01:00 committed by GitHub
parent 0b83b3b8cc
commit 7d28e4cdbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 1 deletions

View File

@ -15,13 +15,15 @@ async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
bytes = { git = "https://github.com/tokio-rs/bytes" }
futures = "0.3.0"
futures = { version = "0.3.0", features = ["thread-pool"]}
http = "0.2"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
httparse = "1.0"
time = "0.1"
once_cell = "1.5.2"
[[example]]
name = "chat"
@ -66,3 +68,12 @@ path = "udp-codec.rs"
[[example]]
name = "tinyhttp"
path = "tinyhttp.rs"
[[example]]
name = "custom-executor"
path = "custom-executor.rs"
[[example]]
name = "custom-executor-tokio-context"
path = "custom-executor-tokio-context.rs"

View File

@ -0,0 +1,32 @@
// This example shows how to use the tokio runtime with any other executor
//
//It takes advantage from RuntimeExt which provides the extension to customize your
//runtime.
use tokio::net::TcpListener;
use tokio::runtime::Builder;
use tokio::sync::oneshot;
use tokio_util::context::RuntimeExt;
fn main() {
let (tx, rx) = oneshot::channel();
let rt1 = Builder::new_multi_thread()
.worker_threads(1)
// no timer!
.build()
.unwrap();
let rt2 = Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(rt2.wrap(async move {
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
println!("addr: {:?}", listener.local_addr());
tx.send(()).unwrap();
}));
futures::executor::block_on(rx).unwrap();
}

View File

@ -0,0 +1,56 @@
// This example shows how to use the tokio runtime with any other executor
//
// The main components are a spawn fn that will wrap futures in a special future
// that will always enter the tokio context on poll. This only spawns one extra thread
// to manage and run the tokio drivers in the background.
use tokio::net::TcpListener;
use tokio::sync::oneshot;
fn main() {
let (tx, rx) = oneshot::channel();
my_custom_runtime::spawn(async move {
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
println!("addr: {:?}", listener.local_addr());
tx.send(()).unwrap();
});
futures::executor::block_on(rx).unwrap();
}
mod my_custom_runtime {
use once_cell::sync::Lazy;
use std::future::Future;
use tokio_util::context::TokioContext;
pub fn spawn(f: impl Future<Output = ()> + Send + 'static) {
EXECUTOR.spawn(f);
}
struct ThreadPool {
inner: futures::executor::ThreadPool,
rt: tokio::runtime::Runtime,
}
static EXECUTOR: Lazy<ThreadPool> = Lazy::new(|| {
// Spawn tokio runtime on a single background thread
// enabling IO and timers.
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
let inner = futures::executor::ThreadPool::builder().create().unwrap();
ThreadPool { inner, rt }
});
impl ThreadPool {
fn spawn(&self, f: impl Future<Output = ()> + Send + 'static) {
let handle = self.rt.handle().clone();
self.inner.spawn_ok(TokioContext::new(f, handle));
}
}
}