Reference count senders and receivers so that we don't close down early.

This commit is contained in:
Corey Schuhen 2025-03-09 13:17:39 +10:00
parent 7c49f482d7
commit 424d20727e
4 changed files with 199 additions and 69 deletions

View File

@ -17,7 +17,7 @@ use self::registers::{Registers, RxFifo};
pub use super::common::{BufferedCanReceiver, BufferedCanSender};
use super::frame::{Envelope, Frame};
use super::util;
use crate::can::enums::{BusError, TryReadError};
use crate::can::enums::{BusError, InternalOperation, TryReadError};
use crate::gpio::{AfType, OutputType, Pull, Speed};
use crate::interrupt::typelevel::Interrupt;
use crate::rcc::{self, RccPeripheral};
@ -685,22 +685,18 @@ impl<'d, const TX_BUF_SIZE: usize> BufferedCanTx<'d, TX_BUF_SIZE> {
/// Returns a sender that can be used for sending CAN frames.
pub fn writer(&self) -> BufferedCanSender {
(self.info.internal_operation)(InternalOperation::NotifySenderCreated);
BufferedCanSender {
tx_buf: self.tx_buf.sender().into(),
waker: self.info.tx_waker,
internal_operation: self.info.internal_operation,
}
}
}
impl<'d, const TX_BUF_SIZE: usize> Drop for BufferedCanTx<'d, TX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifySenderDestroyed);
}
}
@ -825,7 +821,11 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> {
/// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub fn reader(&self) -> BufferedCanReceiver {
self.rx_buf.receiver().into()
(self.info.internal_operation)(InternalOperation::NotifyReceiverCreated);
BufferedCanReceiver {
rx_buf: self.rx_buf.receiver().into(),
internal_operation: self.info.internal_operation,
}
}
/// Accesses the filter banks owned by this CAN peripheral.
@ -839,13 +839,7 @@ impl<'d, const RX_BUF_SIZE: usize> BufferedCanRx<'d, RX_BUF_SIZE> {
impl<'d, const RX_BUF_SIZE: usize> Drop for BufferedCanRx<'d, RX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}
@ -1048,6 +1042,8 @@ pub(crate) struct State {
pub(crate) rx_mode: RxMode,
pub(crate) tx_mode: TxMode,
pub err_waker: AtomicWaker,
receiver_instance_count: usize,
sender_instance_count: usize,
}
impl State {
@ -1056,6 +1052,8 @@ impl State {
rx_mode: RxMode::NonBuffered(AtomicWaker::new()),
tx_mode: TxMode::NonBuffered(AtomicWaker::new()),
err_waker: AtomicWaker::new(),
receiver_instance_count: 1,
sender_instance_count: 1,
}
}
}
@ -1067,6 +1065,7 @@ pub(crate) struct Info {
rx1_interrupt: crate::interrupt::Interrupt,
sce_interrupt: crate::interrupt::Interrupt,
tx_waker: fn(),
internal_operation: fn(InternalOperation),
/// The total number of filter banks available to the instance.
///
@ -1079,6 +1078,7 @@ trait SealedInstance {
fn regs() -> crate::pac::can::Can;
fn state() -> &'static State;
unsafe fn mut_state() -> &'static mut State;
fn internal_operation(val: InternalOperation);
}
/// CAN instance trait.
@ -1136,6 +1136,7 @@ foreach_peripheral!(
rx1_interrupt: crate::_generated::peripheral_interrupts::$inst::RX1::IRQ,
sce_interrupt: crate::_generated::peripheral_interrupts::$inst::SCE::IRQ,
tx_waker: crate::_generated::peripheral_interrupts::$inst::TX::pend,
internal_operation: peripherals::$inst::internal_operation,
num_filter_banks: peripherals::$inst::NUM_FILTER_BANKS,
};
&INFO
@ -1151,6 +1152,37 @@ foreach_peripheral!(
fn state() -> &'static State {
unsafe { peripherals::$inst::mut_state() }
}
fn internal_operation(val: InternalOperation) {
critical_section::with(|_| {
//let state = self.state as *const State;
unsafe {
//let mut_state = state as *mut State;
let mut_state = peripherals::$inst::mut_state();
match val {
InternalOperation::NotifySenderCreated => {
mut_state.sender_instance_count += 1;
}
InternalOperation::NotifySenderDestroyed => {
mut_state.sender_instance_count -= 1;
if ( 0 == mut_state.sender_instance_count) {
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
InternalOperation::NotifyReceiverCreated => {
mut_state.receiver_instance_count += 1;
}
InternalOperation::NotifyReceiverDestroyed => {
mut_state.receiver_instance_count -= 1;
if ( 0 == mut_state.receiver_instance_count) {
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
}
}
});
}
}
impl Instance for peripherals::$inst {

View File

@ -22,22 +22,22 @@ pub(crate) struct FdBufferedTxInner {
}
/// Sender that can be used for sending CAN frames.
#[derive(Copy, Clone)]
pub struct BufferedCanSender {
pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'static, Frame>,
pub struct BufferedSender<'ch, FRAME> {
pub(crate) tx_buf: embassy_sync::channel::DynamicSender<'ch, FRAME>,
pub(crate) waker: fn(),
pub(crate) internal_operation: fn(InternalOperation),
}
impl BufferedCanSender {
impl<'ch, FRAME> BufferedSender<'ch, FRAME> {
/// Async write frame to TX buffer.
pub fn try_write(&mut self, frame: Frame) -> Result<(), embassy_sync::channel::TrySendError<Frame>> {
pub fn try_write(&mut self, frame: FRAME) -> Result<(), embassy_sync::channel::TrySendError<FRAME>> {
self.tx_buf.try_send(frame)?;
(self.waker)();
Ok(())
}
/// Async write frame to TX buffer.
pub async fn write(&mut self, frame: Frame) {
pub async fn write(&mut self, frame: FRAME) {
self.tx_buf.send(frame).await;
(self.waker)();
}
@ -48,5 +48,77 @@ impl BufferedCanSender {
}
}
impl<'ch, FRAME> Clone for BufferedSender<'ch, FRAME> {
fn clone(&self) -> Self {
(self.internal_operation)(InternalOperation::NotifySenderCreated);
Self {
tx_buf: self.tx_buf,
waker: self.waker,
internal_operation: self.internal_operation,
}
}
}
impl<'ch, FRAME> Drop for BufferedSender<'ch, FRAME> {
fn drop(&mut self) {
(self.internal_operation)(InternalOperation::NotifySenderDestroyed);
}
}
/// Sender that can be used for sending Classic CAN frames.
pub type BufferedCanSender = BufferedSender<'static, Frame>;
/// Receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub type BufferedCanReceiver = embassy_sync::channel::DynamicReceiver<'static, Result<Envelope, BusError>>;
pub struct BufferedReceiver<'ch, ENVELOPE> {
pub(crate) rx_buf: embassy_sync::channel::DynamicReceiver<'ch, Result<ENVELOPE, BusError>>,
pub(crate) internal_operation: fn(InternalOperation),
}
impl<'ch, ENVELOPE> BufferedReceiver<'ch, ENVELOPE> {
/// Receive the next frame.
///
/// See [`Channel::receive()`].
pub fn receive(&self) -> embassy_sync::channel::DynamicReceiveFuture<'_, Result<ENVELOPE, BusError>> {
self.rx_buf.receive()
}
/// Attempt to immediately receive the next frame.
///
/// See [`Channel::try_receive()`]
pub fn try_receive(&self) -> Result<Result<ENVELOPE, BusError>, embassy_sync::channel::TryReceiveError> {
self.rx_buf.try_receive()
}
/// Allows a poll_fn to poll until the channel is ready to receive
///
/// See [`Channel::poll_ready_to_receive()`]
pub fn poll_ready_to_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<()> {
self.rx_buf.poll_ready_to_receive(cx)
}
/// Poll the channel for the next frame
///
/// See [`Channel::poll_receive()`]
pub fn poll_receive(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<ENVELOPE, BusError>> {
self.rx_buf.poll_receive(cx)
}
}
impl<'ch, ENVELOPE> Clone for BufferedReceiver<'ch, ENVELOPE> {
fn clone(&self) -> Self {
(self.internal_operation)(InternalOperation::NotifyReceiverCreated);
Self {
rx_buf: self.rx_buf,
internal_operation: self.internal_operation,
}
}
}
impl<'ch, ENVELOPE> Drop for BufferedReceiver<'ch, ENVELOPE> {
fn drop(&mut self) {
(self.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}
/// A BufferedCanReceiver for Classic CAN frames.
pub type BufferedCanReceiver = BufferedReceiver<'static, Envelope>;

View File

@ -68,3 +68,17 @@ pub enum TryReadError {
/// Receive buffer is empty
Empty,
}
/// Internal Operation
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum InternalOperation {
/// Notify receiver created
NotifyReceiverCreated,
/// Notify receiver destroyed
NotifyReceiverDestroyed,
/// Notify sender created
NotifySenderCreated,
/// Notify sender destroyed
NotifySenderDestroyed,
}

View File

@ -6,7 +6,7 @@ use core::task::Poll;
use embassy_hal_internal::interrupt::InterruptExt;
use embassy_hal_internal::{into_ref, PeripheralRef};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender};
use embassy_sync::channel::Channel;
use embassy_sync::waitqueue::AtomicWaker;
use crate::can::fd::peripheral::Registers;
@ -478,28 +478,28 @@ impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> BufferedCan<'d,
/// Returns a sender that can be used for sending CAN frames.
pub fn writer(&self) -> BufferedCanSender {
(self.info.internal_operation)(InternalOperation::NotifySenderCreated);
BufferedCanSender {
tx_buf: self.tx_buf.sender().into(),
waker: self.info.tx_waker,
internal_operation: self.info.internal_operation,
}
}
/// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub fn reader(&self) -> BufferedCanReceiver {
self.rx_buf.receiver().into()
(self.info.internal_operation)(InternalOperation::NotifyReceiverCreated);
BufferedCanReceiver {
rx_buf: self.rx_buf.receiver().into(),
internal_operation: self.info.internal_operation,
}
}
}
impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> Drop for BufferedCan<'d, TX_BUF_SIZE, RX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifySenderDestroyed);
(self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}
@ -509,35 +509,11 @@ pub type RxFdBuf<const BUF_SIZE: usize> = Channel<CriticalSectionRawMutex, Resul
/// User supplied buffer for TX buffering
pub type TxFdBuf<const BUF_SIZE: usize> = Channel<CriticalSectionRawMutex, FdFrame, BUF_SIZE>;
/// Sender that can be used for sending CAN frames.
#[derive(Copy, Clone)]
pub struct BufferedFdCanSender {
tx_buf: DynamicSender<'static, FdFrame>,
waker: fn(),
}
impl BufferedFdCanSender {
/// Async write frame to TX buffer.
pub fn try_write(&mut self, frame: FdFrame) -> Result<(), embassy_sync::channel::TrySendError<FdFrame>> {
self.tx_buf.try_send(frame)?;
(self.waker)();
Ok(())
}
/// Async write frame to TX buffer.
pub async fn write(&mut self, frame: FdFrame) {
self.tx_buf.send(frame).await;
(self.waker)();
}
/// Allows a poll_fn to poll until the channel is ready to write
pub fn poll_ready_to_send(&self, cx: &mut core::task::Context<'_>) -> core::task::Poll<()> {
self.tx_buf.poll_ready_to_send(cx)
}
}
/// Sender that can be used for sending Classic CAN frames.
pub type BufferedFdCanSender = super::common::BufferedSender<'static, FdFrame>;
/// Receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub type BufferedFdCanReceiver = DynamicReceiver<'static, Result<FdEnvelope, BusError>>;
pub type BufferedFdCanReceiver = super::common::BufferedReceiver<'static, FdEnvelope>;
/// Buffered FDCAN Instance
pub struct BufferedCanFd<'d, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> {
@ -608,28 +584,28 @@ impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> BufferedCanFd<'
/// Returns a sender that can be used for sending CAN frames.
pub fn writer(&self) -> BufferedFdCanSender {
(self.info.internal_operation)(InternalOperation::NotifySenderCreated);
BufferedFdCanSender {
tx_buf: self.tx_buf.sender().into(),
waker: self.info.tx_waker,
internal_operation: self.info.internal_operation,
}
}
/// Returns a receiver that can be used for receiving CAN frames. Note, each CAN frame will only be received by one receiver.
pub fn reader(&self) -> BufferedFdCanReceiver {
self.rx_buf.receiver().into()
(self.info.internal_operation)(InternalOperation::NotifyReceiverCreated);
BufferedFdCanReceiver {
rx_buf: self.rx_buf.receiver().into(),
internal_operation: self.info.internal_operation,
}
}
}
impl<'c, 'd, const TX_BUF_SIZE: usize, const RX_BUF_SIZE: usize> Drop for BufferedCanFd<'d, TX_BUF_SIZE, RX_BUF_SIZE> {
fn drop(&mut self) {
critical_section::with(|_| {
let state = self.state as *const State;
unsafe {
let mut_state = state as *mut State;
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
});
(self.info.internal_operation)(InternalOperation::NotifySenderDestroyed);
(self.info.internal_operation)(InternalOperation::NotifyReceiverDestroyed);
}
}
@ -922,6 +898,8 @@ struct State {
pub rx_mode: RxMode,
pub tx_mode: TxMode,
pub ns_per_timer_tick: u64,
receiver_instance_count: usize,
sender_instance_count: usize,
pub err_waker: AtomicWaker,
}
@ -933,6 +911,8 @@ impl State {
tx_mode: TxMode::NonBuffered(AtomicWaker::new()),
ns_per_timer_tick: 0,
err_waker: AtomicWaker::new(),
receiver_instance_count: 1,
sender_instance_count: 1,
}
}
}
@ -942,6 +922,7 @@ struct Info {
interrupt0: crate::interrupt::Interrupt,
_interrupt1: crate::interrupt::Interrupt,
tx_waker: fn(),
internal_operation: fn(InternalOperation),
}
impl Info {
@ -970,6 +951,7 @@ trait SealedInstance {
fn registers() -> crate::can::fd::peripheral::Registers;
fn state() -> &'static State;
unsafe fn mut_state() -> &'static mut State;
fn internal_operation(val: InternalOperation);
fn calc_timestamp(ns_per_timer_tick: u64, ts_val: u16) -> Timestamp;
}
@ -992,12 +974,42 @@ macro_rules! impl_fdcan {
impl SealedInstance for peripherals::$inst {
const MSG_RAM_OFFSET: usize = $msg_ram_offset;
fn internal_operation(val: InternalOperation) {
critical_section::with(|_| {
//let state = self.state as *const State;
unsafe {
//let mut_state = state as *mut State;
let mut_state = peripherals::$inst::mut_state();
match val {
InternalOperation::NotifySenderCreated => {
mut_state.sender_instance_count += 1;
}
InternalOperation::NotifySenderDestroyed => {
mut_state.sender_instance_count -= 1;
if ( 0 == mut_state.sender_instance_count) {
(*mut_state).tx_mode = TxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
InternalOperation::NotifyReceiverCreated => {
mut_state.receiver_instance_count += 1;
}
InternalOperation::NotifyReceiverDestroyed => {
mut_state.receiver_instance_count -= 1;
if ( 0 == mut_state.receiver_instance_count) {
(*mut_state).rx_mode = RxMode::NonBuffered(embassy_sync::waitqueue::AtomicWaker::new());
}
}
}
}
});
}
fn info() -> &'static Info {
static INFO: Info = Info {
regs: Registers{regs: crate::pac::$inst, msgram: crate::pac::$msg_ram_inst, msg_ram_offset: $msg_ram_offset},
interrupt0: crate::_generated::peripheral_interrupts::$inst::IT0::IRQ,
_interrupt1: crate::_generated::peripheral_interrupts::$inst::IT1::IRQ,
tx_waker: crate::_generated::peripheral_interrupts::$inst::IT0::pend,
internal_operation: peripherals::$inst::internal_operation,
};
&INFO
}