diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index d89b8b6dc..8a344f90d 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -253,6 +253,16 @@ impl Framed { &mut self.inner.state.write.buffer } + /// Returns backpressure boundary + pub fn backpressure_boundary(&self) -> usize { + self.inner.state.write.backpressure_boundary + } + + /// Updates backpressure boundary + pub fn set_backpressure_boundary(&mut self, boundary: usize) { + self.inner.state.write.backpressure_boundary = boundary; + } + /// Consumes the `Framed`, returning its underlying I/O stream. /// /// Note that care should be taken to not tamper with the underlying stream diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index ce1a6db87..8f3fa49b0 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -25,7 +25,6 @@ pin_project! { } const INITIAL_CAPACITY: usize = 8 * 1024; -const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY; #[derive(Debug)] pub(crate) struct ReadFrame { @@ -37,6 +36,7 @@ pub(crate) struct ReadFrame { pub(crate) struct WriteFrame { pub(crate) buffer: BytesMut, + pub(crate) backpressure_boundary: usize, } #[derive(Default)] @@ -60,6 +60,7 @@ impl Default for WriteFrame { fn default() -> Self { Self { buffer: BytesMut::with_capacity(INITIAL_CAPACITY), + backpressure_boundary: INITIAL_CAPACITY, } } } @@ -87,7 +88,10 @@ impl From for WriteFrame { buffer.reserve(INITIAL_CAPACITY - size); } - Self { buffer } + Self { + buffer, + backpressure_boundary: INITIAL_CAPACITY, + } } } @@ -256,7 +260,7 @@ where type Error = U::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.state.borrow().buffer.len() >= BACKPRESSURE_BOUNDARY { + if self.state.borrow().buffer.len() >= self.state.borrow().backpressure_boundary { self.as_mut().poll_flush(cx) } else { Poll::Ready(Ok(())) @@ -277,7 +281,7 @@ where let mut pinned = self.project(); while !pinned.state.borrow_mut().buffer.is_empty() { - let WriteFrame { buffer } = pinned.state.borrow_mut(); + let WriteFrame { buffer, .. } = pinned.state.borrow_mut(); trace!(remaining = buffer.len(), "writing;"); let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?; diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index aa4cec982..3f0a34081 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -123,6 +123,16 @@ impl FramedWrite { pub fn write_buffer_mut(&mut self) -> &mut BytesMut { &mut self.inner.state.buffer } + + /// Returns backpressure boundary + pub fn backpressure_boundary(&self) -> usize { + self.inner.state.backpressure_boundary + } + + /// Updates backpressure boundary + pub fn set_backpressure_boundary(&mut self, boundary: usize) { + self.inner.state.backpressure_boundary = boundary; + } } // This impl just defers to the underlying FramedImpl diff --git a/tokio-util/tests/framed_write.rs b/tokio-util/tests/framed_write.rs index 01d71b2b8..39091c0b1 100644 --- a/tokio-util/tests/framed_write.rs +++ b/tokio-util/tests/framed_write.rs @@ -109,12 +109,12 @@ fn write_hits_backpressure() { const ITER: usize = 2 * 1024; let mut mock = mock! { - // Block the `ITER`th write + // Block the `ITER*2`th write Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")), Ok(b"".to_vec()), }; - for i in 0..=ITER { + for i in 0..=ITER * 2 { let mut b = BytesMut::with_capacity(4); b.put_u32(i as u32); @@ -133,14 +133,15 @@ fn write_hits_backpressure() { // Push a new chunk mock.calls.push_back(Ok(b[..].to_vec())); } - // 1 'wouldblock', 4 * 2KB buffers, 1 b-byte buffer - assert_eq!(mock.calls.len(), 6); + // 1 'wouldblock', 8 * 2KB buffers, 1 b-byte buffer + assert_eq!(mock.calls.len(), 10); let mut task = task::spawn(()); let mut framed = FramedWrite::new(mock, U32Encoder); + framed.set_backpressure_boundary(ITER * 8); task.enter(|cx, _| { - // Send 8KB. This fills up FramedWrite2 buffer - for i in 0..ITER { + // Send 16KB. This fills up FramedWrite buffer + for i in 0..ITER * 2 { assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok()); assert!(pin!(framed).start_send(i as u32).is_ok()); } @@ -150,11 +151,11 @@ fn write_hits_backpressure() { assert!(pin!(framed).poll_ready(cx).is_pending()); // We poll again, forcing another flush, which this time succeeds - // The whole 8KB buffer is flushed + // The whole 16KB buffer is flushed assert!(assert_ready!(pin!(framed).poll_ready(cx)).is_ok()); // Send more data. This matches the final message expected by the mock - assert!(pin!(framed).start_send(ITER as u32).is_ok()); + assert!(pin!(framed).start_send((ITER * 2) as u32).is_ok()); // Flush the rest of the buffer assert!(assert_ready!(pin!(framed).poll_flush(cx)).is_ok());