runtime: merge multi & single threaded runtimes (#1716)

Simplify Tokio's runtime construct by combining both Runtime variants
into a single type. The execution style can be controlled by a
configuration setting on `Builder`.

The implication of this change is that there is no longer any way to
spawn `!Send` futures. This, however, is a temporary limitation. A
different strategy will be employed for supporting `!Send` futures.

Included in this patch is a rework of `task::JoinHandle` to support
using this type from both the thread-pool and current-thread executors.
This commit is contained in:
Carl Lerche 2019-11-01 13:18:52 -07:00 committed by GitHub
parent 742d89b0f3
commit d70c928d88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1551 additions and 3613 deletions

View File

@ -98,7 +98,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
}
let result = match runtime {
RuntimeType::Multi => quote! {
RuntimeType::Multi | RuntimeType::Auto => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
tokio::runtime::Runtime::new().unwrap().block_on(async { #body })
@ -107,14 +107,11 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
RuntimeType::Single => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body })
}
},
RuntimeType::Auto => quote! {
#(#attrs)*
fn #name() #ret {
let mut rt = tokio::runtime::__main::Runtime::new().unwrap();
rt.block_on(async { #body })
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
.block_on(async { #body })
}
},
};
@ -211,7 +208,11 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
#[test]
#(#attrs)*
fn #name() #ret {
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body })
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
.block_on(async { #body })
}
},
};

View File

@ -26,15 +26,9 @@ pub mod task;
///
/// [runtime-block-on]: https://docs.rs/tokio/0.2.0-alpha.2/tokio/runtime/current_thread/struct.Runtime.html#method.block_on
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
use tokio::runtime;
let mut rt = runtime::Builder::new().current_thread().build().unwrap();
rt.block_on(future)
}
/*
#[doc(hidden)]
pub mod codegen {
pub mod futures {
pub use futures::*;
}
}
*/

View File

@ -47,7 +47,6 @@ net-full = ["tcp", "udp", "uds"]
net-driver = ["io-traits", "mio", "blocking", "lazy_static"]
rt-current-thread = [
"executor-core",
"crossbeam-channel",
"timer",
"sync",
"net-driver",
@ -95,7 +94,6 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann
# Everything else is optional...
bytes = { version = "0.4", optional = true }
crossbeam-channel = { version = "0.3.8", optional = true }
fnv = { version = "1.0.6", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }

View File

@ -221,6 +221,7 @@ impl Pool {
}
}
#[derive(Debug)]
pub(crate) struct PoolWaiter(Arc<Pool>);
impl From<Pool> for PoolWaiter {

File diff suppressed because it is too large Load Diff

View File

@ -1,808 +0,0 @@
use crate::executor::current_thread::{Borrow, BorrowSpawner};
use crate::executor::park::Unpark;
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize};
use std::sync::{Arc, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::thread;
use std::usize;
/// A generic task-aware scheduler.
///
/// This is used both by `FuturesUnordered` and the current-thread executor.
pub(crate) struct Scheduler<U> {
inner: Arc<Inner<U>>,
nodes: List<U>,
}
// A linked-list of nodes
struct List<U> {
len: usize,
head: *const Node<U>,
tail: *const Node<U>,
}
// Scheduler is implemented using two linked lists. The first linked list tracks
// all items managed by a `Scheduler`. This list is stored on the `Scheduler`
// struct and is **not** thread safe. The second linked list is an
// implementation of the intrusive MPSC queue algorithm described by
// 1024cores.net and is stored on `Inner`. This linked list can push items to
// the back concurrently but only one consumer may pop from the front. To
// enforce this requirement, all popping will be performed via fns on
// `Scheduler` that take `&mut self`.
//
// When a item is submitted to the set a node is allocated and inserted in
// both linked lists. This means that all insertion operations **must** be
// originated from `Scheduler` with `&mut self` The next call to `tick` will
// (eventually) see this node and call `poll` on the item.
//
// Nodes are wrapped in `Arc` cells which manage the lifetime of the node.
// However, `Arc` handles are sometimes cast to `*const Node` pointers.
// Specifically, when a node is stored in at least one of the two lists
// described above, this represents a logical `Arc` handle. This is how
// `Scheduler` maintains its reference to all nodes it manages. Each
// `NotifyHandle` instance is an `Arc<Node>` as well.
//
// When `Scheduler` drops, it clears the linked list of all nodes that it
// manages. When doing so, it must attempt to decrement the reference count (by
// dropping an Arc handle). However, it can **only** decrement the reference
// count if the node is not currently stored in the mpsc channel. If the node
// **is** "queued" in the mpsc channel, then the arc reference count cannot be
// decremented. Once the node is popped from the mpsc channel, then the final
// arc reference count can be decremented, thus freeing the node.
struct Inner<U> {
// Thread unpark handle
unpark: U,
// Tick number
tick_num: AtomicUsize,
// Head/tail of the readiness queue
head_readiness: AtomicPtr<Node<U>>,
tail_readiness: UnsafeCell<*const Node<U>>,
// Used as part of the mpsc queue algorithm
stub: Arc<Node<U>>,
}
unsafe impl<U: Sync + Send> Send for Inner<U> {}
unsafe impl<U: Sync + Send> Sync for Inner<U> {}
struct Node<U> {
// The item
item: UnsafeCell<Option<Task>>,
// The tick at which this node was notified
notified_at: AtomicUsize,
// Next pointer for linked list tracking all active nodes
next_all: UnsafeCell<*const Node<U>>,
// Previous node in linked list tracking all active nodes
prev_all: UnsafeCell<*const Node<U>>,
// Next pointer in readiness queue
next_readiness: AtomicPtr<Node<U>>,
// Whether or not this node is currently in the mpsc queue.
queued: AtomicBool,
// Queue that we'll be enqueued to when notified
queue: Weak<Inner<U>>,
}
/// Returned by `Inner::dequeue`, representing either a dequeue success (with
/// the dequeued node), an empty list, or an inconsistent state.
///
/// The inconsistent state is described in more detail at [1024cores], but
/// roughly indicates that a node will be ready to dequeue sometime shortly in
/// the future and the caller should try again soon.
///
/// [1024cores]: http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
enum Dequeue<U> {
Data(*const Node<U>),
Empty,
Yield,
Inconsistent,
}
/// Wraps a spawned boxed future
struct Task(Pin<Box<dyn Future<Output = ()>>>);
/// A task that is scheduled. `turn` must be called
pub(crate) struct Scheduled<'a, U> {
task: &'a mut Task,
node: &'a Arc<Node<U>>,
done: &'a mut bool,
}
pub(super) struct TickArgs<'a> {
pub(super) id: u64,
pub(super) num_futures: &'a AtomicUsize,
#[cfg(feature = "blocking")]
pub(super) blocking: &'a crate::executor::blocking::PoolWaiter,
}
impl<U> Scheduler<U>
where
U: Unpark,
{
/// Constructs a new, empty `Scheduler`
///
/// The returned `Scheduler` does not contain any items and, in this
/// state, `Scheduler::poll` will return `Ok(Async::Ready(None))`.
pub(crate) fn new(unpark: U) -> Self {
let stub = Arc::new(Node {
item: UnsafeCell::new(None),
notified_at: AtomicUsize::new(0),
next_all: UnsafeCell::new(ptr::null()),
prev_all: UnsafeCell::new(ptr::null()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Weak::new(),
});
let stub_ptr = &*stub as *const Node<U>;
let inner = Arc::new(Inner {
unpark,
tick_num: AtomicUsize::new(0),
head_readiness: AtomicPtr::new(stub_ptr as *mut _),
tail_readiness: UnsafeCell::new(stub_ptr),
stub,
});
Scheduler {
inner,
nodes: List::new(),
}
}
pub(crate) fn waker(&self) -> Waker {
waker_inner(self.inner.clone())
}
pub(crate) fn schedule(&mut self, item: Pin<Box<dyn Future<Output = ()>>>) {
// Get the current scheduler tick
let tick_num = self.inner.tick_num.load(SeqCst);
let node = Arc::new(Node {
item: UnsafeCell::new(Some(Task::new(item))),
notified_at: AtomicUsize::new(tick_num),
next_all: UnsafeCell::new(ptr::null_mut()),
prev_all: UnsafeCell::new(ptr::null_mut()),
next_readiness: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
queue: Arc::downgrade(&self.inner),
});
// Right now our node has a strong reference count of 1. We transfer
// ownership of this reference count to our internal linked list
// and we'll reclaim ownership through the `unlink` function below.
let ptr = self.nodes.push_back(node);
// We'll need to get the item "into the system" to start tracking it,
// e.g. getting its unpark notifications going to us tracking which
// items are ready. To do that we unconditionally enqueue it for
// polling here.
self.inner.enqueue(ptr);
}
/// Returns `true` if there are currently any pending futures
pub(crate) fn has_pending_futures(&mut self) -> bool {
// See function definition for why the unsafe is needed and
// correctly used here
unsafe { self.inner.has_pending_futures() }
}
/// Advance the scheduler state, returning `true` if any futures were
/// processed.
///
/// This function should be called whenever the caller is notified via a
/// wakeup.
pub(super) fn tick(&mut self, args: TickArgs<'_>) -> bool {
let mut ret = false;
let tick = self.inner.tick_num.fetch_add(1, SeqCst).wrapping_add(1);
loop {
let node = match unsafe { self.inner.dequeue(Some(tick)) } {
Dequeue::Empty => {
return ret;
}
Dequeue::Yield => {
self.inner.unpark.unpark();
return ret;
}
Dequeue::Inconsistent => {
thread::yield_now();
continue;
}
Dequeue::Data(node) => node,
};
ret = true;
debug_assert!(node != self.inner.stub());
unsafe {
if (*(*node).item.get()).is_none() {
// The node has already been released. However, while it was
// being released, another thread notified it, which
// resulted in it getting pushed into the mpsc channel.
//
// In this case, we just decrement the ref count.
let node = ptr2arc(node);
assert!((*node.next_all.get()).is_null());
assert!((*node.prev_all.get()).is_null());
continue;
};
// We're going to need to be very careful if the `poll`
// function below panics. We need to (a) not leak memory and
// (b) ensure that we still don't have any use-after-frees. To
// manage this we do a few things:
//
// * This "bomb" here will call `release_node` if dropped
// abnormally. That way we'll be sure the memory management
// of the `node` is managed correctly.
//
// * We unlink the node from our internal queue to preemptively
// assume is is complete (will return Ready or panic), in
// which case we'll want to discard it regardless.
//
struct Bomb<'a, U: Unpark> {
borrow: &'a mut Borrow<'a, U>,
node: Option<Arc<Node<U>>>,
}
impl<U: Unpark> Drop for Bomb<'_, U> {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
self.borrow.enter(|| release_node(node))
}
}
}
let node = self.nodes.remove(node);
let mut borrow = Borrow {
spawner: BorrowSpawner {
id: args.id,
scheduler: self,
num_futures: args.num_futures,
},
#[cfg(feature = "blocking")]
blocking: args.blocking,
};
let mut bomb = Bomb {
node: Some(node),
borrow: &mut borrow,
};
let mut done = false;
// Now that the bomb holds the node, create a new scope. This
// scope ensures that the borrow will go out of scope before we
// mutate the node pointer in `bomb` again
{
let node = bomb.node.as_ref().unwrap();
// Get a reference to the inner future. We already ensured
// that the item `is_some`.
let item = (*node.item.get()).as_mut().unwrap();
// Unset queued flag... this must be done before
// polling. This ensures that the item gets
// rescheduled if it is notified **during** a call
// to `poll`.
let prev = (*node).queued.swap(false, SeqCst);
assert!(prev);
// Poll the underlying item with the appropriate `notify`
// implementation. This is where a large bit of the unsafety
// starts to stem from internally. The `notify` instance itself
// is basically just our `Arc<Node>` and tracks the mpsc
// queue of ready items.
//
// Critically though `Node` won't actually access `Task`, the
// item, while it's floating around inside of `Task`
// instances. These structs will basically just use `T` to size
// the internal allocation, appropriately accessing fields and
// deallocating the node if need be.
let borrow = &mut *bomb.borrow;
let mut scheduled = Scheduled {
task: item,
node: bomb.node.as_ref().unwrap(),
done: &mut done,
};
if borrow.enter(|| scheduled.tick()) {
// we have a borrow of the Runtime, so we know it's not shut down
borrow.spawner.num_futures.fetch_sub(2, SeqCst);
}
}
if !done {
// The future is not done, push it back into the "all
// node" list.
let node = bomb.node.take().unwrap();
bomb.borrow.spawner.scheduler.nodes.push_back(node);
}
}
}
}
}
impl<U: Unpark> Scheduled<'_, U> {
/// Polls the task, returns `true` if the task has completed.
pub(crate) fn tick(&mut self) -> bool {
let waker = unsafe {
// Safety: we don't hold this waker ref longer than
// this `tick` function
waker_ref(self.node)
};
let mut cx = Context::from_waker(&waker);
let ret = match self.task.0.as_mut().poll(&mut cx) {
Poll::Ready(()) => true,
Poll::Pending => false,
};
*self.done = ret;
ret
}
}
impl Task {
pub(crate) fn new(future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> Self {
Task(future)
}
}
impl fmt::Debug for Task {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Task").finish()
}
}
fn release_node<U>(node: Arc<Node<U>>) {
// The item is done, try to reset the queued flag. This will prevent
// `notify` from doing any work in the item
let prev = node.queued.swap(true, SeqCst);
// Drop the item, even if it hasn't finished yet. This is safe
// because we're dropping the item on the thread that owns
// `Scheduler`, which correctly tracks T's lifetimes and such.
unsafe {
drop((*node.item.get()).take());
}
// If the queued flag was previously set then it means that this node
// is still in our internal mpsc queue. We then transfer ownership
// of our reference count to the mpsc queue, and it'll come along and
// free it later, noticing that the item is `None`.
//
// If, however, the queued flag was *not* set then we're safe to
// release our reference count on the internal node. The queued flag
// was set above so all item `enqueue` operations will not actually
// enqueue the node, so our node will never see the mpsc queue again.
// The node itself will be deallocated once all reference counts have
// been dropped by the various owning tasks elsewhere.
if prev {
mem::forget(node);
}
}
impl<U> Debug for Scheduler<U> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Scheduler {{ ... }}")
}
}
impl<U> Drop for Scheduler<U> {
fn drop(&mut self) {
// When a `Scheduler` is dropped we want to drop all items associated
// with it. At the same time though there may be tons of `Task` handles
// flying around which contain `Node` references inside them. We'll
// let those naturally get deallocated when the `Task` itself goes out
// of scope or gets notified.
while let Some(node) = self.nodes.pop_front() {
release_node(node);
}
// Note that at this point we could still have a bunch of nodes in the
// mpsc queue. None of those nodes, however, have items associated
// with them so they're safe to destroy on any thread. At this point
// the `Scheduler` struct, the owner of the one strong reference
// to `Inner` will drop the strong reference. At that point
// whichever thread releases the strong refcount last (be it this
// thread or some other thread as part of an `upgrade`) will clear out
// the mpsc queue and free all remaining nodes.
//
// While that freeing operation isn't guaranteed to happen here, it's
// guaranteed to happen "promptly" as no more "blocking work" will
// happen while there's a strong refcount held.
}
}
impl<U> Inner<U> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
fn enqueue(&self, node: *const Node<U>) {
unsafe {
debug_assert!((*node).queued.load(Relaxed));
// This action does not require any coordination
(*node).next_readiness.store(ptr::null_mut(), Relaxed);
// Note that these atomic orderings come from 1024cores
let node = node as *mut _;
let prev = self.head_readiness.swap(node, AcqRel);
(*prev).next_readiness.store(node, Release);
}
}
/// Returns `true` if there are currently any pending futures
///
/// See `dequeue` for an explanation why this function is unsafe.
unsafe fn has_pending_futures(&self) -> bool {
let tail = *self.tail_readiness.get();
let next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() && next.is_null() {
return false;
}
true
}
/// The dequeue function from the 1024cores intrusive MPSC queue algorithm
///
/// Note that this unsafe as it required mutual exclusion (only one thread
/// can call this) to be guaranteed elsewhere.
unsafe fn dequeue(&self, tick: Option<usize>) -> Dequeue<U> {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
if tail == self.stub() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail_readiness.get() = next;
tail = next;
next = (*next).next_readiness.load(Acquire);
}
if let Some(tick) = tick {
let actual = (*tail).notified_at.load(SeqCst);
// Only dequeue if the node was not scheduled during the current
// tick.
if actual == tick {
// Only doing the check above **should** be enough in
// practice. However, technically there is a potential for
// deadlocking if there are `usize::MAX` ticks while the thread
// scheduling the task is frozen.
//
// If, for some reason, this is not enough, calling `unpark`
// here will resolve the issue.
return Dequeue::Yield;
}
}
if !next.is_null() {
*self.tail_readiness.get() = next;
debug_assert!(tail != self.stub());
return Dequeue::Data(tail);
}
if self.head_readiness.load(Acquire) as *const _ != tail {
return Dequeue::Inconsistent;
}
self.enqueue(self.stub());
next = (*tail).next_readiness.load(Acquire);
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
Dequeue::Inconsistent
}
fn stub(&self) -> *const Node<U> {
&*self.stub
}
}
impl<U> Drop for Inner<U> {
fn drop(&mut self) {
// Once we're in the destructor for `Inner` we need to clear out the
// mpsc queue of nodes if there's anything left in there.
//
// Note that each node has a strong reference count associated with it
// which is owned by the mpsc queue. All nodes should have had their
// items dropped already by the `Scheduler` destructor above,
// so we're just pulling out nodes and dropping their refcounts.
unsafe {
loop {
match self.dequeue(None) {
Dequeue::Empty => break,
Dequeue::Yield => unreachable!(),
Dequeue::Inconsistent => abort("inconsistent in drop"),
Dequeue::Data(ptr) => drop(ptr2arc(ptr)),
}
}
}
}
}
impl<U> List<U> {
fn new() -> Self {
List {
len: 0,
head: ptr::null_mut(),
tail: ptr::null_mut(),
}
}
/// Appends an element to the back of the list
fn push_back(&mut self, node: Arc<Node<U>>) -> *const Node<U> {
let ptr = arc2ptr(node);
unsafe {
// Point to the current last node in the list
*(*ptr).prev_all.get() = self.tail;
*(*ptr).next_all.get() = ptr::null_mut();
if !self.tail.is_null() {
*(*self.tail).next_all.get() = ptr;
self.tail = ptr;
} else {
// This is the first node
self.tail = ptr;
self.head = ptr;
}
}
self.len += 1;
ptr
}
/// Pop an element from the front of the list
fn pop_front(&mut self) -> Option<Arc<Node<U>>> {
if self.head.is_null() {
// The list is empty
return None;
}
self.len -= 1;
unsafe {
// Convert the ptr to Arc<_>
let node = ptr2arc(self.head);
// Update the head pointer
self.head = *node.next_all.get();
// If the pointer is null, then the list is empty
if self.head.is_null() {
self.tail = ptr::null_mut();
} else {
*(*self.head).prev_all.get() = ptr::null_mut();
}
Some(node)
}
}
/// Remove a specific node
unsafe fn remove(&mut self, node: *const Node<U>) -> Arc<Node<U>> {
let node = ptr2arc(node);
let next = *node.next_all.get();
let prev = *node.prev_all.get();
*node.next_all.get() = ptr::null_mut();
*node.prev_all.get() = ptr::null_mut();
if !next.is_null() {
*(*next).prev_all.get() = prev;
} else {
self.tail = prev;
}
if !prev.is_null() {
*(*prev).next_all.get() = next;
} else {
self.head = next;
}
self.len -= 1;
node
}
}
unsafe fn noop(_: *const ()) {}
// ===== Raw Waker Inner<U> ======
fn waker_inner<U: Unpark>(inner: Arc<Inner<U>>) -> Waker {
let ptr = Arc::into_raw(inner) as *const ();
let vtable = &RawWakerVTable::new(
clone_inner::<U>,
wake_inner::<U>,
wake_by_ref_inner::<U>,
drop_inner::<U>,
);
unsafe { Waker::from_raw(RawWaker::new(ptr, vtable)) }
}
unsafe fn clone_inner<U: Unpark>(data: *const ()) -> RawWaker {
let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
let clone = arc.clone();
// forget both Arcs so the refcounts don't get decremented
mem::forget(arc);
mem::forget(clone);
let vtable = &RawWakerVTable::new(
clone_inner::<U>,
wake_inner::<U>,
wake_by_ref_inner::<U>,
drop_inner::<U>,
);
RawWaker::new(data, vtable)
}
unsafe fn wake_inner<U: Unpark>(data: *const ()) {
let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
arc.unpark.unpark();
}
unsafe fn wake_by_ref_inner<U: Unpark>(data: *const ()) {
let arc: Arc<Inner<U>> = Arc::from_raw(data as *const Inner<U>);
arc.unpark.unpark();
// by_ref means we don't own the Node, so forget the Arc
mem::forget(arc);
}
unsafe fn drop_inner<U>(data: *const ()) {
drop(Arc::<Inner<U>>::from_raw(data as *const Inner<U>));
}
// ===== Raw Waker Node<U> ======
unsafe fn waker_ref<U: Unpark>(node: &Arc<Node<U>>) -> Waker {
let ptr = &*node as &Node<U> as *const Node<U> as *const ();
let vtable = &RawWakerVTable::new(
clone_node::<U>,
wake_unreachable,
wake_by_ref_node::<U>,
noop,
);
Waker::from_raw(RawWaker::new(ptr, vtable))
}
unsafe fn wake_unreachable(_data: *const ()) {
unreachable!("waker_ref::wake()");
}
unsafe fn clone_node<U: Unpark>(data: *const ()) -> RawWaker {
let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
let clone = arc.clone();
// forget both Arcs so the refcounts don't get decremented
mem::forget(arc);
mem::forget(clone);
let vtable = &RawWakerVTable::new(
clone_node::<U>,
wake_node::<U>,
wake_by_ref_node::<U>,
drop_node::<U>,
);
RawWaker::new(data, vtable)
}
unsafe fn wake_node<U: Unpark>(data: *const ()) {
let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
Node::<U>::notify(&arc);
}
unsafe fn wake_by_ref_node<U: Unpark>(data: *const ()) {
let arc: Arc<Node<U>> = Arc::from_raw(data as *const Node<U>);
Node::<U>::notify(&arc);
// by_ref means we don't own the Node, so forget the Arc
mem::forget(arc);
}
unsafe fn drop_node<U>(data: *const ()) {
drop(Arc::<Node<U>>::from_raw(data as *const Node<U>));
}
impl<U: Unpark> Node<U> {
fn notify(me: &Arc<Node<U>>) {
let inner = match me.queue.upgrade() {
Some(inner) => inner,
None => return,
};
// It's our job to notify the node that it's ready to get polled,
// meaning that we need to enqueue it into the readiness queue. To
// do this we flag that we're ready to be queued, and if successful
// we then do the literal queueing operation, ensuring that we're
// only queued once.
//
// Once the node is inserted we be sure to notify the parent task,
// as it'll want to come along and pick up our node now.
//
// Note that we don't change the reference count of the node here,
// we're just enqueueing the raw pointer. The `Scheduler`
// implementation guarantees that if we set the `queued` flag true that
// there's a reference count held by the main `Scheduler` queue
// still.
let prev = me.queued.swap(true, SeqCst);
if !prev {
// Get the current scheduler tick
let tick_num = inner.tick_num.load(SeqCst);
me.notified_at.store(tick_num, SeqCst);
inner.enqueue(&**me);
inner.unpark.unpark();
}
}
}
impl<U> Drop for Node<U> {
fn drop(&mut self) {
// Currently a `Node` is sent across all threads for any lifetime,
// regardless of `T`. This means that for memory safety we can't
// actually touch `T` at any time except when we have a reference to the
// `Scheduler` itself.
//
// Consequently it *should* be the case that we always drop items from
// the `Scheduler` instance, but this is a bomb in place to catch
// any bugs in that logic.
unsafe {
if (*self.item.get()).is_some() {
abort("item still here when dropping");
}
}
}
}
fn arc2ptr<T>(ptr: Arc<T>) -> *const T {
let addr = &*ptr as *const T;
mem::forget(ptr);
addr
}
unsafe fn ptr2arc<T>(ptr: *const T) -> Arc<T> {
let anchor = mem::transmute::<usize, Arc<T>>(0x10);
let addr = &*anchor as *const T;
mem::forget(anchor);
let offset = addr as isize - 0x10;
mem::transmute::<isize, Arc<T>>(ptr as isize - offset)
}
fn abort(s: &str) -> ! {
struct DoublePanic;
impl Drop for DoublePanic {
fn drop(&mut self) {
panic!("panicking twice to abort the program");
}
}
let _bomb = DoublePanic;
panic!("{}", s);
}

