Remove timers from Tokio.

In accordance with tokio-rs/tokio-rfcs#3, timers are being extracted
from Tokio and moved to a separate crate (probably futures-timer).

This PR removes timers from the code base.
This commit is contained in:
Carl Lerche 2017-10-24 17:20:46 -07:00 committed by Alex Crichton
parent b23a997cb8
commit 697851210c
19 changed files with 6 additions and 2242 deletions

View File

@ -1,130 +0,0 @@
//! In-memory evented channels.
//!
//! This module contains a `Sender` and `Receiver` pair types which can be used
//! to send messages between different future tasks.
#![deprecated(since = "0.1.1", note = "use `futures::sync::mpsc` instead")]
#![allow(deprecated)]
#![cfg(feature = "with-deprecated")]
use std::io;
use std::sync::mpsc::TryRecvError;
use futures::{Poll, Async, Sink, AsyncSink, StartSend, Stream};
use mio::channel;
use reactor::{Handle, PollEvented};
/// The transmission half of a channel used for sending messages to a receiver.
///
/// A `Sender` can be `clone`d to have multiple threads or instances sending
/// messages to one receiver.
///
/// This type is created by the [`channel`] function.
///
/// [`channel`]: fn.channel.html
#[must_use = "sinks do nothing unless polled"]
pub struct Sender<T> {
tx: channel::Sender<T>,
}
/// The receiving half of a channel used for processing messages sent by a
/// `Sender`.
///
/// A `Receiver` cannot be cloned, so only one thread can receive messages at a
/// time.
///
/// This type is created by the [`channel`] function and implements the
/// `Stream` trait to represent received messages.
///
/// [`channel`]: fn.channel.html
#[must_use = "streams do nothing unless polled"]
pub struct Receiver<T> {
rx: PollEvented<channel::Receiver<T>>,
}
/// Creates a new in-memory channel used for sending data across `Send +
/// 'static` boundaries, frequently threads.
///
/// This type can be used to conveniently send messages between futures.
/// Unlike the futures crate `channel` method and types, the returned tx/rx
/// pair is a multi-producer single-consumer (mpsc) channel *with no
/// backpressure*. Currently it's left up to the application to implement a
/// mechanism, if necessary, to avoid messages piling up.
///
/// The returned `Sender` can be used to send messages that are processed by
/// the returned `Receiver`. The `Sender` can be cloned to send messages
/// from multiple sources simultaneously.
pub fn channel<T>(handle: &Handle) -> io::Result<(Sender<T>, Receiver<T>)>
where T: Send + 'static,
{
let (tx, rx) = channel::channel();
let rx = try!(PollEvented::new(rx, handle));
Ok((Sender { tx: tx }, Receiver { rx: rx }))
}
impl<T> Sender<T> {
/// Sends a message to the corresponding receiver of this sender.
///
/// The message provided will be enqueued on the channel immediately, and
/// this function will return immediately. Keep in mind that the
/// underlying channel has infinite capacity, and this may not always be
/// desired.
///
/// If an I/O error happens while sending the message, or if the receiver
/// has gone away, then an error will be returned. Note that I/O errors here
/// are generally quite abnormal.
pub fn send(&self, t: T) -> io::Result<()> {
self.tx.send(t).map_err(|e| {
match e {
channel::SendError::Io(e) => e,
channel::SendError::Disconnected(_) => {
io::Error::new(io::ErrorKind::Other,
"channel has been disconnected")
}
}
})
}
}
impl<T> Sink for Sender<T> {
type SinkItem = T;
type SinkError = io::Error;
fn start_send(&mut self, t: T) -> StartSend<T, io::Error> {
Sender::send(self, t).map(|()| AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
fn close(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
Sender { tx: self.tx.clone() }
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<T>, io::Error> {
if let Async::NotReady = self.rx.poll_read() {
return Ok(Async::NotReady)
}
match self.rx.get_ref().try_recv() {
Ok(t) => Ok(Async::Ready(Some(t))),
Err(TryRecvError::Empty) => {
self.rx.need_read();
Ok(Async::NotReady)
}
Err(TryRecvError::Disconnected) => Ok(Async::Ready(None)),
}
}
}

View File

@ -1,305 +0,0 @@
//! A simple binary heap with support for removal of arbitrary elements
//!
//! This heap is used to manage timer state in the event loop. All timeouts go
//! into this heap and we also cancel timeouts from this heap. The crucial
//! feature of this heap over the standard library's `BinaryHeap` is the ability
//! to remove arbitrary elements. (e.g. when a timer is canceled)
//!
//! Note that this heap is not at all optimized right now, it should hopefully
//! just work.
use std::mem;
use slab::Slab;
pub struct Heap<T> {
// Binary heap of items, plus the slab index indicating what position in the
// list they're in.
items: Vec<(T, usize)>,
// A map from a slab index (assigned to an item above) to the actual index
// in the array the item appears at.
index: Slab<usize>,
}
pub struct Slot {
idx: usize,
}
impl<T: Ord> Heap<T> {
pub fn new() -> Heap<T> {
Heap {
items: Vec::new(),
index: Slab::with_capacity(128),
}
}
/// Pushes an element onto this heap, returning a slot token indicating
/// where it was pushed on to.
///
/// The slot can later get passed to `remove` to remove the element from the
/// heap, but only if the element was previously not removed from the heap.
pub fn push(&mut self, t: T) -> Slot {
self.assert_consistent();
let len = self.items.len();
if self.index.len() == self.index.capacity() {
self.index.reserve_exact(len);
}
let slot_idx = self.index.insert(len);
self.items.push((t, slot_idx));
self.percolate_up(len);
self.assert_consistent();
Slot { idx: slot_idx }
}
pub fn peek(&self) -> Option<&T> {
self.assert_consistent();
self.items.get(0).map(|i| &i.0)
}
pub fn pop(&mut self) -> Option<T> {
self.assert_consistent();
if self.items.len() == 0 {
return None
}
let slot = Slot { idx: self.items[0].1 };
Some(self.remove(slot))
}
pub fn remove(&mut self, slot: Slot) -> T {
self.assert_consistent();
let idx = self.index.remove(slot.idx);
let (item, slot_idx) = self.items.swap_remove(idx);
debug_assert_eq!(slot.idx, slot_idx);
if idx < self.items.len() {
self.index[self.items[idx].1] = idx;
if self.items[idx].0 < item {
self.percolate_up(idx);
} else {
self.percolate_down(idx);
}
}
self.assert_consistent();
return item
}
fn percolate_up(&mut self, mut idx: usize) -> usize {
while idx > 0 {
let parent = (idx - 1) / 2;
if self.items[idx].0 >= self.items[parent].0 {
break
}
let (a, b) = self.items.split_at_mut(idx);
mem::swap(&mut a[parent], &mut b[0]);
self.index[a[parent].1] = parent;
self.index[b[0].1] = idx;
idx = parent;
}
return idx
}
fn percolate_down(&mut self, mut idx: usize) -> usize {
loop {
let left = 2 * idx + 1;
let right = 2 * idx + 2;
let mut swap_left = true;
match (self.items.get(left), self.items.get(right)) {
(Some(left), None) => {
if left.0 >= self.items[idx].0 {
break
}
}
(Some(left), Some(right)) => {
if left.0 < self.items[idx].0 {
if right.0 < left.0 {
swap_left = false;
}
} else if right.0 < self.items[idx].0 {
swap_left = false;
} else {
break
}
}
(None, None) => break,
(None, Some(_right)) => panic!("not possible"),
}
let (a, b) = if swap_left {
self.items.split_at_mut(left)
} else {
self.items.split_at_mut(right)
};
mem::swap(&mut a[idx], &mut b[0]);
self.index[a[idx].1] = idx;
self.index[b[0].1] = a.len();
idx = a.len();
}
return idx
}
fn assert_consistent(&self) {
if !cfg!(assert_timer_heap_consistent) {
return
}
assert_eq!(self.items.len(), self.index.len());
for (i, &(_, j)) in self.items.iter().enumerate() {
if self.index[j] != i {
panic!("self.index[j] != i : i={} j={} self.index[j]={}",
i, j, self.index[j]);
}
}
for (i, &(ref item, _)) in self.items.iter().enumerate() {
if i > 0 {
assert!(*item >= self.items[(i - 1) / 2].0, "bad at index: {}", i);
}
if let Some(left) = self.items.get(2 * i + 1) {
assert!(*item <= left.0, "bad left at index: {}", i);
}
if let Some(right) = self.items.get(2 * i + 2) {
assert!(*item <= right.0, "bad right at index: {}", i);
}
}
}
}
#[cfg(test)]
mod tests {
use super::Heap;
#[test]
fn simple() {
let mut h = Heap::new();
h.push(1);
h.push(2);
h.push(8);
h.push(4);
assert_eq!(h.pop(), Some(1));
assert_eq!(h.pop(), Some(2));
assert_eq!(h.pop(), Some(4));
assert_eq!(h.pop(), Some(8));
assert_eq!(h.pop(), None);
assert_eq!(h.pop(), None);
}
#[test]
fn simple2() {
let mut h = Heap::new();
h.push(5);
h.push(4);
h.push(3);
h.push(2);
h.push(1);
assert_eq!(h.pop(), Some(1));
h.push(8);
assert_eq!(h.pop(), Some(2));
h.push(1);
assert_eq!(h.pop(), Some(1));
assert_eq!(h.pop(), Some(3));
assert_eq!(h.pop(), Some(4));
h.push(5);
assert_eq!(h.pop(), Some(5));
assert_eq!(h.pop(), Some(5));
assert_eq!(h.pop(), Some(8));
}
#[test]
fn remove() {
let mut h = Heap::new();
h.push(5);
h.push(4);
h.push(3);
let two = h.push(2);
h.push(1);
assert_eq!(h.pop(), Some(1));
assert_eq!(h.remove(two), 2);
h.push(1);
assert_eq!(h.pop(), Some(1));
assert_eq!(h.pop(), Some(3));
}
fn vec2heap<T: Ord>(v: Vec<T>) -> Heap<T> {
let mut h = Heap::new();
for t in v {
h.push(t);
}
return h
}
#[test]
fn test_peek_and_pop() {
let data = vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1];
let mut sorted = data.clone();
sorted.sort();
let mut heap = vec2heap(data);
while heap.peek().is_some() {
assert_eq!(heap.peek().unwrap(), sorted.first().unwrap());
assert_eq!(heap.pop().unwrap(), sorted.remove(0));
}
}
#[test]
fn test_push() {
let mut heap = Heap::new();
heap.push(-2);
heap.push(-4);
heap.push(-9);
assert!(*heap.peek().unwrap() == -9);
heap.push(-11);
assert!(*heap.peek().unwrap() == -11);
heap.push(-5);
assert!(*heap.peek().unwrap() == -11);
heap.push(-27);
assert!(*heap.peek().unwrap() == -27);
heap.push(-3);
assert!(*heap.peek().unwrap() == -27);
heap.push(-103);
assert!(*heap.peek().unwrap() == -103);
}
fn check_to_vec(mut data: Vec<i32>) {
let mut heap = Heap::new();
for data in data.iter() {
heap.push(*data);
}
data.sort();
let mut v = Vec::new();
while let Some(i) = heap.pop() {
v.push(i);
}
assert_eq!(v, data);
}
#[test]
fn test_to_vec() {
check_to_vec(vec![]);
check_to_vec(vec![5]);
check_to_vec(vec![3, 2]);
check_to_vec(vec![2, 3]);
check_to_vec(vec![5, 1, 2]);
check_to_vec(vec![1, 100, 2, 3]);
check_to_vec(vec![1, 3, 5, 7, 9, 2, 4, 6, 8, 0]);
check_to_vec(vec![2, 4, 6, 2, 1, 8, 10, 3, 5, 7, 0, 9, 1]);
check_to_vec(vec![9, 11, 9, 9, 9, 9, 11, 2, 3, 4, 11, 9, 0, 0, 0, 0]);
check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
check_to_vec(vec![10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]);
check_to_vec(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 1, 2]);
check_to_vec(vec![5, 4, 3, 2, 1, 5, 4, 3, 2, 1, 5, 4, 3, 2, 1]);
}
#[test]
fn test_empty_pop() {
let mut heap = Heap::<i32>::new();
assert!(heap.pop().is_none());
}
#[test]
fn test_empty_peek() {
let empty = Heap::<i32>::new();
assert!(empty.peek().is_none());
}
}

View File

@ -1,85 +0,0 @@
use std::io::{self, Read, Write};
use futures::{Future, Poll};
/// A future which will copy all data from a reader into a writer.
///
/// Created by the [`copy`] function, this future will resolve to the number of
/// bytes copied or an error if one happens.
///
/// [`copy`]: fn.copy.html
#[must_use = "futures do nothing unless polled"]
pub struct Copy<R, W> {
reader: R,
read_done: bool,
writer: W,
pos: usize,
cap: usize,
amt: u64,
buf: Box<[u8]>,
}
/// Creates a future which represents copying all the bytes from one object to
/// another.
///
/// The returned future will copy all the bytes read from `reader` into the
/// `writer` specified. This future will only complete once the `reader` has hit
/// EOF and all bytes have been written to and flushed from the `writer`
/// provided.
///
/// On success the number of bytes is returned and the `reader` and `writer` are
/// consumed. On error the error is returned and the I/O objects are consumed as
/// well.
pub fn copy<R, W>(reader: R, writer: W) -> Copy<R, W>
where R: Read,
W: Write,
{
Copy {
reader: reader,
read_done: false,
writer: writer,
amt: 0,
pos: 0,
cap: 0,
buf: Box::new([0; 2048]),
}
}
impl<R, W> Future for Copy<R, W>
where R: Read,
W: Write,
{
type Item = u64;
type Error = io::Error;
fn poll(&mut self) -> Poll<u64, io::Error> {
loop {
// If our buffer is empty, then we need to read some data to
// continue.
if self.pos == self.cap && !self.read_done {
let n = try_nb!(self.reader.read(&mut self.buf));
if n == 0 {
self.read_done = true;
} else {
self.pos = 0;
self.cap = n;
}
}
// If our buffer has some data, let's write it out!
while self.pos < self.cap {
let i = try_nb!(self.writer.write(&self.buf[self.pos..self.cap]));
self.pos += i;
self.amt += i as u64;
}
// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
// done with the entire transfer.
if self.pos == self.cap && self.read_done {
try_nb!(self.writer.flush());
return Ok(self.amt.into())
}
}
}
}

View File

@ -1,42 +0,0 @@
use std::io::{self, Write};
use futures::{Poll, Future, Async};
/// A future used to fully flush an I/O object.
///
/// Resolves to the underlying I/O object once the flush operation is complete.
///
/// Created by the [`flush`] function.
///
/// [`flush`]: fn.flush.html
#[must_use = "futures do nothing unless polled"]
pub struct Flush<A> {
a: Option<A>,
}
/// Creates a future which will entirely flush an I/O object and then yield the
/// object itself.
///
/// This function will consume the object provided if an error happens, and
/// otherwise it will repeatedly call `flush` until it sees `Ok(())`, scheduling
/// a retry if `WouldBlock` is seen along the way.
pub fn flush<A>(a: A) -> Flush<A>
where A: Write,
{
Flush {
a: Some(a),
}
}
impl<A> Future for Flush<A>
where A: Write,
{
type Item = A;
type Error = io::Error;
fn poll(&mut self) -> Poll<A, io::Error> {
try_nb!(self.a.as_mut().unwrap().flush());
Ok(Async::Ready(self.a.take().unwrap()))
}
}

View File

@ -1,588 +0,0 @@
use std::fmt;
use std::io;
use std::hash;
use std::mem;
use std::cmp;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use io::Io;
const INITIAL_CAPACITY: usize = 8 * 1024;
/// A reference counted buffer of bytes.
///
/// An `EasyBuf` is a representation of a byte buffer where sub-slices of it can
/// be handed out efficiently, each with a `'static` lifetime which keeps the
/// data alive. The buffer also supports mutation but may require bytes to be
/// copied to complete the operation.
#[derive(Clone, Eq)]
pub struct EasyBuf {
buf: Arc<Vec<u8>>,
start: usize,
end: usize,
}
/// An RAII object returned from `get_mut` which provides mutable access to the
/// underlying `Vec<u8>`.
pub struct EasyBufMut<'a> {
buf: &'a mut Vec<u8>,
end: &'a mut usize,
}
impl EasyBuf {
/// Creates a new EasyBuf with no data and the default capacity.
pub fn new() -> EasyBuf {
EasyBuf::with_capacity(INITIAL_CAPACITY)
}
/// Creates a new EasyBuf with `cap` capacity.
pub fn with_capacity(cap: usize) -> EasyBuf {
EasyBuf {
buf: Arc::new(Vec::with_capacity(cap)),
start: 0,
end: 0,
}
}
/// Changes the starting index of this window to the index specified.
///
/// Returns the windows back to chain multiple calls to this method.
///
/// # Panics
///
/// This method will panic if `start` is out of bounds for the underlying
/// slice or if it comes after the `end` configured in this window.
fn set_start(&mut self, start: usize) -> &mut EasyBuf {
assert!(start <= self.buf.as_ref().len());
assert!(start <= self.end);
self.start = start;
self
}
/// Changes the end index of this window to the index specified.
///
/// Returns the windows back to chain multiple calls to this method.
///
/// # Panics
///
/// This method will panic if `end` is out of bounds for the underlying
/// slice or if it comes after the `end` configured in this window.
fn set_end(&mut self, end: usize) -> &mut EasyBuf {
assert!(end <= self.buf.len());
assert!(self.start <= end);
self.end = end;
self
}
/// Returns the number of bytes contained in this `EasyBuf`.
pub fn len(&self) -> usize {
self.end - self.start
}
/// Returns the inner contents of this `EasyBuf` as a slice.
pub fn as_slice(&self) -> &[u8] {
self.as_ref()
}
/// Splits the buffer into two at the given index.
///
/// Afterwards `self` contains elements `[0, at)`, and the returned `EasyBuf`
/// contains elements `[at, len)`.
///
/// This is an O(1) operation that just increases the reference count and
/// sets a few indexes.
///
/// # Panics
///
/// Panics if `at > len`
pub fn split_off(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_start(idx);
self.set_end(idx);
return other
}
/// Splits the buffer into two at the given index.
///
/// Afterwards `self` contains elements `[at, len)`, and the returned `EasyBuf`
/// contains elements `[0, at)`.
///
/// This is an O(1) operation that just increases the reference count and
/// sets a few indexes.
///
/// # Panics
///
/// Panics if `at > len`
pub fn drain_to(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_end(idx);
self.set_start(idx);
return other
}
/// Returns a mutable reference to the underlying growable buffer of bytes.
///
/// If this `EasyBuf` is the only instance pointing at the underlying buffer
/// of bytes, a direct mutable reference will be returned. Otherwise the
/// contents of this `EasyBuf` will be reallocated in a fresh `Vec<u8>`
/// allocation with the same capacity as an `EasyBuf` created with `EasyBuf::new()`,
/// and that allocation will be returned.
///
/// This operation **is not O(1)** as it may clone the entire contents of
/// this buffer.
///
/// The returned `EasyBufMut` type implement `Deref` and `DerefMut` to
/// `Vec<u8>` can the byte buffer can be manipulated using the standard
/// `Vec<u8>` methods.
pub fn get_mut(&mut self) -> EasyBufMut {
// Fast path if we can get mutable access to our own current
// buffer.
//
// TODO: this should be a match or an if-let
if Arc::get_mut(&mut self.buf).is_some() {
let buf = Arc::get_mut(&mut self.buf).unwrap();
buf.drain(self.end..);
buf.drain(..self.start);
self.start = 0;
return EasyBufMut { buf: buf, end: &mut self.end }
}
// If we couldn't get access above then we give ourself a new buffer
// here.
let mut v = Vec::with_capacity(cmp::max(INITIAL_CAPACITY, self.as_ref().len()));
v.extend_from_slice(self.as_ref());
self.start = 0;
self.buf = Arc::new(v);
EasyBufMut {
buf: Arc::get_mut(&mut self.buf).unwrap(),
end: &mut self.end,
}
}
}
impl AsRef<[u8]> for EasyBuf {
fn as_ref(&self) -> &[u8] {
&self.buf[self.start..self.end]
}
}
impl<'a> Deref for EasyBufMut<'a> {
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8> {
self.buf
}
}
impl<'a> DerefMut for EasyBufMut<'a> {
fn deref_mut(&mut self) -> &mut Vec<u8> {
self.buf
}
}
impl From<Vec<u8>> for EasyBuf {
fn from(vec: Vec<u8>) -> EasyBuf {
let end = vec.len();
EasyBuf {
buf: Arc::new(vec),
start: 0,
end: end,
}
}
}
impl<T: AsRef<[u8]>> PartialEq<T> for EasyBuf {
fn eq(&self, other: &T) -> bool {
self.as_slice().eq(other.as_ref())
}
}
impl Ord for EasyBuf {
fn cmp(&self, other: &Self) -> cmp::Ordering {
self.as_slice().cmp(other.as_slice())
}
}
impl<T: AsRef<[u8]>> PartialOrd<T> for EasyBuf {
fn partial_cmp(&self, other: &T) -> Option<cmp::Ordering> {
self.as_slice().partial_cmp(other.as_ref())
}
}
impl hash::Hash for EasyBuf {
fn hash<H: hash::Hasher>(&self, state: &mut H) {
self.as_slice().hash(state)
}
}
impl<'a> Drop for EasyBufMut<'a> {
fn drop(&mut self) {
*self.end = self.buf.len();
}
}
impl fmt::Debug for EasyBuf {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
let bytes = self.as_ref();
let len = self.len();
if len < 10 {
write!(formatter, "EasyBuf{{len={}/{} {:?}}}", self.len(), self.buf.len(), bytes)
} else { // choose a more compact representation
write!(formatter, "EasyBuf{{len={}/{} [{}, {}, {}, {}, ..., {}, {}, {}, {}]}}", self.len(), self.buf.len(), bytes[0], bytes[1], bytes[2], bytes[3], bytes[len-4], bytes[len-3], bytes[len-2], bytes[len-1])
}
}
}
impl Into<Vec<u8>> for EasyBuf {
fn into(mut self) -> Vec<u8> {
mem::replace(self.get_mut().buf, vec![])
}
}
/// Encoding and decoding of frames via buffers.
///
/// This trait is used when constructing an instance of `Framed`. It provides
/// two types: `In`, for decoded input frames, and `Out`, for outgoing frames
/// that need to be encoded. It also provides methods to actually perform the
/// encoding and decoding, which work with corresponding buffer types.
///
/// The trait itself is implemented on a type that can track state for decoding
/// or encoding, which is particularly useful for streaming parsers. In many
/// cases, though, this type will simply be a unit struct (e.g. `struct
/// HttpCodec`).
pub trait Codec {
/// The type of decoded frames.
type In;
/// The type of frames to be encoded.
type Out;
/// Attempts to decode a frame from the provided buffer of bytes.
///
/// This method is called by `Framed` whenever bytes are ready to be parsed.
/// The provided buffer of bytes is what's been read so far, and this
/// instance of `Decode` can determine whether an entire frame is in the
/// buffer and is ready to be returned.
///
/// If an entire frame is available, then this instance will remove those
/// bytes from the buffer provided and return them as a decoded
/// frame. Note that removing bytes from the provided buffer doesn't always
/// necessarily copy the bytes, so this should be an efficient operation in
/// most circumstances.
///
/// If the bytes look valid, but a frame isn't fully available yet, then
/// `Ok(None)` is returned. This indicates to the `Framed` instance that
/// it needs to read some more bytes before calling this method again.
///
/// Finally, if the bytes in the buffer are malformed then an error is
/// returned indicating why. This informs `Framed` that the stream is now
/// corrupt and should be terminated.
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<Self::In>>;
/// A default method available to be called when there are no more bytes
/// available to be read from the underlying I/O.
///
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned. Typically this doesn't need to be implemented
/// unless the framing protocol differs near the end of the stream.
fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
/// Encodes a frame into the buffer provided.
///
/// This method will encode `msg` into the byte buffer provided by `buf`.
/// The `buf` provided is an internal buffer of the `Framed` instance and
/// will be written out when possible.
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
}
/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
/// the `Codec` trait to encode and decode frames.
///
/// You can acquire a `Framed` instance by using the `Io::framed` adapter.
#[must_use = "streams do nothing unless polled"]
pub struct Framed<T, C> {
upstream: T,
codec: C,
eof: bool,
is_readable: bool,
rd: EasyBuf,
wr: Vec<u8>,
}
impl<T: Io, C: Codec> Stream for Framed<T, C> {
type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
loop {
// If the read buffer has any pending data, then it could be
// possible that `decode` will return a new frame. We leave it to
// the decoder to optimize detecting that more data is required.
if self.is_readable {
if self.eof {
if self.rd.len() == 0 {
return Ok(None.into())
} else {
let frame = try!(self.codec.decode_eof(&mut self.rd));
return Ok(Async::Ready(Some(frame)))
}
}
trace!("attempting to decode a frame");
if let Some(frame) = try!(self.codec.decode(&mut self.rd)) {
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
self.is_readable = false;
}
assert!(!self.eof);
// Otherwise, try to read more data and try again
//
// TODO: shouldn't read_to_end, that may read a lot
let before = self.rd.len();
let ret = self.upstream.read_to_end(&mut self.rd.get_mut());
match ret {
Ok(_n) => self.eof = true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.rd.len() == before {
return Ok(Async::NotReady)
}
}
Err(e) => return Err(e),
}
self.is_readable = true;
}
}
}
impl<T: Io, C: Codec> Sink for Framed<T, C> {
type SinkItem = C::Out;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
// If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's
// *still* over 8KiB, then apply backpressure (reject the send).
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
if self.wr.len() > BACKPRESSURE_BOUNDARY {
try!(self.poll_complete());
if self.wr.len() > BACKPRESSURE_BOUNDARY {
return Ok(AsyncSink::NotReady(item));
}
}
try!(self.codec.encode(item, &mut self.wr));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
trace!("flushing framed transport");
while !self.wr.is_empty() {
trace!("writing; remaining={}", self.wr.len());
let n = try_nb!(self.upstream.write(&self.wr));
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"failed to write frame to transport"));
}
self.wr.drain(..n);
}
// Try flushing the underlying IO
try_nb!(self.upstream.flush());
trace!("framed transport flushed");
return Ok(Async::Ready(()));
}
fn close(&mut self) -> Poll<(), io::Error> {
try_ready!(self.poll_complete());
Ok(().into())
}
}
pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> {
Framed {
upstream: io,
codec: codec,
eof: false,
is_readable: false,
rd: EasyBuf::new(),
wr: Vec::with_capacity(INITIAL_CAPACITY),
}
}
impl<T, C> Framed<T, C> {
/// Returns a reference to the underlying I/O stream wrapped by `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_ref(&self) -> &T {
&self.upstream
}
/// Returns a mutable reference to the underlying I/O stream wrapped by
/// `Framed`.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn get_mut(&mut self) -> &mut T {
&mut self.upstream
}
/// Consumes the `Framed`, returning its underlying I/O stream.
///
/// Note that care should be taken to not tamper with the underlying stream
/// of data coming in as it may corrupt the stream of frames otherwise being
/// worked with.
pub fn into_inner(self) -> T {
self.upstream
}
}
#[cfg(test)]
mod tests {
use super::{INITIAL_CAPACITY, EasyBuf};
use std::mem;
#[test]
fn debug_empty_easybuf() {
let buf: EasyBuf = vec![].into();
assert_eq!("EasyBuf{len=0/0 []}", format!("{:?}", buf));
}
#[test]
fn debug_small_easybuf() {
let buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
assert_eq!("EasyBuf{len=6/6 [1, 2, 3, 4, 5, 6]}", format!("{:?}", buf));
}
#[test]
fn debug_small_easybuf_split() {
let mut buf: EasyBuf = vec![1, 2, 3, 4, 5, 6].into();
let split = buf.split_off(4);
assert_eq!("EasyBuf{len=4/6 [1, 2, 3, 4]}", format!("{:?}", buf));
assert_eq!("EasyBuf{len=2/6 [5, 6]}", format!("{:?}", split));
}
#[test]
fn debug_large_easybuf() {
let vec: Vec<u8> = (0u8..255u8).collect();
let buf: EasyBuf = vec.into();
assert_eq!("EasyBuf{len=255/255 [0, 1, 2, 3, ..., 251, 252, 253, 254]}", format!("{:?}", buf));
}
#[test]
fn easybuf_get_mut_sliced() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
buf.split_off(9);
buf.drain_to(3);
assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
}
#[test]
fn easybuf_get_mut_sliced_allocating_at_least_initial_capacity() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
buf.split_off(9);
buf.drain_to(3);
// Clone to make shared
let clone = buf.clone();
assert_eq!(*buf.get_mut(), [3, 4, 5, 6, 7, 8]);
assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY);
mem::drop(clone); // prevent unused warning
}
#[test]
fn easybuf_get_mut_sliced_allocating_required_capacity() {
let vec: Vec<u8> = (0..INITIAL_CAPACITY * 2).map(|_|0u8).collect();
let mut buf: EasyBuf = vec.into();
buf.drain_to(INITIAL_CAPACITY / 2);
let clone = buf.clone();
assert_eq!(buf.get_mut().buf.capacity(), INITIAL_CAPACITY + INITIAL_CAPACITY / 2);
mem::drop(clone)
}
#[test]
fn easybuf_into_vec_simple() {
let vec: Vec<u8> = (0u8..10u8).collect();
let reference = vec.clone();
let buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
let result: Vec<u8> = buf.into();
assert_eq!(result, reference);
let new_pointer = result.as_ptr();
assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
}
#[test]
fn easybuf_into_vec_sliced() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
buf.split_off(9);
buf.drain_to(3);
let result: Vec<u8> = buf.into();
let reference: Vec<u8> = (3u8..9u8).collect();
assert_eq!(result, reference);
let new_pointer = result.as_ptr();
assert_eq!(original_pointer, new_pointer, "Into<Vec<u8>> should reuse the exclusive Vec");
}
#[test]
fn easybuf_into_vec_sliced_allocating() {
let vec: Vec<u8> = (0u8..10u8).collect();
let mut buf: EasyBuf = vec.into();
let original_pointer = buf.buf.as_ref().as_ptr();
// Create a clone to create second reference to this EasyBuf and force allocation
let original = buf.clone();
buf.split_off(9);
buf.drain_to(3);
let result: Vec<u8> = buf.into();
let reference: Vec<u8> = (3u8..9u8).collect();
assert_eq!(result, reference);
let original_reference: EasyBuf =(0u8..10u8).collect::<Vec<u8>>().into();
assert_eq!(original.as_ref(), original_reference.as_ref());
let new_pointer = result.as_ptr();
assert_ne!(original_pointer, new_pointer, "A new vec should be allocated");
}
#[test]
fn easybuf_equality_same_underlying_vec() {
let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
assert_eq!(buf, buf);
let other = buf.drain_to(5);
assert_ne!(buf, other);
let buf: EasyBuf = (0u8..5).collect::<Vec<_>>().into();
assert_eq!(buf, other);
}
#[test]
fn easybuf_equality_different_underlying_vec() {
let mut buf: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
let mut other: EasyBuf = (0u8..10).collect::<Vec<_>>().into();
assert_eq!(buf, other);
buf = buf.drain_to(5);
assert_ne!(buf, other);
other = other.drain_to(5);
assert_eq!(buf, other);
}
}

