From b9dae23e3f943d9125a5add8513b551867b1131c Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 20 Aug 2016 22:59:14 -0700 Subject: [PATCH 1/3] Don't store an Arc in ReadinessStream This commit contains a few refactorings, but the major goal is to remove the `Arc` that's stored inside of each `ReadinessStream` and `Scheduled` slot in the event loop. The original purpose of this `Arc` was to share the I/O object among the concrete handle itself and the event loop. The event loop would then change how the socket is registered over time and then deregister it when it gets a "shutdown request". Nowadays, however, once an I/O object is registered with the event loop it's never updated. Additionally, we don't actually need to call `deregister` but can rather just instead close the I/O object itself and let the kernel/event loop take care of the cleanup. All we need to do on deregistering is free up the slab entry. The major result of this commit is that I/O objects no longer need to be `Sync` (as they're not stored in an `Arc`). Instead they just need to be `Send + 'static` as one might otherwise expect. Along the way this also refactors a few pieces here and there to make more sense in this new scheme. The `ReadinessStream` type now has a type parameter indicating an owned reference to the I/O object it wraps. This can be accessed via the `get_ref` and `get_mut` methods. Additionally I/O tokens on the event loop are now a full-fledged `IoToken` type which we can change in the future if we need to. --- src/event_loop.rs | 202 ++++++++++++++++++++-------------------- src/lib.rs | 2 +- src/readiness_stream.rs | 80 ++++++++++------ src/tcp.rs | 109 +++++++++------------- src/udp.rs | 61 ++++++------ 5 files changed, 226 insertions(+), 228 deletions(-) diff --git a/src/event_loop.rs b/src/event_loop.rs index d04a3e4d2..3bd8ca2a0 100644 --- a/src/event_loop.rs +++ b/src/event_loop.rs @@ -80,7 +80,7 @@ pub struct LoopPin { } struct Scheduled { - source: IoSource, + readiness: Arc, reader: Option, writer: Option, } @@ -97,7 +97,6 @@ enum Direction { } enum Message { - AddSource(IoSource, Arc>>), DropSource(usize), Schedule(usize, TaskHandle, Direction), AddTimeout(Instant, Arc>>), @@ -107,29 +106,6 @@ enum Message { Drop(DropBox), } -/// Type of I/O objects inserted into the event loop, created by `Source::new`. -pub struct Source { - readiness: AtomicUsize, - io: E, -} - -/// I/O objects inserted into the event loop -pub type IoSource = Arc>; - -fn register(poll: &mio::Poll, - token: usize, - sched: &Scheduled) -> io::Result<()> { - poll.register(&sched.source.io, - mio::Token(token), - mio::EventSet::readable() | mio::EventSet::writable(), - mio::PollOpt::edge()) -} - -fn deregister(poll: &mio::Poll, sched: &Scheduled) { - // TODO: handle error - poll.deregister(&sched.source.io).unwrap(); -} - impl Loop { /// Creates a new event loop, returning any error that happened during the /// creation. @@ -334,11 +310,11 @@ impl Loop { if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) { if event.kind().is_readable() { reader = sched.reader.take(); - sched.source.readiness.fetch_or(1, Ordering::Relaxed); + sched.readiness.fetch_or(1, Ordering::Relaxed); } if event.kind().is_writable() { writer = sched.writer.take(); - sched.source.readiness.fetch_or(2, Ordering::Relaxed); + sched.readiness.fetch_or(2, Ordering::Relaxed); } } else { debug!("notified on {} which no longer exists", token); @@ -382,10 +358,10 @@ impl Loop { CURRENT_LOOP.set(&self, || handle.unpark()); } - fn add_source(&self, source: IoSource) -> io::Result { + fn add_source(&self, source: &mio::Evented) -> io::Result { debug!("adding a new I/O source"); let sched = Scheduled { - source: source, + readiness: Arc::new(AtomicUsize::new(0)), reader: None, writer: None, }; @@ -395,14 +371,20 @@ impl Loop { dispatch.grow(amt); } let entry = dispatch.vacant_entry().unwrap(); - try!(register(&self.io, entry.index(), &sched)); - Ok(entry.insert(sched).index()) + try!(self.io.register(source, + mio::Token(entry.index()), + mio::EventSet::readable() | + mio::EventSet::writable(), + mio::PollOpt::edge())); + Ok(IoToken { + readiness: sched.readiness.clone(), + token: entry.insert(sched).index(), + }) } fn drop_source(&self, token: usize) { debug!("dropping I/O source: {}", token); - let sched = self.dispatch.borrow_mut().remove(token).unwrap(); - deregister(&self.io, &sched); + self.dispatch.borrow_mut().remove(token).unwrap(); } fn schedule(&self, token: usize, wake: TaskHandle, dir: Direction) { @@ -414,10 +396,10 @@ impl Loop { Direction::Read => (&mut sched.reader, 1), Direction::Write => (&mut sched.writer, 2), }; - let ready = sched.source.readiness.load(Ordering::SeqCst); + let ready = sched.readiness.load(Ordering::SeqCst); if ready & bit != 0 { *slot = None; - sched.source.readiness.store(ready & !bit, Ordering::SeqCst); + sched.readiness.store(ready & !bit, Ordering::SeqCst); Some(wake) } else { *slot = Some(wake); @@ -472,11 +454,6 @@ impl Loop { fn notify(&self, msg: Message) { match msg { - Message::AddSource(source, slot) => { - // This unwrap() should always be ok as we're the only producer - slot.try_produce(self.add_source(source)) - .ok().expect("interference with try_produce"); - } Message::DropSource(tok) => self.drop_source(tok), Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir), @@ -545,19 +522,20 @@ impl LoopHandle { /// /// When a new I/O object is created it needs to be communicated to the /// event loop to ensure that it's registered and ready to receive - /// notifications. The event loop with then respond with a unique token that - /// this handle can be identified with (the resolved value of the returned - /// future). + /// notifications. The event loop with then respond back with the I/O object + /// and a token which can be used to send more messages to the event loop. /// - /// This token is then passed in turn to each of the methods below to - /// interact with notifications on the I/O object itself. + /// The token returned is then passed in turn to each of the methods below + /// to interact with notifications on the I/O object itself. /// /// # Panics /// /// The returned future will panic if the event loop this handle is /// associated with has gone away, or if there is an error communicating /// with the event loop. - pub fn add_source(&self, source: IoSource) -> AddSource { + pub fn add_source(&self, source: E) -> AddSource + where E: mio::Evented + Send + 'static, + { AddSource { inner: LoopFuture { loop_handle: self.clone(), @@ -567,15 +545,19 @@ impl LoopHandle { } } - /// Begin listening for read events on an event loop. + /// Schedule the current future task to receive a notification when the + /// corresponding I/O object is readable. /// /// Once an I/O object has been registered with the event loop through the /// `add_source` method, this method can be used with the assigned token to - /// begin awaiting read notifications. + /// notify the current future task when the next read notification comes in. /// - /// Currently the current task will be notified with *edge* semantics. This - /// means that whenever the underlying I/O object changes state, e.g. it was - /// not readable and now it is, then a notification will be sent. + /// The current task will only receive a notification **once** and to + /// receive further notifications it will need to call `schedule_read` + /// again. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. /// /// # Panics /// @@ -585,19 +567,24 @@ impl LoopHandle { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_read(&self, tok: usize) { - self.send(Message::Schedule(tok, task::park(), Direction::Read)); + pub fn schedule_read(&self, tok: &IoToken) { + self.send(Message::Schedule(tok.token, task::park(), Direction::Read)); } - /// Begin listening for write events on an event loop. + /// Schedule the current future task to receive a notification when the + /// corresponding I/O object is writable. /// /// Once an I/O object has been registered with the event loop through the /// `add_source` method, this method can be used with the assigned token to - /// begin awaiting write notifications. + /// notify the current future task when the next write notification comes + /// in. /// - /// Currently the current task will be notified with *edge* semantics. This - /// means that whenever the underlying I/O object changes state, e.g. it was - /// not writable and now it is, then a notification will be sent. + /// The current task will only receive a notification **once** and to + /// receive further notifications it will need to call `schedule_write` + /// again. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. /// /// # Panics /// @@ -607,8 +594,8 @@ impl LoopHandle { /// /// This function will also panic if there is not a currently running future /// task. - pub fn schedule_write(&self, tok: usize) { - self.send(Message::Schedule(tok, task::park(), Direction::Write)); + pub fn schedule_write(&self, tok: &IoToken) { + self.send(Message::Schedule(tok.token, task::park(), Direction::Write)); } /// Unregister all information associated with a token on an event loop, @@ -625,13 +612,16 @@ impl LoopHandle { /// ensure that the callbacks are **not** invoked, so pending scheduled /// callbacks cannot be relied upon to get called. /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + /// /// # Panics /// /// This function will panic if the event loop this handle is associated /// with has gone away, or if there is an error communicating with the event /// loop. - pub fn drop_source(&self, tok: usize) { - self.send(Message::DropSource(tok)); + pub fn drop_source(&self, tok: &IoToken) { + self.send(Message::DropSource(tok.token)); } /// Adds a new timeout to get fired at the specified instant, notifying the @@ -731,16 +721,60 @@ impl LoopPin { /// /// Created through the `LoopHandle::add_source` method, this future can also /// resolve to an error if there's an issue communicating with the event loop. -pub struct AddSource { - inner: LoopFuture, +pub struct AddSource { + inner: LoopFuture<(E, IoToken), E>, } -impl Future for AddSource { - type Item = usize; +/// A token that identifies an active timeout. +pub struct IoToken { + token: usize, + // TODO: can we avoid this allocation? It's kind of a bummer... + readiness: Arc, +} + +impl IoToken { + /// Consumes the last readiness notification the token this source is for + /// registered. + /// + /// Currently sources receive readiness notifications on an edge-basis. That + /// is, once you receive a notification that an object can be read, you + /// won't receive any more notifications until all of that data has been + /// read. + /// + /// The event loop will fill in this information and then inform futures + /// that they're ready to go with the `schedule` method, and then the `poll` + /// method can use this to figure out what happened. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + // TODO: this should really return a proper newtype/enum, not a usize + pub fn take_readiness(&self) -> usize { + self.readiness.swap(0, Ordering::SeqCst) + } +} + +impl Future for AddSource + where E: mio::Evented + Send + 'static, +{ + type Item = (E, IoToken); type Error = io::Error; - fn poll(&mut self) -> Poll { - self.inner.poll(Loop::add_source, Message::AddSource) + fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { + let handle = self.inner.loop_handle.clone(); + self.inner.poll(|lp, io| { + let token = try!(lp.add_source(&io)); + Ok((io, token)) + }, |io, slot| { + Message::Run(Box::new(move || { + let res = handle.with_loop(|lp| { + let lp = lp.unwrap(); + let token = try!(lp.add_source(&io)); + Ok((io, token)) + }); + slot.try_produce(res).ok() + .expect("add source try_produce intereference"); + })) + }) } } @@ -1114,38 +1148,6 @@ impl TimeoutState { } } -impl Source { - /// Creates a new `Source` wrapping the provided source of events. - pub fn new(e: E) -> Source { - Source { - readiness: AtomicUsize::new(0), - io: e, - } - } -} - -impl Source { - /// Consumes the last readiness notification that this source received. - /// - /// Currently sources receive readiness notifications on an edge-basis. That - /// is, once you receive a notification that an object can be read, you - /// won't receive any more notifications until all of that data has been - /// read. - /// - /// The event loop will fill in this information and then inform futures - /// that they're ready to go with the `schedule` method, and then the `poll` - /// method can use this to figure out what happened. - // TODO: shouldn't return a usize here, but rather some kind of newtype - pub fn take_readiness(&self) -> usize { - self.readiness.swap(0, Ordering::SeqCst) - } - - /// Gets access to the underlying I/O object. - pub fn io(&self) -> &E { - &self.io - } -} - impl Executor for MioSender { fn execute_boxed(&self, callback: Box) { self.inner.send(Message::Run(callback)) diff --git a/src/lib.rs b/src/lib.rs index a54897558..bc665cc7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,7 @@ mod mpsc_queue; mod channel; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; -pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoSource, Source}; +pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; pub use readiness_stream::ReadinessStream; pub use tcp::{TcpListener, TcpStream}; pub use timeout::Timeout; diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index 3b48f3131..c67d5cec7 100644 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -2,8 +2,9 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use futures::{Future, Poll}; +use mio; -use event_loop::{IoSource, LoopHandle, AddSource}; +use event_loop::{IoToken, LoopHandle, AddSource}; /// A concrete implementation of a stream of readiness notifications for I/O /// objects that originates from an event loop. @@ -21,31 +22,30 @@ use event_loop::{IoSource, LoopHandle, AddSource}; /// It's the responsibility of the wrapper to inform the readiness stream when a /// "would block" I/O event is seen. The readiness stream will then take care of /// any scheduling necessary to get notified when the event is ready again. -pub struct ReadinessStream { - io_token: usize, - loop_handle: LoopHandle, - source: IoSource, +pub struct ReadinessStream { + token: IoToken, + handle: LoopHandle, readiness: AtomicUsize, + io: E, } -pub struct ReadinessStreamNew { - inner: AddSource, - handle: Option, - source: Option, +pub struct ReadinessStreamNew { + inner: AddSource, + handle: LoopHandle, } -impl ReadinessStream { +impl ReadinessStream + where E: mio::Evented + Send + 'static, +{ /// Creates a new readiness stream associated with the provided /// `loop_handle` and for the given `source`. /// /// This method returns a future which will resolve to the readiness stream /// when it's ready. - pub fn new(loop_handle: LoopHandle, source: IoSource) - -> ReadinessStreamNew { + pub fn new(loop_handle: LoopHandle, source: E) -> ReadinessStreamNew { ReadinessStreamNew { - inner: loop_handle.add_source(source.clone()), - source: Some(source), - handle: Some(loop_handle), + inner: loop_handle.add_source(source), + handle: loop_handle, } } @@ -60,11 +60,11 @@ impl ReadinessStream { if self.readiness.load(Ordering::SeqCst) & 1 != 0 { return Poll::Ok(()) } - self.readiness.fetch_or(self.source.take_readiness(), Ordering::SeqCst); + self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); if self.readiness.load(Ordering::SeqCst) & 1 != 0 { Poll::Ok(()) } else { - self.loop_handle.schedule_read(self.io_token); + self.handle.schedule_read(&self.token); Poll::NotReady } } @@ -80,11 +80,11 @@ impl ReadinessStream { if self.readiness.load(Ordering::SeqCst) & 2 != 0 { return Poll::Ok(()) } - self.readiness.fetch_or(self.source.take_readiness(), Ordering::SeqCst); + self.readiness.fetch_or(self.token.take_readiness(), Ordering::SeqCst); if self.readiness.load(Ordering::SeqCst) & 2 != 0 { Poll::Ok(()) } else { - self.loop_handle.schedule_write(self.io_token); + self.handle.schedule_write(&self.token); Poll::NotReady } } @@ -102,7 +102,7 @@ impl ReadinessStream { /// then again readable. pub fn need_read(&self) { self.readiness.fetch_and(!1, Ordering::SeqCst); - self.loop_handle.schedule_read(self.io_token); + self.handle.schedule_read(&self.token); } /// Indicates to this source of events that the corresponding I/O object is @@ -118,28 +118,48 @@ impl ReadinessStream { /// then again writable. pub fn need_write(&self) { self.readiness.fetch_and(!2, Ordering::SeqCst); - self.loop_handle.schedule_write(self.io_token); + self.handle.schedule_write(&self.token); + } + + /// Returns a reference to the event loop handle that this readiness stream + /// is associated with. + pub fn loop_handle(&self) -> &LoopHandle { + &self.handle + } + + /// Returns a shared reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_ref(&self) -> &E { + &self.io + } + + /// Returns a mutable reference to the underlying I/O object this readiness + /// stream is wrapping. + pub fn get_mut(&mut self) -> &mut E { + &mut self.io } } -impl Future for ReadinessStreamNew { - type Item = ReadinessStream; +impl Future for ReadinessStreamNew + where E: mio::Evented + Send + 'static, +{ + type Item = ReadinessStream; type Error = io::Error; - fn poll(&mut self) -> Poll { - self.inner.poll().map(|token| { + fn poll(&mut self) -> Poll, io::Error> { + self.inner.poll().map(|(io, token)| { ReadinessStream { - io_token: token, - source: self.source.take().unwrap(), - loop_handle: self.handle.take().unwrap(), + token: token, + handle: self.handle.clone(), + io: io, readiness: AtomicUsize::new(0), } }) } } -impl Drop for ReadinessStream { +impl Drop for ReadinessStream { fn drop(&mut self) { - self.loop_handle.drop_source(self.io_token) + self.handle.drop_source(&self.token) } } diff --git a/src/tcp.rs b/src/tcp.rs index 5993ef7f0..c0aca32b8 100644 --- a/src/tcp.rs +++ b/src/tcp.rs @@ -2,7 +2,6 @@ use std::fmt; use std::io::{self, ErrorKind, Read, Write}; use std::mem; use std::net::{self, SocketAddr, Shutdown}; -use std::sync::Arc; use futures::stream::Stream; use futures::{Future, IntoFuture, failed, Poll}; @@ -10,27 +9,21 @@ use futures_io::{IoFuture, IoStream}; use mio; use {ReadinessStream, LoopHandle}; -use event_loop::Source; /// An I/O object representing a TCP socket listening for incoming connections. /// /// This object can be converted into a stream of incoming connections for /// various forms of processing. pub struct TcpListener { - loop_handle: LoopHandle, - ready: ReadinessStream, - listener: Arc>, + io: ReadinessStream, } impl TcpListener { fn new(listener: mio::tcp::TcpListener, handle: LoopHandle) -> IoFuture { - let listener = Arc::new(Source::new(listener)); - ReadinessStream::new(handle.clone(), listener.clone()).map(|r| { + ReadinessStream::new(handle, listener).map(|io| { TcpListener { - loop_handle: handle, - ready: r, - listener: listener, + io: io, } }).boxed() } @@ -73,7 +66,7 @@ impl TcpListener { /// Test whether this socket is ready to be read or not. pub fn poll_read(&self) -> Poll<(), io::Error> { - self.ready.poll_read() + self.io.poll_read() } /// Returns the local address that this listener is bound to. @@ -81,7 +74,7 @@ impl TcpListener { /// This can be useful, for example, when binding to port 0 to figure out /// which port was actually bound. pub fn local_addr(&self) -> io::Result { - self.listener.io().local_addr() + self.io.get_ref().local_addr() } /// Consumes this listener, returning a stream of the sockets this listener @@ -99,14 +92,14 @@ impl TcpListener { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - match self.inner.listener.io().accept() { + match self.inner.io.get_ref().accept() { Ok(Some(pair)) => { debug!("accepted a socket"); Poll::Ok(Some(pair)) } Ok(None) => { debug!("waiting to accept another socket"); - self.inner.ready.need_read(); + self.inner.io.need_read(); Poll::NotReady } Err(e) => Poll::Err(e), @@ -114,17 +107,11 @@ impl TcpListener { } } - let loop_handle = self.loop_handle.clone(); + let loop_handle = self.io.loop_handle().clone(); Incoming { inner: self } .and_then(move |(tcp, addr)| { - let tcp = Arc::new(Source::new(tcp)); - ReadinessStream::new(loop_handle.clone(), - tcp.clone()).map(move |ready| { - let stream = TcpStream { - source: tcp, - ready: ready, - }; - (stream, addr) + ReadinessStream::new(loop_handle.clone(), tcp).map(move |io| { + (TcpStream { io: io }, addr) }) }).boxed() } @@ -132,7 +119,7 @@ impl TcpListener { impl fmt::Debug for TcpListener { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.listener.io().fmt(f) + self.io.get_ref().fmt(f) } } @@ -143,8 +130,7 @@ impl fmt::Debug for TcpListener { /// raw underlying I/O object as well as streams for the read/write /// notifications on the stream itself. pub struct TcpStream { - source: Arc>, - ready: ReadinessStream, + io: ReadinessStream, } enum TcpStreamNew { @@ -184,18 +170,8 @@ impl TcpStream { fn new(connected_stream: mio::tcp::TcpStream, handle: LoopHandle) -> IoFuture { - // Once we've connected, wait for the stream to be writable as that's - // when the actual connection has been initiated. Once we're writable we - // check for `take_socket_error` to see if the connect actually hit an - // error or not. - // - // If all that succeeded then we ship everything on up. - let connected_stream = Arc::new(Source::new(connected_stream)); - ReadinessStream::new(handle, connected_stream.clone()).and_then(|ready| { - TcpStreamNew::Waiting(TcpStream { - source: connected_stream, - ready: ready, - }) + ReadinessStream::new(handle, connected_stream).and_then(|io| { + TcpStreamNew::Waiting(TcpStream { io: io }) }).boxed() } @@ -233,7 +209,7 @@ impl TcpStream { /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is readable again. pub fn poll_read(&self) -> Poll<(), io::Error> { - self.ready.poll_read() + self.io.poll_read() } /// Test whether this socket is writey to be written to or not. @@ -243,17 +219,17 @@ impl TcpStream { /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is writable again. pub fn poll_write(&self) -> Poll<(), io::Error> { - self.ready.poll_write() + self.io.poll_write() } /// Returns the local address that this stream is bound to. pub fn local_addr(&self) -> io::Result { - self.source.io().local_addr() + self.io.get_ref().local_addr() } /// Returns the remote address that this stream is connected to. pub fn peer_addr(&self) -> io::Result { - self.source.io().peer_addr() + self.io.get_ref().peer_addr() } /// Shuts down the read, write, or both halves of this connection. @@ -262,7 +238,7 @@ impl TcpStream { /// portions to return immediately with an appropriate value (see the /// documentation of `Shutdown`). pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.source.io().shutdown(how) + self.io.get_ref().shutdown(how) } /// Sets the value of the `TCP_NODELAY` option on this socket. @@ -273,12 +249,12 @@ impl TcpStream { /// sufficient amount to send out, thereby avoiding the frequent sending of /// small packets. pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { - self.source.io().set_nodelay(nodelay) + self.io.get_ref().set_nodelay(nodelay) } /// Sets the keepalive time in seconds for this socket. pub fn set_keepalive_s(&self, seconds: Option) -> io::Result<()> { - self.source.io().set_keepalive(seconds) + self.io.get_ref().set_keepalive(seconds) } } @@ -291,9 +267,16 @@ impl Future for TcpStreamNew { TcpStreamNew::Waiting(s) => s, TcpStreamNew::Empty => panic!("can't poll TCP stream twice"), }; - match stream.ready.poll_write() { + + // Once we've connected, wait for the stream to be writable as that's + // when the actual connection has been initiated. Once we're writable we + // check for `take_socket_error` to see if the connect actually hit an + // error or not. + // + // If all that succeeded then we ship everything on up. + match stream.io.poll_write() { Poll::Ok(()) => { - match stream.source.io().take_socket_error() { + match stream.io.get_ref().take_socket_error() { Ok(()) => return Poll::Ok(stream), Err(ref e) if e.kind() == ErrorKind::WouldBlock => {} Err(e) => return Poll::Err(e), @@ -309,9 +292,9 @@ impl Future for TcpStreamNew { impl Read for TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let r = self.source.io().read(buf); + let r = self.io.get_ref().read(buf); if is_wouldblock(&r) { - self.ready.need_read(); + self.io.need_read(); } return r } @@ -319,16 +302,16 @@ impl Read for TcpStream { impl Write for TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { - let r = self.source.io().write(buf); + let r = self.io.get_ref().write(buf); if is_wouldblock(&r) { - self.ready.need_write(); + self.io.need_write(); } return r } fn flush(&mut self) -> io::Result<()> { - let r = self.source.io().flush(); + let r = self.io.get_ref().flush(); if is_wouldblock(&r) { - self.ready.need_write(); + self.io.need_write(); } return r } @@ -336,9 +319,9 @@ impl Write for TcpStream { impl<'a> Read for &'a TcpStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let r = self.source.io().read(buf); + let r = self.io.get_ref().read(buf); if is_wouldblock(&r) { - self.ready.need_read(); + self.io.need_read(); } return r } @@ -346,17 +329,17 @@ impl<'a> Read for &'a TcpStream { impl<'a> Write for &'a TcpStream { fn write(&mut self, buf: &[u8]) -> io::Result { - let r = self.source.io().write(buf); + let r = self.io.get_ref().write(buf); if is_wouldblock(&r) { - self.ready.need_write(); + self.io.need_write(); } return r } fn flush(&mut self) -> io::Result<()> { - let r = self.source.io().flush(); + let r = self.io.get_ref().flush(); if is_wouldblock(&r) { - self.ready.need_write(); + self.io.need_write(); } return r } @@ -371,7 +354,7 @@ fn is_wouldblock(r: &io::Result) -> bool { impl fmt::Debug for TcpStream { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.source.io().fmt(f) + self.io.get_ref().fmt(f) } } @@ -382,13 +365,13 @@ mod sys { impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { - self.source.io().as_raw_fd() + self.io.get_ref().as_raw_fd() } } impl AsRawFd for TcpListener { fn as_raw_fd(&self) -> RawFd { - self.listener.io().as_raw_fd() + self.io.get_ref().as_raw_fd() } } } @@ -402,7 +385,7 @@ mod sys { // // impl AsRawHandle for TcpStream { // fn as_raw_handle(&self) -> RawHandle { - // self.source.io().as_raw_handle() + // self.io.get_ref().as_raw_handle() // } // } // diff --git a/src/udp.rs b/src/udp.rs index 30c15317d..5a0de2b58 100644 --- a/src/udp.rs +++ b/src/udp.rs @@ -1,6 +1,5 @@ use std::io; use std::net::{self, SocketAddr, Ipv4Addr, Ipv6Addr}; -use std::sync::Arc; use std::fmt; use futures::{Future, failed, Poll}; @@ -8,12 +7,10 @@ use futures_io::IoFuture; use mio; use {ReadinessStream, LoopHandle}; -use event_loop::Source; /// An I/O object representing a UDP socket. pub struct UdpSocket { - source: Arc>, - ready: ReadinessStream, + io: ReadinessStream, } impl LoopHandle { @@ -34,12 +31,8 @@ impl LoopHandle { impl UdpSocket { fn new(socket: mio::udp::UdpSocket, handle: LoopHandle) -> IoFuture { - let socket = Arc::new(Source::new(socket)); - ReadinessStream::new(handle, socket.clone()).map(|ready| { - UdpSocket { - source: socket, - ready: ready, - } + ReadinessStream::new(handle, socket).map(|io| { + UdpSocket { io: io } }).boxed() } @@ -62,7 +55,7 @@ impl UdpSocket { /// Returns the local address that this stream is bound to. pub fn local_addr(&self) -> io::Result { - self.source.io().local_addr() + self.io.get_ref().local_addr() } /// Test whether this socket is ready to be read or not. @@ -72,7 +65,7 @@ impl UdpSocket { /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is readable again. pub fn poll_read(&self) -> Poll<(), io::Error> { - self.ready.poll_read() + self.io.poll_read() } /// Test whether this socket is writey to be written to or not. @@ -82,7 +75,7 @@ impl UdpSocket { /// is only suitable for calling in a `Future::poll` method and will /// automatically handle ensuring a retry once the socket is writable again. pub fn poll_write(&self) -> Poll<(), io::Error> { - self.ready.poll_write() + self.io.poll_write() } /// Sends data on the socket to the given address. On success, returns the @@ -91,10 +84,10 @@ impl UdpSocket { /// Address type can be any implementor of `ToSocketAddrs` trait. See its /// documentation for concrete examples. pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result { - match self.source.io().send_to(buf, target) { + match self.io.get_ref().send_to(buf, target) { Ok(Some(n)) => Ok(n), Ok(None) => { - self.ready.need_write(); + self.io.need_write(); Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) } Err(e) => Err(e), @@ -104,10 +97,10 @@ impl UdpSocket { /// Receives data from the socket. On success, returns the number of bytes /// read and the address from whence the data came. pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - match self.source.io().recv_from(buf) { + match self.io.get_ref().recv_from(buf) { Ok(Some(n)) => Ok(n), Ok(None) => { - self.ready.need_read(); + self.io.need_read(); Err(io::Error::new(io::ErrorKind::WouldBlock, "would block")) } Err(e) => Err(e), @@ -121,7 +114,7 @@ impl UdpSocket { /// /// [link]: #method.set_broadcast pub fn broadcast(&self) -> io::Result { - self.source.io().broadcast() + self.io.get_ref().broadcast() } /// Sets the value of the `SO_BROADCAST` option for this socket. @@ -129,7 +122,7 @@ impl UdpSocket { /// When enabled, this socket is allowed to send packets to a broadcast /// address. pub fn set_broadcast(&self, on: bool) -> io::Result<()> { - self.source.io().set_broadcast(on) + self.io.get_ref().set_broadcast(on) } /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -139,7 +132,7 @@ impl UdpSocket { /// /// [link]: #method.set_multicast_loop_v4 pub fn multicast_loop_v4(&self) -> io::Result { - self.source.io().multicast_loop_v4() + self.io.get_ref().multicast_loop_v4() } /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. @@ -147,7 +140,7 @@ impl UdpSocket { /// If enabled, multicast packets will be looped back to the local socket. /// Note that this may not have any affect on IPv6 sockets. pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { - self.source.io().set_multicast_loop_v4(on) + self.io.get_ref().set_multicast_loop_v4(on) } /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -157,7 +150,7 @@ impl UdpSocket { /// /// [link]: #method.set_multicast_ttl_v4 pub fn multicast_ttl_v4(&self) -> io::Result { - self.source.io().multicast_ttl_v4() + self.io.get_ref().multicast_ttl_v4() } /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. @@ -168,7 +161,7 @@ impl UdpSocket { /// /// Note that this may not have any affect on IPv6 sockets. pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { - self.source.io().set_multicast_ttl_v4(ttl) + self.io.get_ref().set_multicast_ttl_v4(ttl) } /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -178,7 +171,7 @@ impl UdpSocket { /// /// [link]: #method.set_multicast_loop_v6 pub fn multicast_loop_v6(&self) -> io::Result { - self.source.io().multicast_loop_v6() + self.io.get_ref().multicast_loop_v6() } /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. @@ -186,7 +179,7 @@ impl UdpSocket { /// Controls whether this socket sees the multicast packets it sends itself. /// Note that this may not have any affect on IPv4 sockets. pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { - self.source.io().set_multicast_loop_v6(on) + self.io.get_ref().set_multicast_loop_v6(on) } /// Gets the value of the `IP_TTL` option for this socket. @@ -195,7 +188,7 @@ impl UdpSocket { /// /// [link]: #method.set_ttl pub fn ttl(&self) -> io::Result { - self.source.io().ttl() + self.io.get_ref().ttl() } /// Sets the value for the `IP_TTL` option on this socket. @@ -203,7 +196,7 @@ impl UdpSocket { /// This value sets the time-to-live field that is used in every packet sent /// from this socket. pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { - self.source.io().set_ttl(ttl) + self.io.get_ref().set_ttl(ttl) } /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. @@ -216,7 +209,7 @@ impl UdpSocket { pub fn join_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.source.io().join_multicast_v4(multiaddr, interface) + self.io.get_ref().join_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. @@ -227,7 +220,7 @@ impl UdpSocket { pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.source.io().join_multicast_v6(multiaddr, interface) + self.io.get_ref().join_multicast_v6(multiaddr, interface) } /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. @@ -239,7 +232,7 @@ impl UdpSocket { pub fn leave_multicast_v4(&self, multiaddr: &Ipv4Addr, interface: &Ipv4Addr) -> io::Result<()> { - self.source.io().leave_multicast_v4(multiaddr, interface) + self.io.get_ref().leave_multicast_v4(multiaddr, interface) } /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. @@ -251,13 +244,13 @@ impl UdpSocket { pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { - self.source.io().leave_multicast_v6(multiaddr, interface) + self.io.get_ref().leave_multicast_v6(multiaddr, interface) } } impl fmt::Debug for UdpSocket { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - self.source.io().fmt(f) + self.io.get_ref().fmt(f) } } @@ -268,7 +261,7 @@ mod sys { impl AsRawFd for UdpSocket { fn as_raw_fd(&self) -> RawFd { - self.source.io().as_raw_fd() + self.io.get_ref().as_raw_fd() } } } @@ -282,7 +275,7 @@ mod sys { // // impl AsRawHandle for UdpSocket { // fn as_raw_handle(&self) -> RawHandle { - // self.source.io().as_raw_handle() + // self.io.get_ref().as_raw_handle() // } // } } From 2bd616df519ef1442a5a7ed1033aa76eeecb71a0 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 20 Aug 2016 23:23:16 -0700 Subject: [PATCH 2/3] Reorganize the `event_loop` module Split it up into a number of targeted modules for each purpose, for example loop data, I/O sources, timeouts, and channels. No actual change is intended to be part of this commit. --- src/event_loop.rs | 1156 ------------------------------- src/{ => event_loop}/channel.rs | 0 src/event_loop/loop_data.rs | 347 ++++++++++ src/event_loop/mod.rs | 596 ++++++++++++++++ src/event_loop/source.rs | 183 +++++ src/event_loop/timeout.rs | 81 +++ src/lib.rs | 1 - 7 files changed, 1207 insertions(+), 1157 deletions(-) delete mode 100644 src/event_loop.rs rename src/{ => event_loop}/channel.rs (100%) create mode 100644 src/event_loop/loop_data.rs create mode 100644 src/event_loop/mod.rs create mode 100644 src/event_loop/source.rs create mode 100644 src/event_loop/timeout.rs diff --git a/src/event_loop.rs b/src/event_loop.rs deleted file mode 100644 index 3bd8ca2a0..000000000 --- a/src/event_loop.rs +++ /dev/null @@ -1,1156 +0,0 @@ -use std::cell::RefCell; -use std::io::{self, ErrorKind}; -use std::marker; -use std::mem; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; -use std::time::{Instant, Duration}; - -use futures::{Future, Poll}; -use futures::task::{self, Task, Notify, TaskHandle}; -use futures::executor::{ExecuteCallback, Executor}; -use mio; -use slab::Slab; - -use channel::{Sender, Receiver, channel}; -use event_loop::dropbox::DropBox; -use slot::{self, Slot}; -use timer_wheel::{TimerWheel, Timeout}; - -static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; -scoped_thread_local!(static CURRENT_LOOP: Loop); - -const SLAB_CAPACITY: usize = 1024 * 64; - -/// An event loop. -/// -/// The event loop is the main source of blocking in an application which drives -/// all other I/O events and notifications happening. Each event loop can have -/// multiple handles pointing to it, each of which can then be used to create -/// various I/O objects to interact with the event loop in interesting ways. -// TODO: expand this -pub struct Loop { - id: usize, - io: mio::Poll, - events: mio::Events, - tx: Arc, - rx: Receiver, - dispatch: RefCell>, - _future_registration: mio::Registration, - future_readiness: Arc, - - // Timer wheel keeping track of all timeouts. The `usize` stored in the - // timer wheel is an index into the slab below. - // - // The slab below keeps track of the timeouts themselves as well as the - // state of the timeout itself. The `TimeoutToken` type is an index into the - // `timeouts` slab. - timer_wheel: RefCell>, - timeouts: RefCell>, - - // A `Loop` cannot be sent to other threads as it's used as a proxy for data - // that belongs to the thread the loop was running on at some point. In - // other words, the safety of `DropBox` below relies on loops not crossing - // threads. - _marker: marker::PhantomData>, -} - -struct MioSender { - inner: Sender, -} - -/// Handle to an event loop, used to construct I/O objects, send messages, and -/// otherwise interact indirectly with the event loop itself. -/// -/// Handles can be cloned, and when cloned they will still refer to the -/// same underlying event loop. -#[derive(Clone)] -pub struct LoopHandle { - id: usize, - tx: Arc, -} - -/// A non-sendable handle to an event loop, useful for manufacturing instances -/// of `LoopData`. -#[derive(Clone)] -pub struct LoopPin { - handle: LoopHandle, - _marker: marker::PhantomData>, -} - -struct Scheduled { - readiness: Arc, - reader: Option, - writer: Option, -} - -enum TimeoutState { - NotFired, - Fired, - Waiting(TaskHandle), -} - -enum Direction { - Read, - Write, -} - -enum Message { - DropSource(usize), - Schedule(usize, TaskHandle, Direction), - AddTimeout(Instant, Arc>>), - UpdateTimeout(TimeoutToken, TaskHandle), - CancelTimeout(TimeoutToken), - Run(Box), - Drop(DropBox), -} - -impl Loop { - /// Creates a new event loop, returning any error that happened during the - /// creation. - pub fn new() -> io::Result { - let (tx, rx) = channel(); - let io = try!(mio::Poll::new()); - try!(io.register(&rx, - mio::Token(0), - mio::EventSet::readable(), - mio::PollOpt::edge())); - let pair = mio::Registration::new(&io, - mio::Token(1), - mio::EventSet::readable(), - mio::PollOpt::level()); - let (registration, readiness) = pair; - Ok(Loop { - id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), - io: io, - events: mio::Events::new(), - tx: Arc::new(MioSender { inner: tx }), - rx: rx, - _future_registration: registration, - future_readiness: Arc::new(readiness), - dispatch: RefCell::new(Slab::new_starting_at(2, SLAB_CAPACITY)), - timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)), - timer_wheel: RefCell::new(TimerWheel::new()), - _marker: marker::PhantomData, - }) - } - - /// Generates a handle to this event loop used to construct I/O objects and - /// send messages. - /// - /// Handles to an event loop are cloneable as well and clones will always - /// refer to the same event loop. - pub fn handle(&self) -> LoopHandle { - LoopHandle { - id: self.id, - tx: self.tx.clone(), - } - } - - /// Returns a "pin" of this event loop which cannot be sent across threads - /// but can be used as a proxy to the event loop itself. - /// - /// Currently the primary use for this is to use as a handle to add data - /// to the event loop directly. The `LoopPin::add_loop_data` method can - /// be used to immediately create instances of `LoopData` structures. - pub fn pin(&self) -> LoopPin { - LoopPin { - handle: self.handle(), - _marker: marker::PhantomData, - } - } - - /// Creates a new `LoopData` handle by associating data to be directly - /// stored by this event loop. - /// - /// This function is useful for when storing non-`Send` data inside of a - /// future. The `LoopData` handle is itself `Send + 'static` regardless - /// of the underlying `A`. That is, for example, you can create a handle to - /// some data that contains an `Rc`, for example. - pub fn add_loop_data(&self, a: A) -> LoopData - where A: 'static, - { - self.pin().add_loop_data(a) - } - - /// Runs a future until completion, driving the event loop while we're - /// otherwise waiting for the future to complete. - /// - /// This function will begin executing the event loop and will finish once - /// the provided future is resolve. Note that the future argument here - /// crucially does not require the `'static` nor `Send` bounds. As a result - /// the future will be "pinned" to not only this thread but also this stack - /// frame. - /// - /// This function will returns the value that the future resolves to once - /// the future has finished. If the future never resolves then this function - /// will never return. - /// - /// # Panics - /// - /// This method will **not** catch panics from polling the future `f`. If - /// the future panics then it's the responsibility of the caller to catch - /// that panic and handle it as appropriate. - /// - /// Similarly, becuase the provided future will be pinned not only to this - /// thread but also to this task, any attempt to poll the future on a - /// separate thread will result in a panic. That is, calls to - /// `task::poll_on` must be avoided. - pub fn run(&mut self, mut f: F) -> Result - where F: Future, - { - struct MyNotify(Arc); - - impl Notify for MyNotify { - fn notify(&self) { - self.0.set_readiness(mio::EventSet::readable()) - .expect("failed to set readiness"); - } - } - - // First up, create the task that will drive this future. The task here - // isn't a "normal task" but rather one where we define what to do when - // a readiness notification comes in. - // - // We translate readiness notifications to a `set_readiness` of our - // `future_readiness` structure we have stored internally. - let mut task = Task::new_notify(MyNotify(self.future_readiness.clone())); - let ready = self.future_readiness.clone(); - - // Next, move all that data into a dynamically dispatched closure to cut - // down on monomorphization costs. Inside this closure we unset the - // readiness of the future (as we're about to poll it) and then we check - // to see if it's done. If it's not then the event loop will turn again. - let mut res = None; - self._run(&mut || { - ready.set_readiness(mio::EventSet::none()) - .expect("failed to set readiness"); - assert!(res.is_none()); - match task.enter(|| f.poll()) { - Poll::NotReady => {} - Poll::Ok(e) => res = Some(Ok(e)), - Poll::Err(e) => res = Some(Err(e)), - } - res.is_some() - }); - res.expect("run should not return until future is done") - } - - fn _run(&mut self, done: &mut FnMut() -> bool) { - // Check to see if we're done immediately, if so we shouldn't do any - // work. - if CURRENT_LOOP.set(self, || done()) { - return - } - - loop { - let amt; - // On Linux, Poll::poll is epoll_wait, which may return EINTR if a - // ptracer attaches. This retry loop prevents crashing when - // attaching strace, or similar. - let start = Instant::now(); - loop { - let timeout = self.timer_wheel.borrow().next_timeout().map(|t| { - if t < start { - Duration::new(0, 0) - } else { - t - start - } - }); - match self.io.poll(&mut self.events, timeout) { - Ok(a) => { - amt = a; - break; - } - Err(ref e) if e.kind() == ErrorKind::Interrupted => {} - err @ Err(_) => { - err.unwrap(); - } - } - } - debug!("loop poll - {:?}", start.elapsed()); - debug!("loop time - {:?}", Instant::now()); - - // First up, process all timeouts that may have just occurred. - let start = Instant::now(); - self.consume_timeouts(start); - - // Next, process all the events that came in. - for i in 0..self.events.len() { - let event = self.events.get(i).unwrap(); - let token = usize::from(event.token()); - - // Token 0 == our incoming message queue, so this means we - // process the whole queue of messages. - // - // Token 1 == we should poll the future, we'll do that right - // after we get through the rest of this tick of the event loop. - if token == 0 { - debug!("consuming notification queue"); - CURRENT_LOOP.set(&self, || { - self.consume_queue(); - }); - continue - } else if token == 1 { - if CURRENT_LOOP.set(self, || done()) { - return - } - continue - } - - trace!("event {:?} {:?}", event.kind(), event.token()); - - // For any other token we look at `dispatch` to see what we're - // supposed to do. If there's a waiter we get ready to notify - // it, and we also or-in atomically any events that have - // happened (currently read/write events). - let mut reader = None; - let mut writer = None; - if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) { - if event.kind().is_readable() { - reader = sched.reader.take(); - sched.readiness.fetch_or(1, Ordering::Relaxed); - } - if event.kind().is_writable() { - writer = sched.writer.take(); - sched.readiness.fetch_or(2, Ordering::Relaxed); - } - } else { - debug!("notified on {} which no longer exists", token); - } - - // If we actually got a waiter, then notify! - // - // TODO: don't notify the same task twice - if let Some(reader) = reader { - self.notify_handle(reader); - } - if let Some(writer) = writer { - self.notify_handle(writer); - } - } - - debug!("loop process - {} events, {:?}", amt, start.elapsed()); - } - } - - fn consume_timeouts(&mut self, now: Instant) { - loop { - let idx = match self.timer_wheel.borrow_mut().poll(now) { - Some(idx) => idx, - None => break, - }; - trace!("firing timeout: {}", idx); - let handle = self.timeouts.borrow_mut()[idx].1.fire(); - if let Some(handle) = handle { - self.notify_handle(handle); - } - } - } - - /// Method used to notify a task handle. - /// - /// Note that this should be used instead fo `handle.unpark()` to ensure - /// that the `CURRENT_LOOP` variable is set appropriately. - fn notify_handle(&self, handle: TaskHandle) { - debug!("notifying a task handle"); - CURRENT_LOOP.set(&self, || handle.unpark()); - } - - fn add_source(&self, source: &mio::Evented) -> io::Result { - debug!("adding a new I/O source"); - let sched = Scheduled { - readiness: Arc::new(AtomicUsize::new(0)), - reader: None, - writer: None, - }; - let mut dispatch = self.dispatch.borrow_mut(); - if dispatch.vacant_entry().is_none() { - let amt = dispatch.count(); - dispatch.grow(amt); - } - let entry = dispatch.vacant_entry().unwrap(); - try!(self.io.register(source, - mio::Token(entry.index()), - mio::EventSet::readable() | - mio::EventSet::writable(), - mio::PollOpt::edge())); - Ok(IoToken { - readiness: sched.readiness.clone(), - token: entry.insert(sched).index(), - }) - } - - fn drop_source(&self, token: usize) { - debug!("dropping I/O source: {}", token); - self.dispatch.borrow_mut().remove(token).unwrap(); - } - - fn schedule(&self, token: usize, wake: TaskHandle, dir: Direction) { - debug!("scheduling direction for: {}", token); - let to_call = { - let mut dispatch = self.dispatch.borrow_mut(); - let sched = dispatch.get_mut(token).unwrap(); - let (slot, bit) = match dir { - Direction::Read => (&mut sched.reader, 1), - Direction::Write => (&mut sched.writer, 2), - }; - let ready = sched.readiness.load(Ordering::SeqCst); - if ready & bit != 0 { - *slot = None; - sched.readiness.store(ready & !bit, Ordering::SeqCst); - Some(wake) - } else { - *slot = Some(wake); - None - } - }; - if let Some(to_call) = to_call { - debug!("schedule immediately done"); - self.notify_handle(to_call); - } - } - - fn add_timeout(&self, at: Instant) -> io::Result { - let mut timeouts = self.timeouts.borrow_mut(); - if timeouts.vacant_entry().is_none() { - let len = timeouts.count(); - timeouts.grow(len); - } - let entry = timeouts.vacant_entry().unwrap(); - let timeout = self.timer_wheel.borrow_mut().insert(at, entry.index()); - let when = *timeout.when(); - let entry = entry.insert((timeout, TimeoutState::NotFired)); - debug!("added a timeout: {}", entry.index()); - Ok(TimeoutToken { - token: entry.index(), - when: when, - }) - } - - fn update_timeout(&self, token: &TimeoutToken, handle: TaskHandle) { - debug!("updating a timeout: {}", token.token); - let to_wake = self.timeouts.borrow_mut()[token.token].1.block(handle); - if let Some(to_wake) = to_wake { - self.notify_handle(to_wake); - } - } - - fn cancel_timeout(&self, token: &TimeoutToken) { - debug!("cancel a timeout: {}", token.token); - let pair = self.timeouts.borrow_mut().remove(token.token); - if let Some((timeout, _state)) = pair { - self.timer_wheel.borrow_mut().cancel(&timeout); - } - } - - fn consume_queue(&self) { - // TODO: can we do better than `.unwrap()` here? - while let Some(msg) = self.rx.recv().unwrap() { - self.notify(msg); - } - } - - fn notify(&self, msg: Message) { - match msg { - Message::DropSource(tok) => self.drop_source(tok), - Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir), - - Message::AddTimeout(at, slot) => { - slot.try_produce(self.add_timeout(at)) - .ok().expect("interference with try_produce on timeout"); - } - Message::UpdateTimeout(t, handle) => self.update_timeout(&t, handle), - Message::CancelTimeout(t) => self.cancel_timeout(&t), - Message::Run(f) => { - debug!("running a closure"); - f.call() - } - Message::Drop(data) => { - debug!("dropping some data"); - drop(data); - } - } - } -} - -impl LoopHandle { - fn send(&self, msg: Message) { - self.with_loop(|lp| { - match lp { - Some(lp) => { - // Need to execute all existing requests first, to ensure - // that our message is processed "in order" - lp.consume_queue(); - lp.notify(msg); - } - None => { - match self.tx.inner.send(msg) { - Ok(()) => {} - - // This should only happen when there was an error - // writing to the pipe to wake up the event loop, - // hopefully that never happens - Err(e) => { - panic!("error sending message to event loop: {}", e) - } - } - } - } - }) - } - - fn with_loop(&self, f: F) -> R - where F: FnOnce(Option<&Loop>) -> R - { - if CURRENT_LOOP.is_set() { - CURRENT_LOOP.with(|lp| { - if lp.id == self.id { - f(Some(lp)) - } else { - f(None) - } - }) - } else { - f(None) - } - } - - /// Add a new source to an event loop, returning a future which will resolve - /// to the token that can be used to identify this source. - /// - /// When a new I/O object is created it needs to be communicated to the - /// event loop to ensure that it's registered and ready to receive - /// notifications. The event loop with then respond back with the I/O object - /// and a token which can be used to send more messages to the event loop. - /// - /// The token returned is then passed in turn to each of the methods below - /// to interact with notifications on the I/O object itself. - /// - /// # Panics - /// - /// The returned future will panic if the event loop this handle is - /// associated with has gone away, or if there is an error communicating - /// with the event loop. - pub fn add_source(&self, source: E) -> AddSource - where E: mio::Evented + Send + 'static, - { - AddSource { - inner: LoopFuture { - loop_handle: self.clone(), - data: Some(source), - result: None, - } - } - } - - /// Schedule the current future task to receive a notification when the - /// corresponding I/O object is readable. - /// - /// Once an I/O object has been registered with the event loop through the - /// `add_source` method, this method can be used with the assigned token to - /// notify the current future task when the next read notification comes in. - /// - /// The current task will only receive a notification **once** and to - /// receive further notifications it will need to call `schedule_read` - /// again. - /// - /// > **Note**: This method should generally not be used directly, but - /// > rather the `ReadinessStream` type should be used instead. - /// - /// # Panics - /// - /// This function will panic if the event loop this handle is associated - /// with has gone away, or if there is an error communicating with the event - /// loop. - /// - /// This function will also panic if there is not a currently running future - /// task. - pub fn schedule_read(&self, tok: &IoToken) { - self.send(Message::Schedule(tok.token, task::park(), Direction::Read)); - } - - /// Schedule the current future task to receive a notification when the - /// corresponding I/O object is writable. - /// - /// Once an I/O object has been registered with the event loop through the - /// `add_source` method, this method can be used with the assigned token to - /// notify the current future task when the next write notification comes - /// in. - /// - /// The current task will only receive a notification **once** and to - /// receive further notifications it will need to call `schedule_write` - /// again. - /// - /// > **Note**: This method should generally not be used directly, but - /// > rather the `ReadinessStream` type should be used instead. - /// - /// # Panics - /// - /// This function will panic if the event loop this handle is associated - /// with has gone away, or if there is an error communicating with the event - /// loop. - /// - /// This function will also panic if there is not a currently running future - /// task. - pub fn schedule_write(&self, tok: &IoToken) { - self.send(Message::Schedule(tok.token, task::park(), Direction::Write)); - } - - /// Unregister all information associated with a token on an event loop, - /// deallocating all internal resources assigned to the given token. - /// - /// This method should be called whenever a source of events is being - /// destroyed. This will ensure that the event loop can reuse `tok` for - /// another I/O object if necessary and also remove it from any poll - /// notifications and callbacks. - /// - /// Note that wake callbacks may still be invoked after this method is - /// called as it may take some time for the message to drop a source to - /// reach the event loop. Despite this fact, this method will attempt to - /// ensure that the callbacks are **not** invoked, so pending scheduled - /// callbacks cannot be relied upon to get called. - /// - /// > **Note**: This method should generally not be used directly, but - /// > rather the `ReadinessStream` type should be used instead. - /// - /// # Panics - /// - /// This function will panic if the event loop this handle is associated - /// with has gone away, or if there is an error communicating with the event - /// loop. - pub fn drop_source(&self, tok: &IoToken) { - self.send(Message::DropSource(tok.token)); - } - - /// Adds a new timeout to get fired at the specified instant, notifying the - /// specified task. - pub fn add_timeout(&self, at: Instant) -> AddTimeout { - AddTimeout { - inner: LoopFuture { - loop_handle: self.clone(), - data: Some(at), - result: None, - }, - } - } - - /// Updates a previously added timeout to notify a new task instead. - /// - /// # Panics - /// - /// This method will panic if the timeout specified was not created by this - /// loop handle's `add_timeout` method. - pub fn update_timeout(&self, timeout: &TimeoutToken) { - let timeout = TimeoutToken { token: timeout.token, when: timeout.when }; - self.send(Message::UpdateTimeout(timeout, task::park())) - } - - /// Cancel a previously added timeout. - /// - /// # Panics - /// - /// This method will panic if the timeout specified was not created by this - /// loop handle's `add_timeout` method. - pub fn cancel_timeout(&self, timeout: &TimeoutToken) { - debug!("cancel timeout {}", timeout.token); - let timeout = TimeoutToken { token: timeout.token, when: timeout.when }; - self.send(Message::CancelTimeout(timeout)) - } - - /// Schedules a closure to add some data to event loop thread itself. - /// - /// This function is useful for when storing non-`Send` data inside of a - /// future. This returns a future which will resolve to a `LoopData` - /// handle, which is itself `Send + 'static` regardless of the underlying - /// `A`. That is, for example, you can create a handle to some data that - /// contains an `Rc`, for example. - /// - /// This function takes a closure which may be sent to the event loop to - /// generate an instance of type `A`. The closure itself is required to be - /// `Send + 'static`, but the data it produces is only required to adhere to - /// `'static`. - /// - /// If the returned future is polled on the event loop thread itself it will - /// very cheaply resolve to a handle to the data, but if it's not polled on - /// the event loop then it will send a message to the event loop to run the - /// closure `f`, generate a handle, and then the future will yield it back. - // TODO: more with examples - pub fn add_loop_data(&self, f: F) -> AddLoopData - where F: FnOnce() -> A + Send + 'static, - A: 'static, - { - AddLoopData { - inner: LoopFuture { - loop_handle: self.clone(), - data: Some(f), - result: None, - }, - } - } -} - -impl LoopPin { - /// Adds some data to the event loop this pin is associated with. - /// - /// This method will return a handle to the data, `LoopData`, which can be - /// used to access the underlying data whenever it's on the correct event - /// loop thread. - pub fn add_loop_data(&self, a: A) -> LoopData - where A: 'static, - { - LoopData { - data: DropBox::new_on(a, self), - handle: self.handle.clone(), - } - } - - /// Returns a reference to the underlying handle to the event loop. - pub fn handle(&self) -> &LoopHandle { - &self.handle - } - - /// TODO: dox - pub fn executor(&self) -> Arc { - self.handle.tx.clone() - } -} - -/// A future which will resolve a unique `tok` token for an I/O object. -/// -/// Created through the `LoopHandle::add_source` method, this future can also -/// resolve to an error if there's an issue communicating with the event loop. -pub struct AddSource { - inner: LoopFuture<(E, IoToken), E>, -} - -/// A token that identifies an active timeout. -pub struct IoToken { - token: usize, - // TODO: can we avoid this allocation? It's kind of a bummer... - readiness: Arc, -} - -impl IoToken { - /// Consumes the last readiness notification the token this source is for - /// registered. - /// - /// Currently sources receive readiness notifications on an edge-basis. That - /// is, once you receive a notification that an object can be read, you - /// won't receive any more notifications until all of that data has been - /// read. - /// - /// The event loop will fill in this information and then inform futures - /// that they're ready to go with the `schedule` method, and then the `poll` - /// method can use this to figure out what happened. - /// - /// > **Note**: This method should generally not be used directly, but - /// > rather the `ReadinessStream` type should be used instead. - // TODO: this should really return a proper newtype/enum, not a usize - pub fn take_readiness(&self) -> usize { - self.readiness.swap(0, Ordering::SeqCst) - } -} - -impl Future for AddSource - where E: mio::Evented + Send + 'static, -{ - type Item = (E, IoToken); - type Error = io::Error; - - fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { - let handle = self.inner.loop_handle.clone(); - self.inner.poll(|lp, io| { - let token = try!(lp.add_source(&io)); - Ok((io, token)) - }, |io, slot| { - Message::Run(Box::new(move || { - let res = handle.with_loop(|lp| { - let lp = lp.unwrap(); - let token = try!(lp.add_source(&io)); - Ok((io, token)) - }); - slot.try_produce(res).ok() - .expect("add source try_produce intereference"); - })) - }) - } -} - -/// Return value from the `LoopHandle::add_timeout` method, a future that will -/// resolve to a `TimeoutToken` to configure the behavior of that timeout. -pub struct AddTimeout { - inner: LoopFuture, -} - -/// A token that identifies an active timeout. -pub struct TimeoutToken { - token: usize, - when: Instant, -} - -impl Future for AddTimeout { - type Item = TimeoutToken; - type Error = io::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll(Loop::add_timeout, Message::AddTimeout) - } -} - -impl TimeoutToken { - /// Returns the instant in time when this timeout token will "fire". - /// - /// Note that this instant may *not* be the instant that was passed in when - /// the timeout was created. The event loop does not support high resolution - /// timers, so the exact resolution of when a timeout may fire may be - /// slightly fudged. - pub fn when(&self) -> &Instant { - &self.when - } -} - -/// A handle to data that is owned by an event loop thread, and is only -/// accessible on that thread itself. -/// -/// This structure is created by the `LoopHandle::add_loop_data` method which -/// will return a future resolving to one of these references. A `LoopData` -/// handle is `Send` regardless of what `A` is, but the internal data can only -/// be accessed on the event loop thread itself. -/// -/// Internally this reference also stores a handle to the event loop that the -/// data originated on, so it knows how to go back to the event loop to access -/// the data itself. -// TODO: write more once it's implemented -pub struct LoopData { - data: DropBox, - handle: LoopHandle, -} - -/// Future returned from the `LoopHandle::add_loop_data` method. -/// -/// This future will resolve to a `LoopData` reference when completed, which -/// represents a handle to data that is "owned" by the event loop thread but can -/// migrate among threads temporarily so travel with a future itself. -pub struct AddLoopData { - inner: LoopFuture, F>, -} - -fn _assert() { - fn _assert_send() {} - _assert_send::>(); -} - -impl Future for AddLoopData - where F: FnOnce() -> A + Send + 'static, - A: 'static, -{ - type Item = LoopData; - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - let ret = self.inner.poll(|_lp, f| { - Ok(DropBox::new(f())) - }, |f, slot| { - Message::Run(Box::new(move || { - slot.try_produce(Ok(DropBox::new(f()))).ok() - .expect("add loop data try_produce intereference"); - })) - }); - - ret.map(|data| { - LoopData { - data: data, - handle: self.inner.loop_handle.clone(), - } - }) - } -} - -impl LoopData { - /// Gets a shared reference to the underlying data in this handle. - /// - /// Returns `None` if it is not called from the event loop thread that this - /// `LoopData` is associated with, or `Some` with a reference to the data - /// if we are indeed on the event loop thread. - pub fn get(&self) -> Option<&A> { - self.data.get() - } - - /// Gets a mutable reference to the underlying data in this handle. - /// - /// Returns `None` if it is not called from the event loop thread that this - /// `LoopData` is associated with, or `Some` with a reference to the data - /// if we are indeed on the event loop thread. - pub fn get_mut(&mut self) -> Option<&mut A> { - self.data.get_mut() - } - - /// Acquire the executor associated with the thread that owns this - /// `LoopData`'s data. - /// - /// If the `get` and `get_mut` functions above return `None`, then this data - /// is being polled on the wrong thread to access the data, and to make - /// progress a future may need to migrate to the actual thread which owns - /// the relevant data. - /// - /// This executor can in turn be passed to `Task::poll_on`, which will then - /// move the entire future to be polled on the right thread. - pub fn executor(&self) -> Arc { - self.handle.tx.clone() - } - - /// Returns a reference to the handle that this data is bound to. - pub fn loop_handle(&self) -> &LoopHandle { - &self.handle - } -} - -impl Future for LoopData { - type Item = A::Item; - type Error = A::Error; - - fn poll(&mut self) -> Poll { - // If we're on the right thread, then we can proceed. Otherwise we need - // to go and get polled on the right thread. - if let Some(inner) = self.get_mut() { - return inner.poll() - } - task::poll_on(self.executor()); - Poll::NotReady - } -} - -impl Drop for LoopData { - fn drop(&mut self) { - // The `DropBox` we store internally will cause a memory leak if it's - // dropped on the wrong thread. While necessary for safety, we don't - // actually want a memory leak, so for all normal circumstances we take - // out the `DropBox` as a `DropBox` and then we send it off - // to the event loop. - // - // TODO: possible optimization is to do none of this if we're on the - // event loop thread itself - if let Some(data) = self.data.take() { - self.handle.send(Message::Drop(data)); - } - } -} - -/// A curious inner module with one `unsafe` keyword, yet quite an important -/// one! -/// -/// The purpose of this module is to define a type, `DropBox`, which is able -/// to be sent across thread event when the underlying data `A` is itself not -/// sendable across threads. This is then in turn used to build up the -/// `LoopData` abstraction above. -/// -/// A `DropBox` currently contains two major components, an identification of -/// the thread that it originated from as well as the data itself. Right now the -/// data is stored in a `Box` as we'll transition between it and `Box`, -/// but this is perhaps optimizable. -/// -/// The `DropBox` itself only provides a few safe methods, all of which are -/// safe to call from any thread. Access to the underlying data is only granted -/// if we're on the right thread, and otherwise the methods don't access the -/// data itself. -/// -/// Finally, one crucial piece, if the data is dropped it may run code that -/// assumes it's on the original thread. For this reason we have to be sure that -/// the data is only dropped on the originating thread itself. It's currently -/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the -/// right thread, so we don't attempt to perform any communication in this -/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong -/// thread, it simply leaks its contents. -/// -/// All that's really just a lot of words in an attempt to justify the `unsafe` -/// impl of `Send` below. The idea is that the data is only ever accessed on the -/// originating thread, even during `Drop`. -/// -/// Note that this is a private module to have a visibility boundary around the -/// unsafe internals. Although there's not any unsafe blocks here, the code -/// itself is quite unsafe as it has to make sure that the data is dropped in -/// the right place, if ever. -mod dropbox { - use std::mem; - use super::{CURRENT_LOOP, LoopPin}; - - pub struct DropBox { - id: usize, - inner: Option>, - } - - // We can be sent across threads due to the comment above - unsafe impl Send for DropBox {} - - // We can also be shared across threads just fine as we'll only ever get a - // reference on at most one thread, regardless of `A`. - unsafe impl Sync for DropBox {} - - pub trait MyDrop {} - impl MyDrop for T {} - - impl DropBox { - /// Creates a new `DropBox` pinned to the current threads. - /// - /// Will panic if `CURRENT_LOOP` isn't set. - pub fn new(a: A) -> DropBox { - DropBox { - id: CURRENT_LOOP.with(|lp| lp.id), - inner: Some(Box::new(a)), - } - } - - /// Creates a new `DropBox` pinned to the thread of `LoopPin`. - pub fn new_on(a: A, lp: &LoopPin) -> DropBox { - DropBox { - id: lp.handle.id, - inner: Some(Box::new(a)), - } - } - - /// Consumes the contents of this `DropBox`, returning a new - /// `DropBox`. - /// - /// This is just intended to be a simple and cheap conversion, should - /// almost always return `Some`. - pub fn take<'a>(&mut self) -> Option> - where A: 'a - { - self.inner.take().map(|d| { - DropBox { id: self.id, inner: Some(d as Box) } - }) - } - } - - impl DropBox { - /// Returns a shared reference to the data if we're on the right - /// thread. - pub fn get(&self) -> Option<&A> { - if CURRENT_LOOP.is_set() { - CURRENT_LOOP.with(|lp| { - if lp.id == self.id { - self.inner.as_ref().map(|b| &**b) - } else { - None - } - }) - } else { - None - } - } - - /// Returns a mutable reference to the data if we're on the right - /// thread. - pub fn get_mut(&mut self) -> Option<&mut A> { - if CURRENT_LOOP.is_set() { - CURRENT_LOOP.with(move |lp| { - if lp.id == self.id { - self.inner.as_mut().map(|b| &mut **b) - } else { - None - } - }) - } else { - None - } - } - } - - impl Drop for DropBox { - fn drop(&mut self) { - // Try our safe accessor first, and if it works then we know that - // we're on the right thread. In that case we can simply drop as - // usual. - if let Some(a) = self.get_mut().take() { - return drop(a) - } - - // If we're on the wrong thread but we actually have some data, then - // something in theory horrible has gone awry. Prevent memory safety - // issues by forgetting the data and then also warn about this odd - // event. - if let Some(data) = self.inner.take() { - mem::forget(data); - warn!("forgetting some data on an event loop"); - } - } - } -} - -struct LoopFuture { - loop_handle: LoopHandle, - data: Option, - result: Option<(Arc>>, slot::Token)>, -} - -impl LoopFuture - where T: 'static, -{ - fn poll(&mut self, f: F, g: G) -> Poll - where F: FnOnce(&Loop, U) -> io::Result, - G: FnOnce(U, Arc>>) -> Message, - { - match self.result { - Some((ref result, ref mut token)) => { - result.cancel(*token); - match result.try_consume() { - Ok(t) => return t.into(), - Err(_) => {} - } - let task = task::park(); - *token = result.on_full(move |_| { - task.unpark(); - }); - return Poll::NotReady - } - None => { - let data = &mut self.data; - let ret = self.loop_handle.with_loop(|lp| { - lp.map(|lp| f(lp, data.take().unwrap())) - }); - if let Some(ret) = ret { - debug!("loop future done immediately on event loop"); - return ret.into() - } - debug!("loop future needs to send info to event loop"); - - let task = task::park(); - let result = Arc::new(Slot::new(None)); - let token = result.on_full(move |_| { - task.unpark(); - }); - self.result = Some((result.clone(), token)); - self.loop_handle.send(g(data.take().unwrap(), result)); - Poll::NotReady - } - } - } -} - -impl TimeoutState { - fn block(&mut self, handle: TaskHandle) -> Option { - match *self { - TimeoutState::Fired => return Some(handle), - _ => {} - } - *self = TimeoutState::Waiting(handle); - None - } - - fn fire(&mut self) -> Option { - match mem::replace(self, TimeoutState::Fired) { - TimeoutState::NotFired => None, - TimeoutState::Fired => panic!("fired twice?"), - TimeoutState::Waiting(handle) => Some(handle), - } - } -} - -impl Executor for MioSender { - fn execute_boxed(&self, callback: Box) { - self.inner.send(Message::Run(callback)) - .expect("error sending a message to the event loop") - } -} diff --git a/src/channel.rs b/src/event_loop/channel.rs similarity index 100% rename from src/channel.rs rename to src/event_loop/channel.rs diff --git a/src/event_loop/loop_data.rs b/src/event_loop/loop_data.rs new file mode 100644 index 000000000..28086d4fd --- /dev/null +++ b/src/event_loop/loop_data.rs @@ -0,0 +1,347 @@ +use std::sync::Arc; +use std::io; + +use futures::{Future, Poll}; +use futures::task; +use futures::executor::Executor; + +use event_loop::{Message, Loop, LoopPin, LoopHandle, LoopFuture}; +use self::dropbox::DropBox; + +/// A handle to data that is owned by an event loop thread, and is only +/// accessible on that thread itself. +/// +/// This structure is created by the `LoopHandle::add_loop_data` method which +/// will return a future resolving to one of these references. A `LoopData` +/// handle is `Send` regardless of what `A` is, but the internal data can only +/// be accessed on the event loop thread itself. +/// +/// Internally this reference also stores a handle to the event loop that the +/// data originated on, so it knows how to go back to the event loop to access +/// the data itself. +// TODO: write more once it's implemented +pub struct LoopData { + data: DropBox, + handle: LoopHandle, +} + +pub struct Opaque { + _inner: DropBox, +} + +/// Future returned from the `LoopHandle::add_loop_data` method. +/// +/// This future will resolve to a `LoopData` reference when completed, which +/// represents a handle to data that is "owned" by the event loop thread but can +/// migrate among threads temporarily so travel with a future itself. +pub struct AddLoopData { + inner: LoopFuture, F>, +} + +fn _assert() { + fn _assert_send() {} + _assert_send::>(); +} + +impl Loop { + /// Creates a new `LoopData` handle by associating data to be directly + /// stored by this event loop. + /// + /// This function is useful for when storing non-`Send` data inside of a + /// future. The `LoopData` handle is itself `Send + 'static` regardless + /// of the underlying `A`. That is, for example, you can create a handle to + /// some data that contains an `Rc`, for example. + pub fn add_loop_data(&self, a: A) -> LoopData + where A: 'static, + { + self.pin().add_loop_data(a) + } +} + +impl LoopPin { + /// Adds some data to the event loop this pin is associated with. + /// + /// This method will return a handle to the data, `LoopData`, which can be + /// used to access the underlying data whenever it's on the correct event + /// loop thread. + pub fn add_loop_data(&self, a: A) -> LoopData + where A: 'static, + { + LoopData { + data: DropBox::new_on(a, self), + handle: self.handle.clone(), + } + } +} + +impl LoopHandle { + + /// Schedules a closure to add some data to event loop thread itself. + /// + /// This function is useful for when storing non-`Send` data inside of a + /// future. This returns a future which will resolve to a `LoopData` + /// handle, which is itself `Send + 'static` regardless of the underlying + /// `A`. That is, for example, you can create a handle to some data that + /// contains an `Rc`, for example. + /// + /// This function takes a closure which may be sent to the event loop to + /// generate an instance of type `A`. The closure itself is required to be + /// `Send + 'static`, but the data it produces is only required to adhere to + /// `'static`. + /// + /// If the returned future is polled on the event loop thread itself it will + /// very cheaply resolve to a handle to the data, but if it's not polled on + /// the event loop then it will send a message to the event loop to run the + /// closure `f`, generate a handle, and then the future will yield it back. + // TODO: more with examples + pub fn add_loop_data(&self, f: F) -> AddLoopData + where F: FnOnce() -> A + Send + 'static, + A: 'static, + { + AddLoopData { + inner: LoopFuture { + loop_handle: self.clone(), + data: Some(f), + result: None, + }, + } + } +} + +impl Future for AddLoopData + where F: FnOnce() -> A + Send + 'static, + A: 'static, +{ + type Item = LoopData; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + let ret = self.inner.poll(|_lp, f| { + Ok(DropBox::new(f())) + }, |f, slot| { + Message::Run(Box::new(move || { + slot.try_produce(Ok(DropBox::new(f()))).ok() + .expect("add loop data try_produce intereference"); + })) + }); + + ret.map(|data| { + LoopData { + data: data, + handle: self.inner.loop_handle.clone(), + } + }) + } +} + +impl LoopData { + /// Gets a shared reference to the underlying data in this handle. + /// + /// Returns `None` if it is not called from the event loop thread that this + /// `LoopData` is associated with, or `Some` with a reference to the data + /// if we are indeed on the event loop thread. + pub fn get(&self) -> Option<&A> { + self.data.get() + } + + /// Gets a mutable reference to the underlying data in this handle. + /// + /// Returns `None` if it is not called from the event loop thread that this + /// `LoopData` is associated with, or `Some` with a reference to the data + /// if we are indeed on the event loop thread. + pub fn get_mut(&mut self) -> Option<&mut A> { + self.data.get_mut() + } + + /// Acquire the executor associated with the thread that owns this + /// `LoopData`'s data. + /// + /// If the `get` and `get_mut` functions above return `None`, then this data + /// is being polled on the wrong thread to access the data, and to make + /// progress a future may need to migrate to the actual thread which owns + /// the relevant data. + /// + /// This executor can in turn be passed to `Task::poll_on`, which will then + /// move the entire future to be polled on the right thread. + pub fn executor(&self) -> Arc { + self.handle.tx.clone() + } + + /// Returns a reference to the handle that this data is bound to. + pub fn loop_handle(&self) -> &LoopHandle { + &self.handle + } +} + +impl Future for LoopData { + type Item = A::Item; + type Error = A::Error; + + fn poll(&mut self) -> Poll { + // If we're on the right thread, then we can proceed. Otherwise we need + // to go and get polled on the right thread. + if let Some(inner) = self.get_mut() { + return inner.poll() + } + task::poll_on(self.executor()); + Poll::NotReady + } +} + +impl Drop for LoopData { + fn drop(&mut self) { + // The `DropBox` we store internally will cause a memory leak if it's + // dropped on the wrong thread. While necessary for safety, we don't + // actually want a memory leak, so for all normal circumstances we take + // out the `DropBox` as a `DropBox` and then we send it off + // to the event loop. + // + // TODO: possible optimization is to do none of this if we're on the + // event loop thread itself + if let Some(data) = self.data.take() { + self.handle.send(Message::Drop(Opaque { _inner: data })); + } + } +} + +/// A curious inner module with one `unsafe` keyword, yet quite an important +/// one! +/// +/// The purpose of this module is to define a type, `DropBox`, which is able +/// to be sent across thread event when the underlying data `A` is itself not +/// sendable across threads. This is then in turn used to build up the +/// `LoopData` abstraction above. +/// +/// A `DropBox` currently contains two major components, an identification of +/// the thread that it originated from as well as the data itself. Right now the +/// data is stored in a `Box` as we'll transition between it and `Box`, +/// but this is perhaps optimizable. +/// +/// The `DropBox` itself only provides a few safe methods, all of which are +/// safe to call from any thread. Access to the underlying data is only granted +/// if we're on the right thread, and otherwise the methods don't access the +/// data itself. +/// +/// Finally, one crucial piece, if the data is dropped it may run code that +/// assumes it's on the original thread. For this reason we have to be sure that +/// the data is only dropped on the originating thread itself. It's currently +/// the job of the outer `LoopData` to ensure that a `DropBox` is dropped on the +/// right thread, so we don't attempt to perform any communication in this +/// `Drop` implementation. Instead, if a `DropBox` is dropped on the wrong +/// thread, it simply leaks its contents. +/// +/// All that's really just a lot of words in an attempt to justify the `unsafe` +/// impl of `Send` below. The idea is that the data is only ever accessed on the +/// originating thread, even during `Drop`. +/// +/// Note that this is a private module to have a visibility boundary around the +/// unsafe internals. Although there's not any unsafe blocks here, the code +/// itself is quite unsafe as it has to make sure that the data is dropped in +/// the right place, if ever. +mod dropbox { + use std::mem; + use event_loop::{CURRENT_LOOP, LoopPin}; + + pub struct DropBox { + id: usize, + inner: Option>, + } + + // We can be sent across threads due to the comment above + unsafe impl Send for DropBox {} + + // We can also be shared across threads just fine as we'll only ever get a + // reference on at most one thread, regardless of `A`. + unsafe impl Sync for DropBox {} + + pub trait MyDrop {} + impl MyDrop for T {} + + impl DropBox { + /// Creates a new `DropBox` pinned to the current threads. + /// + /// Will panic if `CURRENT_LOOP` isn't set. + pub fn new(a: A) -> DropBox { + DropBox { + id: CURRENT_LOOP.with(|lp| lp.id), + inner: Some(Box::new(a)), + } + } + + /// Creates a new `DropBox` pinned to the thread of `LoopPin`. + pub fn new_on(a: A, lp: &LoopPin) -> DropBox { + DropBox { + id: lp.handle.id, + inner: Some(Box::new(a)), + } + } + + /// Consumes the contents of this `DropBox`, returning a new + /// `DropBox`. + /// + /// This is just intended to be a simple and cheap conversion, should + /// almost always return `Some`. + pub fn take<'a>(&mut self) -> Option> + where A: 'a + { + self.inner.take().map(|d| { + DropBox { id: self.id, inner: Some(d as Box) } + }) + } + } + + impl DropBox { + /// Returns a shared reference to the data if we're on the right + /// thread. + pub fn get(&self) -> Option<&A> { + if CURRENT_LOOP.is_set() { + CURRENT_LOOP.with(|lp| { + if lp.id == self.id { + self.inner.as_ref().map(|b| &**b) + } else { + None + } + }) + } else { + None + } + } + + /// Returns a mutable reference to the data if we're on the right + /// thread. + pub fn get_mut(&mut self) -> Option<&mut A> { + if CURRENT_LOOP.is_set() { + CURRENT_LOOP.with(move |lp| { + if lp.id == self.id { + self.inner.as_mut().map(|b| &mut **b) + } else { + None + } + }) + } else { + None + } + } + } + + impl Drop for DropBox { + fn drop(&mut self) { + // Try our safe accessor first, and if it works then we know that + // we're on the right thread. In that case we can simply drop as + // usual. + if let Some(a) = self.get_mut().take() { + return drop(a) + } + + // If we're on the wrong thread but we actually have some data, then + // something in theory horrible has gone awry. Prevent memory safety + // issues by forgetting the data and then also warn about this odd + // event. + if let Some(data) = self.inner.take() { + mem::forget(data); + warn!("forgetting some data on an event loop"); + } + } + } +} + diff --git a/src/event_loop/mod.rs b/src/event_loop/mod.rs new file mode 100644 index 000000000..f8dbc95c3 --- /dev/null +++ b/src/event_loop/mod.rs @@ -0,0 +1,596 @@ +use std::cell::RefCell; +use std::io::{self, ErrorKind}; +use std::marker; +use std::mem; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; +use std::time::{Instant, Duration}; + +use futures::{Future, Poll}; +use futures::task::{self, Task, Notify, TaskHandle}; +use futures::executor::{ExecuteCallback, Executor}; +use mio; +use slab::Slab; + +use slot::{self, Slot}; +use timer_wheel::{TimerWheel, Timeout}; + +mod channel; +mod loop_data; +mod source; +mod timeout; +pub use self::loop_data::{LoopData, AddLoopData}; +pub use self::source::{AddSource, IoToken}; +pub use self::timeout::{AddTimeout, TimeoutToken}; +use self::channel::{Sender, Receiver, channel}; + +static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT; +scoped_thread_local!(static CURRENT_LOOP: Loop); + +const SLAB_CAPACITY: usize = 1024 * 64; + +/// An event loop. +/// +/// The event loop is the main source of blocking in an application which drives +/// all other I/O events and notifications happening. Each event loop can have +/// multiple handles pointing to it, each of which can then be used to create +/// various I/O objects to interact with the event loop in interesting ways. +// TODO: expand this +pub struct Loop { + id: usize, + io: mio::Poll, + events: mio::Events, + tx: Arc, + rx: Receiver, + dispatch: RefCell>, + _future_registration: mio::Registration, + future_readiness: Arc, + + // Timer wheel keeping track of all timeouts. The `usize` stored in the + // timer wheel is an index into the slab below. + // + // The slab below keeps track of the timeouts themselves as well as the + // state of the timeout itself. The `TimeoutToken` type is an index into the + // `timeouts` slab. + timer_wheel: RefCell>, + timeouts: RefCell>, + + // A `Loop` cannot be sent to other threads as it's used as a proxy for data + // that belongs to the thread the loop was running on at some point. In + // other words, the safety of `DropBox` below relies on loops not crossing + // threads. + _marker: marker::PhantomData>, +} + +struct MioSender { + inner: Sender, +} + +/// Handle to an event loop, used to construct I/O objects, send messages, and +/// otherwise interact indirectly with the event loop itself. +/// +/// Handles can be cloned, and when cloned they will still refer to the +/// same underlying event loop. +#[derive(Clone)] +pub struct LoopHandle { + id: usize, + tx: Arc, +} + +/// A non-sendable handle to an event loop, useful for manufacturing instances +/// of `LoopData`. +#[derive(Clone)] +pub struct LoopPin { + handle: LoopHandle, + _marker: marker::PhantomData>, +} + +struct Scheduled { + readiness: Arc, + reader: Option, + writer: Option, +} + +enum TimeoutState { + NotFired, + Fired, + Waiting(TaskHandle), +} + +enum Direction { + Read, + Write, +} + +enum Message { + DropSource(usize), + Schedule(usize, TaskHandle, Direction), + AddTimeout(Instant, Arc>>), + UpdateTimeout(usize, TaskHandle), + CancelTimeout(usize), + Run(Box), + Drop(loop_data::Opaque), +} + +impl Loop { + /// Creates a new event loop, returning any error that happened during the + /// creation. + pub fn new() -> io::Result { + let (tx, rx) = channel(); + let io = try!(mio::Poll::new()); + try!(io.register(&rx, + mio::Token(0), + mio::EventSet::readable(), + mio::PollOpt::edge())); + let pair = mio::Registration::new(&io, + mio::Token(1), + mio::EventSet::readable(), + mio::PollOpt::level()); + let (registration, readiness) = pair; + Ok(Loop { + id: NEXT_LOOP_ID.fetch_add(1, Ordering::Relaxed), + io: io, + events: mio::Events::new(), + tx: Arc::new(MioSender { inner: tx }), + rx: rx, + _future_registration: registration, + future_readiness: Arc::new(readiness), + dispatch: RefCell::new(Slab::new_starting_at(2, SLAB_CAPACITY)), + timeouts: RefCell::new(Slab::new_starting_at(0, SLAB_CAPACITY)), + timer_wheel: RefCell::new(TimerWheel::new()), + _marker: marker::PhantomData, + }) + } + + /// Generates a handle to this event loop used to construct I/O objects and + /// send messages. + /// + /// Handles to an event loop are cloneable as well and clones will always + /// refer to the same event loop. + pub fn handle(&self) -> LoopHandle { + LoopHandle { + id: self.id, + tx: self.tx.clone(), + } + } + + /// Returns a "pin" of this event loop which cannot be sent across threads + /// but can be used as a proxy to the event loop itself. + /// + /// Currently the primary use for this is to use as a handle to add data + /// to the event loop directly. The `LoopPin::add_loop_data` method can + /// be used to immediately create instances of `LoopData` structures. + pub fn pin(&self) -> LoopPin { + LoopPin { + handle: self.handle(), + _marker: marker::PhantomData, + } + } + + /// Runs a future until completion, driving the event loop while we're + /// otherwise waiting for the future to complete. + /// + /// This function will begin executing the event loop and will finish once + /// the provided future is resolve. Note that the future argument here + /// crucially does not require the `'static` nor `Send` bounds. As a result + /// the future will be "pinned" to not only this thread but also this stack + /// frame. + /// + /// This function will returns the value that the future resolves to once + /// the future has finished. If the future never resolves then this function + /// will never return. + /// + /// # Panics + /// + /// This method will **not** catch panics from polling the future `f`. If + /// the future panics then it's the responsibility of the caller to catch + /// that panic and handle it as appropriate. + /// + /// Similarly, becuase the provided future will be pinned not only to this + /// thread but also to this task, any attempt to poll the future on a + /// separate thread will result in a panic. That is, calls to + /// `task::poll_on` must be avoided. + pub fn run(&mut self, mut f: F) -> Result + where F: Future, + { + struct MyNotify(Arc); + + impl Notify for MyNotify { + fn notify(&self) { + self.0.set_readiness(mio::EventSet::readable()) + .expect("failed to set readiness"); + } + } + + // First up, create the task that will drive this future. The task here + // isn't a "normal task" but rather one where we define what to do when + // a readiness notification comes in. + // + // We translate readiness notifications to a `set_readiness` of our + // `future_readiness` structure we have stored internally. + let mut task = Task::new_notify(MyNotify(self.future_readiness.clone())); + let ready = self.future_readiness.clone(); + + // Next, move all that data into a dynamically dispatched closure to cut + // down on monomorphization costs. Inside this closure we unset the + // readiness of the future (as we're about to poll it) and then we check + // to see if it's done. If it's not then the event loop will turn again. + let mut res = None; + self._run(&mut || { + ready.set_readiness(mio::EventSet::none()) + .expect("failed to set readiness"); + assert!(res.is_none()); + match task.enter(|| f.poll()) { + Poll::NotReady => {} + Poll::Ok(e) => res = Some(Ok(e)), + Poll::Err(e) => res = Some(Err(e)), + } + res.is_some() + }); + res.expect("run should not return until future is done") + } + + fn _run(&mut self, done: &mut FnMut() -> bool) { + // Check to see if we're done immediately, if so we shouldn't do any + // work. + if CURRENT_LOOP.set(self, || done()) { + return + } + + loop { + let amt; + // On Linux, Poll::poll is epoll_wait, which may return EINTR if a + // ptracer attaches. This retry loop prevents crashing when + // attaching strace, or similar. + let start = Instant::now(); + loop { + let timeout = self.timer_wheel.borrow().next_timeout().map(|t| { + if t < start { + Duration::new(0, 0) + } else { + t - start + } + }); + match self.io.poll(&mut self.events, timeout) { + Ok(a) => { + amt = a; + break; + } + Err(ref e) if e.kind() == ErrorKind::Interrupted => {} + err @ Err(_) => { + err.unwrap(); + } + } + } + debug!("loop poll - {:?}", start.elapsed()); + debug!("loop time - {:?}", Instant::now()); + + // First up, process all timeouts that may have just occurred. + let start = Instant::now(); + self.consume_timeouts(start); + + // Next, process all the events that came in. + for i in 0..self.events.len() { + let event = self.events.get(i).unwrap(); + let token = usize::from(event.token()); + + // Token 0 == our incoming message queue, so this means we + // process the whole queue of messages. + // + // Token 1 == we should poll the future, we'll do that right + // after we get through the rest of this tick of the event loop. + if token == 0 { + debug!("consuming notification queue"); + CURRENT_LOOP.set(&self, || { + self.consume_queue(); + }); + continue + } else if token == 1 { + if CURRENT_LOOP.set(self, || done()) { + return + } + continue + } + + trace!("event {:?} {:?}", event.kind(), event.token()); + + // For any other token we look at `dispatch` to see what we're + // supposed to do. If there's a waiter we get ready to notify + // it, and we also or-in atomically any events that have + // happened (currently read/write events). + let mut reader = None; + let mut writer = None; + if let Some(sched) = self.dispatch.borrow_mut().get_mut(token) { + if event.kind().is_readable() { + reader = sched.reader.take(); + sched.readiness.fetch_or(1, Ordering::Relaxed); + } + if event.kind().is_writable() { + writer = sched.writer.take(); + sched.readiness.fetch_or(2, Ordering::Relaxed); + } + } else { + debug!("notified on {} which no longer exists", token); + } + + // If we actually got a waiter, then notify! + // + // TODO: don't notify the same task twice + if let Some(reader) = reader { + self.notify_handle(reader); + } + if let Some(writer) = writer { + self.notify_handle(writer); + } + } + + debug!("loop process - {} events, {:?}", amt, start.elapsed()); + } + } + + fn consume_timeouts(&mut self, now: Instant) { + loop { + let idx = match self.timer_wheel.borrow_mut().poll(now) { + Some(idx) => idx, + None => break, + }; + trace!("firing timeout: {}", idx); + let handle = self.timeouts.borrow_mut()[idx].1.fire(); + if let Some(handle) = handle { + self.notify_handle(handle); + } + } + } + + /// Method used to notify a task handle. + /// + /// Note that this should be used instead fo `handle.unpark()` to ensure + /// that the `CURRENT_LOOP` variable is set appropriately. + fn notify_handle(&self, handle: TaskHandle) { + debug!("notifying a task handle"); + CURRENT_LOOP.set(&self, || handle.unpark()); + } + + fn add_source(&self, source: &mio::Evented) + -> io::Result<(Arc, usize)> { + debug!("adding a new I/O source"); + let sched = Scheduled { + readiness: Arc::new(AtomicUsize::new(0)), + reader: None, + writer: None, + }; + let mut dispatch = self.dispatch.borrow_mut(); + if dispatch.vacant_entry().is_none() { + let amt = dispatch.count(); + dispatch.grow(amt); + } + let entry = dispatch.vacant_entry().unwrap(); + try!(self.io.register(source, + mio::Token(entry.index()), + mio::EventSet::readable() | + mio::EventSet::writable(), + mio::PollOpt::edge())); + Ok((sched.readiness.clone(), entry.insert(sched).index())) + } + + fn drop_source(&self, token: usize) { + debug!("dropping I/O source: {}", token); + self.dispatch.borrow_mut().remove(token).unwrap(); + } + + fn schedule(&self, token: usize, wake: TaskHandle, dir: Direction) { + debug!("scheduling direction for: {}", token); + let to_call = { + let mut dispatch = self.dispatch.borrow_mut(); + let sched = dispatch.get_mut(token).unwrap(); + let (slot, bit) = match dir { + Direction::Read => (&mut sched.reader, 1), + Direction::Write => (&mut sched.writer, 2), + }; + let ready = sched.readiness.load(Ordering::SeqCst); + if ready & bit != 0 { + *slot = None; + sched.readiness.store(ready & !bit, Ordering::SeqCst); + Some(wake) + } else { + *slot = Some(wake); + None + } + }; + if let Some(to_call) = to_call { + debug!("schedule immediately done"); + self.notify_handle(to_call); + } + } + + fn add_timeout(&self, at: Instant) -> io::Result<(usize, Instant)> { + let mut timeouts = self.timeouts.borrow_mut(); + if timeouts.vacant_entry().is_none() { + let len = timeouts.count(); + timeouts.grow(len); + } + let entry = timeouts.vacant_entry().unwrap(); + let timeout = self.timer_wheel.borrow_mut().insert(at, entry.index()); + let when = *timeout.when(); + let entry = entry.insert((timeout, TimeoutState::NotFired)); + debug!("added a timeout: {}", entry.index()); + Ok((entry.index(), when)) + } + + fn update_timeout(&self, token: usize, handle: TaskHandle) { + debug!("updating a timeout: {}", token); + let to_wake = self.timeouts.borrow_mut()[token].1.block(handle); + if let Some(to_wake) = to_wake { + self.notify_handle(to_wake); + } + } + + fn cancel_timeout(&self, token: usize) { + debug!("cancel a timeout: {}", token); + let pair = self.timeouts.borrow_mut().remove(token); + if let Some((timeout, _state)) = pair { + self.timer_wheel.borrow_mut().cancel(&timeout); + } + } + + fn consume_queue(&self) { + // TODO: can we do better than `.unwrap()` here? + while let Some(msg) = self.rx.recv().unwrap() { + self.notify(msg); + } + } + + fn notify(&self, msg: Message) { + match msg { + Message::DropSource(tok) => self.drop_source(tok), + Message::Schedule(tok, wake, dir) => self.schedule(tok, wake, dir), + + Message::AddTimeout(at, slot) => { + slot.try_produce(self.add_timeout(at)) + .ok().expect("interference with try_produce on timeout"); + } + Message::UpdateTimeout(t, handle) => self.update_timeout(t, handle), + Message::CancelTimeout(t) => self.cancel_timeout(t), + Message::Run(f) => { + debug!("running a closure"); + f.call() + } + Message::Drop(data) => { + debug!("dropping some data"); + drop(data); + } + } + } +} + +impl LoopHandle { + fn send(&self, msg: Message) { + self.with_loop(|lp| { + match lp { + Some(lp) => { + // Need to execute all existing requests first, to ensure + // that our message is processed "in order" + lp.consume_queue(); + lp.notify(msg); + } + None => { + match self.tx.inner.send(msg) { + Ok(()) => {} + + // This should only happen when there was an error + // writing to the pipe to wake up the event loop, + // hopefully that never happens + Err(e) => { + panic!("error sending message to event loop: {}", e) + } + } + } + } + }) + } + + fn with_loop(&self, f: F) -> R + where F: FnOnce(Option<&Loop>) -> R + { + if CURRENT_LOOP.is_set() { + CURRENT_LOOP.with(|lp| { + if lp.id == self.id { + f(Some(lp)) + } else { + f(None) + } + }) + } else { + f(None) + } + } +} + +impl LoopPin { + /// Returns a reference to the underlying handle to the event loop. + pub fn handle(&self) -> &LoopHandle { + &self.handle + } + + /// TODO: dox + pub fn executor(&self) -> Arc { + self.handle.tx.clone() + } +} + +struct LoopFuture { + loop_handle: LoopHandle, + data: Option, + result: Option<(Arc>>, slot::Token)>, +} + +impl LoopFuture + where T: 'static, +{ + fn poll(&mut self, f: F, g: G) -> Poll + where F: FnOnce(&Loop, U) -> io::Result, + G: FnOnce(U, Arc>>) -> Message, + { + match self.result { + Some((ref result, ref mut token)) => { + result.cancel(*token); + match result.try_consume() { + Ok(t) => return t.into(), + Err(_) => {} + } + let task = task::park(); + *token = result.on_full(move |_| { + task.unpark(); + }); + return Poll::NotReady + } + None => { + let data = &mut self.data; + let ret = self.loop_handle.with_loop(|lp| { + lp.map(|lp| f(lp, data.take().unwrap())) + }); + if let Some(ret) = ret { + debug!("loop future done immediately on event loop"); + return ret.into() + } + debug!("loop future needs to send info to event loop"); + + let task = task::park(); + let result = Arc::new(Slot::new(None)); + let token = result.on_full(move |_| { + task.unpark(); + }); + self.result = Some((result.clone(), token)); + self.loop_handle.send(g(data.take().unwrap(), result)); + Poll::NotReady + } + } + } +} + +impl TimeoutState { + fn block(&mut self, handle: TaskHandle) -> Option { + match *self { + TimeoutState::Fired => return Some(handle), + _ => {} + } + *self = TimeoutState::Waiting(handle); + None + } + + fn fire(&mut self) -> Option { + match mem::replace(self, TimeoutState::Fired) { + TimeoutState::NotFired => None, + TimeoutState::Fired => panic!("fired twice?"), + TimeoutState::Waiting(handle) => Some(handle), + } + } +} + +impl Executor for MioSender { + fn execute_boxed(&self, callback: Box) { + self.inner.send(Message::Run(callback)) + .expect("error sending a message to the event loop") + } +} diff --git a/src/event_loop/source.rs b/src/event_loop/source.rs new file mode 100644 index 000000000..92e795b8f --- /dev/null +++ b/src/event_loop/source.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::io; + +use futures::{Future, Poll}; +use futures::task; +use mio; + +use event_loop::{Message, LoopHandle, LoopFuture, Direction}; + +/// A future which will resolve a unique `tok` token for an I/O object. +/// +/// Created through the `LoopHandle::add_source` method, this future can also +/// resolve to an error if there's an issue communicating with the event loop. +pub struct AddSource { + inner: LoopFuture<(E, (Arc, usize)), E>, +} + +/// A token that identifies an active timeout. +pub struct IoToken { + token: usize, + // TODO: can we avoid this allocation? It's kind of a bummer... + readiness: Arc, +} + +impl LoopHandle { + /// Add a new source to an event loop, returning a future which will resolve + /// to the token that can be used to identify this source. + /// + /// When a new I/O object is created it needs to be communicated to the + /// event loop to ensure that it's registered and ready to receive + /// notifications. The event loop with then respond back with the I/O object + /// and a token which can be used to send more messages to the event loop. + /// + /// The token returned is then passed in turn to each of the methods below + /// to interact with notifications on the I/O object itself. + /// + /// # Panics + /// + /// The returned future will panic if the event loop this handle is + /// associated with has gone away, or if there is an error communicating + /// with the event loop. + pub fn add_source(&self, source: E) -> AddSource + where E: mio::Evented + Send + 'static, + { + AddSource { + inner: LoopFuture { + loop_handle: self.clone(), + data: Some(source), + result: None, + } + } + } + + /// Schedule the current future task to receive a notification when the + /// corresponding I/O object is readable. + /// + /// Once an I/O object has been registered with the event loop through the + /// `add_source` method, this method can be used with the assigned token to + /// notify the current future task when the next read notification comes in. + /// + /// The current task will only receive a notification **once** and to + /// receive further notifications it will need to call `schedule_read` + /// again. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + /// + /// # Panics + /// + /// This function will panic if the event loop this handle is associated + /// with has gone away, or if there is an error communicating with the event + /// loop. + /// + /// This function will also panic if there is not a currently running future + /// task. + pub fn schedule_read(&self, tok: &IoToken) { + self.send(Message::Schedule(tok.token, task::park(), Direction::Read)); + } + + /// Schedule the current future task to receive a notification when the + /// corresponding I/O object is writable. + /// + /// Once an I/O object has been registered with the event loop through the + /// `add_source` method, this method can be used with the assigned token to + /// notify the current future task when the next write notification comes + /// in. + /// + /// The current task will only receive a notification **once** and to + /// receive further notifications it will need to call `schedule_write` + /// again. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + /// + /// # Panics + /// + /// This function will panic if the event loop this handle is associated + /// with has gone away, or if there is an error communicating with the event + /// loop. + /// + /// This function will also panic if there is not a currently running future + /// task. + pub fn schedule_write(&self, tok: &IoToken) { + self.send(Message::Schedule(tok.token, task::park(), Direction::Write)); + } + + /// Unregister all information associated with a token on an event loop, + /// deallocating all internal resources assigned to the given token. + /// + /// This method should be called whenever a source of events is being + /// destroyed. This will ensure that the event loop can reuse `tok` for + /// another I/O object if necessary and also remove it from any poll + /// notifications and callbacks. + /// + /// Note that wake callbacks may still be invoked after this method is + /// called as it may take some time for the message to drop a source to + /// reach the event loop. Despite this fact, this method will attempt to + /// ensure that the callbacks are **not** invoked, so pending scheduled + /// callbacks cannot be relied upon to get called. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + /// + /// # Panics + /// + /// This function will panic if the event loop this handle is associated + /// with has gone away, or if there is an error communicating with the event + /// loop. + pub fn drop_source(&self, tok: &IoToken) { + self.send(Message::DropSource(tok.token)); + } +} + +impl IoToken { + /// Consumes the last readiness notification the token this source is for + /// registered. + /// + /// Currently sources receive readiness notifications on an edge-basis. That + /// is, once you receive a notification that an object can be read, you + /// won't receive any more notifications until all of that data has been + /// read. + /// + /// The event loop will fill in this information and then inform futures + /// that they're ready to go with the `schedule` method, and then the `poll` + /// method can use this to figure out what happened. + /// + /// > **Note**: This method should generally not be used directly, but + /// > rather the `ReadinessStream` type should be used instead. + // TODO: this should really return a proper newtype/enum, not a usize + pub fn take_readiness(&self) -> usize { + self.readiness.swap(0, Ordering::SeqCst) + } +} + +impl Future for AddSource + where E: mio::Evented + Send + 'static, +{ + type Item = (E, IoToken); + type Error = io::Error; + + fn poll(&mut self) -> Poll<(E, IoToken), io::Error> { + let handle = self.inner.loop_handle.clone(); + let res = self.inner.poll(|lp, io| { + let pair = try!(lp.add_source(&io)); + Ok((io, pair)) + }, |io, slot| { + Message::Run(Box::new(move || { + let res = handle.with_loop(|lp| { + let lp = lp.unwrap(); + let pair = try!(lp.add_source(&io)); + Ok((io, pair)) + }); + slot.try_produce(res).ok() + .expect("add source try_produce intereference"); + })) + }); + + res.map(|(io, (ready, token))| { + (io, IoToken { token: token, readiness: ready }) + }) + } +} diff --git a/src/event_loop/timeout.rs b/src/event_loop/timeout.rs new file mode 100644 index 000000000..a51c26036 --- /dev/null +++ b/src/event_loop/timeout.rs @@ -0,0 +1,81 @@ +use std::io; +use std::time::Instant; + +use futures::{Future, Poll}; +use futures::task; + +use event_loop::{Message, Loop, LoopHandle, LoopFuture}; + +impl LoopHandle { + /// Adds a new timeout to get fired at the specified instant, notifying the + /// specified task. + pub fn add_timeout(&self, at: Instant) -> AddTimeout { + AddTimeout { + inner: LoopFuture { + loop_handle: self.clone(), + data: Some(at), + result: None, + }, + } + } + + /// Updates a previously added timeout to notify a new task instead. + /// + /// # Panics + /// + /// This method will panic if the timeout specified was not created by this + /// loop handle's `add_timeout` method. + pub fn update_timeout(&self, timeout: &TimeoutToken) { + self.send(Message::UpdateTimeout(timeout.token, task::park())) + } + + /// Cancel a previously added timeout. + /// + /// # Panics + /// + /// This method will panic if the timeout specified was not created by this + /// loop handle's `add_timeout` method. + pub fn cancel_timeout(&self, timeout: &TimeoutToken) { + debug!("cancel timeout {}", timeout.token); + self.send(Message::CancelTimeout(timeout.token)) + } +} + +/// Return value from the `LoopHandle::add_timeout` method, a future that will +/// resolve to a `TimeoutToken` to configure the behavior of that timeout. +pub struct AddTimeout { + inner: LoopFuture<(usize, Instant), Instant>, +} + +/// A token that identifies an active timeout. +pub struct TimeoutToken { + token: usize, + when: Instant, +} + +impl Future for AddTimeout { + type Item = TimeoutToken; + type Error = io::Error; + + fn poll(&mut self) -> Poll { + self.inner.poll(Loop::add_timeout, Message::AddTimeout).map(|(t, i)| { + TimeoutToken { + token: t, + when: i, + } + }) + } +} + +impl TimeoutToken { + /// Returns the instant in time when this timeout token will "fire". + /// + /// Note that this instant may *not* be the instant that was passed in when + /// the timeout was created. The event loop does not support high resolution + /// timers, so the exact resolution of when a timeout may fire may be + /// slightly fudged. + pub fn when(&self) -> &Instant { + &self.when + } +} + diff --git a/src/lib.rs b/src/lib.rs index bc665cc7a..77b381cba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,6 @@ mod slot; #[path = "../../src/lock.rs"] mod lock; mod mpsc_queue; -mod channel; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; From df9730fcbebc42e42a92592a71a6959de0f2dde3 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 20 Aug 2016 23:41:19 -0700 Subject: [PATCH 3/3] Add a channel to the futures-mio crate --- src/channel.rs | 107 ++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 17 ++++--- src/readiness_stream.rs | 2 + 3 files changed, 119 insertions(+), 7 deletions(-) create mode 100644 src/channel.rs diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 000000000..add055a34 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,107 @@ +use std::io; +use std::sync::mpsc::TryRecvError; + +use futures::{Future, Poll}; +use futures_io::IoFuture; +use mio::channel; + +use {ReadinessStream, LoopHandle}; + +/// The transmission half of a channel used for sending messages to a receiver. +/// +/// A `Sender` can be `clone`d to have multiple threads or instances sending +/// messages to one receiver. +/// +/// This type is created by the `LoopHandle::channel` method. +pub struct Sender { + tx: channel::Sender, +} + +/// The receiving half of a channel used for processing messages sent by a +/// `Sender`. +/// +/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can +/// receive messages at a time. +/// +/// This type is created by the `LoopHandle::channel` method. +pub struct Receiver { + rx: ReadinessStream>, +} + +impl LoopHandle { + /// Creates a new in-memory channel used for sending data across `Send + + /// 'static` boundaries, frequently threads. + /// + /// This type can be used to conveniently send messages between futures. + /// Unlike the futures crate `channel` method and types, the returned tx/rx + /// pair is a multi-producer single-consumer (mpsc) channel *with no + /// backpressure*. Currently it's left up to the application to implement a + /// mechanism, if necessary, to avoid messages piling up. + /// + /// The returned `Sender` can be used to send messages that are processed by + /// the returned `Receiver`. The `Sender` can be cloned to send messages + /// from multiple sources simultaneously. + pub fn channel(self) -> (Sender, IoFuture>) + where T: Send + 'static, + { + let (tx, rx) = channel::channel(); + let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx }); + (Sender { tx: tx }, rx.boxed()) + } +} + +impl Sender { + /// Sends a message to the corresponding receiver of this sender. + /// + /// The message provided will be enqueued on the channel immediately, and + /// this function will return immediately. Keep in mind that the + /// underlying channel has infinite capacity, and this may not always be + /// desired. + /// + /// If an I/O error happens while sending the message, or if the receiver + /// has gone away, then an error will be returned. Note that I/O errors here + /// are generally quite abnormal. + pub fn send(&self, t: T) -> io::Result<()> { + self.tx.send(t).map_err(|e| { + match e { + channel::SendError::Io(e) => e, + channel::SendError::Disconnected(_) => { + io::Error::new(io::ErrorKind::Other, + "channel has been disconnected") + } + } + }) + } +} + +impl Clone for Sender { + fn clone(&self) -> Sender { + Sender { tx: self.tx.clone() } + } +} + +impl Receiver { + /// Attempts to receive a message sent on this channel. + /// + /// This method will attempt to dequeue any messages sent on this channel + /// from any corresponding sender. If no message is available, but senders + /// are still detected, then `Poll::NotReady` is returned and the current + /// future task is scheduled to receive a notification when a message is + /// available. + /// + /// If an I/O error happens or if all senders have gone away (the channel is + /// disconnected) then `Poll::Err` will be returned. + pub fn recv(&self) -> Poll { + match self.rx.get_ref().try_recv() { + Ok(t) => Poll::Ok(t), + Err(TryRecvError::Empty) => { + self.rx.need_read(); + Poll::NotReady + } + Err(TryRecvError::Disconnected) => { + Poll::Err(io::Error::new(io::ErrorKind::Other, + "channel has been disconnected")) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 77b381cba..449dc6004 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,18 +16,21 @@ extern crate scoped_tls; #[macro_use] extern crate log; -mod readiness_stream; -mod event_loop; -mod tcp; -mod udp; -mod timeout; -mod timer_wheel; #[path = "../../src/slot.rs"] mod slot; #[path = "../../src/lock.rs"] mod lock; -mod mpsc_queue; +mod channel; +mod event_loop; +mod mpsc_queue; +mod readiness_stream; +mod tcp; +mod timeout; +mod timer_wheel; +mod udp; + +pub use channel::{Sender, Receiver}; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; pub use readiness_stream::ReadinessStream; diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index c67d5cec7..a2e44e7a7 100644 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -48,7 +48,9 @@ impl ReadinessStream handle: loop_handle, } } +} +impl ReadinessStream { /// Tests to see if this source is ready to be read from or not. /// /// If this stream is not ready for a read then `NotReady` will be returned