View File

@ -1,3 +1,6 @@
#[cfg(feature = "rt-current-thread")]
use crate::executor::current_thread;
#[cfg(feature = "rt-full")]
use crate::executor::thread_pool::ThreadPool;
use crate::executor::{Executor, SpawnError};
@ -50,6 +53,11 @@ impl DefaultExecutor {
let mut thread_pool = unsafe { &*threadpool_ptr };
Some(f(&mut thread_pool))
}
#[cfg(feature = "rt-current-thread")]
State::CurrentThread(current_thread_ptr) => {
let mut current_thread = unsafe { &*current_thread_ptr };
Some(f(&mut current_thread))
}
State::Empty => None,
})
}
@ -64,6 +72,10 @@ enum State {
#[cfg(feature = "rt-full")]
ThreadPool(*const ThreadPool),
// Current-thread executor
#[cfg(feature = "rt-current-thread")]
CurrentThread(*const current_thread::Scheduler),
// default executor is set to a custom executor.
Ready(*mut dyn Executor),
}
@ -160,12 +172,41 @@ where
let thread_pool = unsafe { &*threadpool_ptr };
thread_pool.spawn_background(future);
}
#[cfg(feature = "rt-current-thread")]
State::CurrentThread(current_thread_ptr) => {
let current_thread = unsafe { &*current_thread_ptr };
// Safety: The `CurrentThread` value set the thread-local (same
// thread).
unsafe {
current_thread.spawn_background(future);
}
}
State::Empty => panic!("must be called from the context of Tokio runtime"),
})
}
#[cfg(feature = "rt-current-thread")]
pub(super) fn with_current_thread<F, R>(current_thread: &current_thread::Scheduler, f: F) -> R
where
F: FnOnce() -> R,
{
with_state(
State::CurrentThread(current_thread as *const current_thread::Scheduler),
f,
)
}
#[cfg(feature = "rt-current-thread")]
pub(super) fn current_thread_is_current(current_thread: &current_thread::Scheduler) -> bool {
EXECUTOR.with(|current_executor| match current_executor.get() {
State::CurrentThread(ptr) => ptr == current_thread as *const _,
_ => false,
})
}
#[cfg(feature = "rt-full")]
pub(crate) fn with_threadpool<F, R>(thread_pool: &ThreadPool, f: F) -> R
pub(super) fn with_threadpool<F, R>(thread_pool: &ThreadPool, f: F) -> R
where
F: FnOnce() -> R,
{

View File

@ -20,8 +20,10 @@ pub(crate) mod std {
}
}
#[cfg(feature = "rt-full")]
pub(crate) use self::std::rand;
pub(crate) use self::std::sync;
#[cfg(any(feature = "blocking", feature = "rt-full"))]
pub(crate) use self::std::thread;
#[cfg(feature = "rt-full")]
pub(crate) use self::std::{alloc, cell, rand, sys};
#[cfg(feature = "rt-current-thread")]
pub(crate) use self::std::{alloc, cell, sys};

View File

