diff --git a/src/channel.rs b/src/channel.rs index add055a34..cd5fef469 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -2,6 +2,7 @@ use std::io; use std::sync::mpsc::TryRecvError; use futures::{Future, Poll}; +use futures::stream::Stream; use futures_io::IoFuture; use mio::channel; @@ -20,10 +21,11 @@ pub struct Sender { /// The receiving half of a channel used for processing messages sent by a /// `Sender`. /// -/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can -/// receive messages at a time. +/// A `Receiver` cannot be cloned, so only one thread can receive messages at a +/// time. /// -/// This type is created by the `LoopHandle::channel` method. +/// This type is created by the `LoopHandle::channel` method and implements the +/// `Stream` trait to represent received messages. pub struct Receiver { rx: ReadinessStream>, } @@ -80,28 +82,18 @@ impl Clone for Sender { } } -impl Receiver { - /// Attempts to receive a message sent on this channel. - /// - /// This method will attempt to dequeue any messages sent on this channel - /// from any corresponding sender. If no message is available, but senders - /// are still detected, then `Poll::NotReady` is returned and the current - /// future task is scheduled to receive a notification when a message is - /// available. - /// - /// If an I/O error happens or if all senders have gone away (the channel is - /// disconnected) then `Poll::Err` will be returned. - pub fn recv(&self) -> Poll { +impl Stream for Receiver { + type Item = T; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { match self.rx.get_ref().try_recv() { - Ok(t) => Poll::Ok(t), + Ok(t) => Poll::Ok(Some(t)), Err(TryRecvError::Empty) => { self.rx.need_read(); Poll::NotReady } - Err(TryRecvError::Disconnected) => { - Poll::Err(io::Error::new(io::ErrorKind::Other, - "channel has been disconnected")) - } + Err(TryRecvError::Disconnected) => Poll::Ok(None), } } }