diff --git a/embassy-stm32/src/can/bxcan/mod.rs b/embassy-stm32/src/can/bxcan/mod.rs index 8165364f5..c0b3c730b 100644 --- a/embassy-stm32/src/can/bxcan/mod.rs +++ b/embassy-stm32/src/can/bxcan/mod.rs @@ -17,7 +17,7 @@ use self::registers::{Registers, RxFifo}; pub use super::common::{BufferedCanReceiver, BufferedCanSender}; use super::frame::{Envelope, Frame}; use super::util; -use crate::can::enums::{BusError, TryReadError}; +use crate::can::enums::{BusError, InternalOperation, TryReadError}; use crate::gpio::{AfType, OutputType, Pull, Speed}; use crate::interrupt::typelevel::Interrupt; use crate::rcc::{self, RccPeripheral}; @@ -685,22 +685,18 @@ impl<'d, const TX_BUF_SIZE: usize> BufferedCanTx<'d, TX_BUF_SIZE> { /// Returns a sender that can be used for sending CAN frames. pub fn writer(&self) -> BufferedCanSender { + (self.info.internal_operation)(InternalOperation::NotifySenderCreated); BufferedCanSender { tx_buf: self.tx_buf.sender().into(), waker: self.info.tx_waker, + internal_operation: self.info.internal_operation, } } } impl<'d, const TX_BUF_SIZE: usize> Drop for BufferedCanTx<'d, TX_BUF_SIZE> { fn drop(&mut self) { - critical_section::with(|_| { - let state = self.state as *const State; - unsafe { - let mut_state = state as *mut State; - (*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - } - }); + (self.info.internal_operation)(InternalOperation::NotifySenderDestroyed); } } @@ -825,7 +821,11 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> { /// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver. pub fn reader(&self) -> BufferedCanReceiver { - self.rx_buf.receiver().into() + (self.info.internal_operation)(InternalOperation::NotifyReceiverCreated); + BufferedCanReceiver { + rx_buf: self.rx_buf.receiver().into(), + internal_operation: self.info.internal_operation, + } } /// Accesses the filter banks owned by this CAN peripheral. @@ -839,13 +839,7 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> { impl<'d, const RX_BUF_SIZE: usize> Drop for BufferedCanRx<'d, RX_BUF_SIZE> { fn drop(&mut self) { - critical_section::with(|_| { - let state = self.state as *const State; - unsafe { - let mut_state = state as *mut State; - (*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - } - }); + (self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed); } } @@ -1048,6 +1042,8 @@ pub(crate) struct State { pub(crate) rx_mode: RxMode, pub(crate) tx_mode: TxMode, pub err_waker: AtomicWaker, + receiver_instance_count: usize, + sender_instance_count: usize, } impl State { @@ -1056,6 +1052,8 @@ impl State { rx_mode: RxMode::NonBuffered(AtomicWaker::new()), tx_mode: TxMode::NonBuffered(AtomicWaker::new()), err_waker: AtomicWaker::new(), + receiver_instance_count: 1, + sender_instance_count: 1, } } } @@ -1067,6 +1065,7 @@ pub(crate) struct Info { rx1_interrupt: crate::interrupt::Interrupt, sce_interrupt: crate::interrupt::Interrupt, tx_waker: fn(), + internal_operation: fn(InternalOperation), /// The total number of filter banks available to the instance. /// @@ -1079,6 +1078,7 @@ trait SealedInstance { fn regs() -> crate::pac::can::Can; fn state() -> &'static State; unsafe fn mut_state() -> &'static mut State; + fn internal_operation(val: InternalOperation); } /// CAN instance trait. @@ -1136,6 +1136,7 @@ foreach_peripheral!( rx1_interrupt: crate::_generated::peripheral_interrupts::$inst::RX1::IRQ, sce_interrupt: crate::_generated::peripheral_interrupts::$inst::SCE::IRQ, tx_waker: crate::_generated::peripheral_interrupts::$inst::TX::pend, + internal_operation: peripherals::$inst::internal_operation, num_filter_banks: peripherals::$inst::NUM_FILTER_BANKS, }; &INFO @@ -1151,6 +1152,37 @@ foreach_peripheral!( fn state() -> &'static State { unsafe { peripherals::$inst::mut_state() } } + + + fn internal_operation(val: InternalOperation) { + critical_section::with(|_| { + //let state = self.state as *const State; + unsafe { + //let mut_state = state as *mut State; + let mut_state = peripherals::$inst::mut_state(); + match val { + InternalOperation::NotifySenderCreated => { + mut_state.sender_instance_count += 1; + } + InternalOperation::NotifySenderDestroyed => { + mut_state.sender_instance_count -= 1; + if ( 0 == mut_state.sender_instance_count) { + (*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); + } + } + InternalOperation::NotifyReceiverCreated => { + mut_state.receiver_instance_count += 1; + } + InternalOperation::NotifyReceiverDestroyed => { + mut_state.receiver_instance_count -= 1; + if ( 0 == mut_state.receiver_instance_count) { + (*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); + } + } + } + } + }); + } } impl Instance for peripherals::$inst { diff --git a/embassy-stm32/src/can/common.rs b/embassy-stm32/src/can/common.rs index a54b54f6e..e12111910 100644 --- a/embassy-stm32/src/can/common.rs +++ b/embassy-stm32/src/can/common.rs @@ -22,22 +22,22 @@ pub(crate) struct FdBufferedTxInner { } /// Sender that can be used for sending CAN frames. -#[derive(Copy, Clone)] -pub struct BufferedCanSender { - pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'static, Frame>, +pub struct BufferedSender<'ch, FRAME> { + pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'ch, FRAME>, pub(crate) waker: fn(), + pub(crate) internal_operation: fn(InternalOperation), } -impl BufferedCanSender { +impl<'ch, FRAME> BufferedSender<'ch, FRAME> { /// Async write frame to TX buffer. - pub fn try_write(&mut self, frame: Frame) -> Result<(), embassy_sync::channel::TrySendError> { + pub fn try_write(&mut self, frame: FRAME) -> Result<(), embassy_sync::channel::TrySendError> { self.tx_buf.try_send(frame)?; (self.waker)(); Ok(()) } /// Async write frame to TX buffer. - pub async fn write(&mut self, frame: Frame) { + pub async fn write(&mut self, frame: FRAME) { self.tx_buf.send(frame).await; (self.waker)(); } @@ -48,5 +48,77 @@ impl BufferedCanSender { } } +impl<'ch, FRAME> Clone for BufferedSender<'ch, FRAME> { + fn clone(&self) -> Self { + (self.internal_operation)(InternalOperation::NotifySenderCreated); + Self { + tx_buf: self.tx_buf, + waker: self.waker, + internal_operation: self.internal_operation, + } + } +} + +impl<'ch, FRAME> Drop for BufferedSender<'ch, FRAME> { + fn drop(&mut self) { + (self.internal_operation)(InternalOperation::NotifySenderDestroyed); + } +} + +/// Sender that can be used for sending Classic CAN frames. +pub type BufferedCanSender = BufferedSender<'static, Frame>; + /// Receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver. -pub type BufferedCanReceiver = embassy_sync::channel::DynamicReceiver<'static, Result>; +pub struct BufferedReceiver<'ch, ENVELOPE> { + pub(crate) rx_buf: embassy_sync::channel::DynamicReceiver<'ch, Result>, + pub(crate) internal_operation: fn(InternalOperation), +} + +impl<'ch, ENVELOPE> BufferedReceiver<'ch, ENVELOPE> { + /// Receive the next frame. + /// + /// See [`Channel::receive()`]. + pub fn receive(&self) -> embassy_sync::channel::DynamicReceiveFuture<'_, Result> { + self.rx_buf.receive() + } + + /// Attempt to immediately receive the next frame. + /// + /// See [`Channel::try_receive()`] + pub fn try_receive(&self) -> Result, embassy_sync::channel::TryReceiveError> { + self.rx_buf.try_receive() + } + + /// Allows a poll_fn to poll until the channel is ready to receive + /// + /// See [`Channel::poll_ready_to_receive()`] + pub fn poll_ready_to_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<()> { + self.rx_buf.poll_ready_to_receive(cx) + } + + /// Poll the channel for the next frame + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll> { + self.rx_buf.poll_receive(cx) + } +} + +impl<'ch, ENVELOPE> Clone for BufferedReceiver<'ch, ENVELOPE> { + fn clone(&self) -> Self { + (self.internal_operation)(InternalOperation::NotifyReceiverCreated); + Self { + rx_buf: self.rx_buf, + internal_operation: self.internal_operation, + } + } +} + +impl<'ch, ENVELOPE> Drop for BufferedReceiver<'ch, ENVELOPE> { + fn drop(&mut self) { + (self.internal_operation)(InternalOperation::NotifyReceiverDestroyed); + } +} + +/// A BufferedCanReceiver for Classic CAN frames. +pub type BufferedCanReceiver = BufferedReceiver<'static, Envelope>; diff --git a/embassy-stm32/src/can/enums.rs b/embassy-stm32/src/can/enums.rs index a5cca424d..97cb47640 100644 --- a/embassy-stm32/src/can/enums.rs +++ b/embassy-stm32/src/can/enums.rs @@ -68,3 +68,17 @@ pub enum TryReadError { /// Receive buffer is empty Empty, } + +/// Internal Operation +#[derive(Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum InternalOperation { + /// Notify receiver created + NotifyReceiverCreated, + /// Notify receiver destroyed + NotifyReceiverDestroyed, + /// Notify sender created + NotifySenderCreated, + /// Notify sender destroyed + NotifySenderDestroyed, +} diff --git a/embassy-stm32/src/can/fdcan.rs b/embassy-stm32/src/can/fdcan.rs index c549313f3..f950b6f99 100644 --- a/embassy-stm32/src/can/fdcan.rs +++ b/embassy-stm32/src/can/fdcan.rs @@ -6,7 +6,7 @@ use core::task::Poll; use embassy_hal_internal::interrupt::InterruptExt; use embassy_hal_internal::{into_ref, PeripheralRef}; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; +use embassy_sync::channel::Channel; use embassy_sync::waitqueue::AtomicWaker; use crate::can::fd::peripheral::Registers; @@ -478,28 +478,28 @@ impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> BufferedCan<'d, /// Returns a sender that can be used for sending CAN frames. pub fn writer(&self) -> BufferedCanSender { + (self.info.internal_operation)(InternalOperation::NotifySenderCreated); BufferedCanSender { tx_buf: self.tx_buf.sender().into(), waker: self.info.tx_waker, + internal_operation: self.info.internal_operation, } } /// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver. pub fn reader(&self) -> BufferedCanReceiver { - self.rx_buf.receiver().into() + (self.info.internal_operation)(InternalOperation::NotifyReceiverCreated); + BufferedCanReceiver { + rx_buf: self.rx_buf.receiver().into(), + internal_operation: self.info.internal_operation, + } } } impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> Drop for BufferedCan<'d, TX_BUF_SIZE, RX_BUF_SIZE> { fn drop(&mut self) { - critical_section::with(|_| { - let state = self.state as *const State; - unsafe { - let mut_state = state as *mut State; - (*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - (*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - } - }); + (self.info.internal_operation)(InternalOperation::NotifySenderDestroyed); + (self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed); } } @@ -509,35 +509,11 @@ pub type RxFdBuf = Channel = Channel; -/// Sender that can be used for sending CAN frames. -#[derive(Copy, Clone)] -pub struct BufferedFdCanSender { - tx_buf: DynamicSender<'static, FdFrame>, - waker: fn(), -} - -impl BufferedFdCanSender { - /// Async write frame to TX buffer. - pub fn try_write(&mut self, frame: FdFrame) -> Result<(), embassy_sync::channel::TrySendError> { - self.tx_buf.try_send(frame)?; - (self.waker)(); - Ok(()) - } - - /// Async write frame to TX buffer. - pub async fn write(&mut self, frame: FdFrame) { - self.tx_buf.send(frame).await; - (self.waker)(); - } - - /// Allows a poll_fn to poll until the channel is ready to write - pub fn poll_ready_to_send(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<()> { - self.tx_buf.poll_ready_to_send(cx) - } -} +/// Sender that can be used for sending Classic CAN frames. +pub type BufferedFdCanSender = super::common::BufferedSender<'static, FdFrame>; /// Receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver. -pub type BufferedFdCanReceiver = DynamicReceiver<'static, Result>; +pub type BufferedFdCanReceiver = super::common::BufferedReceiver<'static, FdEnvelope>; /// Buffered FDCAN Instance pub struct BufferedCanFd<'d, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> { @@ -608,28 +584,28 @@ impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> BufferedCanFd<' /// Returns a sender that can be used for sending CAN frames. pub fn writer(&self) -> BufferedFdCanSender { + (self.info.internal_operation)(InternalOperation::NotifySenderCreated); BufferedFdCanSender { tx_buf: self.tx_buf.sender().into(), waker: self.info.tx_waker, + internal_operation: self.info.internal_operation, } } /// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver. pub fn reader(&self) -> BufferedFdCanReceiver { - self.rx_buf.receiver().into() + (self.info.internal_operation)(InternalOperation::NotifyReceiverCreated); + BufferedFdCanReceiver { + rx_buf: self.rx_buf.receiver().into(), + internal_operation: self.info.internal_operation, + } } } impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> Drop for BufferedCanFd<'d, TX_BUF_SIZE, RX_BUF_SIZE> { fn drop(&mut self) { - critical_section::with(|_| { - let state = self.state as *const State; - unsafe { - let mut_state = state as *mut State; - (*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - (*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); - } - }); + (self.info.internal_operation)(InternalOperation::NotifySenderDestroyed); + (self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed); } } @@ -922,6 +898,8 @@ struct State { pub rx_mode: RxMode, pub tx_mode: TxMode, pub ns_per_timer_tick: u64, + receiver_instance_count: usize, + sender_instance_count: usize, pub err_waker: AtomicWaker, } @@ -933,6 +911,8 @@ impl State { tx_mode: TxMode::NonBuffered(AtomicWaker::new()), ns_per_timer_tick: 0, err_waker: AtomicWaker::new(), + receiver_instance_count: 1, + sender_instance_count: 1, } } } @@ -942,6 +922,7 @@ struct Info { interrupt0: crate::interrupt::Interrupt, _interrupt1: crate::interrupt::Interrupt, tx_waker: fn(), + internal_operation: fn(InternalOperation), } impl Info { @@ -970,6 +951,7 @@ trait SealedInstance { fn registers() -> crate::can::fd::peripheral::Registers; fn state() -> &'static State; unsafe fn mut_state() -> &'static mut State; + fn internal_operation(val: InternalOperation); fn calc_timestamp(ns_per_timer_tick: u64, ts_val: u16) -> Timestamp; } @@ -992,12 +974,42 @@ macro_rules! impl_fdcan { impl SealedInstance for peripherals::$inst { const MSG_RAM_OFFSET: usize = $msg_ram_offset; + fn internal_operation(val: InternalOperation) { + critical_section::with(|_| { + //let state = self.state as *const State; + unsafe { + //let mut_state = state as *mut State; + let mut_state = peripherals::$inst::mut_state(); + match val { + InternalOperation::NotifySenderCreated => { + mut_state.sender_instance_count += 1; + } + InternalOperation::NotifySenderDestroyed => { + mut_state.sender_instance_count -= 1; + if ( 0 == mut_state.sender_instance_count) { + (*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); + } + } + InternalOperation::NotifyReceiverCreated => { + mut_state.receiver_instance_count += 1; + } + InternalOperation::NotifyReceiverDestroyed => { + mut_state.receiver_instance_count -= 1; + if ( 0 == mut_state.receiver_instance_count) { + (*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new()); + } + } + } + } + }); + } fn info() -> &'static Info { static INFO: Info = Info { regs: Registers{regs: crate::pac::$inst, msgram: crate::pac::$msg_ram_inst, msg_ram_offset: $msg_ram_offset}, interrupt0: crate::_generated::peripheral_interrupts::$inst::IT0::IRQ, _interrupt1: crate::_generated::peripheral_interrupts::$inst::IT1::IRQ, tx_waker: crate::_generated::peripheral_interrupts::$inst::IT0::pend, + internal_operation: peripherals::$inst::internal_operation, }; &INFO }