@ -44,5 +44,6 @@ impl<T> CausalCell<T> {
impl CausalCheck {
pub(crate) fn check(self) {}
#[cfg(feature = "rt-full")]
pub(crate) fn join(&mut self, _other: CausalCheck) {}
}

View File

@ -1,10 +1,12 @@
// rt-full implies rt-current-thread
#[cfg(feature = "rt-full")]
mod atomic_u32;
mod atomic_usize;
#[cfg(feature = "rt-full")]
#[cfg(feature = "rt-current-thread")]
mod causal_cell;
#[cfg(feature = "rt-full")]
#[cfg(feature = "rt-current-thread")]
pub(crate) mod alloc {
#[derive(Debug)]
pub(crate) struct Track<T> {
@ -26,7 +28,7 @@ pub(crate) mod alloc {
}
}
#[cfg(feature = "rt-full")]
#[cfg(feature = "rt-current-thread")]
pub(crate) mod cell {
pub(crate) use super::causal_cell::{CausalCell, CausalCheck};
}
@ -62,15 +64,23 @@ pub(crate) mod sync {
pub(crate) use crate::executor::loom::std::atomic_usize::AtomicUsize;
#[cfg(feature = "rt-full")]
pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicPtr};
pub(crate) use std::sync::atomic::spin_loop_hint;
#[cfg(feature = "rt-current-thread")]
pub(crate) use std::sync::atomic::{fence, AtomicPtr};
}
}
#[cfg(feature = "rt-full")]
#[cfg(feature = "rt-current-thread")]
pub(crate) mod sys {
#[cfg(feature = "rt-full")]
pub(crate) fn num_cpus() -> usize {
usize::max(1, num_cpus::get_physical())
}
#[cfg(not(feature = "rt-full"))]
pub(crate) fn num_cpus() -> usize {
1
}
}
#[cfg(any(feature = "blocking", feature = "rt-full"))]

View File

@ -57,12 +57,14 @@ pub use self::executor::Executor;
mod global;
pub use self::global::{spawn, with_default, DefaultExecutor};
mod loom;
pub(crate) mod loom;
pub mod park;
#[cfg(feature = "rt-full")]
#[cfg(feature = "rt-current-thread")]
mod task;
#[cfg(feature = "rt-current-thread")]
pub use self::task::{JoinError, JoinHandle};
mod typed;
pub use self::typed::TypedExecutor;
@ -76,7 +78,7 @@ mod blocking;
pub mod blocking;
#[cfg(feature = "rt-current-thread")]
pub mod current_thread;
pub(crate) mod current_thread;
#[cfg(feature = "rt-full")]
pub mod thread_pool;

View File

