tokio: remove wildcard in match patterns (#5970)

This commit is contained in:
Weijia Jiang 2023-09-24 04:05:44 +08:00 committed by GitHub
parent b161633b5f
commit 707fb4d0df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 180 additions and 198 deletions

View File

@ -126,22 +126,22 @@ impl Configuration {
}
fn build(&self) -> Result<FinalConfig, syn::Error> {
let flavor = self.flavor.unwrap_or(self.default_flavor);
use RuntimeFlavor::*;
use RuntimeFlavor as F;
let flavor = self.flavor.unwrap_or(self.default_flavor);
let worker_threads = match (flavor, self.worker_threads) {
(CurrentThread, Some((_, worker_threads_span))) => {
(F::CurrentThread, Some((_, worker_threads_span))) => {
let msg = format!(
"The `worker_threads` option requires the `multi_thread` runtime flavor. Use `#[{}(flavor = \"multi_thread\")]`",
self.macro_name(),
);
return Err(syn::Error::new(worker_threads_span, msg));
}
(CurrentThread, None) => None,
(Threaded, worker_threads) if self.rt_multi_thread_available => {
(F::CurrentThread, None) => None,
(F::Threaded, worker_threads) if self.rt_multi_thread_available => {
worker_threads.map(|(val, _span)| val)
}
(Threaded, _) => {
(F::Threaded, _) => {
let msg = if self.flavor.is_none() {
"The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled."
} else {
@ -152,14 +152,14 @@ impl Configuration {
};
let start_paused = match (flavor, self.start_paused) {
(Threaded, Some((_, start_paused_span))) => {
(F::Threaded, Some((_, start_paused_span))) => {
let msg = format!(
"The `start_paused` option requires the `current_thread` runtime flavor. Use `#[{}(flavor = \"current_thread\")]`",
self.macro_name(),
);
return Err(syn::Error::new(start_paused_span, msg));
}
(CurrentThread, Some((start_paused, _))) => Some(start_paused),
(F::CurrentThread, Some((start_paused, _))) => Some(start_paused),
(_, None) => None,
};

View File

@ -66,25 +66,23 @@ where
T: Stream,
U: Stream<Item = T::Item>,
{
use Poll::*;
let mut done = true;
match first.poll_next(cx) {
Ready(Some(val)) => return Ready(Some(val)),
Ready(None) => {}
Pending => done = false,
Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
Poll::Ready(None) => {}
Poll::Pending => done = false,
}
match second.poll_next(cx) {
Ready(Some(val)) => return Ready(Some(val)),
Ready(None) => {}
Pending => done = false,
Poll::Ready(Some(val)) => return Poll::Ready(Some(val)),
Poll::Ready(None) => {}
Poll::Pending => done = false,
}
if done {
Ready(None)
Poll::Ready(None)
} else {
Pending
Poll::Pending
}
}

View File

@ -518,8 +518,6 @@ where
{
/// Polls the next value, includes the vec entry index
fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
use Poll::*;
let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
let mut idx = start;
@ -527,8 +525,8 @@ where
let (_, stream) = &mut self.entries[idx];
match Pin::new(stream).poll_next(cx) {
Ready(Some(val)) => return Ready(Some((idx, val))),
Ready(None) => {
Poll::Ready(Some(val)) => return Poll::Ready(Some((idx, val))),
Poll::Ready(None) => {
// Remove the entry
self.entries.swap_remove(idx);
@ -542,7 +540,7 @@ where
idx = idx.wrapping_add(1) % self.entries.len();
}
}
Pending => {
Poll::Pending => {
idx = idx.wrapping_add(1) % self.entries.len();
}
}
@ -550,9 +548,9 @@ where
// If the map is empty, then the stream is complete.
if self.entries.is_empty() {
Ready(None)
Poll::Ready(None)
} else {
Pending
Poll::Pending
}
}
}

View File

@ -22,17 +22,17 @@
#[macro_export]
macro_rules! assert_ready {
($e:expr) => {{
use core::task::Poll::*;
use core::task::Poll;
match $e {
Ready(v) => v,
Pending => panic!("pending"),
Poll::Ready(v) => v,
Poll::Pending => panic!("pending"),
}
}};
($e:expr, $($msg:tt)+) => {{
use core::task::Poll::*;
use core::task::Poll;
match $e {
Ready(v) => v,
Pending => {
Poll::Ready(v) => v,
Poll::Pending => {
panic!("pending; {}", format_args!($($msg)+))
}
}
@ -127,17 +127,17 @@ macro_rules! assert_ready_err {
#[macro_export]
macro_rules! assert_pending {
($e:expr) => {{
use core::task::Poll::*;
use core::task::Poll;
match $e {
Pending => {}
Ready(v) => panic!("ready; value = {:?}", v),
Poll::Pending => {}
Poll::Ready(v) => panic!("ready; value = {:?}", v),
}
}};
($e:expr, $($msg:tt)+) => {{
use core::task::Poll::*;
use core::task::Poll;
match $e {
Pending => {}
Ready(v) => {
Poll::Pending => {}
Poll::Ready(v) => {
panic!("ready; value = {:?}; {}", v, format_args!($($msg)+))
}
}

View File

@ -12,7 +12,6 @@ use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
use std::task::Poll::*;
use std::task::{Context, Poll};
macro_rules! mock {
@ -39,10 +38,10 @@ macro_rules! assert_next_eq {
macro_rules! assert_next_pending {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(e))) => panic!("error = {:?}", e),
Ready(None) => panic!("done"),
Pending => {}
Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e),
Poll::Ready(None) => panic!("done"),
Poll::Pending => {}
});
}};
}
@ -50,10 +49,10 @@ macro_rules! assert_next_pending {
macro_rules! assert_next_err {
($io:ident) => {{
task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Ready(Some(Err(_))) => {}
Ready(None) => panic!("done"),
Pending => panic!("pending"),
Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
Poll::Ready(Some(Err(_))) => {}
Poll::Ready(None) => panic!("done"),
Poll::Pending => panic!("pending"),
});
}};
}
@ -186,11 +185,11 @@ fn read_single_frame_multi_packet_wait() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
Pending,
Poll::Pending,
data(b"\x00\x09abc"),
Pending,
Poll::Pending,
data(b"defghi"),
Pending,
Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@ -208,15 +207,15 @@ fn read_multi_frame_multi_packet_wait() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00"),
Pending,
Poll::Pending,
data(b"\x00\x09abc"),
Pending,
Poll::Pending,
data(b"defghi"),
Pending,
Poll::Pending,
data(b"\x00\x00\x00\x0312"),
Pending,
Poll::Pending,
data(b"3\x00\x00\x00\x0bhello world"),
Pending,
Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@ -250,9 +249,9 @@ fn read_incomplete_head() {
fn read_incomplete_head_multi() {
let io = FramedRead::new(
mock! {
Pending,
Poll::Pending,
data(b"\x00"),
Pending,
Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@ -268,9 +267,9 @@ fn read_incomplete_payload() {
let io = FramedRead::new(
mock! {
data(b"\x00\x00\x00\x09ab"),
Pending,
Poll::Pending,
data(b"cd"),
Pending,
Poll::Pending,
},
LengthDelimitedCodec::new(),
);
@ -310,7 +309,7 @@ fn read_update_max_frame_len_at_rest() {
fn read_update_max_frame_len_in_flight() {
let io = length_delimited::Builder::new().new_read(mock! {
data(b"\x00\x00\x00\x09abcd"),
Pending,
Poll::Pending,
data(b"efghi"),
data(b"\x00\x00\x00\x09abcdefghi"),
});
@ -533,9 +532,9 @@ fn write_single_multi_frame_multi_packet() {
fn write_single_frame_would_block() {
let io = FramedWrite::new(
mock! {
Pending,
Poll::Pending,
data(b"\x00\x00"),
Pending,
Poll::Pending,
data(b"\x00\x09"),
data(b"abcdefghi"),
flush(),
@ -640,7 +639,7 @@ fn write_update_max_frame_len_in_flight() {
let io = length_delimited::Builder::new().new_write(mock! {
data(b"\x00\x00\x00\x06"),
data(b"ab"),
Pending,
Poll::Pending,
data(b"cdef"),
flush(),
});
@ -701,8 +700,6 @@ enum Op {
Flush,
}
use self::Op::*;
impl AsyncRead for Mock {
fn poll_read(
mut self: Pin<&mut Self>,
@ -710,15 +707,15 @@ impl AsyncRead for Mock {
dst: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
match self.calls.pop_front() {
Some(Ready(Ok(Op::Data(data)))) => {
Some(Poll::Ready(Ok(Op::Data(data)))) => {
debug_assert!(dst.remaining() >= data.len());
dst.put_slice(&data);
Ready(Ok(()))
Poll::Ready(Ok(()))
}
Some(Ready(Ok(_))) => panic!(),
Some(Ready(Err(e))) => Ready(Err(e)),
Some(Pending) => Pending,
None => Ready(Ok(())),
Some(Poll::Ready(Ok(_))) => panic!(),
Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
Some(Poll::Pending) => Poll::Pending,
None => Poll::Ready(Ok(())),
}
}
}
@ -730,31 +727,31 @@ impl AsyncWrite for Mock {
src: &[u8],
) -> Poll<Result<usize, io::Error>> {
match self.calls.pop_front() {
Some(Ready(Ok(Op::Data(data)))) => {
Some(Poll::Ready(Ok(Op::Data(data)))) => {
let len = data.len();
assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
assert_eq!(&data[..], &src[..len]);
Ready(Ok(len))
Poll::Ready(Ok(len))
}
Some(Ready(Ok(_))) => panic!(),
Some(Ready(Err(e))) => Ready(Err(e)),
Some(Pending) => Pending,
None => Ready(Ok(0)),
Some(Poll::Ready(Ok(_))) => panic!(),
Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
Some(Poll::Pending) => Poll::Pending,
None => Poll::Ready(Ok(0)),
}
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match self.calls.pop_front() {
Some(Ready(Ok(Op::Flush))) => Ready(Ok(())),
Some(Ready(Ok(_))) => panic!(),
Some(Ready(Err(e))) => Ready(Err(e)),
Some(Pending) => Pending,
None => Ready(Ok(())),
Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())),
Some(Poll::Ready(Ok(_))) => panic!(),
Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
Some(Poll::Pending) => Poll::Pending,
None => Poll::Ready(Ok(())),
}
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Ready(Ok(()))
Poll::Ready(Ok(()))
}
}
@ -771,9 +768,9 @@ impl From<Vec<u8>> for Op {
}
fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
Ready(Ok(bytes.into()))
Poll::Ready(Ok(bytes.into()))
}
fn flush() -> Poll<io::Result<Op>> {
Ready(Ok(Flush))
Poll::Ready(Ok(Op::Flush))
}

View File

@ -2,7 +2,6 @@
//!
//! [`File`]: File
use self::State::*;
use crate::fs::{asyncify, OpenOptions};
use crate::io::blocking::Buf;
use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
@ -17,7 +16,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::task::Poll::*;
#[cfg(test)]
use super::mocks::JoinHandle;
@ -351,7 +349,7 @@ impl File {
inner.complete_inflight().await;
let mut buf = match inner.state {
Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
_ => unreachable!(),
};
@ -363,7 +361,7 @@ impl File {
let std = self.std.clone();
inner.state = Busy(spawn_blocking(move || {
inner.state = State::Busy(spawn_blocking(move || {
let res = if let Some(seek) = seek {
(&*std).seek(seek).and_then(|_| std.set_len(size))
} else {
@ -376,11 +374,11 @@ impl File {
}));
let (op, buf) = match inner.state {
Idle(_) => unreachable!(),
Busy(ref mut rx) => rx.await?,
State::Idle(_) => unreachable!(),
State::Busy(ref mut rx) => rx.await?,
};
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
match op {
Operation::Seek(res) => res.map(|pos| {
@ -532,51 +530,51 @@ impl AsyncRead for File {
loop {
match inner.state {
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
return Poll::Ready(Ok(()));
}
buf.ensure_capacity_for(dst);
let std = me.std.clone();
inner.state = Busy(spawn_blocking(move || {
inner.state = State::Busy(spawn_blocking(move || {
let res = buf.read_from(&mut &*std);
(Operation::Read(res), buf)
}));
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
match op {
Operation::Read(Ok(_)) => {
buf.copy_to(dst);
inner.state = Idle(Some(buf));
return Ready(Ok(()));
inner.state = State::Idle(Some(buf));
return Poll::Ready(Ok(()));
}
Operation::Read(Err(e)) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
return Ready(Err(e));
inner.state = State::Idle(Some(buf));
return Poll::Ready(Err(e));
}
Operation::Write(Ok(_)) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
continue;
}
Operation::Write(Err(e)) => {
assert!(inner.last_write_err.is_none());
inner.last_write_err = Some(e.kind());
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
}
Operation::Seek(result) => {
assert!(buf.is_empty());
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
if let Ok(pos) = result {
inner.pos = pos;
}
@ -595,11 +593,11 @@ impl AsyncSeek for File {
let inner = me.inner.get_mut();
match inner.state {
Busy(_) => Err(io::Error::new(
State::Busy(_) => Err(io::Error::new(
io::ErrorKind::Other,
"other file operation is pending, call poll_complete before start_seek",
)),
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
// Factor in any unread data from the buf
@ -613,7 +611,7 @@ impl AsyncSeek for File {
let std = me.std.clone();
inner.state = Busy(spawn_blocking(move || {
inner.state = State::Busy(spawn_blocking(move || {
let res = (&*std).seek(pos);
(Operation::Seek(res), buf)
}));
@ -628,10 +626,10 @@ impl AsyncSeek for File {
loop {
match inner.state {
Idle(_) => return Poll::Ready(Ok(inner.pos)),
Busy(ref mut rx) => {
State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
match op {
Operation::Read(_) => {}
@ -644,7 +642,7 @@ impl AsyncSeek for File {
if let Ok(pos) = res {
inner.pos = pos;
}
return Ready(res);
return Poll::Ready(res);
}
}
}
@ -664,12 +662,12 @@ impl AsyncWrite for File {
let inner = me.inner.get_mut();
if let Some(e) = inner.last_write_err.take() {
return Ready(Err(e.into()));
return Poll::Ready(Err(e.into()));
}
loop {
match inner.state {
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let seek = if !buf.is_empty() {
@ -694,13 +692,13 @@ impl AsyncWrite for File {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
inner.state = Busy(blocking_task_join_handle);
inner.state = State::Busy(blocking_task_join_handle);
return Ready(Ok(n));
return Poll::Ready(Ok(n));
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
match op {
Operation::Read(_) => {
@ -735,12 +733,12 @@ impl AsyncWrite for File {
let inner = me.inner.get_mut();
if let Some(e) = inner.last_write_err.take() {
return Ready(Err(e.into()));
return Poll::Ready(Err(e.into()));
}
loop {
match inner.state {
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
let seek = if !buf.is_empty() {
@ -765,13 +763,13 @@ impl AsyncWrite for File {
io::Error::new(io::ErrorKind::Other, "background task failed")
})?;
inner.state = Busy(blocking_task_join_handle);
inner.state = State::Busy(blocking_task_join_handle);
return Ready(Ok(n));
return Poll::Ready(Ok(n));
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
inner.state = Idle(Some(buf));
inner.state = State::Idle(Some(buf));
match op {
Operation::Read(_) => {
@ -896,21 +894,21 @@ impl Inner {
fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
if let Some(e) = self.last_write_err.take() {
return Ready(Err(e.into()));
return Poll::Ready(Err(e.into()));
}
let (op, buf) = match self.state {
Idle(_) => return Ready(Ok(())),
Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
State::Idle(_) => return Poll::Ready(Ok(())),
State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
};
// The buffer is not used here
self.state = Idle(Some(buf));
self.state = State::Idle(Some(buf));
match op {
Operation::Read(_) => Ready(Ok(())),
Operation::Write(res) => Ready(res),
Operation::Seek(_) => Ready(Ok(())),
Operation::Read(_) => Poll::Ready(Ok(())),
Operation::Write(res) => Poll::Ready(res),
Operation::Seek(_) => Poll::Ready(Ok(())),
}
}
}

View File

@ -124,12 +124,12 @@ impl<T> Future for JoinHandle<T> {
type Output = Result<T, io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::task::Poll::*;
use std::task::Poll;
match Pin::new(&mut self.rx).poll(cx) {
Ready(Ok(v)) => Ready(Ok(v)),
Ready(Err(e)) => panic!("error = {:?}", e),
Pending => Pending,
Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)),
Poll::Ready(Err(e)) => panic!("error = {:?}", e),
Poll::Pending => Poll::Pending,
}
}
}

View File

@ -6,11 +6,8 @@ use std::future::Future;
use std::io;
use std::io::prelude::*;
use std::pin::Pin;
use std::task::Poll::*;
use std::task::{Context, Poll};
use self::State::*;
/// `T` should not implement _both_ Read and Write.
#[derive(Debug)]
pub(crate) struct Blocking<T> {
@ -58,38 +55,38 @@ where
) -> Poll<io::Result<()>> {
loop {
match self.state {
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
if !buf.is_empty() {
buf.copy_to(dst);
*buf_cell = Some(buf);
return Ready(Ok(()));
return Poll::Ready(Ok(()));
}
buf.ensure_capacity_for(dst);
let mut inner = self.inner.take().unwrap();
self.state = Busy(sys::run(move || {
self.state = State::Busy(sys::run(move || {
let res = buf.read_from(&mut inner);
(res, buf, inner)
}));
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (res, mut buf, inner) = ready!(Pin::new(rx).poll(cx))?;
self.inner = Some(inner);
match res {
Ok(_) => {
buf.copy_to(dst);
self.state = Idle(Some(buf));
return Ready(Ok(()));
self.state = State::Idle(Some(buf));
return Poll::Ready(Ok(()));
}
Err(e) => {
assert!(buf.is_empty());
self.state = Idle(Some(buf));
return Ready(Err(e));
self.state = State::Idle(Some(buf));
return Poll::Ready(Err(e));
}
}
}
@ -109,7 +106,7 @@ where
) -> Poll<io::Result<usize>> {
loop {
match self.state {
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
let mut buf = buf_cell.take().unwrap();
assert!(buf.is_empty());
@ -117,7 +114,7 @@ where
let n = buf.copy_from(src);
let mut inner = self.inner.take().unwrap();
self.state = Busy(sys::run(move || {
self.state = State::Busy(sys::run(move || {
let n = buf.len();
let res = buf.write_to(&mut inner).map(|_| n);
@ -125,11 +122,11 @@ where
}));
self.need_flush = true;
return Ready(Ok(n));
return Poll::Ready(Ok(n));
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
self.state = State::Idle(Some(buf));
self.inner = Some(inner);
// If error, return
@ -144,24 +141,24 @@ where
let need_flush = self.need_flush;
match self.state {
// The buffer is not used here
Idle(ref mut buf_cell) => {
State::Idle(ref mut buf_cell) => {
if need_flush {
let buf = buf_cell.take().unwrap();
let mut inner = self.inner.take().unwrap();
self.state = Busy(sys::run(move || {
self.state = State::Busy(sys::run(move || {
let res = inner.flush().map(|_| 0);
(res, buf, inner)
}));
self.need_flush = false;
} else {
return Ready(Ok(()));
return Poll::Ready(Ok(()));
}
}
Busy(ref mut rx) => {
State::Busy(ref mut rx) => {
let (res, buf, inner) = ready!(Pin::new(rx).poll(cx))?;
self.state = Idle(Some(buf));
self.state = State::Idle(Some(buf));
self.inner = Some(inner);
// If error, return

View File

@ -604,20 +604,19 @@ enum TryCurrentErrorKind {
impl fmt::Debug for TryCurrentErrorKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*;
match self {
NoContext => f.write_str("NoContext"),
ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
TryCurrentErrorKind::NoContext => f.write_str("NoContext"),
TryCurrentErrorKind::ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
}
}
}
impl fmt::Display for TryCurrentError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use TryCurrentErrorKind::*;
use TryCurrentErrorKind as E;
match self.kind {
NoContext => f.write_str(CONTEXT_MISSING_ERROR),
ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
E::NoContext => f.write_str(CONTEXT_MISSING_ERROR),
E::ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
}
}
}

View File

@ -28,7 +28,6 @@ use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::*;
use std::task::Poll::*;
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt};
@ -391,7 +390,7 @@ impl Semaphore {
let mut waiters = loop {
// Has the semaphore closed?
if curr & Self::CLOSED > 0 {
return Ready(Err(AcquireError::closed()));
return Poll::Ready(Err(AcquireError::closed()));
}
let mut remaining = 0;
@ -436,7 +435,7 @@ impl Semaphore {
)
});
return Ready(Ok(()));
return Poll::Ready(Ok(()));
} else if lock.is_none() {
break self.waiters.lock();
}
@ -448,7 +447,7 @@ impl Semaphore {
};
if waiters.closed {
return Ready(Err(AcquireError::closed()));
return Poll::Ready(Err(AcquireError::closed()));
}
#[cfg(all(tokio_unstable, feature = "tracing"))]
@ -462,7 +461,7 @@ impl Semaphore {
if node.assign_permits(&mut acquired) {
self.add_permits_locked(acquired, waiters);
return Ready(Ok(()));
return Poll::Ready(Ok(()));
}
assert_eq!(acquired, 0);
@ -494,7 +493,7 @@ impl Semaphore {
drop(waiters);
drop(old_waker);
Pending
Poll::Pending
}
}
@ -572,15 +571,15 @@ impl Future for Acquire<'_> {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Pending => {
Poll::Pending => {
*queued = true;
Pending
Poll::Pending
}
Ready(r) => {
Poll::Ready(r) => {
coop.made_progress();
r?;
*queued = false;
Ready(Ok(()))
Poll::Ready(Ok(()))
}
};

View File

@ -241,7 +241,7 @@ impl<T, S: Semaphore> Rx<T, S> {
/// Receive the next value
pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
use super::block::Read::*;
use super::block::Read;
ready!(crate::trace::trace_leaf(cx));
@ -254,12 +254,12 @@ impl<T, S: Semaphore> Rx<T, S> {
macro_rules! try_recv {
() => {
match rx_fields.list.pop(&self.inner.tx) {
Some(Value(value)) => {
Some(Read::Value(value)) => {
self.inner.semaphore.add_permit();
coop.made_progress();
return Ready(Some(value));
}
Some(Closed) => {
Some(Read::Closed) => {
// TODO: This check may not be required as it most
// likely can only return `true` at this point. A
// channel is closed when all tx handles are

View File

@ -888,13 +888,11 @@ impl Notified<'_> {
}
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
use State::*;
let (notify, state, notify_waiters_calls, waiter) = self.project();
'outer_loop: loop {
match *state {
Init => {
State::Init => {
let curr = notify.state.load(SeqCst);
// Optimistically try acquiring a pending notification
@ -907,7 +905,7 @@ impl Notified<'_> {
if res.is_ok() {
// Acquired the notification
*state = Done;
*state = State::Done;
continue 'outer_loop;
}
@ -925,7 +923,7 @@ impl Notified<'_> {
// if notify_waiters has been called after the future
// was created, then we are done
if get_num_notify_waiters_calls(curr) != *notify_waiters_calls {
*state = Done;
*state = State::Done;
continue 'outer_loop;
}
@ -961,7 +959,7 @@ impl Notified<'_> {
match res {
Ok(_) => {
// Acquired the notification
*state = Done;
*state = State::Done;
continue 'outer_loop;
}
Err(actual) => {
@ -989,14 +987,14 @@ impl Notified<'_> {
// Insert the waiter into the linked list
waiters.push_front(NonNull::from(waiter));
*state = Waiting;
*state = State::Waiting;
drop(waiters);
drop(old_waker);
return Poll::Pending;
}
Waiting => {
State::Waiting => {
#[cfg(tokio_taskdump)]
if let Some(waker) = waker {
let mut ctx = Context::from_waker(waker);
@ -1009,7 +1007,7 @@ impl Notified<'_> {
drop(unsafe { waiter.waker.with_mut(|waker| (*waker).take()) });
waiter.notification.clear();
*state = Done;
*state = State::Done;
return Poll::Ready(());
}
@ -1034,7 +1032,7 @@ impl Notified<'_> {
drop(waiters);
drop(old_waker);
*state = Done;
*state = State::Done;
return Poll::Ready(());
}
@ -1056,7 +1054,7 @@ impl Notified<'_> {
// The list is used in `notify_waiters`, so it must be guarded.
unsafe { waiters.remove(NonNull::from(waiter)) };
*state = Done;
*state = State::Done;
} else {
// Safety: we hold the lock, so we can modify the waker.
unsafe {
@ -1090,7 +1088,7 @@ impl Notified<'_> {
// Drop the old waker after releasing the lock.
drop(old_waker);
}
Done => {
State::Done => {
#[cfg(tokio_taskdump)]
if let Some(waker) = waker {
let mut ctx = Context::from_waker(waker);
@ -1113,15 +1111,13 @@ impl Future for Notified<'_> {
impl Drop for Notified<'_> {
fn drop(&mut self) {
use State::*;
// Safety: The type only transitions to a "Waiting" state when pinned.
let (notify, state, _, waiter) = unsafe { Pin::new_unchecked(self).project() };
// This is where we ensure safety. The `Notified` value is being
// dropped, which means we must ensure that the waiter entry is no
// longer stored in the linked list.
if matches!(*state, Waiting) {
if matches!(*state, State::Waiting) {
let mut waiters = notify.waiters.lock();
let mut notify_state = notify.state.load(SeqCst);

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
#[test]
fn smoke() {
use crate::sync::mpsc::block::Read::*;
use crate::sync::mpsc::block::Read;
const NUM_TX: usize = 2;
const NUM_MSG: usize = 2;
@ -28,7 +28,7 @@ fn smoke() {
loop {
match rx.pop(&tx) {
Some(Value((th, v))) => {
Some(Read::Value((th, v))) => {
assert_eq!(v, next[th]);
next[th] += 1;
@ -36,7 +36,7 @@ fn smoke() {
break;
}
}
Some(Closed) => {
Some(Read::Closed) => {
panic!();
}
None => {

View File

@ -1,6 +1,5 @@
//! Time error types.
use self::Kind::*;
use std::error;
use std::fmt;
@ -57,7 +56,7 @@ pub(crate) enum InsertError {
impl Error {
/// Creates an error representing a shutdown timer.
pub fn shutdown() -> Error {
Error(Shutdown)
Error(Kind::Shutdown)
}
/// Returns `true` if the error was caused by the timer being shutdown.
@ -67,7 +66,7 @@ impl Error {
/// Creates an error representing a timer at capacity.
pub fn at_capacity() -> Error {
Error(AtCapacity)
Error(Kind::AtCapacity)
}
/// Returns `true` if the error was caused by the timer being at capacity.
@ -77,7 +76,7 @@ impl Error {
/// Creates an error representing a misconfigured timer.
pub fn invalid() -> Error {
Error(Invalid)
Error(Kind::Invalid)
}
/// Returns `true` if the error was caused by the timer being misconfigured.
@ -90,11 +89,12 @@ impl error::Error for Error {}
impl fmt::Display for Error {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
use self::Kind::*;
let descr = match self.0 {
Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime",
AtCapacity => "timer is at capacity and cannot create a new entry",
Invalid => "timer duration exceeds maximum duration",
Kind::Shutdown => {
"the timer is shutdown, must be called from the context of Tokio runtime"
}
Kind::AtCapacity => "timer is at capacity and cannot create a new entry",
Kind::Invalid => "timer duration exceeds maximum duration",
};
write!(fmt, "{}", descr)
}