Improve orchestrate_tasks example with shared state and better documentation

Add mutex-protected shared system state
Improve task coordination and signaling
Add more documentation
This commit is contained in:
1-rafael-1 2024-12-31 15:54:42 +01:00
parent 667dfa34b5
commit e6001e66f8

View File

@ -1,20 +1,18 @@
//! This example demonstrates some approaches to communicate between tasks in order to orchestrate the state of the system. //! This example demonstrates some approaches to communicate between tasks in order to orchestrate the state of the system.
//! //!
//! The system consists of several tasks:
//! - Three tasks that generate random numbers at different intervals (simulating i.e. sensor readings)
//! - A task that monitors USB power connection (hardware event handling)
//! - A task that reads system voltage (ADC sampling)
//! - A consumer task that processes all this information
//!
//! The system maintains state in a single place, wrapped in a Mutex.
//!
//! We demonstrate how to: //! We demonstrate how to:
//! - use a channel to send messages between tasks, in this case here in order to have one task control the state of the system. //! - use a mutex to maintain shared state between tasks
//! - use a signal to terminate a task. //! - use a channel to send events between tasks
//! - use command channels to send commands to another task. //! - use an orchestrator task to coordinate tasks and handle state transitions
//! - use different ways to receive messages, from a straightforwar awaiting on one channel to a more complex awaiting on multiple futures. //! - use signals to notify about state changes and terminate tasks
//!
//! There are more patterns to orchestrate tasks, this is just one example.
//!
//! We will use these tasks to generate example "state information":
//! - a task that generates random numbers in intervals of 60s
//! - a task that generates random numbers in intervals of 30s
//! - a task that generates random numbers in intervals of 90s
//! - a task that notifies about being attached/disattached from usb power
//! - a task that measures vsys voltage in intervals of 30s
//! - a task that consumes the state information and reacts to it
#![no_std] #![no_std]
#![no_main] #![no_main]
@ -28,15 +26,12 @@ use embassy_rp::clocks::RoscRng;
use embassy_rp::gpio::{Input, Pull}; use embassy_rp::gpio::{Input, Pull};
use embassy_rp::{bind_interrupts, peripherals}; use embassy_rp::{bind_interrupts, peripherals};
use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
use embassy_sync::{channel, signal}; use embassy_sync::{channel, mutex::Mutex, signal};
use embassy_time::{Duration, Timer}; use embassy_time::{Duration, Timer};
use rand::RngCore; use rand::RngCore;
use {defmt_rtt as _, panic_probe as _}; use {defmt_rtt as _, panic_probe as _};
// This is just some preparation, see example `assign_resources.rs` for more information on this. We prep the rresources that we will be using in different tasks. // Hardware resource assignment. See other examples for different ways of doing this.
// **Note**: This will not work with a board that has a wifi chip, because the wifi chip uses pins 24 and 29 for its own purposes. A way around this in software
// is not trivial, at least if you intend to use wifi, too. Workaround is to wire from vsys and vbus pins to appropriate pins on the board through a voltage divider. Then use those pins.
// For this example it will not matter much, the concept of what we are showing remains valid.
assign_resources! { assign_resources! {
vsys: Vsys { vsys: Vsys {
adc: ADC, adc: ADC,
@ -47,228 +42,233 @@ assign_resources! {
}, },
} }
// Interrupt binding - required for hardware peripherals like ADC
bind_interrupts!(struct Irqs { bind_interrupts!(struct Irqs {
ADC_IRQ_FIFO => InterruptHandler; ADC_IRQ_FIFO => InterruptHandler;
}); });
/// This is the type of Events that we will send from the worker tasks to the orchestrating task. /// Events that worker tasks send to the orchestrator
enum Events { enum Events {
UsbPowered(bool), UsbPowered(bool), // USB connection state changed
VsysVoltage(f32), VsysVoltage(f32), // New voltage reading
FirstRandomSeed(u32), FirstRandomSeed(u32), // Random number from 30s timer
SecondRandomSeed(u32), SecondRandomSeed(u32), // Random number from 60s timer
ThirdRandomSeed(u32), ThirdRandomSeed(u32), // Random number from 90s timer
ResetFirstRandomSeed, ResetFirstRandomSeed, // Signal to reset the first counter
} }
/// This is the type of Commands that we will send from the orchestrating task to the worker tasks. /// Commands that can control task behavior.
/// Note that we are lazy here and only have one command, you might want to have more. /// Currently only used to stop tasks, but could be extended for other controls.
enum Commands { enum Commands {
/// This command will stop the appropriate worker task /// Signals a task to stop execution
Stop, Stop,
} }
/// This is the state of the system, we will use this to orchestrate the system. This is a simple example, in a real world application this would be more complex. /// The central state of our system, shared between tasks.
#[derive(Default, Debug, Clone, Format)] #[derive(Clone, Format)]
struct State { struct State {
usb_powered: bool, usb_powered: bool,
vsys_voltage: f32, vsys_voltage: f32,
first_random_seed: u32, first_random_seed: u32,
second_random_seed: u32, second_random_seed: u32,
third_random_seed: u32, third_random_seed: u32,
first_random_seed_task_running: bool,
times_we_got_first_random_seed: u8, times_we_got_first_random_seed: u8,
maximum_times_we_want_first_random_seed: u8, maximum_times_we_want_first_random_seed: u8,
} }
/// A formatted view of the system status, used for logging. Used for the below `get_system_summary` fn.
#[derive(Format)]
struct SystemStatus {
power_source: &'static str,
voltage: f32,
}
impl State { impl State {
fn new() -> Self { const fn new() -> Self {
Self { Self {
usb_powered: false, usb_powered: false,
vsys_voltage: 0.0, vsys_voltage: 0.0,
first_random_seed: 0, first_random_seed: 0,
second_random_seed: 0, second_random_seed: 0,
third_random_seed: 0, third_random_seed: 0,
first_random_seed_task_running: false,
times_we_got_first_random_seed: 0, times_we_got_first_random_seed: 0,
maximum_times_we_want_first_random_seed: 3, maximum_times_we_want_first_random_seed: 3,
} }
} }
/// Returns a formatted summary of power state and voltage.
/// Shows how to create methods that work with shared state.
fn get_system_summary(&self) -> SystemStatus {
SystemStatus {
power_source: if self.usb_powered {
"USB powered"
} else {
"Battery powered"
},
voltage: self.vsys_voltage,
}
}
} }
/// Channel for the events that we want the orchestrator to react to, all state events are of the type Enum Events. /// The shared state protected by a mutex
/// We use a channel with an arbitrary size of 10, the precise size of the queue depends on your use case. This depends on how many events we static SYSTEM_STATE: Mutex<CriticalSectionRawMutex, State> = Mutex::new(State::new());
/// expect to be generated in a given time frame and how fast the orchestrator can react to them. And then if we rather want the senders to wait for
/// new slots in the queue or if we want the orchestrator to have a backlog of events to process. In this case here we expect to always be enough slots /// Channel for events from worker tasks to the orchestrator
/// in the queue, so the worker tasks can in all nominal cases send their events and continue with their work without waiting.
/// For the events we - in this case here - do not want to loose any events, so a channel is a good choice. See embassy_sync docs for other options.
static EVENT_CHANNEL: channel::Channel<CriticalSectionRawMutex, Events, 10> = channel::Channel::new(); static EVENT_CHANNEL: channel::Channel<CriticalSectionRawMutex, Events, 10> = channel::Channel::new();
/// Signal for stopping the first random signal task. We use a signal here, because we need no queue. It is suffiient to have one signal active. /// Signal used to stop the first random number task
static STOP_FIRST_RANDOM_SIGNAL: signal::Signal<CriticalSectionRawMutex, Commands> = signal::Signal::new(); static STOP_FIRST_RANDOM_SIGNAL: signal::Signal<CriticalSectionRawMutex, Commands> = signal::Signal::new();
/// Channel for the state that we want the consumer task to react to. We use a channel here, because we want to have a queue of state changes, although /// Signal for notifying about state changes
/// we want the queue to be of size 1, because we want to finish rwacting to the state change before the next one comes in. This is just a design choice static STATE_CHANGED: signal::Signal<CriticalSectionRawMutex, ()> = signal::Signal::new();
/// and depends on your use case.
static CONSUMER_CHANNEL: channel::Channel<CriticalSectionRawMutex, State, 1> = channel::Channel::new();
// And now we can put all this into use
/// This is the main task, that will not do very much besides spawning the other tasks. This is a design choice, you could do the
/// orchestrating here. This is to show that we do not need a main loop here, the system will run indefinitely as long as at least one task is running.
#[embassy_executor::main] #[embassy_executor::main]
async fn main(spawner: Spawner) { async fn main(spawner: Spawner) {
// initialize the peripherals
let p = embassy_rp::init(Default::default()); let p = embassy_rp::init(Default::default());
// split the resources, for convenience - see above
let r = split_resources! {p}; let r = split_resources! {p};
// spawn the tasks
spawner.spawn(orchestrate(spawner)).unwrap(); spawner.spawn(orchestrate(spawner)).unwrap();
spawner.spawn(random_60s(spawner)).unwrap(); spawner.spawn(random_60s(spawner)).unwrap();
spawner.spawn(random_90s(spawner)).unwrap(); spawner.spawn(random_90s(spawner)).unwrap();
// `random_30s` is not spawned here, butin the orchestrate task depending on state
spawner.spawn(usb_power(spawner, r.vbus)).unwrap(); spawner.spawn(usb_power(spawner, r.vbus)).unwrap();
spawner.spawn(vsys_voltage(spawner, r.vsys)).unwrap(); spawner.spawn(vsys_voltage(spawner, r.vsys)).unwrap();
spawner.spawn(consumer(spawner)).unwrap(); spawner.spawn(consumer(spawner)).unwrap();
} }
/// This is the task handling the system state and orchestrating the other tasks. WEe can regard this as the "main loop" of the system. /// Main task that processes all events and updates system state.
#[embassy_executor::task] #[embassy_executor::task]
async fn orchestrate(_spawner: Spawner) { async fn orchestrate(spawner: Spawner) {
let mut state = State::new();
// we need to have a receiver for the events
let receiver = EVENT_CHANNEL.receiver(); let receiver = EVENT_CHANNEL.receiver();
// and we need a sender for the consumer task
let state_sender = CONSUMER_CHANNEL.sender();
loop { loop {
// we await on the receiver, this will block until a new event is available // Do nothing until we receive any event
// as an alternative to this, we could also await on multiple channels, this would block until at least one of the channels has an event
// see the embassy_futures docs: https://docs.embassy.dev/embassy-futures/git/default/select/index.html
// The task random_30s does a select, if you want to have a look at that.
// Another reason to use select may also be that we want to have a timeout, so we can react to the absence of events within a time frame.
// We keep it simple here.
let event = receiver.receive().await; let event = receiver.receive().await;
// react to the events // Scope in which we want to lock the system state. As an alternative we could also call `drop` on the state
match event { {
Events::UsbPowered(usb_powered) => { let mut state = SYSTEM_STATE.lock().await;
// update the state and/or react to the event here
state.usb_powered = usb_powered;
info!("Usb powered: {}", usb_powered);
}
Events::VsysVoltage(voltage) => {
// update the state and/or react to the event here
state.vsys_voltage = voltage;
info!("Vsys voltage: {}", voltage);
}
Events::FirstRandomSeed(seed) => {
// update the state and/or react to the event here
state.first_random_seed = seed;
// here we change some meta state, we count how many times we got the first random seed
state.times_we_got_first_random_seed += 1;
info!(
"First random seed: {}, and that was iteration {} of receiving this.",
seed, &state.times_we_got_first_random_seed
);
}
Events::SecondRandomSeed(seed) => {
// update the state and/or react to the event here
state.second_random_seed = seed;
info!("Second random seed: {}", seed);
}
Events::ThirdRandomSeed(seed) => {
// update the state and/or react to the event here
state.third_random_seed = seed;
info!("Third random seed: {}", seed);
}
Events::ResetFirstRandomSeed => {
// update the state and/or react to the event here
state.times_we_got_first_random_seed = 0;
state.first_random_seed = 0;
info!("Resetting the first random seed counter");
}
}
// we now have an altered state
// there is a crate for detecting field changes on crates.io (https://crates.io/crates/fieldset) that might be useful here
// for now we just keep it simple
// we send the state to the consumer task match event {
// since the channel has a size of 1, this will block until the consumer task has received the state, which is what we want here in this example Events::UsbPowered(usb_powered) => {
// **Note:** It is bad design to send too much data between tasks, with no clear definition of what "too much" is. In this example we send the state.usb_powered = usb_powered;
// whole state, in a real world application you might want to send only the data, that is relevant to the consumer task AND only when it has changed. info!("Usb powered: {}", usb_powered);
// We keep it simple here. info!("System summary: {}", state.get_system_summary());
state_sender.send(state.clone()).await; }
} Events::VsysVoltage(voltage) => {
} state.vsys_voltage = voltage;
info!("Vsys voltage: {}", voltage);
/// This task will consume the state information and react to it. This is a simple example, in a real world application this would be more complex }
/// and we could have multiple consumer tasks, each reacting to different parts of the state. Events::FirstRandomSeed(seed) => {
#[embassy_executor::task] state.first_random_seed = seed;
async fn consumer(spawner: Spawner) { state.times_we_got_first_random_seed += 1;
// we need to have a receiver for the state info!(
let receiver = CONSUMER_CHANNEL.receiver(); "First random seed: {}, and that was iteration {} of receiving this.",
let sender = EVENT_CHANNEL.sender(); seed, &state.times_we_got_first_random_seed
loop { );
// we await on the receiver, this will block until a new state is available }
let state = receiver.receive().await; Events::SecondRandomSeed(seed) => {
// react to the state, in this case here we just log it state.second_random_seed = seed;
info!("The consumer has reveived this state: {:?}", &state); info!("Second random seed: {}", seed);
}
// here we react to the state, in this case here we want to start or stop the first random signal task depending on the state of the system Events::ThirdRandomSeed(seed) => {
match state.times_we_got_first_random_seed { state.third_random_seed = seed;
max if max == state.maximum_times_we_want_first_random_seed => { info!("Third random seed: {}", seed);
info!("Stopping the first random signal task"); }
// we send a command to the task Events::ResetFirstRandomSeed => {
STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop); state.times_we_got_first_random_seed = 0;
// we notify the orchestrator that we have sent the command state.first_random_seed = 0;
sender.send(Events::ResetFirstRandomSeed).await; info!("Resetting the first random seed counter");
}
0 => {
// we start the task, which presents us with an interesting problem, because we may return here before the task has started
// here we just try and log if the task has started, in a real world application you might want to handle this more gracefully
info!("Starting the first random signal task");
match spawner.spawn(random_30s(spawner)) {
Ok(_) => info!("Successfully spawned random_30s task"),
Err(e) => info!("Failed to spawn random_30s task: {:?}", e),
} }
} }
_ => {}
// Handle task orchestration based on state
// Just placed as an example here, could be hooked into the event system, puton a timer, ...
match state.times_we_got_first_random_seed {
max if max == state.maximum_times_we_want_first_random_seed => {
info!("Stopping the first random signal task");
STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop);
EVENT_CHANNEL.sender().send(Events::ResetFirstRandomSeed).await;
}
0 => {
let respawn_first_random_seed_task = !state.first_random_seed_task_running;
// Deliberately dropping the Mutex lock here to release it before a lengthy operation
drop(state);
if respawn_first_random_seed_task {
info!("(Re)-Starting the first random signal task");
spawner.spawn(random_30s(spawner)).unwrap();
}
}
_ => {}
}
} }
STATE_CHANGED.signal(());
} }
} }
/// This task will generate random numbers in intervals of 30s /// Task that monitors state changes and logs system status.
/// The task will terminate after it has received a command signal to stop, see the orchestrate task for that. #[embassy_executor::task]
/// Note that we are not spawning this task from main, as we will show how such a task can be spawned and closed dynamically. async fn consumer(_spawner: Spawner) {
loop {
// Wait for state change notification
STATE_CHANGED.wait().await;
let state = SYSTEM_STATE.lock().await;
info!(
"State update - {} | Seeds - First: {} (count: {}/{}, running: {}), Second: {}, Third: {}",
state.get_system_summary(),
state.first_random_seed,
state.times_we_got_first_random_seed,
state.maximum_times_we_want_first_random_seed,
state.first_random_seed_task_running,
state.second_random_seed,
state.third_random_seed
);
}
}
/// Task that generates random numbers every 30 seconds until stopped.
/// Shows how to handle both timer events and stop signals.
/// As an example of some routine we want to be on or off depending on other needs.
#[embassy_executor::task] #[embassy_executor::task]
async fn random_30s(_spawner: Spawner) { async fn random_30s(_spawner: Spawner) {
{
let mut state = SYSTEM_STATE.lock().await;
state.first_random_seed_task_running = true;
}
let mut rng = RoscRng; let mut rng = RoscRng;
let sender = EVENT_CHANNEL.sender(); let sender = EVENT_CHANNEL.sender();
loop { loop {
// we either await on the timer or the signal, whichever comes first. // Wait for either 30s timer or stop signal (like select() in Go)
let futures = select(Timer::after(Duration::from_secs(30)), STOP_FIRST_RANDOM_SIGNAL.wait()).await; match select(Timer::after(Duration::from_secs(30)), STOP_FIRST_RANDOM_SIGNAL.wait()).await {
match futures {
Either::First(_) => { Either::First(_) => {
// we received are operating on the timer
info!("30s are up, generating random number"); info!("30s are up, generating random number");
let random_number = rng.next_u32(); let random_number = rng.next_u32();
sender.send(Events::FirstRandomSeed(random_number)).await; sender.send(Events::FirstRandomSeed(random_number)).await;
} }
Either::Second(_) => { Either::Second(_) => {
// we received the signal to stop
info!("Received signal to stop, goodbye!"); info!("Received signal to stop, goodbye!");
let mut state = SYSTEM_STATE.lock().await;
state.first_random_seed_task_running = false;
break; break;
} }
} }
} }
} }
/// This task will generate random numbers in intervals of 60s /// Task that generates random numbers every 60 seconds. As an example of some routine.
#[embassy_executor::task] #[embassy_executor::task]
async fn random_60s(_spawner: Spawner) { async fn random_60s(_spawner: Spawner) {
let mut rng = RoscRng; let mut rng = RoscRng;
let sender = EVENT_CHANNEL.sender(); let sender = EVENT_CHANNEL.sender();
loop { loop {
Timer::after(Duration::from_secs(60)).await; Timer::after(Duration::from_secs(60)).await;
let random_number = rng.next_u32(); let random_number = rng.next_u32();
@ -276,11 +276,12 @@ async fn random_60s(_spawner: Spawner) {
} }
} }
/// This task will generate random numbers in intervals of 90s /// Task that generates random numbers every 90 seconds. . As an example of some routine.
#[embassy_executor::task] #[embassy_executor::task]
async fn random_90s(_spawner: Spawner) { async fn random_90s(_spawner: Spawner) {
let mut rng = RoscRng; let mut rng = RoscRng;
let sender = EVENT_CHANNEL.sender(); let sender = EVENT_CHANNEL.sender();
loop { loop {
Timer::after(Duration::from_secs(90)).await; Timer::after(Duration::from_secs(90)).await;
let random_number = rng.next_u32(); let random_number = rng.next_u32();
@ -288,31 +289,30 @@ async fn random_90s(_spawner: Spawner) {
} }
} }
/// This task will notify if we are connected to usb power /// Task that monitors USB power connection. As an example of some Interrupt somewhere.
#[embassy_executor::task] #[embassy_executor::task]
pub async fn usb_power(_spawner: Spawner, r: Vbus) { pub async fn usb_power(_spawner: Spawner, r: Vbus) {
let mut vbus_in = Input::new(r.pin_24, Pull::None); let mut vbus_in = Input::new(r.pin_24, Pull::None);
let sender = EVENT_CHANNEL.sender(); let sender = EVENT_CHANNEL.sender();
loop { loop {
sender.send(Events::UsbPowered(vbus_in.is_high())).await; sender.send(Events::UsbPowered(vbus_in.is_high())).await;
vbus_in.wait_for_any_edge().await; vbus_in.wait_for_any_edge().await;
} }
} }
/// This task will measure the vsys voltage in intervals of 30s /// Task that reads system voltage through ADC. As an example of some continuous sensor reading.
#[embassy_executor::task] #[embassy_executor::task]
pub async fn vsys_voltage(_spawner: Spawner, r: Vsys) { pub async fn vsys_voltage(_spawner: Spawner, r: Vsys) {
let mut adc = Adc::new(r.adc, Irqs, Config::default()); let mut adc = Adc::new(r.adc, Irqs, Config::default());
let vsys_in = r.pin_29; let vsys_in = r.pin_29;
let mut channel = Channel::new_pin(vsys_in, Pull::None); let mut channel = Channel::new_pin(vsys_in, Pull::None);
let sender = EVENT_CHANNEL.sender(); let sender = EVENT_CHANNEL.sender();
loop { loop {
// read the adc value Timer::after(Duration::from_secs(30)).await;
let adc_value = adc.read(&mut channel).await.unwrap(); let adc_value = adc.read(&mut channel).await.unwrap();
// convert the adc value to voltage.
// 3.3 is the reference voltage, 3.0 is the factor for the inbuilt voltage divider and 4096 is the resolution of the adc
let voltage = (adc_value as f32) * 3.3 * 3.0 / 4096.0; let voltage = (adc_value as f32) * 3.3 * 3.0 / 4096.0;
sender.send(Events::VsysVoltage(voltage)).await; sender.send(Events::VsysVoltage(voltage)).await;
Timer::after(Duration::from_secs(30)).await;
} }
} }