@ -17,9 +17,9 @@ use std::task::{Context, Poll, Waker};
/// It is critical for `Header` to be the first field as the task structure will
/// be referenced by both *mut Cell and *mut Header.
#[repr(C)]
pub(super) struct Cell<T: Future, S: 'static> {
pub(super) struct Cell<T: Future> {
/// Hot task state data
pub(super) header: Header<S>,
pub(super) header: Header,
/// Either the future or output, depending on the execution stage.
pub(super) core: Core<T>,
@ -37,24 +37,24 @@ pub(super) struct Core<T: Future> {
/// Crate public as this is also needed by the pool.
#[repr(C)]
pub(crate) struct Header<S: 'static> {
pub(crate) struct Header {
/// Task state
pub(super) state: State,
/// Pointer to the executor owned by the task
pub(super) executor: CausalCell<Option<NonNull<S>>>,
pub(super) executor: CausalCell<Option<NonNull<()>>>,
/// Pointer to next task, used for misc task linked lists.
pub(crate) queue_next: UnsafeCell<*const Header<S>>,
pub(crate) queue_next: UnsafeCell<*const Header>,
/// Pointer to the next task in the ownership list.
pub(crate) owned_next: UnsafeCell<Option<NonNull<Header<S>>>>,
pub(crate) owned_next: UnsafeCell<Option<NonNull<Header>>>,
/// Pointer to the previous task in the ownership list.
pub(crate) owned_prev: UnsafeCell<Option<NonNull<Header<S>>>>,
pub(crate) owned_prev: UnsafeCell<Option<NonNull<Header>>>,
/// Table of function pointers for executing actions on the task.
pub(super) vtable: &'static Vtable<S>,
pub(super) vtable: &'static Vtable,
/// Used by loom to track the causality of the future. Without loom, this is
/// unit.
@ -74,10 +74,13 @@ enum Stage<T: Future> {
Consumed,
}
impl<T: Future, S: Schedule> Cell<T, S> {
impl<T: Future> Cell<T> {
/// Allocate a new task cell, containing the header, trailer, and core
/// structures.
pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
pub(super) fn new<S>(future: T, state: State) -> Box<Cell<T>>
where
S: Schedule,
{
Box::new(Cell {
header: Header {
state,
@ -103,7 +106,7 @@ impl<T: Future> Core<T> {
self.stage = Stage::Consumed
}
pub(super) fn poll<S>(&mut self, header: &Header<S>) -> Poll<T::Output>
pub(super) fn poll<S>(&mut self, header: &Header) -> Poll<T::Output>
where
S: Schedule,
{
@ -146,8 +149,8 @@ impl<T: Future> Core<T> {
}
}
impl<S> Header<S> {
pub(super) fn executor(&self) -> Option<NonNull<S>> {
impl Header {
pub(super) fn executor(&self) -> Option<NonNull<()>> {
unsafe { self.executor.with(|ptr| *ptr) }
}
}

View File

@ -2,7 +2,7 @@ use std::any::Any;
use std::fmt;
/// Task failed to execute to completion.
pub struct Error {
pub struct JoinError {
repr: Repr,
}
@ -11,23 +11,23 @@ enum Repr {
Panic(Box<dyn Any + Send + 'static>),
}
impl Error {
impl JoinError {
/// Create a new `cancelled` error
pub fn cancelled() -> Error {
Error {
pub fn cancelled() -> JoinError {
JoinError {
repr: Repr::Cancelled,
}
}
/// Create a new `panic` error
pub fn panic(err: Box<dyn Any + Send + 'static>) -> Error {
Error {
pub fn panic(err: Box<dyn Any + Send + 'static>) -> JoinError {
JoinError {
repr: Repr::Panic(err),
}
}
}
impl fmt::Display for Error {
impl fmt::Display for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
Repr::Cancelled => write!(fmt, "cancelled"),
@ -36,13 +36,13 @@ impl fmt::Display for Error {
}
}
impl fmt::Debug for Error {
impl fmt::Debug for JoinError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.repr {
Repr::Cancelled => write!(fmt, "task::Error::Cancelled"),
Repr::Panic(_) => write!(fmt, "task::Error::Panic(...)"),
Repr::Cancelled => write!(fmt, "JoinError::Cancelled"),
Repr::Panic(_) => write!(fmt, "JoinError::Panic(...)"),
}
}
}
impl std::error::Error for Error {}
impl std::error::Error for JoinError {}

View File

@ -2,16 +2,18 @@ use crate::executor::loom::alloc::Track;
use crate::executor::loom::cell::CausalCheck;
use crate::executor::task::core::{Cell, Core, Header, Trailer};
use crate::executor::task::state::Snapshot;
use crate::executor::task::{Error, Schedule, Task};
use crate::executor::task::{JoinError, Schedule, Task};
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{ManuallyDrop, MaybeUninit};
use std::ptr::NonNull;
use std::task::{Poll, Waker};
/// Typed raw task handle
pub(super) struct Harness<T: Future, S: 'static> {
cell: NonNull<Cell<T, S>>,
cell: NonNull<Cell<T>>,
_p: PhantomData<S>,
}
impl<T, S> Harness<T, S>
@ -22,11 +24,13 @@ where
pub(super) unsafe fn from_raw(ptr: *mut ()) -> Harness<T, S> {
debug_assert!(!ptr.is_null());
let cell = NonNull::new_unchecked(ptr as *mut Cell<T, S>);
Harness { cell }
Harness {
cell: NonNull::new_unchecked(ptr as *mut Cell<T>),
_p: PhantomData,
}
}
fn header(&self) -> &Header<S> {
fn header(&self) -> &Header {
unsafe { &self.cell.as_ref().header }
}
@ -51,7 +55,11 @@ where
/// Panics raised while polling the future are handled.
///
/// Returns `true` if the task needs to be scheduled again
pub(super) fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool {
///
/// # Safety
///
/// The pointer returned by the `executor` fn must be castable to `*mut S`
pub(super) unsafe fn poll(mut self, executor: &mut dyn FnMut() -> Option<NonNull<()>>) -> bool {
use std::panic;
// Transition the task to the running state.
@ -67,7 +75,7 @@ where
debug_assert!(join_interest || !res.has_join_waker());
// Get the cell components
let cell = unsafe { &mut self.cell.as_mut() };
let cell = &mut self.cell.as_mut();
let header = &cell.header;
let core = &mut cell.core;
@ -76,15 +84,13 @@ where
// point, there are no outstanding wakers which might access the
// field concurrently.
if header.executor().is_none() {
unsafe {
// We don't want the destructor to run because we don't really
// own the task here.
let task = ManuallyDrop::new(Task::from_raw(header.into()));
// Call the scheduler's bind callback
let executor = executor().expect("first poll must happen from an executor");
executor.as_ref().bind(&task);
header.executor.with_mut(|ptr| *ptr = Some(executor));
}
// We don't want the destructor to run because we don't really
// own the task here.
let task = ManuallyDrop::new(Task::from_raw(header.into()));
// Call the scheduler's bind callback
let executor = executor().expect("first poll must happen from an executor");
executor.cast::<S>().as_ref().bind(&task);
header.executor.with_mut(|ptr| *ptr = Some(executor.cast()));
}
// The transition to `Running` done above ensures that a lock on the
@ -111,7 +117,7 @@ where
polled: false,
};
let res = guard.core.poll(header);
let res = guard.core.poll::<S>(header);
// prevent the guard from dropping the future
guard.polled = true;
@ -136,7 +142,7 @@ where
}
}
Err(err) => {
self.complete(executor, join_interest, Err(Error::panic(err)));
self.complete(executor, join_interest, Err(JoinError::panic(err)));
false
}
}
@ -186,7 +192,7 @@ where
state: Snapshot,
) {
if state.is_canceled() {
dst.write(Track::new(Err(Error::cancelled())));
dst.write(Track::new(Err(JoinError::cancelled())));
} else {
self.core().read_output(dst);
}
@ -306,7 +312,7 @@ where
None => panic!("executor should be set"),
};
S::schedule(executor.as_ref(), self.to_task());
S::schedule(executor.cast().as_ref(), self.to_task());
}
}
}
@ -382,7 +388,7 @@ where
let task = self.to_task();
if let Some(executor) = bound_executor {
executor.as_ref().release(task);
executor.cast::<S>().as_ref().release(task);
} else {
// Just drop the task. This will release / deallocate memory.
drop(task);
@ -394,7 +400,7 @@ where
fn complete(
mut self,
executor: &mut dyn FnMut() -> Option<NonNull<S>>,
executor: &mut dyn FnMut() -> Option<NonNull<()>>,
join_interest: bool,
output: super::Result<T::Output>,
) {
@ -412,7 +418,12 @@ where
unsafe {
// perform a local release
let task = ManuallyDrop::new(self.to_task());
executor.as_ref().unwrap().as_ref().release_local(&task);
executor
.as_ref()
.unwrap()
.cast::<S>()
.as_ref()
.release_local(&task);
if self.transition_to_released(join_interest).is_final_ref() {
self.dealloc();
@ -438,7 +449,7 @@ where
None => panic!("executor should be set"),
};
executor.as_ref().release(task);
executor.cast::<S>().as_ref().release(task);
}
}
}
@ -542,7 +553,7 @@ where
}
unsafe fn to_task(&self) -> Task<S> {
let ptr = self.cell.as_ptr() as *mut Header<S>;
let ptr = self.cell.as_ptr() as *mut Header;
Task::from_raw(NonNull::new_unchecked(ptr))
}
}

View File

@ -1,18 +1,20 @@
use crate::executor::loom::alloc::Track;
use crate::executor::task::raw::RawTask;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
pub(crate) struct JoinHandle<T, S: 'static> {
raw: Option<RawTask<S>>,
/// An owned permission to join on a task (await its termination).
pub struct JoinHandle<T> {
raw: Option<RawTask>,
_p: PhantomData<T>,
}
impl<T, S: 'static> JoinHandle<T, S> {
pub(super) fn new(raw: RawTask<S>) -> JoinHandle<T, S> {
impl<T> JoinHandle<T> {
pub(super) fn new(raw: RawTask) -> JoinHandle<T> {
JoinHandle {
raw: Some(raw),
_p: PhantomData,
@ -20,9 +22,9 @@ impl<T, S: 'static> JoinHandle<T, S> {
}
}
impl<T, S: 'static> Unpin for JoinHandle<T, S> {}
impl<T> Unpin for JoinHandle<T> {}
impl<T, S: 'static> Future for JoinHandle<T, S> {
impl<T> Future for JoinHandle<T> {
type Output = super::Result<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -61,7 +63,7 @@ impl<T, S: 'static> Future for JoinHandle<T, S> {
}
}
impl<T, S: 'static> Drop for JoinHandle<T, S> {
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(raw) = self.raw.take() {
if raw.header().state.drop_join_handle_fast() {
@ -72,3 +74,12 @@ impl<T, S: 'static> Drop for JoinHandle<T, S> {
}
}
}
impl<T> fmt::Debug for JoinHandle<T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("JoinHandle").finish()
}
}

View File

@ -1,15 +1,20 @@
use crate::executor::task::{Header, Task};
use std::fmt;
use std::marker::PhantomData;
use std::ptr::NonNull;
pub(crate) struct OwnedList<T: 'static> {
head: Option<NonNull<Header<T>>>,
head: Option<NonNull<Header>>,
_p: PhantomData<T>,
}
impl<T: 'static> OwnedList<T> {
pub(crate) fn new() -> OwnedList<T> {
OwnedList { head: None }
OwnedList {
head: None,
_p: PhantomData,
}
}
pub(crate) fn insert(&mut self, task: &Task<T>) {

View File

@ -2,12 +2,15 @@ mod core;
pub(crate) use self::core::Header;
mod error;
pub use self::error::Error;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::error::JoinError;
mod harness;
mod join;
pub(crate) use self::join::JoinHandle;
#[cfg(any(feature = "rt-current-thread", feature = "rt-full"))]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;
mod list;
pub(crate) use self::list::OwnedList;
@ -27,18 +30,20 @@ mod tests;
use self::raw::RawTask;
use std::future::Future;
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::{fmt, mem};
/// An owned handle to the task, tracked by ref count
pub(crate) struct Task<S: 'static> {
raw: RawTask<S>,
raw: RawTask,
_p: PhantomData<S>,
}
unsafe impl<S: Send + Sync + 'static> Send for Task<S> {}
/// Task result sent back
pub(crate) type Result<T> = std::result::Result<T, Error>;
pub(crate) type Result<T> = std::result::Result<T, JoinError>;
pub(crate) trait Schedule: Send + Sync + Sized + 'static {
/// Bind a task to the executor.
@ -63,34 +68,43 @@ where
T: Future + Send + 'static,
S: Schedule,
{
let raw = RawTask::new_background(task);
Task { raw }
Task {
raw: RawTask::new_background::<_, S>(task),
_p: PhantomData,
}
}
/// Create a new task with an associated join handle
pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output, S>)
pub(crate) fn joinable<T, S>(task: T) -> (Task<S>, JoinHandle<T::Output>)
where
T: Future + Send + 'static,
S: Schedule,
{
let raw = RawTask::new_joinable(task);
let task = Task { raw };
let raw = RawTask::new_joinable::<_, S>(task);
let task = Task {
raw,
_p: PhantomData,
};
let join = JoinHandle::new(raw);
(task, join)
}
impl<S: 'static> Task<S> {
pub(crate) unsafe fn from_raw(ptr: NonNull<Header<S>>) -> Task<S> {
let raw = RawTask::from_raw(ptr);
Task { raw }
pub(crate) unsafe fn from_raw(ptr: NonNull<Header>) -> Task<S> {
Task {
raw: RawTask::from_raw(ptr),
_p: PhantomData,
}
}
pub(crate) fn header(&self) -> &Header<S> {
pub(crate) fn header(&self) -> &Header {
self.raw.header()
}
pub(crate) fn into_raw(self) -> NonNull<Header<S>> {
pub(crate) fn into_raw(self) -> NonNull<Header> {
let raw = self.raw.into_raw();
mem::forget(self);
raw
@ -99,8 +113,14 @@ impl<S: 'static> Task<S> {
impl<S: Schedule> Task<S> {
/// Returns `self` when the task needs to be immediately re-scheduled
pub(crate) fn run(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> Option<Self> {
if unsafe { self.raw.poll(executor) } {
pub(crate) fn run<F>(self, mut executor: F) -> Option<Self>
where
F: FnMut() -> Option<NonNull<S>>,
{
if unsafe {
self.raw
.poll(&mut || executor().map(|ptr| ptr.cast::<()>()))
} {
Some(self)
} else {
// Cleaning up the `Task` instance is done from within the poll

View File

@ -9,13 +9,13 @@ use std::ptr::NonNull;
use std::task::Waker;
/// Raw task handle
pub(super) struct RawTask<S: 'static> {
ptr: NonNull<Header<S>>,
pub(super) struct RawTask {
ptr: NonNull<Header>,
}
pub(super) struct Vtable<S: 'static> {
pub(super) struct Vtable {
/// Poll the future
pub(super) poll: unsafe fn(*mut (), &mut dyn FnMut() -> Option<NonNull<S>>) -> bool,
pub(super) poll: unsafe fn(*mut (), &mut dyn FnMut() -> Option<NonNull<()>>) -> bool,
/// The task handle has been dropped and the join waker needs to be dropped
/// or the task struct needs to be deallocated
@ -42,7 +42,7 @@ pub(super) struct Vtable<S: 'static> {
}
/// Get the vtable for the requested `T` and `S` generics.
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable<S> {
pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
&Vtable {
poll: poll::<T, S>,
drop_task: drop_task::<T, S>,
@ -54,54 +54,54 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable<S> {
}
}
impl<S> RawTask<S> {
pub(super) fn new_background<T>(task: T) -> RawTask<S>
impl RawTask {
pub(super) fn new_background<T, S>(task: T) -> RawTask
where
T: Future + Send + 'static,
S: Schedule,
{
RawTask::new(task, State::new_background())
RawTask::new::<_, S>(task, State::new_background())
}
pub(super) fn new_joinable<T>(task: T) -> RawTask<S>
pub(super) fn new_joinable<T, S>(task: T) -> RawTask
where
T: Future + Send + 'static,
S: Schedule,
{
RawTask::new(task, State::new_joinable())
RawTask::new::<_, S>(task, State::new_joinable())
}
fn new<T>(task: T, state: State) -> RawTask<S>
fn new<T, S>(task: T, state: State) -> RawTask
where
T: Future + Send + 'static,
S: Schedule,
{
let ptr = Box::into_raw(Cell::<T, S>::new(task, state));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header<S>) };
let ptr = Box::into_raw(Cell::new::<S>(task, state));
let ptr = unsafe { NonNull::new_unchecked(ptr as *mut Header) };
RawTask { ptr }
}
pub(super) unsafe fn from_raw(ptr: NonNull<Header<S>>) -> RawTask<S> {
pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> RawTask {
RawTask { ptr }
}
/// Returns a reference to the task's meta structure.
///
/// Safe as `Header` is `Sync`.
pub(super) fn header(&self) -> &Header<S> {
pub(super) fn header(&self) -> &Header {
unsafe { self.ptr.as_ref() }
}
/// Returns a raw pointer to the task's meta structure.
pub(super) fn into_raw(self) -> NonNull<Header<S>> {
pub(super) fn into_raw(self) -> NonNull<Header> {
self.ptr
}
/// Safety: mutual exclusion is required to call this function.
///
/// Returns `true` if the task needs to be scheduled again.
pub(super) unsafe fn poll(self, executor: &mut dyn FnMut() -> Option<NonNull<S>>) -> bool {
pub(super) unsafe fn poll(self, executor: &mut dyn FnMut() -> Option<NonNull<()>>) -> bool {
// Get the vtable without holding a ref to the meta struct. This is done
// because a mutable reference to the task is passed into the poll fn.
let vtable = self.header().vtable;
@ -142,17 +142,17 @@ impl<S> RawTask<S> {
}
}
impl<S: 'static> Clone for RawTask<S> {
impl Clone for RawTask {
fn clone(&self) -> Self {
RawTask { ptr: self.ptr }
}
}
impl<S: 'static> Copy for RawTask<S> {}
impl Copy for RawTask {}
unsafe fn poll<T: Future, S: Schedule>(
ptr: *mut (),
executor: &mut dyn FnMut() -> Option<NonNull<S>>,
executor: &mut dyn FnMut() -> Option<NonNull<()>>,
) -> bool {
let harness = Harness::<T, S>::from_raw(ptr);
harness.poll(executor)

View File

@ -1,19 +1,22 @@
use crate::executor::loom::sync::atomic::AtomicPtr;
use crate::executor::task::{Header, Task};
use std::marker::PhantomData;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
/// Concurrent stack of tasks, used to pass ownership of a task from one worker
/// to another.
pub(crate) struct TransferStack<T: 'static> {
head: AtomicPtr<Header<T>>,
head: AtomicPtr<Header>,
_p: PhantomData<T>,
}
impl<T: 'static> TransferStack<T> {
pub(crate) fn new() -> TransferStack<T> {
TransferStack {
head: AtomicPtr::new(ptr::null_mut()),
_p: PhantomData,
}
}
@ -49,7 +52,7 @@ impl<T: 'static> TransferStack<T> {
}
pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
struct Iter<T: 'static>(*mut Header<T>);
struct Iter<T: 'static>(*mut Header, PhantomData<T>);
impl<T: 'static> Iterator for Iter<T> {
type Item = Task<T>;
@ -80,6 +83,6 @@ impl<T: 'static> TransferStack<T> {
}
let ptr = self.head.swap(ptr::null_mut(), Acquire);
Iter(ptr)
Iter(ptr, PhantomData)
}
}

View File

@ -14,7 +14,7 @@ use std::sync::mpsc;
fn header_lte_cache_line() {
use std::mem::size_of;
assert!(size_of::<Header<()>>() <= 8 * size_of::<*const ()>());
assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
}
#[test]
@ -53,7 +53,7 @@ fn create_yield_complete_drop() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
let mock = &mut || Some(From::from(&mock));
let mock = || Some(From::from(&mock));
// Task is returned
let task = assert_some!(task.run(mock));
@ -83,7 +83,7 @@ fn create_clone_yield_complete_drop() {
let task = task::background(task);
let mock = mock().bind(&task).release_local();
let mock = &mut || Some(From::from(&mock));
let mock = || Some(From::from(&mock));
// Task is returned
let task = assert_some!(task.run(mock));

View File

@ -8,12 +8,12 @@ use std::task::{RawWaker, RawWakerVTable, Waker};
pub(super) struct WakerRef<'a, S: 'static> {
waker: Waker,
_p: PhantomData<&'a Header<S>>,
_p: PhantomData<(&'a Header, S)>,
}
/// Returns a `WakerRef` which avoids having to pre-emptively increase the
/// refcount if there is no need to do so.
pub(super) fn waker_ref<T, S>(meta: &Header<S>) -> WakerRef<'_, S>
pub(super) fn waker_ref<T, S>(meta: &Header) -> WakerRef<'_, S>
where
T: Future,
S: Schedule,
@ -48,7 +48,7 @@ where
T: Future,
S: Schedule,
{
let meta = ptr as *const Header<S>;
let meta = ptr as *const Header;
(*meta).state.ref_inc();
let vtable = &RawWakerVTable::new(

View File

@ -24,7 +24,7 @@ unsafe impl Sync for Inner {}
#[derive(Debug, Eq, PartialEq)]
enum Call {
Bind(*const Header<Mock>),
Bind(*const Header),
Release,
ReleaseLocal,
Schedule,

View File

@ -1,42 +0,0 @@
use crate::executor::park::Unpark;
use crate::executor::task;
use crate::executor::thread_pool::Shared;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// An owned permission to join on a task (await its termination).
pub struct JoinHandle<T> {
task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>,
}
impl<T> JoinHandle<T>
where
T: Send + 'static,
{
pub(super) fn new(task: task::JoinHandle<T, Shared<Box<dyn Unpark>>>) -> JoinHandle<T> {
JoinHandle { task }
}
}
impl<T> Future for JoinHandle<T>
where
T: Send + 'static,
{
type Output = task::Result<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.task).poll(cx)
}
}
impl<T> fmt::Debug for JoinHandle<T>
where
T: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("JoinHandle").finish()
}
}

View File

@ -8,9 +8,6 @@ mod current;
mod idle;
use self::idle::Idle;
mod join;
pub use self::join::JoinHandle;
mod owned;
use self::owned::Owned;
@ -40,9 +37,6 @@ mod tests;
#[cfg(feature = "blocking")]
pub use worker::blocking;
// Re-export `task::Error`
pub use crate::executor::task::Error;
// These exports are used in tests
#[cfg(test)]
#[allow(warnings)]

View File

@ -1,5 +1,6 @@
use crate::executor::blocking::PoolWaiter;
use crate::executor::thread_pool::{shutdown, Builder, JoinHandle, Spawner};
use crate::executor::task::JoinHandle;
use crate::executor::thread_pool::{shutdown, Builder, Spawner};
use crate::executor::Executor;
use std::fmt;

View File

@ -2,24 +2,27 @@ use crate::executor::loom::sync::atomic::AtomicUsize;
use crate::executor::loom::sync::Mutex;
use crate::executor::task::{Header, Task};
use std::marker::PhantomData;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{Acquire, Release};
use std::usize;
pub(super) struct Queue<T: 'static> {
/// Pointers to the head and tail of the queue
pointers: Mutex<Pointers<T>>,
pointers: Mutex<Pointers>,
/// Number of pending tasks in the queue. This helps prevent unnecessary
/// locking in the hot path.
///
/// The LSB is a flag tracking whether or not the queue is open or not.
len: AtomicUsize,
_p: PhantomData<T>,
}
struct Pointers<T: 'static> {
head: *const Header<T>,
tail: *const Header<T>,
struct Pointers {
head: *const Header,
tail: *const Header,
}
const CLOSED: usize = 1;
@ -33,6 +36,7 @@ impl<T: 'static> Queue<T> {
tail: ptr::null(),
}),
len: AtomicUsize::new(0),
_p: PhantomData,
}
}
@ -186,10 +190,10 @@ impl<T: 'static> Queue<T> {
}
}
unsafe fn get_next<T>(meta: NonNull<Header<T>>) -> *const Header<T> {
unsafe fn get_next(meta: NonNull<Header>) -> *const Header {
*meta.as_ref().queue_next.get()
}
unsafe fn set_next<T>(meta: NonNull<Header<T>>, val: *const Header<T>) {
unsafe fn set_next(meta: NonNull<Header>, val: *const Header) {
*meta.as_ref().queue_next.get() = val;
}

View File

@ -5,8 +5,8 @@
use crate::executor::loom::rand::seed;
use crate::executor::loom::sync::Arc;
use crate::executor::park::Unpark;
use crate::executor::task::{self, Task};
use crate::executor::thread_pool::{current, queue, BoxFuture, Idle, JoinHandle, Owned, Shared};
use crate::executor::task::{self, JoinHandle, Task};
use crate::executor::thread_pool::{current, queue, BoxFuture, Idle, Owned, Shared};
use crate::executor::util::{CachePadded, FastRand};
use crate::executor::{Executor, SpawnError};
@ -203,7 +203,7 @@ impl Set<Box<dyn Unpark>> {
{
let (task, handle) = task::joinable(future);
self.schedule(task);
JoinHandle::new(handle)
handle
}
}

View File

@ -1,6 +1,7 @@
use crate::executor::loom::sync::Arc;
use crate::executor::park::Unpark;
use crate::executor::thread_pool::{worker, JoinHandle};
use crate::executor::task::JoinHandle;
use crate::executor::thread_pool::worker;
use std::fmt;
use std::future::Future;

View File

@ -235,7 +235,7 @@ fn queues_2() -> (queue::Worker<Noop>, queue::Worker<Noop>) {
use std::cell::RefCell;
use std::collections::HashMap;
thread_local! {
static TASKS: RefCell<HashMap<u32, task::JoinHandle<u32, Noop>>> = RefCell::new(HashMap::new())
static TASKS: RefCell<HashMap<u32, task::JoinHandle<u32>>> = RefCell::new(HashMap::new())
}
fn val(num: u32) -> Task<Noop> {

View File

@ -0,0 +1,382 @@
use crate::executor::blocking::{Pool, PoolWaiter};
use crate::executor::current_thread::CurrentThread;
#[cfg(feature = "rt-full")]
use crate::executor::thread_pool;
use crate::net::driver::Reactor;
use crate::runtime::{Runtime, Kind};
use crate::timer::clock::Clock;
use crate::timer::timer::Timer;
use std::sync::Arc;
use std::{fmt, io};
/// Builds Tokio Runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
/// New instances of `Builder` are obtained via [`Builder::new`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: #method.build
/// [`Builder::new`]: #method.new
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
/// use tokio::timer::clock::Clock;
///
/// fn main() {
/// // build Runtime
/// let runtime = Builder::new()
/// .clock(Clock::system())
/// .num_threads(4)
/// .thread_name("my-custom-name")
/// .thread_stack_size(3 * 1024 * 1024)
/// .build()
/// .unwrap();
///
/// // use runtime ...
/// }
/// ```
pub struct Builder {
/// When `true`, use the current-thread executor.
current_thread: bool,
/// The number of worker threads.
///
/// Only used when not using the current-thread executor.
num_threads: usize,
/// Name used for threads spawned by the runtime.
thread_name: String,
/// Stack size used for threads spawned by the runtime.
thread_stack_size: Option<usize>,
/// Callback to run after each thread starts.
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
/// To run before each worker thread stops
before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
/// The clock to use
clock: Clock,
}
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
Builder {
// Use the thread-pool executor by default
current_thread: false,
// Default to use an equal number of threads to number of CPU cores
num_threads: crate::executor::loom::sys::num_cpus(),
// Default thread name
thread_name: "tokio-runtime-worker".into(),
// Do not set a stack size by default
thread_stack_size: None,
// No worker thread callbacks
after_start: None,
before_stop: None,
// Default clock
clock: Clock::new(),
}
}
/// Set the maximum number of worker threads for the `Runtime`'s thread pool.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
/// this value on the smaller side.
///
/// The default value is the number of cores available to the system.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .num_threads(4)
/// .build()
/// .unwrap();
/// # }
/// ```
pub fn num_threads(&mut self, val: usize) -> &mut Self {
self.num_threads = val;
self
}
/// Use only the current thread for the runtime.
///
/// The network driver, timer, and executor will all be run on the current
/// thread during `block_on` calls.
pub fn current_thread(&mut self) -> &mut Self {
self.current_thread = true;
self
}
/// Set name of threads spawned by the `Runtime`'s thread pool.
///
/// The default name is "tokio-runtime-worker".
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .thread_name("my-pool")
/// .build();
/// # }
/// ```
pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_name = val.into();
self
}
/// Set the stack size (in bytes) for worker threads.
///
/// The actual stack size may be greater than this value if the platform
/// specifies minimal stack size.
///
/// The default stack size for spawned threads is 2 MiB, though this
/// particular stack size is subject to change in the future.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .thread_stack_size(32 * 1024)
/// .build();
/// # }
/// ```
pub fn thread_stack_size(&mut self, val: usize) -> &mut Self {
self.thread_stack_size = Some(val);
self
}
/// Execute function `f` after each thread is started but before it starts
/// doing work.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
/// .after_start(|| {
/// println!("thread started");
/// })
/// .build();
/// # }
/// ```
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
}
/// Execute function `f` before each thread stops.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
/// .before_stop(|| {
/// println!("thread stopping");
/// })
/// .build();
/// # }
/// ```
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
}
/// Set the `Clock` instance that will be used by the runtime.
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}
/// Create the configured `Runtime`.
///
/// The returned `ThreadPool` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
/// let mut rt = Builder::new().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
if self.current_thread {
self.build_current_thread()
} else {
self.build_threadpool()
}
}
fn build_current_thread(&mut self) -> io::Result<Runtime> {
// Create network driver
let net = Reactor::new()?;
let net_handles = vec![net.handle()];
let timer = Timer::new_with_clock(net, self.clock.clone());
let timer_handles = vec![timer.handle()];
// And now put a single-threaded executor on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let executor = CurrentThread::new(timer);
// Blocking pool
let blocking_pool = PoolWaiter::from(Pool::default());
Ok(Runtime {
kind: Kind::CurrentThread(executor),
net_handles,
timer_handles,
blocking_pool,
})
}
// Without rt-full, the "threadpool" variant just uses current-thread
#[cfg(not(feature = "rt-full"))]
fn build_threadpool(&mut self) -> io::Result<Runtime> {
self.build_current_thread()
}
#[cfg(feature = "rt-full")]
fn build_threadpool(&mut self) -> io::Result<Runtime> {
use crate::net::driver;
use crate::timer::{clock, timer};
use std::sync::Mutex;
let mut net_handles = Vec::new();
let mut timer_handles = Vec::new();
let mut timers = Vec::new();
for _ in 0..self.num_threads {
// Create network driver
let net = Reactor::new()?;
net_handles.push(net.handle());
// Create a new timer.
let timer = Timer::new_with_clock(net, self.clock.clone());
timer_handles.push(timer.handle());
timers.push(Mutex::new(Some(timer)));
}
// Get a handle to the clock for the runtime.
let clock = self.clock.clone();
// Blocking pool
let blocking_pool = PoolWaiter::from(Pool::default());
let pool = {
let net_handles = net_handles.clone();
let timer_handles = timer_handles.clone();
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let mut builder = thread_pool::Builder::new();
builder.num_threads(self.num_threads);
builder.name(&self.thread_name);
if let Some(stack_size) = self.thread_stack_size {
builder.stack_size(stack_size);
}
builder
.around_worker(move |index, next| {
// Configure the network driver
let _net = driver::set_default(&net_handles[index]);
// Configure the clock
clock::with_default(&clock, || {
// Configure the timer
let _timer = timer::set_default(&timer_handles[index]);
// Call the start callback
if let Some(after_start) = after_start.as_ref() {
after_start();
}
// Run the worker
next();
// Call the after call back
if let Some(before_stop) = before_stop.as_ref() {
before_stop();
}
})
})
.build_with_park(move |index| timers[index].lock().unwrap().take().unwrap())
};
Ok(Runtime {
kind: Kind::ThreadPool(pool),
net_handles,
timer_handles,
blocking_pool,
})
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("current_thread", &self.current_thread)
.field("num_threads", &self.num_threads)
.field("thread_name", &self.thread_name)
.field("thread_stack_size", &self.thread_stack_size)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.field("before_stop", &self.after_start.as_ref().map(|_| "..."))
.field("clock", &self.clock)
.finish()
}
}

View File

@ -1,91 +0,0 @@
use crate::executor::current_thread::CurrentThread;
use crate::net::driver::Reactor;
use crate::runtime::current_thread::Runtime;
use crate::timer::clock::Clock;
use crate::timer::timer::Timer;
use std::io;
/// Builds a Single-threaded runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
/// New instances of `Builder` are obtained via [`Builder::new`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: #method.build
/// [`Builder::new`]: #method.new
///
/// # Examples
///
/// ```
/// use tokio::runtime::current_thread::Builder;
/// use tokio::timer::clock::Clock;
///
/// # pub fn main() {
/// // build Runtime
/// let runtime = Builder::new()
/// .clock(Clock::new())
/// .build();
/// // ... call runtime.run(...)
/// # let _ = runtime;
/// # }
/// ```
#[derive(Debug)]
pub struct Builder {
/// The clock to use
clock: Clock,
}
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
Builder {
clock: Clock::new(),
}
}
/// Set the `Clock` instance that will be used by the runtime.
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}
/// Create the configured `Runtime`.
pub fn build(&mut self) -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();
// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new_with_clock(reactor, self.clock.clone());
let timer_handle = timer.handle();
// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);
let runtime = Runtime::new2(
reactor_handle,
timer_handle,
self.clock.clone(),
executor,
);
Ok(runtime)
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}

View File

@ -1,67 +0,0 @@
//! A runtime implementation that runs everything on the current thread.
//!
//! [`current_thread::Runtime`][rt] is similar to the primary
//! [`Runtime`][concurrent-rt] except that it runs all components on the current
//! thread instead of using a thread pool. This means that it is able to spawn
//! futures that do not implement `Send`.
//!
//! Same as the default [`Runtime`][concurrent-rt], the
//! [`current_thread::Runtime`][rt] includes:
//!
//! * A [reactor] to drive I/O resources.
//! * An [executor] to execute tasks that use these I/O resources.
//! * A [timer] for scheduling work to run after a set period of time.
//!
//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself
//! and cannot be safely moved to other threads.
//!
//! # Spawning from other threads
//!
//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot
//! safely be moved to other threads, it provides a `Handle` that can be sent
//! to other threads and allows to spawn new tasks from there.
//!
//! For example:
//!
//! ```
//! use tokio::runtime::current_thread::Runtime;
//! use std::thread;
//!
//! let runtime = Runtime::new().unwrap();
//! let handle = runtime.handle();
//!
//! thread::spawn(move || {
//! let _ = handle.spawn(async {
//! println!("hello world");
//! });
//! }).join().unwrap();
//! ```
//!
//! # Examples
//!
//! Creating a new `Runtime` and running a future `f` until its completion and
//! returning its result.
//!
//! ```
//! use tokio::runtime::current_thread::Runtime;
//!
//! let runtime = Runtime::new().unwrap();
//!
//! // Use the runtime...
//! // runtime.block_on(f); // where f is a future
//! ```
//!
//! [rt]: struct.Runtime.html
//! [concurrent-rt]: ../struct.Runtime.html
//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html
//! [reactor]: ../../reactor/struct.Reactor.html
//! [executor]: https://tokio.rs/docs/internals/runtime-model/#executors
//! [timer]: ../../timer/index.html
mod builder;
mod runtime;
pub use self::builder::Builder;
pub use self::runtime::{Handle, Runtime, RunError};
pub use crate::executor::current_thread::spawn;
pub use crate::executor::current_thread::TaskExecutor;

View File

@ -1,206 +0,0 @@
use crate::executor::current_thread::Handle as ExecutorHandle;
use crate::executor::current_thread::{self, CurrentThread};
use crate::net::driver::{self, Reactor};
use crate::runtime::current_thread::Builder;
use crate::timer::clock::{self, Clock};
use crate::timer::timer::{self, Timer};
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::io;
/// Single-threaded runtime provides a way to start reactor
/// and executor on the current thread.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
#[derive(Debug)]
pub struct Runtime {
reactor_handle: driver::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Parker>,
}
pub(super) type Parker = Timer<Reactor>;
/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance
#[derive(Debug, Clone)]
pub struct Handle(ExecutorHandle);
impl Handle {
/// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), crate::executor::SpawnError>
where
F: Future<Output = ()> + Send + 'static,
{
self.0.spawn(future)
}
/// Provides a best effort **hint** to whether or not `spawn` will succeed.
///
/// This function may return both false positives **and** false negatives.
/// If `status` returns `Ok`, then a call to `spawn` will *probably*
/// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
/// *probably* fail, but may succeed.
///
/// This allows a caller to avoid creating the task if the call to `spawn`
/// has a high likelihood of failing.
pub fn status(&self) -> Result<(), crate::executor::SpawnError> {
self.0.status()
}
}
impl<T> crate::executor::TypedExecutor<T> for Handle
where
T: Future<Output = ()> + Send + 'static,
{
fn spawn(&mut self, future: T) -> Result<(), crate::executor::SpawnError> {
Handle::spawn(self, future)
}
}
/// Error returned by the `run` function.
#[derive(Debug)]
pub struct RunError {
inner: current_thread::RunError,
}
impl fmt::Display for RunError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "{}", self.inner)
}
}
impl Error for RunError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.inner.source()
}
}
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
Builder::new().build()
}
pub(super) fn new2(
reactor_handle: driver::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Parker>,
) -> Runtime {
Runtime {
reactor_handle,
timer_handle,
clock,
executor,
}
}
/// Get a new handle to spawn futures on the single-threaded Tokio runtime
///
/// Different to the runtime itself, the handle can be sent to different
/// threads.
pub fn handle(&self) -> Handle {
Handle(self.executor.handle().clone())
}
/// Spawn a future onto the single-threaded Tokio runtime.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::current_thread::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Output = ()> + 'static,
{
self.executor.spawn(future);
self
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F: Future>(&mut self, f: F) -> F::Output {
self.enter(|executor| {
// Run the provided future
executor.block_on(f)
})
}
/// Run the executor to completion, blocking the thread until **all**
/// spawned futures have completed.
pub fn run(&mut self) -> Result<(), RunError> {
self.enter(|executor| executor.run())
.map_err(|e| RunError { inner: e })
}
fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut current_thread::CurrentThread<Parker>) -> R,
{
let Runtime {
ref reactor_handle,
ref timer_handle,
ref clock,
ref mut executor,
..
} = *self;
// This will set the default handle and timer to use inside the closure
// and run the future.
let _reactor = driver::set_default(&reactor_handle);
clock::with_default(clock, || {
let _timer = timer::set_default(&timer_handle);
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
crate::executor::with_default(&mut default_executor, || f(executor))
})
}
}

