util: use ReusableBoxFuture for PollSemaphore (#3463)

This commit is contained in:
Rob Ede 2021-01-30 09:57:04 +00:00 committed by GitHub
parent 891aba5f71
commit 06d6adf4b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 14 deletions

View File

@ -47,12 +47,12 @@ futures-util = { version = "0.3.0", optional = true }
log = "0.4" log = "0.4"
pin-project-lite = "0.2.0" pin-project-lite = "0.2.0"
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
async-stream = "0.3.0"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.0.0", features = ["full"] } tokio = { version = "1.0.0", features = ["full"] }
tokio-test = { version = "0.4.0" } tokio-test = { version = "0.4.0" }
async-stream = "0.3.0"
futures = "0.3.0" futures = "0.3.0"
futures-test = "0.3.5" futures-test = "0.3.5"

View File

@ -1,31 +1,28 @@
use futures_core::Stream; use futures_core::{ready, Stream};
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
use super::ReusableBoxFuture;
/// A wrapper around [`Semaphore`] that provides a `poll_acquire` method. /// A wrapper around [`Semaphore`] that provides a `poll_acquire` method.
/// ///
/// [`Semaphore`]: tokio::sync::Semaphore /// [`Semaphore`]: tokio::sync::Semaphore
pub struct PollSemaphore { pub struct PollSemaphore {
semaphore: Arc<Semaphore>, semaphore: Arc<Semaphore>,
inner: Pin<Box<dyn Stream<Item = OwnedSemaphorePermit> + Send + Sync>>, permit_fut: ReusableBoxFuture<Result<OwnedSemaphorePermit, AcquireError>>,
} }
impl PollSemaphore { impl PollSemaphore {
/// Create a new `PollSemaphore`. /// Create a new `PollSemaphore`.
pub fn new(semaphore: Arc<Semaphore>) -> Self { pub fn new(semaphore: Arc<Semaphore>) -> Self {
let fut = Arc::clone(&semaphore).acquire_owned();
Self { Self {
semaphore: semaphore.clone(), semaphore,
inner: Box::pin(async_stream::stream! { permit_fut: ReusableBoxFuture::new(fut),
loop {
match semaphore.clone().acquire_owned().await {
Ok(permit) => yield permit,
Err(_closed) => break,
}
}
}),
} }
} }
@ -58,7 +55,14 @@ impl PollSemaphore {
/// the `Waker` from the `Context` passed to the most recent call is /// the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup. /// scheduled to receive a wakeup.
pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<Option<OwnedSemaphorePermit>> { pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<Option<OwnedSemaphorePermit>> {
self.inner.as_mut().poll_next(cx) match ready!(self.permit_fut.poll(cx)) {
Ok(permit) => {
let next_fut = Arc::clone(&self.semaphore).acquire_owned();
self.permit_fut.set(next_fut);
Poll::Ready(Some(permit))
}
Err(_closed) => Poll::Ready(None),
}
} }
} }