View File

@ -1,52 +0,0 @@
use std::mem;
use futures::{Future, Poll};
enum State<R, T> {
Pending {
rd: R,
buf: T,
},
Empty,
}
/// Tries to read some bytes directly into the given `buf` in asynchronous
/// manner, returning a future type.
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
pub fn read<R, T>(rd: R, buf: T) -> Read<R, T>
where R: ::std::io::Read,
T: AsMut<[u8]>
{
Read { state: State::Pending { rd: rd, buf: buf } }
}
/// A future which can be used to easily read available number of bytes to fill
/// a buffer.
///
/// Created by the [`read`] function.
#[must_use = "futures do nothing unless polled"]
pub struct Read<R, T> {
state: State<R, T>,
}
impl<R, T> Future for Read<R, T>
where R: ::std::io::Read,
T: AsMut<[u8]>
{
type Item = (R, T, usize);
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<(R, T, usize), ::std::io::Error> {
let nread = match self.state {
State::Pending { ref mut rd, ref mut buf } => try_nb!(rd.read(&mut buf.as_mut()[..])),
State::Empty => panic!("poll a Read after it's done"),
};
match mem::replace(&mut self.state, State::Empty) {
State::Pending { rd, buf } => Ok((rd, buf, nread).into()),
State::Empty => panic!("invalid internal state"),
}
}
}