View File

@ -83,7 +83,7 @@
//!
//! fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create the runtime
//! let rt = Runtime::new()?;
//! let mut rt = Runtime::new()?;
//!
//! // Spawn the root task
//! rt.block_on(async {
@ -128,27 +128,193 @@
//! [`tokio::spawn`]: ../executor/fn.spawn.html
//! [`tokio::main`]: ../../tokio_macros/attr.main.html
pub mod current_thread;
mod builder;
pub use self::builder::Builder;
mod spawner;
pub use self::spawner::Spawner;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use crate::executor::{JoinError, JoinHandle};
use crate::executor::blocking::{self, PoolWaiter};
use crate::executor::current_thread::CurrentThread;
#[cfg(feature = "rt-full")]
mod threadpool;
use crate::executor::thread_pool::ThreadPool;
use crate::net::{self, driver};
use crate::timer::timer;
#[cfg(feature = "rt-full")]
pub use self::threadpool::{
Builder,
JoinHandle,
Runtime,
Spawner,
};
use std::future::Future;
use std::io;
// Internal export, don't use.
// This exists to support "auto" runtime selection when using the
// #[tokio::main] attribute.
#[doc(hidden)]
pub mod __main {
#[cfg(feature = "rt-full")]
pub use super::Runtime;
/// The Tokio runtime, includes a reactor as well as an executor for running
/// tasks.
///
/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
/// most users will use the `#[tokio::main]` annotation on their entry point.
///
/// See [module level][mod] documentation for more details.
///
/// # Shutdown
///
/// Shutting down the runtime is done by dropping the value. The current thread
/// will block until the shut down operation has completed.
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
/// * Drop the reactor.
///
/// Once the reactor has dropped, any outstanding I/O resources bound to
/// that reactor will no longer function. Calling any method on them will
/// result in an error.
///
/// [mod]: index.html
/// [`new`]: #method.new
/// [`Builder`]: struct.Builder.html
/// [`tokio::run`]: fn.run.html
#[derive(Debug)]
pub struct Runtime {
/// Task executor
kind: Kind,
#[cfg(not(feature = "rt-full"))]
pub use super::current_thread::Runtime;
/// Handles to the network drivers
net_handles: Vec<net::driver::Handle>,
/// Timer handles
timer_handles: Vec<timer::Handle>,
/// Blocking pool handle
blocking_pool: PoolWaiter,
}
/// The runtime executor is either a thread-pool or a current-thread executor.
#[derive(Debug)]
enum Kind {
#[cfg(feature = "rt-full")]
ThreadPool(ThreadPool),
CurrentThread(CurrentThread<timer::Timer<net::driver::Reactor>>),
}
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
/// This results in a reactor, thread pool, and timer being initialized. The
/// thread pool will not spawn any worker threads until it needs to, i.e.
/// tasks are scheduled to run.
///
/// Most users will not need to call this function directly, instead they
/// will use [`tokio::run`](fn.run.html).
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
/// ```
///
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
Builder::new().build()
}
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(async {
/// println!("now running on a worker thread");
/// });
/// # }
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future<Output = ()> + Send + 'static,
{
match &self.kind {
#[cfg(feature = "rt-full")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::CurrentThread(exec) => exec.spawn(future),
}
}
/// Run a future to completion on the Tokio runtime. This is the runtime's
/// entry point.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F: Future>(&mut self, future: F) -> F::Output {
let _net = driver::set_default(&self.net_handles[0]);
let _timer = timer::set_default(&self.timer_handles[0]);
let kind = &mut self.kind;
blocking::with_pool(&self.blocking_pool, || {
match kind {
#[cfg(feature = "rt-full")]
Kind::ThreadPool(exec) => exec.block_on(future),
Kind::CurrentThread(exec) => exec.block_on(future),
}
})
}
/// Return a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let spawner = rt.spawner();
///
/// spawner.spawn(async { println!("hello"); });
/// ```
pub fn spawner(&self) -> Spawner {
match &self.kind {
#[cfg(feature = "rt-full")]
Kind::ThreadPool(exec) => Spawner::thread_pool(exec.spawner().clone()),
Kind::CurrentThread(exec) => Spawner::current_thread(exec.spawner()),
}
}
}

