diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 000000000..add055a34 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,107 @@ +use std::io; +use std::sync::mpsc::TryRecvError; + +use futures::{Future, Poll}; +use futures_io::IoFuture; +use mio::channel; + +use {ReadinessStream, LoopHandle}; + +/// The transmission half of a channel used for sending messages to a receiver. +/// +/// A `Sender` can be `clone`d to have multiple threads or instances sending +/// messages to one receiver. +/// +/// This type is created by the `LoopHandle::channel` method. +pub struct Sender { + tx: channel::Sender, +} + +/// The receiving half of a channel used for processing messages sent by a +/// `Sender`. +/// +/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can +/// receive messages at a time. +/// +/// This type is created by the `LoopHandle::channel` method. +pub struct Receiver { + rx: ReadinessStream>, +} + +impl LoopHandle { + /// Creates a new in-memory channel used for sending data across `Send + + /// 'static` boundaries, frequently threads. + /// + /// This type can be used to conveniently send messages between futures. + /// Unlike the futures crate `channel` method and types, the returned tx/rx + /// pair is a multi-producer single-consumer (mpsc) channel *with no + /// backpressure*. Currently it's left up to the application to implement a + /// mechanism, if necessary, to avoid messages piling up. + /// + /// The returned `Sender` can be used to send messages that are processed by + /// the returned `Receiver`. The `Sender` can be cloned to send messages + /// from multiple sources simultaneously. + pub fn channel(self) -> (Sender, IoFuture>) + where T: Send + 'static, + { + let (tx, rx) = channel::channel(); + let rx = ReadinessStream::new(self, rx).map(|rx| Receiver { rx: rx }); + (Sender { tx: tx }, rx.boxed()) + } +} + +impl Sender { + /// Sends a message to the corresponding receiver of this sender. + /// + /// The message provided will be enqueued on the channel immediately, and + /// this function will return immediately. Keep in mind that the + /// underlying channel has infinite capacity, and this may not always be + /// desired. + /// + /// If an I/O error happens while sending the message, or if the receiver + /// has gone away, then an error will be returned. Note that I/O errors here + /// are generally quite abnormal. + pub fn send(&self, t: T) -> io::Result<()> { + self.tx.send(t).map_err(|e| { + match e { + channel::SendError::Io(e) => e, + channel::SendError::Disconnected(_) => { + io::Error::new(io::ErrorKind::Other, + "channel has been disconnected") + } + } + }) + } +} + +impl Clone for Sender { + fn clone(&self) -> Sender { + Sender { tx: self.tx.clone() } + } +} + +impl Receiver { + /// Attempts to receive a message sent on this channel. + /// + /// This method will attempt to dequeue any messages sent on this channel + /// from any corresponding sender. If no message is available, but senders + /// are still detected, then `Poll::NotReady` is returned and the current + /// future task is scheduled to receive a notification when a message is + /// available. + /// + /// If an I/O error happens or if all senders have gone away (the channel is + /// disconnected) then `Poll::Err` will be returned. + pub fn recv(&self) -> Poll { + match self.rx.get_ref().try_recv() { + Ok(t) => Poll::Ok(t), + Err(TryRecvError::Empty) => { + self.rx.need_read(); + Poll::NotReady + } + Err(TryRecvError::Disconnected) => { + Poll::Err(io::Error::new(io::ErrorKind::Other, + "channel has been disconnected")) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 77b381cba..449dc6004 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,18 +16,21 @@ extern crate scoped_tls; #[macro_use] extern crate log; -mod readiness_stream; -mod event_loop; -mod tcp; -mod udp; -mod timeout; -mod timer_wheel; #[path = "../../src/slot.rs"] mod slot; #[path = "../../src/lock.rs"] mod lock; -mod mpsc_queue; +mod channel; +mod event_loop; +mod mpsc_queue; +mod readiness_stream; +mod tcp; +mod timeout; +mod timer_wheel; +mod udp; + +pub use channel::{Sender, Receiver}; pub use event_loop::{Loop, LoopPin, LoopHandle, AddSource, AddTimeout}; pub use event_loop::{LoopData, AddLoopData, TimeoutToken, IoToken}; pub use readiness_stream::ReadinessStream; diff --git a/src/readiness_stream.rs b/src/readiness_stream.rs index c67d5cec7..a2e44e7a7 100644 --- a/src/readiness_stream.rs +++ b/src/readiness_stream.rs @@ -48,7 +48,9 @@ impl ReadinessStream handle: loop_handle, } } +} +impl ReadinessStream { /// Tests to see if this source is ready to be read from or not. /// /// If this stream is not ready for a read then `NotReady` will be returned