View File

@ -1,80 +0,0 @@
use std::io::{self, Read};
use std::mem;
use futures::{Poll, Future};
/// A future which can be used to easily read exactly enough bytes to fill
/// a buffer.
///
/// Created by the [`read_exact`] function.
///
/// [`read_exact`]: fn.read_exact.html
#[must_use = "futures do nothing unless polled"]
pub struct ReadExact<A, T> {
state: State<A, T>,
}
enum State<A, T> {
Reading {
a: A,
buf: T,
pos: usize,
},
Empty,
}
/// Creates a future which will read exactly enough bytes to fill `buf`,
/// returning an error if EOF is hit sooner.
///
/// The returned future will resolve to both the I/O stream as well as the
/// buffer once the read operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all data read from the stream appended to
/// the buffer.
pub fn read_exact<A, T>(a: A, buf: T) -> ReadExact<A, T>
where A: Read,
T: AsMut<[u8]>,
{
ReadExact {
state: State::Reading {
a: a,
buf: buf,
pos: 0,
},
}
}
fn eof() -> io::Error {
io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
}
impl<A, T> Future for ReadExact<A, T>
where A: Read,
T: AsMut<[u8]>,
{
type Item = (A, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<(A, T), io::Error> {
match self.state {
State::Reading { ref mut a, ref mut buf, ref mut pos } => {
let buf = buf.as_mut();
while *pos < buf.len() {
let n = try_nb!(a.read(&mut buf[*pos..]));
*pos += n;
if n == 0 {
return Err(eof())
}
}
}
State::Empty => panic!("poll a ReadExact after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(),
}
}
}

View File

@ -1,65 +0,0 @@
use std::io::{self, Read};
use std::mem;
use futures::{Poll, Future};
/// A future which can be used to easily read the entire contents of a stream
/// into a vector.
///
/// Created by the [`read_to_end`] function.
///
/// [`read_to_end`]: fn.read_to_end.html
#[must_use = "futures do nothing unless polled"]
pub struct ReadToEnd<A> {
state: State<A>,
}
enum State<A> {
Reading {
a: A,
buf: Vec<u8>,
},
Empty,
}
/// Creates a future which will read all the bytes associated with the I/O
/// object `A` into the buffer provided.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all data read from the stream appended to
/// the buffer.
pub fn read_to_end<A>(a: A, buf: Vec<u8>) -> ReadToEnd<A>
where A: Read,
{
ReadToEnd {
state: State::Reading {
a: a,
buf: buf,
}
}
}
impl<A> Future for ReadToEnd<A>
where A: Read,
{
type Item = (A, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
match self.state {
State::Reading { ref mut a, ref mut buf } => {
// If we get `Ok`, then we know the stream hit EOF and we're done. If we
// hit "would block" then all the read data so far is in our buffer, and
// otherwise we propagate errors
try_nb!(a.read_to_end(buf));
},
State::Empty => panic!("poll ReadToEnd after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(),
}
}
}

View File

@ -1,71 +0,0 @@
use std::io::{self, Read, BufRead};
use std::mem;
use futures::{Poll, Future};
/// A future which can be used to easily read the contents of a stream into a
/// vector until the delimiter is reached.
///
/// Created by the [`read_until`] function.
///
/// [`read_until`]: fn.read_until.html
#[must_use = "futures do nothing unless polled"]
pub struct ReadUntil<A> {
state: State<A>,
}
enum State<A> {
Reading {
a: A,
byte: u8,
buf: Vec<u8>,
},
Empty,
}
/// Creates a future which will read all the bytes associated with the I/O
/// object `A` into the buffer provided until the delimiter `byte` is reached.
/// This method is the async equivalent to [`BufRead::read_until`].
///
/// In case of an error the buffer and the object will be discarded, with
/// the error yielded. In the case of success the object will be destroyed and
/// the buffer will be returned, with all bytes up to, and including, the delimiter
/// (if found).
///
/// [`BufRead::read_until`]: https://doc.rust-lang.org/std/io/trait.BufRead.html#method.read_until
pub fn read_until<A>(a: A, byte: u8, buf: Vec<u8>) -> ReadUntil<A>
where A: BufRead
{
ReadUntil {
state: State::Reading {
a: a,
byte: byte,
buf: buf,
}
}
}
impl<A> Future for ReadUntil<A>
where A: Read + BufRead
{
type Item = (A, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<(A, Vec<u8>), io::Error> {
match self.state {
State::Reading { ref mut a, byte, ref mut buf } => {
// If we get `Ok(n)`, then we know the stream hit EOF or the delimiter.
// and just return it, as we are finished.
// If we hit "would block" then all the read data so far
// is in our buffer, and otherwise we propagate errors.
try_nb!(a.read_until(byte, buf));
},
State::Empty => panic!("poll ReadUntil after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Reading { a, byte: _, buf } => Ok((a, buf).into()),
State::Empty => unreachable!(),
}
}
}

View File

@ -1,83 +0,0 @@
use std::io::{self, Write};
use std::mem;
use futures::{Poll, Future};
/// A future used to write the entire contents of some data to a stream.
///
/// This is created by the [`write_all`] top-level method.
///
/// [`write_all`]: fn.write_all.html
#[must_use = "futures do nothing unless polled"]
pub struct WriteAll<A, T> {
state: State<A, T>,
}
enum State<A, T> {
Writing {
a: A,
buf: T,
pos: usize,
},
Empty,
}
/// Creates a future that will write the entire contents of the buffer `buf` to
/// the stream `a` provided.
///
/// The returned future will not return until all the data has been written, and
/// the future will resolve to the stream as well as the buffer (for reuse if
/// needed).
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
///
/// The `buf` parameter here only requires the `AsRef<[u8]>` trait, which should
/// be broadly applicable to accepting data which can be converted to a slice.
/// The `Window` struct is also available in this crate to provide a different
/// window into a slice if necessary.
pub fn write_all<A, T>(a: A, buf: T) -> WriteAll<A, T>
where A: Write,
T: AsRef<[u8]>,
{
WriteAll {
state: State::Writing {
a: a,
buf: buf,
pos: 0,
},
}
}
fn zero_write() -> io::Error {
io::Error::new(io::ErrorKind::WriteZero, "zero-length write")
}
impl<A, T> Future for WriteAll<A, T>
where A: Write,
T: AsRef<[u8]>,
{
type Item = (A, T);
type Error = io::Error;
fn poll(&mut self) -> Poll<(A, T), io::Error> {
match self.state {
State::Writing { ref mut a, ref buf, ref mut pos } => {
let buf = buf.as_ref();
while *pos < buf.len() {
let n = try_nb!(a.write(&buf[*pos..]));
*pos += n;
if n == 0 {
return Err(zero_write())
}
}
}
State::Empty => panic!("poll a WriteAll after it's done"),
}
match mem::replace(&mut self.state, State::Empty) {
State::Writing { a, buf, .. } => Ok((a, buf).into()),
State::Empty => panic!(),
}
}
}

View File

@ -1,9 +1,8 @@
//! `Future`-powered I/O at the core of Tokio
//!
//! This crate uses the `futures` crate to provide an event loop ("reactor
//! core") which can be used to drive I/O like TCP and UDP, spawned future
//! tasks, and other events like channels/timeouts. All asynchronous I/O is
//! powered by the `mio` crate.
//! core") which can be used to drive I/O like TCP and UDP. All asynchronous I/O
//! is powered by the `mio` crate.
//!
//! The concrete types provided in this crate are relatively bare bones but are
//! intended to be the essential foundation for further projects needing an
@ -11,7 +10,6 @@
//!
//! * TCP, both streams and listeners
//! * UDP sockets
//! * Timeouts
//! * An event loop to run futures
//!
//! More functionality is likely to be added over time, but otherwise the crate
@ -21,15 +19,6 @@
//!
//! Some other important tasks covered by this crate are:
//!
//! * The ability to spawn futures into an event loop. The `Handle` and `Remote`
//! types have a `spawn` method which allows executing a future on an event
//! loop. The `Handle::spawn` method crucially does not require the future
//! itself to be `Send`.
//!
//! * The `Io` trait serves as an abstraction for future crates to build on top
//! of. This packages up `Read` and `Write` functionality as well as the
//! ability to poll for readiness on both ends.
//!
//! * All I/O is futures-aware. If any action in this crate returns "not ready"
//! or "would block", then the current future task is scheduled to receive a
//! notification when it would otherwise make progress.
@ -92,7 +81,6 @@
#![doc(html_root_url = "https://docs.rs/tokio-core/0.1")]
#![deny(missing_docs)]
#![deny(warnings)]
#![cfg_attr(test, allow(deprecated))]
extern crate bytes;
#[macro_use]
@ -109,6 +97,5 @@ extern crate scoped_tls;
#[macro_use]
extern crate log;
mod heap;
pub mod net;
pub mod reactor;

View File

@ -1,200 +0,0 @@
//! Support for creating futures that represent intervals.
//!
//! This module contains the `Interval` type which is a stream that will
//! resolve at a fixed intervals in future
use std::io;
use std::time::{Duration, Instant};
use futures::{Poll, Async};
use futures::stream::{Stream};
use reactor::{Remote, Handle};
use reactor::timeout_token::TimeoutToken;
/// A stream representing notifications at fixed interval
///
/// Intervals are created through the `Interval::new` or
/// `Interval::new_at` methods indicating when a first notification
/// should be triggered and when it will be repeated.
///
/// Note that timeouts are not intended for high resolution timers, but rather
/// they will likely fire some granularity after the exact instant that they're
/// otherwise indicated to fire at.
#[must_use = "streams do nothing unless polled"]
pub struct Interval {
token: TimeoutToken,
next: Instant,
interval: Duration,
handle: Remote,
}
impl Interval {
/// Creates a new interval which will fire at `dur` time into the future,
/// and will repeat every `dur` interval after
///
/// This function will return a future that will resolve to the actual
/// interval object. The interval object itself is then a stream which will
/// be set to fire at the specified intervals
pub fn new(dur: Duration, handle: &Handle) -> io::Result<Interval> {
Interval::new_at(Instant::now() + dur, dur, handle)
}
/// Creates a new interval which will fire at the time specified by `at`,
/// and then will repeat every `dur` interval after
///
/// This function will return a future that will resolve to the actual
/// timeout object. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn new_at(at: Instant, dur: Duration, handle: &Handle)
-> io::Result<Interval>
{
Ok(Interval {
token: try!(TimeoutToken::new(at, &handle)),
next: at,
interval: dur,
handle: handle.remote().clone(),
})
}
/// Polls this `Interval` instance to see if it's elapsed, assuming the
/// current time is specified by `now`.
///
/// The `Future::poll` implementation for `Interval` will call `Instant::now`
/// each time it's invoked, but in some contexts this can be a costly
/// operation. This method is provided to amortize the cost by avoiding
/// usage of `Instant::now`, assuming that it's been called elsewhere.
///
/// This function takes the assumed current time as the first parameter and
/// otherwise functions as this future's `poll` function. This will block a
/// task if one isn't already blocked or update a previous one if already
/// blocked.
fn poll_at(&mut self, now: Instant) -> Poll<Option<()>, io::Error> {
if self.next <= now {
self.next = next_interval(self.next, now, self.interval);
self.token.reset_timeout(self.next, &self.handle);
Ok(Async::Ready(Some(())))
} else {
self.token.update_timeout(&self.handle);
Ok(Async::NotReady)
}
}
}
impl Stream for Interval {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<()>, io::Error> {
// TODO: is this fast enough?
self.poll_at(Instant::now())
}
}
impl Drop for Interval {
fn drop(&mut self) {
self.token.cancel_timeout(&self.handle);
}
}
/// Converts Duration object to raw nanoseconds if possible
///
/// This is useful to divide intervals.
///
/// While technically for large duration it's impossible to represent any
/// duration as nanoseconds, the largest duration we can represent is about
/// 427_000 years. Large enough for any interval we would use or calculate in
/// tokio.
fn duration_to_nanos(dur: Duration) -> Option<u64> {
dur.as_secs()
.checked_mul(1_000_000_000)
.and_then(|v| v.checked_add(dur.subsec_nanos() as u64))
}
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
let new = prev + interval;
if new > now {
return new;
} else {
let spent_ns = duration_to_nanos(now.duration_since(prev))
.expect("interval should be expired");
let interval_ns = duration_to_nanos(interval)
.expect("interval is less that 427 thousand years");
let mult = spent_ns/interval_ns + 1;
assert!(mult < (1 << 32),
"can't skip more than 4 billion intervals of {:?} \
(trying to skip {})", interval, mult);
return prev + interval * (mult as u32);
}
}
#[cfg(test)]
mod test {
use std::time::{Instant, Duration};
use super::next_interval;
struct Timeline(Instant);
impl Timeline {
fn new() -> Timeline {
Timeline(Instant::now())
}
fn at(&self, millis: u64) -> Instant {
self.0 + Duration::from_millis(millis)
}
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
self.0 + Duration::new(sec, nanos)
}
}
fn dur(millis: u64) -> Duration {
Duration::from_millis(millis)
}
// The math around Instant/Duration isn't 100% precise due to rounding
// errors, see #249 for more info
fn almost_eq(a: Instant, b: Instant) -> bool {
if a == b {
true
} else if a > b {
a - b < Duration::from_millis(1)
} else {
b - a < Duration::from_millis(1)
}
}
#[test]
fn norm_next() {
let tm = Timeline::new();
assert!(almost_eq(next_interval(tm.at(1), tm.at(2), dur(10)),
tm.at(11)));
assert!(almost_eq(next_interval(tm.at(7777), tm.at(7788), dur(100)),
tm.at(7877)));
assert!(almost_eq(next_interval(tm.at(1), tm.at(1000), dur(2100)),
tm.at(2101)));
}
#[test]
fn fast_forward() {
let tm = Timeline::new();
assert!(almost_eq(next_interval(tm.at(1), tm.at(1000), dur(10)),
tm.at(1001)));
assert!(almost_eq(next_interval(tm.at(7777), tm.at(8888), dur(100)),
tm.at(8977)));
assert!(almost_eq(next_interval(tm.at(1), tm.at(10000), dur(2100)),
tm.at(10501)));
}
/// TODO: this test actually should be successful, but since we can't
/// multiply Duration on anything larger than u32 easily we decided
/// to allow it to fail for now
#[test]
#[should_panic(expected = "can't skip more than 4 billion intervals")]
fn large_skip() {
let tm = Timeline::new();
assert_eq!(next_interval(
tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
tm.at_ns(25, 1));
}
}

View File

@ -7,7 +7,7 @@ use mio::event::Evented;
use reactor::{Message, Remote, Handle, Direction};
/// A token that identifies an active timeout.
/// A token that identifies an active I/O resource.
pub struct IoToken {
token: usize,
// TODO: can we avoid this allocation? It's kind of a bummer...

View File

@ -1,14 +1,12 @@
//! The core reactor driving all I/O
//!
//! This module contains the `Core` type which is the reactor for all I/O
//! happening in `tokio-core`. This reactor (or event loop) is used to run
//! futures, schedule tasks, issue I/O requests, etc.
//! happening in `tokio-core`. This reactor (or event loop) is used to drive I/O
//! resources.
use std::cell::RefCell;
use std::cmp;
use std::fmt;
use std::io::{self, ErrorKind};
use std::mem;
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
@ -23,17 +21,10 @@ use mio;
use mio::event::Evented;
use slab::Slab;
use heap::{Heap, Slot};
mod io_token;
mod timeout_token;
mod poll_evented;
mod timeout;
mod interval;
pub use self::poll_evented::PollEvented;
pub use self::timeout::Timeout;
pub use self::interval::Interval;
static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
scoped_thread_local!(static CURRENT_LOOP: Core);
@ -68,15 +59,6 @@ struct Inner {
// Dispatch slabs for I/O and futures events
io_dispatch: Slab<ScheduledIo>,
task_dispatch: Slab<ScheduledTask>,
// Timer wheel keeping track of all timeouts. The `usize` stored in the
// timer wheel is an index into the slab below.
//
// The slab below keeps track of the timeouts themselves as well as the
// state of the timeout itself. The `TimeoutToken` type is an index into the
// `timeouts` slab.
timer_heap: Heap<(Instant, usize)>,
timeouts: Slab<(Option<Slot>, TimeoutState)>,
}
/// An unique ID for a Core
@ -119,12 +101,6 @@ struct ScheduledTask {
wake: Option<Arc<MySetReadiness>>,
}
enum TimeoutState {
NotFired,
Fired,
Waiting(Task),
}
enum Direction {
Read,
Write,
@ -133,9 +109,6 @@ enum Direction {
enum Message {
DropSource(usize),
Schedule(usize, Task, Direction),
UpdateTimeout(usize, Task),
ResetTimeout(usize, Instant),
CancelTimeout(usize),
Run(Box<FnBox>),
}
@ -177,8 +150,6 @@ impl Core {
io: io,
io_dispatch: Slab::with_capacity(1),
task_dispatch: Slab::with_capacity(1),
timeouts: Slab::with_capacity(1),
timer_heap: Heap::new(),
})),
})
}
@ -255,25 +226,11 @@ impl Core {
}
fn poll(&mut self, max_wait: Option<Duration>) -> bool {
// Given the `max_wait` variable specified, figure out the actual
// timeout that we're going to pass to `poll`. This involves taking a
// look at active timers on our heap as well.
let start = Instant::now();
let timeout = self.inner.borrow_mut().timer_heap.peek().map(|t| {
if t.0 < start {
Duration::new(0, 0)
} else {
t.0 - start
}
});
let timeout = match (max_wait, timeout) {
(Some(d1), Some(d2)) => Some(cmp::min(d1, d2)),
(max_wait, timeout) => max_wait.or(timeout),
};
// Block waiting for an event to happen, peeling out how many events
// happened.
let amt = match self.inner.borrow_mut().io.poll(&mut self.events, timeout) {
let amt = match self.inner.borrow_mut().io.poll(&mut self.events, max_wait) {
Ok(a) => a,
Err(ref e) if e.kind() == ErrorKind::Interrupted => return false,
Err(e) => panic!("error in poll: {}", e),
@ -283,10 +240,6 @@ impl Core {
debug!("loop poll - {:?}", after_poll - start);
debug!("loop time - {:?}", after_poll);
// Process all timeouts that may have just occurred, updating the
// current time since
self.consume_timeouts(after_poll);
// Process all the events that came in, dispatching appropriately
let mut fired = false;
for i in 0..self.events.len() {
@ -371,26 +324,6 @@ impl Core {
drop(inner);
}
fn consume_timeouts(&mut self, now: Instant) {
loop {
let mut inner = self.inner.borrow_mut();
match inner.timer_heap.peek() {
Some(head) if head.0 <= now => {}
Some(_) => break,
None => break,
};
let (_, slab_idx) = inner.timer_heap.pop().unwrap();
trace!("firing timeout: {}", slab_idx);
inner.timeouts[slab_idx].0.take().unwrap();
let handle = inner.timeouts[slab_idx].1.fire();
drop(inner);
if let Some(handle) = handle {
self.notify_handle(handle);
}
}
}
/// Method used to notify a task handle.
///
/// Note that this should be used instead of `handle.notify()` to ensure
@ -422,18 +355,6 @@ impl Core {
self.notify_handle(task);
}
}
Message::UpdateTimeout(t, handle) => {
let task = self.inner.borrow_mut().update_timeout(t, handle);
if let Some(task) = task {
self.notify_handle(task);
}
}
Message::ResetTimeout(t, at) => {
self.inner.borrow_mut().reset_timeout(t, at);
}
Message::CancelTimeout(t) => {
self.inner.borrow_mut().cancel_timeout(t)
}
Message::Run(r) => r.call_box(self),
}
}
@ -513,45 +434,6 @@ impl Inner {
}
}
fn add_timeout(&mut self, at: Instant) -> usize {
if self.timeouts.len() == self.timeouts.capacity() {
let len = self.timeouts.len();
self.timeouts.reserve_exact(len);
}
let entry = self.timeouts.vacant_entry();
let key = entry.key();
let slot = self.timer_heap.push((at, key));
entry.insert((Some(slot), TimeoutState::NotFired));
debug!("added a timeout: {}", key);
return key;
}
fn update_timeout(&mut self, token: usize, handle: Task) -> Option<Task> {
debug!("updating a timeout: {}", token);
self.timeouts[token].1.block(handle)
}
fn reset_timeout(&mut self, token: usize, at: Instant) {
let pair = &mut self.timeouts[token];
// TODO: avoid remove + push and instead just do one sift of the heap?
// In theory we could update it in place and then do the percolation
// as necessary
if let Some(slot) = pair.0.take() {
self.timer_heap.remove(slot);
}
let slot = self.timer_heap.push((at, token));
*pair = (Some(slot), TimeoutState::NotFired);
debug!("set a timeout: {}", token);
}
fn cancel_timeout(&mut self, token: usize) {
debug!("cancel a timeout: {}", token);
let pair = self.timeouts.remove(token);
if let (Some(slot), _state) = pair {
self.timer_heap.remove(slot);
}
}
fn spawn(&mut self, future: Box<Future<Item=(), Error=()>>) {
if self.task_dispatch.len() == self.task_dispatch.capacity() {
let len = self.task_dispatch.len();
@ -769,25 +651,6 @@ impl fmt::Debug for Handle {
}
}
impl TimeoutState {
fn block(&mut self, handle: Task) -> Option<Task> {
match *self {
TimeoutState::Fired => return Some(handle),
_ => {}
}
*self = TimeoutState::Waiting(handle);
None
}
fn fire(&mut self) -> Option<Task> {
match mem::replace(self, TimeoutState::Fired) {
TimeoutState::NotFired => None,
TimeoutState::Fired => panic!("fired twice?"),
TimeoutState::Waiting(handle) => Some(handle),
}
}
}
struct MySetReadiness(mio::SetReadiness);
impl Notify for MySetReadiness {

View File

@ -1,106 +0,0 @@
//! Support for creating futures that represent timeouts.
//!
//! This module contains the `Timeout` type which is a future that will resolve
//! at a particular point in the future.
use std::io;
use std::time::{Duration, Instant};
use futures::{Future, Poll, Async};
use reactor::{Remote, Handle};
use reactor::timeout_token::TimeoutToken;
/// A future representing the notification that a timeout has occurred.
///
/// Timeouts are created through the `Timeout::new` or
/// `Timeout::new_at` methods indicating when a timeout should fire at.
/// Note that timeouts are not intended for high resolution timers, but rather
/// they will likely fire some granularity after the exact instant that they're
/// otherwise indicated to fire at.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Timeout {
token: TimeoutToken,
when: Instant,
handle: Remote,
}
impl Timeout {
/// Creates a new timeout which will fire at `dur` time into the future.
///
/// This function will return a Result with the actual timeout object or an
/// error. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn new(dur: Duration, handle: &Handle) -> io::Result<Timeout> {
Timeout::new_at(Instant::now() + dur, handle)
}
/// Creates a new timeout which will fire at the time specified by `at`.
///
/// This function will return a Result with the actual timeout object or an
/// error. The timeout object itself is then a future which will be
/// set to fire at the specified point in the future.
pub fn new_at(at: Instant, handle: &Handle) -> io::Result<Timeout> {
Ok(Timeout {
token: try!(TimeoutToken::new(at, &handle)),
when: at,
handle: handle.remote().clone(),
})
}
/// Resets this timeout to an new timeout which will fire at the time
/// specified by `at`.
///
/// This method is usable even of this instance of `Timeout` has "already
/// fired". That is, if this future has resolved, calling this method means
/// that the future will still re-resolve at the specified instant.
///
/// If `at` is in the past then this future will immediately be resolved
/// (when `poll` is called).
///
/// Note that if any task is currently blocked on this future then that task
/// will be dropped. It is required to call `poll` again after this method
/// has been called to ensure that a task is blocked on this future.
pub fn reset(&mut self, at: Instant) {
self.when = at;
self.token.reset_timeout(self.when, &self.handle);
}
/// Polls this `Timeout` instance to see if it's elapsed, assuming the
/// current time is specified by `now`.
///
/// The `Future::poll` implementation for `Timeout` will call `Instant::now`
/// each time it's invoked, but in some contexts this can be a costly
/// operation. This method is provided to amortize the cost by avoiding
/// usage of `Instant::now`, assuming that it's been called elsewhere.
///
/// This function takes the assumed current time as the first parameter and
/// otherwise functions as this future's `poll` function. This will block a
/// task if one isn't already blocked or update a previous one if already
/// blocked.
fn poll_at(&mut self, now: Instant) -> Poll<(), io::Error> {
if self.when <= now {
Ok(Async::Ready(()))
} else {
self.token.update_timeout(&self.handle);
Ok(Async::NotReady)
}
}
}
impl Future for Timeout {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
// TODO: is this fast enough?
self.poll_at(Instant::now())
}
}
impl Drop for Timeout {
fn drop(&mut self) {
self.token.cancel_timeout(&self.handle);
}
}

View File

@ -1,57 +0,0 @@
use std::io;
use std::time::Instant;
use futures::task;
use reactor::{Message, Handle, Remote};
/// A token that identifies an active timeout.
#[derive(Debug)]
pub struct TimeoutToken {
token: usize,
}
impl TimeoutToken {
/// Adds a new timeout to get fired at the specified instant, notifying the
/// specified task.
pub fn new(at: Instant, handle: &Handle) -> io::Result<TimeoutToken> {
match handle.inner.upgrade() {
Some(inner) => {
let token = inner.borrow_mut().add_timeout(at);
Ok(TimeoutToken { token: token })
}
None => Err(io::Error::new(io::ErrorKind::Other, "event loop gone")),
}
}
/// Updates a previously added timeout to notify a new task instead.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn update_timeout(&self, handle: &Remote) {
handle.send(Message::UpdateTimeout(self.token, task::current()))
}
/// Resets previously added (or fired) timeout to an new timeout
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method.
pub fn reset_timeout(&mut self, at: Instant, handle: &Remote) {
handle.send(Message::ResetTimeout(self.token, at));
}
/// Cancel a previously added timeout.
///
/// # Panics
///
/// This method will panic if the timeout specified was not created by this
/// loop handle's `add_timeout` method or if called multiple times.
pub fn cancel_timeout(&self, handle: &Remote) {
debug!("cancel timeout {}", self.token);
handle.send(Message::CancelTimeout(self.token))
}
}

View File

@ -1,38 +0,0 @@
extern crate env_logger;
extern crate futures;
extern crate tokio;
use std::time::{Instant, Duration};
use futures::stream::{Stream};
use tokio::reactor::{Core, Interval};
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn single() {
drop(env_logger::init());
let mut l = t!(Core::new());
let dur = Duration::from_millis(10);
let start = Instant::now();
let interval = t!(Interval::new(dur, &l.handle()));
t!(l.run(interval.take(1).collect()));
assert!(start.elapsed() >= dur);
}
#[test]
fn two_times() {
drop(env_logger::init());
let mut l = t!(Core::new());
let dur = Duration::from_millis(10);
let start = Instant::now();
let interval = t!(Interval::new(dur, &l.handle()));
let result = t!(l.run(interval.take(2).collect()));
assert!(start.elapsed() >= dur*2);
assert_eq!(result, vec![(), ()]);
}

View File

@ -1,147 +0,0 @@
extern crate tokio;
extern crate env_logger;
extern crate futures;
use std::any::Any;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use futures::{Future, Poll};
use futures::future;
use futures::sync::oneshot;
use tokio::reactor::{Core, Timeout};
#[test]
fn simple() {
drop(env_logger::init());
let mut lp = Core::new().unwrap();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
lp.handle().spawn(future::lazy(|| {
tx1.send(1).unwrap();
Ok(())
}));
lp.remote().spawn(|_| {
future::lazy(|| {
tx2.send(2).unwrap();
Ok(())
})
});
assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2));
}
#[test]
fn simple_core_poll() {
drop(env_logger::init());
let mut lp = Core::new().unwrap();
let (tx, rx) = mpsc::channel();
let (tx1, tx2) = (tx.clone(), tx.clone());
lp.turn(Some(Duration::new(0, 0)));
lp.handle().spawn(future::lazy(move || {
tx1.send(1).unwrap();
Ok(())
}));
lp.turn(Some(Duration::new(0, 0)));
lp.handle().spawn(future::lazy(move || {
tx2.send(2).unwrap();
Ok(())
}));
assert_eq!(rx.try_recv().unwrap(), 1);
assert!(rx.try_recv().is_err());
lp.turn(Some(Duration::new(0, 0)));
assert_eq!(rx.try_recv().unwrap(), 2);
}
#[test]
fn spawn_in_poll() {
drop(env_logger::init());
let mut lp = Core::new().unwrap();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let remote = lp.remote();
lp.handle().spawn(future::lazy(move || {
tx1.send(1).unwrap();
remote.spawn(|_| {
future::lazy(|| {
tx2.send(2).unwrap();
Ok(())
})
});
Ok(())
}));
assert_eq!(lp.run(rx1.join(rx2)).unwrap(), (1, 2));
}
#[test]
fn drop_timeout_in_spawn() {
drop(env_logger::init());
let mut lp = Core::new().unwrap();
let (tx, rx) = oneshot::channel();
let remote = lp.remote();
thread::spawn(move || {
remote.spawn(|handle| {
drop(Timeout::new(Duration::new(1, 0), handle));
tx.send(()).unwrap();
Ok(())
});
});
lp.run(rx).unwrap();
}
#[test]
fn spawn_in_drop() {
drop(env_logger::init());
let mut lp = Core::new().unwrap();
let (tx, rx) = oneshot::channel();
let remote = lp.remote();
struct OnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}
struct MyFuture {
_data: Box<Any>,
}
impl Future for MyFuture {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
Ok(().into())
}
}
thread::spawn(move || {
let mut tx = Some(tx);
remote.spawn(|handle| {
let handle = handle.clone();
MyFuture {
_data: Box::new(OnDrop(move || {
let mut tx = tx.take();
handle.spawn_fn(move || {
tx.take().unwrap().send(()).unwrap();
Ok(())
});
})),
}
});
});
lp.run(rx).unwrap();
}

View File

@ -1,37 +0,0 @@
extern crate env_logger;
extern crate futures;
extern crate tokio;
use std::time::{Instant, Duration};
use tokio::reactor::{Core, Timeout};
macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}
#[test]
fn smoke() {
drop(env_logger::init());
let mut l = t!(Core::new());
let dur = Duration::from_millis(10);
let start = Instant::now();
let timeout = t!(Timeout::new(dur, &l.handle()));
t!(l.run(timeout));
assert!(start.elapsed() >= (dur / 2));
}
#[test]
fn two() {
drop(env_logger::init());
let mut l = t!(Core::new());
let dur = Duration::from_millis(10);
let timeout = t!(Timeout::new(dur, &l.handle()));
t!(l.run(timeout));
let timeout = t!(Timeout::new(dur, &l.handle()));
t!(l.run(timeout));
}