View File

@ -1,3 +1,5 @@
use crate::executor::current_thread;
#[cfg(feature = "rt-full")]
use crate::executor::thread_pool;
use crate::runtime::JoinHandle;
@ -11,12 +13,24 @@ use std::future::Future;
/// For more details, see the [module level](index.html) documentation.
#[derive(Debug, Clone)]
pub struct Spawner {
inner: thread_pool::Spawner,
kind: Kind
}
#[derive(Debug, Clone)]
enum Kind {
#[cfg(feature = "rt-full")]
ThreadPool(thread_pool::Spawner),
CurrentThread(current_thread::Spawner),
}
impl Spawner {
pub(super) fn new(inner: thread_pool::Spawner) -> Spawner {
Spawner { inner }
#[cfg(feature = "rt-full")]
pub(super) fn thread_pool(spawner: thread_pool::Spawner) -> Spawner {
Spawner { kind: Kind::ThreadPool(spawner) }
}
pub(super) fn current_thread(spawner: current_thread::Spawner) -> Spawner {
Spawner { kind: Kind::CurrentThread(spawner) }
}
/// Spawn a future onto the Tokio runtime.
@ -54,6 +68,10 @@ impl Spawner {
where
F: Future<Output = ()> + Send + 'static,
{
self.inner.spawn(future)
match &self.kind {
#[cfg(feature = "rt-full")]
Kind::ThreadPool(spawner) => spawner.spawn(future),
Kind::CurrentThread(spawner) => spawner.spawn(future),
}
}
}

View File

@ -1,288 +0,0 @@
use crate::executor::thread_pool;
use crate::net::driver::{self, Reactor};
use crate::runtime::threadpool::{Inner, Runtime};
use crate::timer::clock::{self, Clock};
use crate::timer::timer::{self, Timer};
use std::sync::{Arc, Mutex};
use std::{fmt, io};
/// Builds Tokio Runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
/// New instances of `Builder` are obtained via [`Builder::new`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: #method.build
/// [`Builder::new`]: #method.new
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
/// use tokio::timer::clock::Clock;
///
/// fn main() {
/// // build Runtime
/// let runtime = Builder::new()
/// .clock(Clock::system())
/// .num_threads(4)
/// .name("my-custom-name")
/// .stack_size(3 * 1024 * 1024)
/// .build()
/// .unwrap();
///
/// // use runtime ...
/// }
/// ```
pub struct Builder {
/// Thread pool specific builder
thread_pool_builder: thread_pool::Builder,
/// The number of worker threads
num_threads: usize,
/// To run after each worker thread starts
after_start: Option<Arc<dyn Fn() + Send + Sync>>,
/// To run before each worker thread stops
before_stop: Option<Arc<dyn Fn() + Send + Sync>>,
/// The clock to use
clock: Clock,
}
impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
let num_threads = num_cpus::get().max(1);
let mut thread_pool_builder = thread_pool::Builder::new();
thread_pool_builder
.name("tokio-runtime-worker")
.num_threads(num_threads);
Builder {
thread_pool_builder,
num_threads,
after_start: None,
before_stop: None,
clock: Clock::new(),
}
}
/// Set the `Clock` instance that will be used by the runtime.
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}
/// Set the maximum number of worker threads for the `Runtime`'s thread pool.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
/// this value on the smaller side.
///
/// The default value is the number of cores available to the system.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .num_threads(4)
/// .build()
/// .unwrap();
/// # }
/// ```
pub fn num_threads(&mut self, val: usize) -> &mut Self {
self.num_threads = val;
self.thread_pool_builder.num_threads(val);
self
}
/// Set name of threads spawned by the `Runtime`'s thread pool.
///
/// The default name is "tokio-runtime-worker".
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .name("my-pool")
/// .build();
/// # }
/// ```
pub fn name(&mut self, val: impl Into<String>) -> &mut Self {
self.thread_pool_builder.name(val);
self
}
/// Set the stack size (in bytes) for worker threads.
///
/// The actual stack size may be greater than this value if the platform
/// specifies minimal stack size.
///
/// The default stack size for spawned threads is 2 MiB, though this
/// particular stack size is subject to change in the future.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new()
/// .stack_size(32 * 1024)
/// .build();
/// # }
/// ```
pub fn stack_size(&mut self, val: usize) -> &mut Self {
self.thread_pool_builder.stack_size(val);
self
}
/// Execute function `f` after each thread is started but before it starts
/// doing work.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let thread_pool = runtime::Builder::new()
/// .after_start(|| {
/// println!("thread started");
/// })
/// .build();
/// # }
/// ```
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
}
/// Execute function `f` before each thread stops.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let thread_pool = runtime::Builder::new()
/// .before_stop(|| {
/// println!("thread stopping");
/// })
/// .build();
/// # }
/// ```
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
}
/// Create the configured `Runtime`.
///
/// The returned `ThreadPool` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// # use tokio::runtime::Builder;
/// # pub fn main() {
/// let runtime = Builder::new().build().unwrap();
/// // ... call runtime.run(...)
/// # let _ = runtime;
/// # }
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
let mut reactor_handles = Vec::new();
let mut timer_handles = Vec::new();
let mut timers = Vec::new();
for _ in 0..self.num_threads {
// Create a new reactor.
let reactor = Reactor::new()?;
reactor_handles.push(reactor.handle());
// Create a new timer.
let timer = Timer::new_with_clock(reactor, self.clock.clone());
timer_handles.push(timer.handle());
timers.push(Mutex::new(Some(timer)));
}
// Get a handle to the clock for the runtime.
let clock = self.clock.clone();
let around_reactor_handles = reactor_handles.clone();
let around_timer_handles = timer_handles.clone();
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
let pool = self
.thread_pool_builder
.around_worker(move |index, next| {
let _reactor = driver::set_default(&around_reactor_handles[index]);
clock::with_default(&clock, || {
let _timer = timer::set_default(&around_timer_handles[index]);
if let Some(after_start) = after_start.as_ref() {
after_start();
}
next();
if let Some(before_stop) = before_stop.as_ref() {
before_stop();
}
})
})
.build_with_park(move |index| timers[index].lock().unwrap().take().unwrap());
Ok(Runtime {
inner: Some(Inner {
pool,
reactor_handles,
timer_handles,
}),
})
}
}
impl Default for Builder {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("thread_pool_builder", &self.thread_pool_builder)
.field("after_start", &self.after_start.as_ref().map(|_| "..."))
.finish()
}
}

View File

@ -1,202 +0,0 @@
mod builder;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::builder::Builder;
mod spawner;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::spawner::Spawner;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use crate::executor::thread_pool::JoinHandle;
use crate::net::driver;
use crate::timer::timer;
use crate::executor::thread_pool::ThreadPool;
use std::future::Future;
use std::io;
/// Handle to the Tokio runtime.
///
/// The Tokio runtime includes a reactor as well as an executor for running
/// tasks.
///
/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However,
/// most users will use [`tokio::run`], which uses a `Runtime` internally.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
/// [`new`]: #method.new
/// [`Builder`]: struct.Builder.html
/// [`tokio::run`]: fn.run.html
#[derive(Debug)]
pub struct Runtime {
inner: Option<Inner>,
}
#[derive(Debug)]
struct Inner {
/// Task execution pool.
pool: ThreadPool,
/// Reactor handles
reactor_handles: Vec<crate::net::driver::Handle>,
/// Timer handles
timer_handles: Vec<timer::Handle>,
}
// ===== impl Runtime =====
impl Runtime {
/// Create a new runtime instance with default configuration values.
///
/// This results in a reactor, thread pool, and timer being initialized. The
/// thread pool will not spawn any worker threads until it needs to, i.e.
/// tasks are scheduled to run.
///
/// Most users will not need to call this function directly, instead they
/// will use [`tokio::run`](fn.run.html).
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// Creating a new `Runtime` with default configuration values.
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
/// ```
///
/// [mod]: index.html
pub fn new() -> io::Result<Self> {
Builder::new().build()
}
/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
/// thread pool. The thread pool is then responsible for polling the future
/// until it completes.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// fn main() {
/// // Create the runtime
/// let rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(async {
/// println!("now running on a worker thread");
/// });
/// }
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> &Self
where
F: Future<Output = ()> + Send + 'static,
{
self.inner().pool.spawn(future);
self
}
/// Run a future to completion on the Tokio runtime.
///
/// This runs the given future on the runtime, blocking until it is
/// complete, and yielding its resolved result. Any tasks or timers which
/// the future spawns internally will be executed on the runtime.
///
/// This method should not be called from an asynchronous context.
///
/// # Panics
///
/// This function panics if the executor is at capacity, if the provided
/// future panics, or if called within an asynchronous execution context.
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
let _reactor = driver::set_default(&self.inner().reactor_handles[0]);
let _timer = timer::set_default(&self.inner().timer_handles[0]);
self.inner().pool.block_on(future)
}
/// Return a handle to the runtime's spawner.
///
/// The returned handle can be used to spawn tasks that run on this runtime.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// let spawner = rt.spawner();
///
/// spawner.spawn(async { println!("hello"); });
/// ```
pub fn spawner(&self) -> Spawner {
let inner = self.inner().pool.spawner().clone();
Spawner::new(inner)
}
/// Signals the runtime to shutdown immediately.
///
/// Blocks the current thread until the shutdown operation has completed.
/// This function will forcibly shutdown the runtime, causing any
/// in-progress work to become canceled.
///
/// The shutdown steps are:
///
/// * Drain any scheduled work queues.
/// * Drop any futures that have not yet completed.
/// * Drop the reactor.
///
/// Once the reactor has dropped, any outstanding I/O resources bound to
/// that reactor will no longer function. Calling any method on them will
/// result in an error.
///
/// See [module level][mod] documentation for more details.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
///
/// let rt = Runtime::new()
/// .unwrap();
///
/// // Use the runtime...
///
/// // Shutdown the runtime
/// rt.shutdown_now();
/// ```
///
/// [mod]: index.html
#[allow(warnings)]
pub fn shutdown_now(mut self) {
self.inner.unwrap().pool.shutdown_now();
}
fn inner(&self) -> &Inner {
self.inner.as_ref().unwrap()
}
}

View File

