codec: update stream impl for Framed to return None after Err (#4166)

This commit is contained in:
Bhargav 2021-10-26 07:56:15 -07:00 committed by GitHub
parent 827694a9e3
commit 0c68b89452
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 26 deletions

View File

@ -106,6 +106,7 @@ where
eof: false, eof: false,
is_readable: false, is_readable: false,
buffer: BytesMut::with_capacity(capacity), buffer: BytesMut::with_capacity(capacity),
has_errored: false,
}, },
write: WriteFrame::default(), write: WriteFrame::default(),
}, },

View File

@ -27,10 +27,12 @@ pin_project! {
const INITIAL_CAPACITY: usize = 8 * 1024; const INITIAL_CAPACITY: usize = 8 * 1024;
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
#[derive(Debug)]
pub(crate) struct ReadFrame { pub(crate) struct ReadFrame {
pub(crate) eof: bool, pub(crate) eof: bool,
pub(crate) is_readable: bool, pub(crate) is_readable: bool,
pub(crate) buffer: BytesMut, pub(crate) buffer: BytesMut,
pub(crate) has_errored: bool,
} }
pub(crate) struct WriteFrame { pub(crate) struct WriteFrame {
@ -49,6 +51,7 @@ impl Default for ReadFrame {
eof: false, eof: false,
is_readable: false, is_readable: false,
buffer: BytesMut::with_capacity(INITIAL_CAPACITY), buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
has_errored: false,
} }
} }
} }
@ -72,6 +75,7 @@ impl From<BytesMut> for ReadFrame {
buffer, buffer,
is_readable: size > 0, is_readable: size > 0,
eof: false, eof: false,
has_errored: false,
} }
} }
} }
@ -126,30 +130,42 @@ where
// //
// The initial state is `reading`. // The initial state is `reading`.
// //
// | state | eof | is_readable | // | state | eof | is_readable | has_errored |
// |---------|-------|-------------| // |---------|-------|-------------|-------------|
// | reading | false | false | // | reading | false | false | false |
// | framing | false | true | // | framing | false | true | false |
// | pausing | true | true | // | pausing | true | true | false |
// | paused | true | false | // | paused | true | false | false |
// // | errored | <any> | <any> | true |
// `decode_eof` // `decode_eof` returns Err
// returns `Some` read 0 bytes // ┌────────────────────────────────────────────────────────┐
// │ │ │ │ // `decode_eof` returns │ │
// │ ▼ │ ▼ // `Ok(Some)` │ │
// ┌───────┐ `decode_eof` ┌──────┐ // ┌─────┐ │ `decode_eof` returns After returning │
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐ // Read 0 bytes ├─────▼──┴┐ `Ok(None)` ┌────────┐ ◄───┐ `None` ┌───▼─────┐
// │ └───────┘ └──────┘ │ // ┌────────────────►│ Pausing ├───────────────────────►│ Paused ├─┐ └───────────┤ Errored │
// pending read┐ │ ┌──────┐ │ ▲ │ // │ └─────────┘ └─┬──▲───┘ │ └───▲───▲─┘
// │ │ │ │ │ │ │ │ // Pending read │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read // ┌──────┐ │ `decode` returns `Some` │ └─────┘ │ │
// │ ╔═══════╗ ┌───────┐◀─┘ │ // │ │ │ ┌──────┐ │ Pending │ │
// └──║reading║─read n>0 bytes─▶│framing│ │ // │ ┌────▼──┴─┐ Read n>0 bytes ┌┴──────▼─┐ read n>0 bytes │ read │ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘ // └─┤ Reading ├───────────────►│ Framing │◄────────────────────────┘ │ │
// ▲ │ // └──┬─▲────┘ └─────┬──┬┘ │ │
// │ │ // │ │ │ │ `decode` returns Err │ │
// └─`decode` returns `None`─┘ // │ └───decode` returns `None`──┘ └───────────────────────────────────────────────────────┘ │
// │ read returns Err │
// └────────────────────────────────────────────────────────────────────────────────────────────┘
loop { loop {
// Return `None` if we have encountered an error from the underlying decoder
// See: https://github.com/tokio-rs/tokio/issues/3976
if state.has_errored {
// preparing has_errored -> paused
trace!("Returning None and setting paused");
state.is_readable = false;
state.has_errored = false;
return Poll::Ready(None);
}
// Repeatedly call `decode` or `decode_eof` while the buffer is "readable", // Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame. // i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`. // Both signal that there is no such data by returning `None`.
@ -165,7 +181,11 @@ where
// pausing or framing // pausing or framing
if state.eof { if state.eof {
// pausing // pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?; let frame = pinned.codec.decode_eof(&mut state.buffer).map_err(|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
})?;
if frame.is_none() { if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused state.is_readable = false; // prepare pausing -> paused
} }
@ -176,7 +196,11 @@ where
// framing // framing
trace!("attempting to decode a frame"); trace!("attempting to decode a frame");
if let Some(frame) = pinned.codec.decode(&mut state.buffer)? { if let Some(frame) = pinned.codec.decode(&mut state.buffer).map_err(|op| {
trace!("Got an error, going to errored state");
state.has_errored = true;
op
})? {
trace!("frame decoded from buffer"); trace!("frame decoded from buffer");
// implicit framing -> framing // implicit framing -> framing
return Poll::Ready(Some(Ok(frame))); return Poll::Ready(Some(Ok(frame)));
@ -190,7 +214,13 @@ where
// Make sure we've got room for at least one byte to read to ensure // Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF. // that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1); state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer).map_err(
|err| {
trace!("Got an error, going to errored state");
state.has_errored = true;
err
},
)? {
Poll::Ready(ct) => ct, Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused // implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,

View File

@ -51,6 +51,7 @@ where
eof: false, eof: false,
is_readable: false, is_readable: false,
buffer: BytesMut::with_capacity(capacity), buffer: BytesMut::with_capacity(capacity),
has_errored: false,
}, },
}, },
} }

View File

@ -0,0 +1,38 @@
use futures_core::stream::Stream;
use std::{io, pin::Pin};
use tokio_test::{assert_ready, io::Builder, task};
use tokio_util::codec::{BytesCodec, FramedRead};
macro_rules! pin {
($id:ident) => {
Pin::new(&mut $id)
};
}
macro_rules! assert_read {
($e:expr, $n:expr) => {{
let val = assert_ready!($e);
assert_eq!(val.unwrap().unwrap(), $n);
}};
}
#[tokio::test]
async fn return_none_after_error() {
let mut io = FramedRead::new(
Builder::new()
.read(b"abcdef")
.read_error(io::Error::new(io::ErrorKind::Other, "Resource errored out"))
.read(b"more data")
.build(),
BytesCodec::new(),
);
let mut task = task::spawn(());
task.enter(|cx, _| {
assert_read!(pin!(io).poll_next(cx), b"abcdef".to_vec());
assert!(assert_ready!(pin!(io).poll_next(cx)).unwrap().is_err());
assert!(assert_ready!(pin!(io).poll_next(cx)).is_none());
assert_read!(pin!(io).poll_next(cx), b"more data".to_vec());
})
}