From d1cd455225a33a1a0d8253d064d2cc173dadc400 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 22 Aug 2016 10:10:03 -0700 Subject: [PATCH] Implement Stream for Receiver That's... what it is! --- src/channel.rs | 32 ++++++++++++-------------------- 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index add055a34..cd5fef469 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -2,6 +2,7 @@ use std::io; use std::sync::mpsc::TryRecvError; use futures::{Future, Poll}; +use futures::stream::Stream; use futures_io::IoFuture; use mio::channel; @@ -20,10 +21,11 @@ pub struct Sender { /// The receiving half of a channel used for processing messages sent by a /// `Sender`. /// -/// A `Receiver` cannot be cloned and is not `Sync`, so only one thread can -/// receive messages at a time. +/// A `Receiver` cannot be cloned, so only one thread can receive messages at a +/// time. /// -/// This type is created by the `LoopHandle::channel` method. +/// This type is created by the `LoopHandle::channel` method and implements the +/// `Stream` trait to represent received messages. pub struct Receiver { rx: ReadinessStream>, } @@ -80,28 +82,18 @@ impl Clone for Sender { } } -impl Receiver { - /// Attempts to receive a message sent on this channel. - /// - /// This method will attempt to dequeue any messages sent on this channel - /// from any corresponding sender. If no message is available, but senders - /// are still detected, then `Poll::NotReady` is returned and the current - /// future task is scheduled to receive a notification when a message is - /// available. - /// - /// If an I/O error happens or if all senders have gone away (the channel is - /// disconnected) then `Poll::Err` will be returned. - pub fn recv(&self) -> Poll { +impl Stream for Receiver { + type Item = T; + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { match self.rx.get_ref().try_recv() { - Ok(t) => Poll::Ok(t), + Ok(t) => Poll::Ok(Some(t)), Err(TryRecvError::Empty) => { self.rx.need_read(); Poll::NotReady } - Err(TryRecvError::Disconnected) => { - Poll::Err(io::Error::new(io::ErrorKind::Other, - "channel has been disconnected")) - } + Err(TryRecvError::Disconnected) => Poll::Ok(None), } } }