@ -178,13 +178,13 @@ where
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::current_thread::Runtime;
use crate::runtime::{self, Runtime};
use crate::sync::{mpsc, oneshot};
use futures::{future, StreamExt};
#[test]
fn smoke() {
let mut rt = Runtime::new().unwrap();
let mut rt = rt();
rt.block_on(async move {
let registry = Registry::new(vec![
EventInfo::default(),
@ -307,4 +307,8 @@ mod tests {
drop(second_rx);
}
fn rt() -> Runtime {
runtime::Builder::new().current_thread().build().unwrap()
}
}

View File

@ -174,13 +174,13 @@ impl Stream for CtrlBreak {
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::runtime::current_thread::Runtime;
use crate::runtime::Runtime;
use futures_util::stream::StreamExt;
#[test]
fn ctrl_c() {
let mut rt = Runtime::new().unwrap();
let mut rt = rt();
rt.block_on(async {
let ctrl_c = crate::signal::ctrl_c().expect("failed to create CtrlC");
@ -198,7 +198,7 @@ mod tests {
#[test]
fn ctrl_break() {
let mut rt = Runtime::new().unwrap();
let mut rt = rt();
rt.block_on(async {
let ctrl_break = super::ctrl_break().expect("failed to create CtrlC");
@ -213,4 +213,11 @@ mod tests {
let _ = ctrl_break.into_future().await;
});
}
fn rt() -> Runtime {
crate::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
}
}

View File

@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]
use tokio::runtime::{self, current_thread};
use tokio::runtime;
use tokio::timer::clock::Clock;
use tokio::timer::*;
@ -20,14 +20,16 @@ fn clock_and_timer_concurrent() {
let when = Instant::now() + Duration::from_millis(5_000);
let clock = Clock::new_with_now(MockNow(when));
let rt = runtime::Builder::new().clock(clock).build().unwrap();
let mut rt = runtime::Builder::new().clock(clock).build().unwrap();
let (tx, rx) = mpsc::channel();
rt.spawn(async move {
delay(when).await;
assert!(Instant::now() < when);
tx.send(()).unwrap();
rt.block_on(async move {
tokio::spawn(async move {
delay(when).await;
assert!(Instant::now() < when);
tx.send(()).unwrap();
})
});
rx.recv().unwrap();
@ -38,7 +40,11 @@ fn clock_and_timer_single_threaded() {
let when = Instant::now() + Duration::from_millis(5_000);
let clock = Clock::new_with_now(MockNow(when));
let mut rt = current_thread::Builder::new().clock(clock).build().unwrap();
let mut rt = runtime::Builder::new()
.current_thread()
.clock(clock)
.build()
.unwrap();
rt.block_on(async move {
delay(when).await;

View File

@ -1,781 +0,0 @@
#![warn(rust_2018_idioms)]
#![cfg(not(miri))]
use tokio::executor::current_thread::{self, block_on_all, CurrentThread, TaskExecutor};
use tokio::executor::TypedExecutor;
use tokio::sync::oneshot;
use std::any::Any;
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
mod from_block_on_all {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let msg = block_on_all(async move {
c.set(1 + c.get());
// Spawn!
spawn(Box::pin(async move {
c.set(1 + c.get());
}));
"hello"
});
assert_eq!(2, cnt.get());
assert_eq!(msg, "hello");
}
#[test]
fn spawn() {
test(current_thread::spawn)
}
#[test]
fn execute() {
test(|f| {
TaskExecutor::current().spawn(f).unwrap();
});
}
}
#[test]
fn block_waits() {
let (tx, rx) = oneshot::channel();
thread::spawn(|| {
thread::sleep(Duration::from_millis(1000));
tx.send(()).unwrap();
});
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
block_on_all(async move {
rx.await.unwrap();
cnt.set(1 + cnt.get());
});
assert_eq!(1, cnt2.get());
}
#[test]
fn spawn_many() {
const ITER: usize = 200;
let cnt = Rc::new(Cell::new(0));
let mut tokio_current_thread = CurrentThread::new();
for _ in 0..ITER {
let cnt = cnt.clone();
tokio_current_thread.spawn(async move {
cnt.set(1 + cnt.get());
});
}
tokio_current_thread.run().unwrap();
assert_eq!(cnt.get(), ITER);
}
mod does_not_set_global_executor_by_default {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) -> Result<(), E> + 'static, E>(
spawn: F,
) {
block_on_all(async {
spawn(Box::pin(async {})).unwrap_err();
});
}
#[test]
fn spawn() {
test(|f| tokio::executor::DefaultExecutor::current().spawn(f))
}
}
mod from_block_on_future {
use super::*;
fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>)>(spawn: F) {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread.block_on(async move {
let cnt3 = cnt2.clone();
spawn(Box::pin(async move {
cnt3.set(1 + cnt3.get());
}));
});
tokio_current_thread.run().unwrap();
assert_eq!(1, cnt.get());
}
#[test]
fn spawn() {
test(current_thread::spawn);
}
#[test]
fn execute() {
test(|f| {
current_thread::TaskExecutor::current().spawn(f).unwrap();
});
}
}
mod outstanding_tasks_are_dropped_when_executor_is_dropped {
use super::*;
#[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed.
async fn never(_rc: Rc<()>) {
loop {
yield_once().await;
}
}
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>),
{
let mut rc = Rc::new(());
let mut tokio_current_thread = CurrentThread::new();
dotspawn(&mut tokio_current_thread, Box::pin(never(rc.clone())));
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
// Using the global spawn fn
let mut rc = Rc::new(());
let rc2 = rc.clone();
let mut tokio_current_thread = CurrentThread::new();
tokio_current_thread.block_on(async move {
spawn(Box::pin(never(rc2)));
});
drop(tokio_current_thread);
// Ensure the daemon is dropped
assert!(Rc::get_mut(&mut rc).is_some());
}
#[test]
fn spawn() {
test(current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
current_thread::TaskExecutor::current().spawn(f).unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
#[test]
#[should_panic]
fn nesting_run() {
block_on_all(async {
block_on_all(async {});
});
}
mod run_in_future {
use super::*;
#[test]
#[should_panic]
fn spawn() {
block_on_all(async {
current_thread::spawn(async {
block_on_all(async {});
});
});
}
#[test]
#[should_panic]
fn execute() {
block_on_all(async {
current_thread::TaskExecutor::current()
.spawn(async {
block_on_all(async {});
})
.unwrap();
});
}
}
#[test]
fn tick_on_infini_future() {
let num = Rc::new(Cell::new(0));
#[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed.
async fn infini(num: Rc<Cell<usize>>) {
loop {
num.set(1 + num.get());
yield_once().await
}
}
CurrentThread::new()
.spawn(infini(num.clone()))
.turn(None)
.unwrap();
assert_eq!(1, num.get());
}
mod tasks_are_scheduled_fairly {
use super::*;
#[allow(unreachable_code)] // TODO: remove this when https://github.com/rust-lang/rust/issues/64636 fixed.
async fn spin(state: Rc<RefCell<[i32; 2]>>, idx: usize) {
loop {
// borrow_mut scope
{
let mut state = state.borrow_mut();
if idx == 0 {
let diff = state[0] - state[1];
assert!(diff.abs() <= 1);
if state[0] >= 50 {
return;
}
}
state[idx] += 1;
if state[idx] >= 100 {
return;
}
}
yield_once().await;
}
}
fn test<F: Fn(Pin<Box<dyn Future<Output = ()>>>)>(spawn: F) {
let state = Rc::new(RefCell::new([0, 0]));
block_on_all(async move {
spawn(Box::pin(spin(state.clone(), 0)));
spawn(Box::pin(spin(state, 1)));
});
}
#[test]
fn spawn() {
test(current_thread::spawn)
}
#[test]
fn execute() {
test(|f| {
current_thread::TaskExecutor::current().spawn(f).unwrap();
})
}
}
mod and_turn {
use super::*;
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>),
{
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
let mut tokio_current_thread = CurrentThread::new();
// Spawn a basic task to get the executor to turn
dotspawn(&mut tokio_current_thread, Box::pin(async {}));
// Turn once...
tokio_current_thread.turn(None).unwrap();
dotspawn(
&mut tokio_current_thread,
Box::pin(async move {
c.set(1 + c.get());
// Spawn!
spawn(Box::pin(async move {
c.set(1 + c.get());
}));
}),
);
// This does not run the newly spawned thread
tokio_current_thread.turn(None).unwrap();
assert_eq!(1, cnt.get());
// This runs the newly spawned thread
tokio_current_thread.turn(None).unwrap();
assert_eq!(2, cnt.get());
}
#[test]
fn spawn() {
test(current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
current_thread::TaskExecutor::current().spawn(f).unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
mod in_drop {
use super::*;
struct OnDrop<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0.take().unwrap())();
}
}
async fn noop(_data: Box<dyn Any>) {}
fn test<F, G>(spawn: F, dotspawn: G)
where
F: Fn(Pin<Box<dyn Future<Output = ()>>>) + 'static,
G: Fn(&mut CurrentThread, Pin<Box<dyn Future<Output = ()>>>),
{
let mut tokio_current_thread = CurrentThread::new();
let (tx, rx) = oneshot::channel();
dotspawn(
&mut tokio_current_thread,
Box::pin(noop(Box::new(OnDrop(Some(move || {
spawn(Box::pin(async move {
tx.send(()).unwrap();
}));
}))))),
);
tokio_current_thread.block_on(rx).unwrap();
tokio_current_thread.run().unwrap();
}
#[test]
fn spawn() {
test(current_thread::spawn, |rt, f| {
rt.spawn(f);
})
}
#[test]
fn execute() {
test(
|f| {
current_thread::TaskExecutor::current().spawn(f).unwrap();
},
// Note: `CurrentThread` doesn't currently implement
// `futures::Executor`, so we'll call `.spawn(...)` rather than
// `.execute(...)` for now. If `CurrentThread` is changed to
// implement Executor, change this to `.execute(...).unwrap()`.
|rt, f| {
rt.spawn(f);
},
);
}
}
/*
#[test]
fn hammer_turn() {
use futures::sync::mpsc;
const ITER: usize = 100;
const N: usize = 100;
const THREADS: usize = 4;
for _ in 0..ITER {
let mut ths = vec![];
// Add some jitter
for _ in 0..THREADS {
let th = thread::spawn(|| {
let mut tokio_current_thread = CurrentThread::new();
let (tx, rx) = mpsc::unbounded();
tokio_current_thread.spawn({
let cnt = Rc::new(Cell::new(0));
let c = cnt.clone();
rx.for_each(move |_| {
c.set(1 + c.get());
Ok(())
})
.map_err(|e| panic!("err={:?}", e))
.map(move |v| {
assert_eq!(N, cnt.get());
v
})
});
thread::spawn(move || {
for _ in 0..N {
tx.unbounded_send(()).unwrap();
thread::yield_now();
}
});
while !tokio_current_thread.is_idle() {
tokio_current_thread.turn(None).unwrap();
}
});
ths.push(th);
}
for th in ths {
th.join().unwrap();
}
}
}
*/
#[test]
fn turn_has_polled() {
let mut tokio_current_thread = CurrentThread::new();
// Spawn oneshot receiver
let (sender, receiver) = oneshot::channel::<()>();
tokio_current_thread.spawn(async move {
let _ = receiver.await;
});
// Turn once...
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled the receiver once, but considered it not ready
assert!(res.has_polled());
// Turn another time
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled nothing, the receiver is not ready yet
assert!(!res.has_polled());
// Make the receiver ready
sender.send(()).unwrap();
// Turn another time
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// Should've polled the receiver, it's ready now
assert!(res.has_polled());
// Now the executor should be empty
assert!(tokio_current_thread.is_idle());
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
// So should've polled nothing
assert!(!res.has_polled());
}
// Our own mock Park that is never really waiting and the only
// thing it does is to send, on request, something (once) to a oneshot
// channel
struct MyPark {
sender: Option<oneshot::Sender<()>>,
send_now: Rc<Cell<bool>>,
}
struct MyUnpark;
impl tokio::executor::park::Park for MyPark {
type Unpark = MyUnpark;
type Error = ();
fn unpark(&self) -> Self::Unpark {
MyUnpark
}
fn park(&mut self) -> Result<(), Self::Error> {
// If called twice with send_now, this will intentionally panic
if self.send_now.get() {
self.sender.take().unwrap().send(()).unwrap();
}
Ok(())
}
fn park_timeout(&mut self, _duration: Duration) -> Result<(), Self::Error> {
self.park()
}
}
impl tokio::executor::park::Unpark for MyUnpark {
fn unpark(&self) {}
}
#[test]
fn turn_fair() {
let send_now = Rc::new(Cell::new(false));
let (sender, receiver) = oneshot::channel::<()>();
let (sender_2, receiver_2) = oneshot::channel::<()>();
let (sender_3, receiver_3) = oneshot::channel::<()>();
let my_park = MyPark {
sender: Some(sender_3),
send_now: send_now.clone(),
};
let mut tokio_current_thread = CurrentThread::new_with_park(my_park);
let receiver_1_done = Rc::new(Cell::new(false));
let receiver_1_done_clone = receiver_1_done.clone();
// Once an item is received on the oneshot channel, it will immediately
// immediately make the second oneshot channel ready
tokio_current_thread.spawn(async move {
receiver.await.unwrap();
sender_2.send(()).unwrap();
receiver_1_done_clone.set(true);
});
let receiver_2_done = Rc::new(Cell::new(false));
let receiver_2_done_clone = receiver_2_done.clone();
tokio_current_thread.spawn(async move {
receiver_2.await.unwrap();
receiver_2_done_clone.set(true);
});
// The third receiver is only woken up from our Park implementation, it simulates
// e.g. a socket that first has to be polled to know if it is ready now
let receiver_3_done = Rc::new(Cell::new(false));
let receiver_3_done_clone = receiver_3_done.clone();
tokio_current_thread.spawn(async move {
receiver_3.await.unwrap();
receiver_3_done_clone.set(true);
});
// First turn should've polled both and considered them not ready
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
assert!(res.has_polled());
// Next turn should've polled nothing
let res = tokio_current_thread
.turn(Some(Duration::from_millis(0)))
.unwrap();
assert!(!res.has_polled());
assert!(!receiver_1_done.get());
assert!(!receiver_2_done.get());
assert!(!receiver_3_done.get());
// After this the receiver future will wake up the second receiver future,
// so there are pending futures again
sender.send(()).unwrap();
// Now the first receiver should be done, the second receiver should be ready
// to be polled again and the socket not yet
let res = tokio_current_thread.turn(None).unwrap();
assert!(res.has_polled());
assert!(receiver_1_done.get());
assert!(!receiver_2_done.get());
assert!(!receiver_3_done.get());
// Now let our park implementation know that it should send something to sender 3
send_now.set(true);
// This should resolve the second receiver directly, but also poll the socket
// and read the packet from it. If it didn't do both here, we would handle
// futures that are woken up from the reactor and directly unfairly and would
// favour the ones that are woken up directly.
let res = tokio_current_thread.turn(None).unwrap();
assert!(res.has_polled());
assert!(receiver_1_done.get());
assert!(receiver_2_done.get());
assert!(receiver_3_done.get());
// Don't send again
send_now.set(false);
// Now we should be idle and turning should not poll anything
assert!(tokio_current_thread.is_idle());
let res = tokio_current_thread.turn(None).unwrap();
assert!(!res.has_polled());
}
#[test]
fn spawn_from_other_thread() {
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (sender, receiver) = oneshot::channel::<()>();
thread::spawn(move || {
handle
.spawn(async move {
sender.send(()).unwrap();
})
.unwrap();
});
let _ = current_thread.block_on(receiver).unwrap();
}
#[test]
fn spawn_from_other_thread_unpark() {
use std::sync::mpsc::channel as mpsc_channel;
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (sender_1, receiver_1) = oneshot::channel::<()>();
let (sender_2, receiver_2) = mpsc_channel::<()>();
thread::spawn(move || {
let _ = receiver_2.recv().unwrap();
handle
.spawn(async move {
sender_1.send(()).unwrap();
})
.unwrap();
});
// Ensure that unparking the executor works correctly. It will first
// check if there are new futures (there are none), then execute the
// lazy future below which will cause the future to be spawned from
// the other thread. Then the executor will park but should be woken
// up because *now* we have a new future to schedule
let _ = current_thread.block_on(async move {
// inlined 'lazy'
async move {
sender_2.send(()).unwrap();
}
.await;
receiver_1.await.unwrap();
});
}
#[test]
fn spawn_from_executor_with_handle() {
let mut current_thread = CurrentThread::new();
let handle = current_thread.handle();
let (tx, rx) = oneshot::channel();
current_thread.spawn(async move {
handle
.spawn(async move {
tx.send(()).unwrap();
})
.unwrap();
});
current_thread.block_on(rx).unwrap();
}
#[test]
fn handle_status() {
let current_thread = CurrentThread::new();
let handle = current_thread.handle();
assert!(handle.status().is_ok());
drop(current_thread);
assert!(handle.spawn(async { () }).is_err());
assert!(handle.status().is_err());
}
#[test]
fn handle_is_sync() {
let current_thread = CurrentThread::new();
let handle = current_thread.handle();
let _box: Box<dyn Sync> = Box::new(handle);
}
async fn yield_once() {
YieldOnce(false).await
}
struct YieldOnce(bool);
impl Future for YieldOnce {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.0 {
Poll::Ready(())
} else {
self.0 = true;
// Push to the back of the executor's queue
cx.waker().wake_by_ref();
Poll::Pending
}
}
}

View File

@ -3,7 +3,7 @@
#![warn(rust_2018_idioms)]
use tokio::process::Command;
use tokio::runtime::current_thread;
use tokio::runtime;
use futures_util::future::FutureExt;
use futures_util::stream::FuturesOrdered;
@ -18,7 +18,8 @@ fn run_test() {
let finished_clone = finished.clone();
thread::spawn(move || {
let mut rt = current_thread::Runtime::new().expect("failed to get runtime");
let mut rt = runtime::Builder::new().current_thread().build().unwrap();
let mut futures = FuturesOrdered::new();
rt.block_on(async {
for i in 0..2 {

View File

@ -0,0 +1,339 @@
#![warn(rust_2018_idioms)]
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::timer;
use tokio_test::{assert_err, assert_ok};
use futures_util::future::poll_fn;
use std::sync::{mpsc, Arc};
use std::task::Poll;
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn block_on_sync() {
let mut rt = rt();
let mut win = false;
rt.block_on(async {
win = true;
});
assert!(win);
}
#[test]
fn block_on_async() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_two() {
let mut rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
assert_ok!(tx1.send("ZOMG"));
});
tokio::spawn(async move {
let msg = assert_ok!(rx1.await);
assert_ok!(tx2.send(msg));
});
assert_ok!(rx2.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_many() {
use tokio::sync::mpsc;
const ITER: usize = 10;
let mut rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
let mut done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
assert_ok!(done_tx.try_send(msg));
});
tx
})
.collect::<Vec<_>>();
drop(done_tx);
thread::spawn(move || {
for (i, tx) in txs.drain(..).enumerate() {
assert_ok!(tx.send(i));
}
});
let mut out = vec![];
while let Some(i) = done_rx.recv().await {
out.push(i);
}
out.sort();
out
});
assert_eq!(ITER, out.len());
for i in 0..ITER {
assert_eq!(i, out[i]);
}
}
#[test]
fn outstanding_tasks_dropped() {
let mut rt = rt();
let cnt = Arc::new(());
rt.block_on(async {
let cnt = cnt.clone();
tokio::spawn(poll_fn(move |_| {
assert_eq!(2, Arc::strong_count(&cnt));
Poll::Pending
}));
});
assert_eq!(2, Arc::strong_count(&cnt));
drop(rt);
assert_eq!(1, Arc::strong_count(&cnt));
}
#[test]
#[should_panic]
fn nested_rt() {
let mut rt1 = rt();
let mut rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
let mut rt1 = rt();
let mut rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
}
#[test]
fn complete_block_on_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
}
#[test]
fn complete_task_under_load() {
let mut rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx1.send(()));
});
tokio::spawn(async move {
assert_ok!(rx1.await);
assert_ok!(tx2.send(()));
});
assert_ok!(rx2.await);
});
}
#[test]
fn spawn_from_other_thread() {
let mut rt = rt();
let sp = rt.spawner();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
sp.spawn(async move {
assert_ok!(tx.send(()));
});
});
rt.block_on(async move {
assert_ok!(rx.await);
});
}
#[test]
fn delay_at_root() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
timer::delay_for(dur).await;
});
assert!(now.elapsed() >= dur);
}
#[test]
fn delay_in_spawn() {
let mut rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
timer::delay_for(dur).await;
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
assert!(now.elapsed() >= dur);
}
#[test]
fn client_server_block_on() {
let _ = env_logger::try_init();
let mut rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
if yielded {
Poll::Ready(())
} else {
yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
}

View File

@ -35,14 +35,23 @@ async fn client_server(tx: mpsc::Sender<()>) {
tx.send(()).unwrap();
}
#[test]
fn send_sync_bound() {
fn is_send<T: Send + Sync>() {}
is_send::<Runtime>();
}
#[test]
fn spawn_shutdown() {
let _ = env_logger::try_init();
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
rt.spawn(client_server(tx.clone()));
rt.block_on(async {
tokio::spawn(client_server(tx.clone()));
});
// Use spawner
rt.spawner().spawn(client_server(tx));
@ -50,13 +59,13 @@ fn spawn_shutdown() {
assert_ok!(rx.recv());
assert_ok!(rx.recv());
rt.shutdown_now();
drop(rt);
assert_err!(rx.try_recv());
}
#[test]
fn block_on_timer() {
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
let v = rt.block_on(async move {
delay(Instant::now() + Duration::from_millis(100)).await;
@ -68,7 +77,7 @@ fn block_on_timer() {
#[test]
fn block_on_socket() {
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
@ -98,7 +107,7 @@ fn block_waits() {
a_tx.send(()).unwrap();
});
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(async move {
a_rx.await.unwrap();
b_tx.send(()).unwrap();
@ -111,7 +120,7 @@ fn block_waits() {
fn spawn_many() {
const ITER: usize = 200;
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
let cnt = Arc::new(Mutex::new(0));
let (tx, rx) = mpsc::channel();
@ -141,12 +150,12 @@ fn spawn_many() {
fn nested_enter() {
use std::panic;
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(async {
assert_err!(tokio::executor::enter());
let res = panic::catch_unwind(move || {
let rt = Runtime::new().unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(async {});
});
@ -165,7 +174,7 @@ fn after_start_and_before_stop_is_called() {
let after_inner = after_start.clone();
let before_inner = before_stop.clone();
let rt = tokio::runtime::Builder::new()
let mut rt = tokio::runtime::Builder::new()
.after_start(move || {
after_inner.clone().fetch_add(1, Ordering::Relaxed);
})

View File

@ -1,137 +0,0 @@
#![warn(rust_2018_idioms)]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::runtime::current_thread::Runtime;
use tokio::sync::oneshot;
use tokio_test::{assert_err, assert_ok};
use env_logger;
use std::sync::mpsc;
use std::time::{Duration, Instant};
use tokio::timer::delay;
async fn client_server(tx: mpsc::Sender<()>) {
let mut server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
#[test]
fn spawn_run_spawn_root() {
let _ = env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone();
rt.spawn(async move {
delay(Instant::now() + Duration::from_millis(1000)).await;
tx2.send(()).unwrap();
});
rt.spawn(client_server(tx));
rt.run().unwrap();
assert_ok!(rx.try_recv());
assert_ok!(rx.try_recv());
}
#[test]
fn spawn_run_nested_spawn() {
let _ = env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone();
rt.spawn(async move {
tokio::spawn(async move {
delay(Instant::now() + Duration::from_millis(1000)).await;
tx2.send(()).unwrap();
});
});
rt.spawn(client_server(tx));
rt.run().unwrap();
assert_ok!(rx.try_recv());
assert_ok!(rx.try_recv());
}
#[test]
fn block_on() {
let _ = env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone();
rt.spawn(async move {
delay(Instant::now() + Duration::from_millis(1000)).await;
tx2.send(()).unwrap();
});
rt.block_on(client_server(tx));
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
#[test]
fn racy() {
use std::sync::mpsc;
use std::thread;
let (trigger, exit) = oneshot::channel();
let (handle_tx, handle_rx) = mpsc::channel();
let jh = thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
handle_tx.send(rt.handle()).unwrap();
// don't exit until we are told to
rt.block_on(async {
exit.await.unwrap();
});
// run until all spawned futures (incl. the "exit" signal future) have completed.
rt.run().unwrap();
});
let (tx, rx) = oneshot::channel();
let handle = handle_rx.recv().unwrap();
handle
.spawn(async {
tx.send(()).unwrap();
})
.unwrap();
// signal runtime thread to exit
trigger.send(()).unwrap();
// wait for runtime thread to exit
jh.join().unwrap();
let mut e = tokio::executor::enter().unwrap();
e.block_on(rx).unwrap();
}

View File

@ -7,18 +7,18 @@ mod support {
use support::signal::send_signal;
use tokio::prelude::*;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use tokio::signal::unix::{signal, SignalKind};
#[test]
fn dropping_loops_does_not_cause_starvation() {
let kind = SignalKind::user_defined1();
let mut first_rt = Runtime::new().expect("failed to init first runtime");
let mut first_rt = rt();
let mut first_signal =
first_rt.block_on(async { signal(kind).expect("failed to register first signal") });
let mut second_rt = Runtime::new().expect("failed to init second runtime");
let mut second_rt = rt();
let mut second_signal =
second_rt.block_on(async { signal(kind).expect("failed to register second signal") });
@ -35,3 +35,10 @@ fn dropping_loops_does_not_cause_starvation() {
second_rt.block_on(second_signal.next());
}
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
}

View File

@ -7,7 +7,7 @@ mod support {
use support::signal::send_signal;
use tokio::prelude::*;
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;
use tokio::signal::unix::{signal, SignalKind};
use std::sync::mpsc::channel;
@ -24,7 +24,7 @@ fn multi_loop() {
.map(|_| {
let sender = sender.clone();
thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
let mut rt = rt();
let _ = rt.block_on(async {
let signal = signal(SignalKind::hangup()).unwrap();
sender.send(()).unwrap();
@ -45,3 +45,10 @@ fn multi_loop() {
}
}
}
fn rt() -> Runtime {
tokio::runtime::Builder::new()
.current_thread()
.build()
.unwrap()
}

View File

@ -1,7 +1,8 @@
#![warn(rust_2018_idioms)]
#![cfg(broken)]
use tokio::executor::current_thread::CurrentThread;
use tokio::executor::park::{Park, Unpark, UnparkThread};
use tokio::runtime;
use tokio::timer::{Delay, Timer};
use rand::Rng;
@ -44,7 +45,7 @@ fn hammer_complete() {
let done = done.clone();
thread::spawn(move || {
let mut exec = CurrentThread::new();
let mut exec = rt();
let mut rng = rand::thread_rng();
barrier.wait();
@ -101,7 +102,7 @@ fn hammer_cancel() {
let done = done.clone();
thread::spawn(move || {
let mut exec = CurrentThread::new();
let mut exec = rt();
let mut rng = rand::thread_rng();
barrier.wait();
@ -162,7 +163,7 @@ fn hammer_reset() {
let done = done.clone();
thread::spawn(move || {
let mut exec = CurrentThread::new();
let mut exec = rt();
let mut rng = rand::thread_rng();
barrier.wait();
@ -239,3 +240,7 @@ fn hammer_reset() {
}
}
}
fn rt() -> runtime::Runtime {
runtime::Builder::new().current_thread().build().unwrap()
}

View File

@ -27,12 +27,12 @@ fn timer_with_threaded_runtime() {
#[test]
fn timer_with_current_thread_runtime() {
use tokio::runtime::current_thread::Runtime;
use tokio::runtime::Builder;
let mut rt = Runtime::new().unwrap();
let mut rt = Builder::new().current_thread().build().unwrap();
let (tx, rx) = mpsc::channel();
rt.spawn(async move {
rt.block_on(async move {
let when = Instant::now() + Duration::from_millis(100);
tokio::timer::delay(when).await;
@ -41,7 +41,6 @@ fn timer_with_current_thread_runtime() {
tx.send(()).unwrap();
});
rt.run().unwrap();
rx.recv().unwrap();
}