Implement Stream for Receiver

That's... what it is!
This commit is contained in:
Alex Crichton 2016-08-22 10:10:03 -07:00
parent cff73b7a9d
commit d1cd455225

View File

@ -2,6 +2,7 @@ use std::io;
use std::sync::mpsc::TryRecvError;
use futures::{Future, Poll};
use futures::stream::Stream;
use futures_io::IoFuture;
use mio::channel;
@ -20,10 +21,11 @@ pub struct Sender<T> {
/// The receiving half of a channel used for processing messages sent by a
/// `Sender`.
///
/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can
/// receive messages at a time.
/// A `Receiver` cannot be cloned, so only one thread can receive messages at a
/// time.
///
/// This type is created by the `LoopHandle::channel` method.
/// This type is created by the `LoopHandle::channel` method and implements the
/// `Stream` trait to represent received messages.
pub struct Receiver<T> {
rx: ReadinessStream<channel::Receiver<T>>,
}
@ -80,28 +82,18 @@ impl<T> Clone for Sender<T> {
}
}
impl<T> Receiver<T> {
/// Attempts to receive a message sent on this channel.
///
/// This method will attempt to dequeue any messages sent on this channel
/// from any corresponding sender. If no message is available, but senders
/// are still detected, then `Poll::NotReady` is returned and the current
/// future task is scheduled to receive a notification when a message is
/// available.
///
/// If an I/O error happens or if all senders have gone away (the channel is
/// disconnected) then `Poll::Err` will be returned.
pub fn recv(&self) -> Poll<T, io::Error> {
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<T>, io::Error> {
match self.rx.get_ref().try_recv() {
Ok(t) => Poll::Ok(t),
Ok(t) => Poll::Ok(Some(t)),
Err(TryRecvError::Empty) => {
self.rx.need_read();
Poll::NotReady
}
Err(TryRecvError::Disconnected) => {
Poll::Err(io::Error::new(io::ErrorKind::Other,
"channel has been disconnected"))
}
Err(TryRecvError::Disconnected) => Poll::Ok(None),
}
}
}