Start adding a global event loop

This commit starts to add support for a global event loop by adding a
`Handle::default` method and implementing it. Currently the support is quite
rudimentary and doesn't support features such as shutdown, overriding the return
value of `Handle::default`, etc. Those will come as future commits.
This commit is contained in:
Alex Crichton 2017-12-05 09:47:29 -08:00 committed by Carl Lerche
parent 23a0e990d2
commit 32f2750c2d
5 changed files with 193 additions and 9 deletions

43
src/reactor/global.rs Normal file
View File

@ -0,0 +1,43 @@
use std::io;
use std::thread;
use reactor::{Reactor, Handle};
pub struct HelperThread {
thread: Option<thread::JoinHandle<()>>,
reactor: Handle,
}
impl HelperThread {
pub fn new() -> io::Result<HelperThread> {
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle().clone();
let thread = thread::Builder::new().spawn(move || run(reactor))?;
Ok(HelperThread {
thread: Some(thread),
reactor: reactor_handle,
})
}
pub fn handle(&self) -> &Handle {
&self.reactor
}
pub fn forget(mut self) {
drop(self.thread.take());
}
}
impl Drop for HelperThread {
fn drop(&mut self) {
// TODO: kill the reactor thread and wait for it to exit, needs
// `Handle::wakeup` to be implemented in a future PR
}
}
fn run(mut reactor: Reactor) {
loop {
reactor.turn(None);
}
}

View File

@ -29,7 +29,7 @@ impl IoToken {
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.inner.upgrade() {
match handle.inner() {
Some(inner) => {
let token = try!(inner.add_source(source));
let handle = handle.clone();
@ -61,7 +61,7 @@ impl IoToken {
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return 0,
};
@ -93,7 +93,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
@ -126,7 +126,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
@ -158,7 +158,7 @@ impl IoToken {
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self) {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};

View File

@ -22,8 +22,10 @@
use std::fmt;
use std::io::{self, ErrorKind};
use std::mem;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Weak, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration};
use futures::{Future, Async};
@ -34,6 +36,7 @@ use mio::event::Evented;
use slab::Slab;
mod io_token;
mod global;
mod poll_evented;
pub use self::poll_evented::PollEvented;
@ -214,7 +217,7 @@ impl Reactor {
let io_dispatch = self.inner.io_dispatch.read().unwrap();
if let Some(io) = io_dispatch.get(token) {
io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed);
io.readiness.fetch_or(ready2usize(ready), Relaxed);
if ready.is_writable() {
io.writer.notify();
}
@ -291,12 +294,113 @@ impl Inner {
task.register();
if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 {
if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 {
task.notify();
}
}
}
static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT;
/// Error returned from `Handle::set_fallback`.
#[derive(Clone, Debug)]
pub struct SetDefaultError(());
impl Handle {
/// Configures the fallback handle to be returned from `Handle::default`.
///
/// The `Handle::default()` function will by default lazily spin up a global
/// thread and run a reactor on this global thread. This behavior is not
/// always desirable in all applications, however, and sometimes a different
/// fallback reactor is desired.
///
/// This function will attempt to globally alter the return value of
/// `Handle::default()` to return the `handle` specified rather than a
/// lazily initialized global thread. If successful then all future calls to
/// `Handle::default()` which would otherwise fall back to the global thread
/// will instead return a clone of the handle specified.
///
/// # Errors
///
/// This function may not always succeed in configuring the fallback handle.
/// If this function was previously called (or perhaps concurrently called
/// on many threads) only the *first* invocation of this function will
/// succeed. All other invocations will return an error.
///
/// Additionally if the global reactor thread has already been initialized
/// then this function will also return an error. (aka if `Handle::default`
/// has been called previously in this program).
pub fn set_fallback(handle: Handle) -> Result<(), SetDefaultError> {
unsafe {
let val = handle.into_usize();
match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
Ok(_) => Ok(()),
Err(_) => {
drop(Handle::from_usize(val));
Err(SetDefaultError(()))
}
}
}
}
fn into_usize(self) -> usize {
unsafe {
mem::transmute::<Weak<Inner>, usize>(self.inner)
}
}
unsafe fn from_usize(val: usize) -> Handle {
let inner = mem::transmute::<usize, Weak<Inner>>(val);;
Handle { inner }
}
fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
}
impl Default for Handle {
fn default() -> Handle {
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
// If the fallback hasn't been previously initialized then let's spin
// up a helper thread and try to initialize with that. If we can't
// actually create a helper thread then we'll just return a "defunkt"
// handle which will return errors when I/O objects are attempted to be
// associated.
if fallback == 0 {
let helper = match global::HelperThread::new() {
Ok(helper) => helper,
Err(_) => return Handle { inner: Weak::new() },
};
// If we successfully set ourselves as the actual fallback then we
// want to `forget` the helper thread to ensure that it persists
// globally. If we fail to set ourselves as the fallback that means
// that someone was racing with this call to `Handle::default`.
// They ended up winning so we'll destroy our helper thread (which
// shuts down the thread) and reload the fallback.
if Handle::set_fallback(helper.handle().clone()).is_ok() {
let ret = helper.handle().clone();
helper.forget();
return ret
}
fallback = HANDLE_FALLBACK.load(SeqCst);
}
// At this point our fallback handle global was configured so we use
// its value to reify a handle, clone it, and then forget our reified
// handle as we don't actually have an owning reference to it.
assert!(fallback != 0);
unsafe {
let handle = Handle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
return ret
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Handle")

View File

@ -281,7 +281,7 @@ impl<E> PollEvented<E> {
pub fn deregister(&self) -> io::Result<()>
where E: Evented,
{
let inner = match self.handle().inner.upgrade() {
let inner = match self.handle().inner() {
Some(inner) => inner,
None => return Ok(()),
};

37
tests/global.rs Normal file
View File

@ -0,0 +1,37 @@
extern crate futures;
extern crate tokio;
use std::thread;
use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
use tokio::reactor::Handle;
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn hammer() {
let threads = (0..10).map(|_| {
thread::spawn(|| {
let handle = Handle::default();
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle));
let addr = t!(srv.local_addr());
let mine = TcpStream::connect(&addr, &handle);
let theirs = srv.incoming().into_future()
.map(|(s, _)| s.unwrap().0)
.map_err(|(s, _)| s);
let (mine, theirs) = t!(mine.join(theirs).wait());
assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
})
}).collect::<Vec<_>>();
for thread in threads {
thread.join().unwrap();
}
}