Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watched_tasks: maintain a list of spawned async tasks and propagate errors #47

Merged
merged 8 commits into from
Jan 19, 2024
13 changes: 7 additions & 6 deletions src/adc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ use std::time::Duration;

use anyhow::Result;
use async_std::sync::Arc;
use async_std::task::{sleep, spawn};
use async_std::task::sleep;

use crate::broker::{BrokerBuilder, Topic};
use crate::measurement::{Measurement, Timestamp};
use crate::watched_tasks::WatchedTasksBuilder;

const HISTORY_LENGTH: usize = 200;
const SLOW_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -77,9 +78,9 @@ pub struct Adc {
}

impl Adc {
pub async fn new(bb: &mut BrokerBuilder) -> Result<Self> {
let stm32_thread = IioThread::new_stm32().await?;
let powerboard_thread = IioThread::new_powerboard().await?;
pub async fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
let stm32_thread = IioThread::new_stm32(wtb).await?;
let powerboard_thread = IioThread::new_powerboard(wtb).await?;

let adc = Self {
usb_host_curr: AdcChannel {
Expand Down Expand Up @@ -212,7 +213,7 @@ impl Adc {

// Spawn an async task to transfer values from the Atomic value based
// "fast" interface to the broker based "slow" interface.
spawn(async move {
wtb.spawn_task("adc-update", async move {
loop {
sleep(SLOW_INTERVAL).await;

Expand All @@ -226,7 +227,7 @@ impl Adc {

time.set(Timestamp::now());
}
});
})?;

Ok(adc)
}
Expand Down
4 changes: 2 additions & 2 deletions src/adc/iio/demo_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pub struct IioThread {
}

impl IioThread {
pub async fn new_stm32() -> Result<Arc<Self>> {
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
let mut demo_magic = block_on(DEMO_MAGIC_STM32.lock());

// Only ever set up a single demo_mode "IioThread" per ADC
Expand Down Expand Up @@ -195,7 +195,7 @@ impl IioThread {
Ok(this)
}

pub async fn new_powerboard() -> Result<Arc<Self>> {
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
let mut demo_magic = block_on(DEMO_MAGIC_POWERBOARD.lock());

// Only ever set up a single demo_mode "IioThread" per ADC
Expand Down
180 changes: 93 additions & 87 deletions src/adc/iio/hardware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ use std::fs::create_dir;
use std::io::Read;
use std::path::Path;
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
use std::sync::Mutex;
use std::thread;
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context, Error, Result};
Expand All @@ -35,6 +32,7 @@ use log::{debug, error, warn};
use thread_priority::*;

use crate::measurement::{Measurement, Timestamp};
use crate::watched_tasks::WatchedTasksBuilder;

struct ChannelDesc {
kernel_name: &'static str,
Expand Down Expand Up @@ -255,7 +253,6 @@ pub struct IioThread {
ref_instant: Instant,
timestamp: AtomicU64,
values: Vec<AtomicU16>,
join: Mutex<Option<JoinHandle<()>>>,
channel_descs: &'static [ChannelDesc],
}

Expand Down Expand Up @@ -325,7 +322,8 @@ impl IioThread {
}

async fn new(
thread_name: &str,
wtb: &mut WatchedTasksBuilder,
thread_name: &'static str,
adc_name: &'static str,
trigger_name: &'static str,
sample_rate: i64,
Expand All @@ -342,102 +340,101 @@ impl IioThread {
let (thread_res_tx, thread_res_rx) = bounded(1);

// Spawn a high priority thread that updates the atomic values in `thread`.
let join = thread::Builder::new()
.name(format!("tacd {thread_name} iio"))
.spawn(move || {
let adc_setup_res = Self::adc_setup(
adc_name,
trigger_name,
sample_rate,
channel_descs,
buffer_len,
);
let (thread, channels, mut buf) = match adc_setup_res {
Ok((channels, buf)) => {
let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
join: Mutex::new(None),
channel_descs,
});

(thread, channels, buf)
}
Err(e) => {
// Can not fail in practice as the queue is known to be empty
// at this point.
thread_res_tx.try_send(Err(e)).unwrap();
return;
}
};

let thread_weak = Arc::downgrade(&thread);
let mut signal_ready = Some((thread, thread_res_tx));

// Stop running as soon as the last reference to this Arc<IioThread>
// is dropped (e.g. the weak reference can no longer be upgraded).
while let Some(thread) = thread_weak.upgrade() {
if let Err(e) = buf.refill() {
thread.timestamp.store(TIMESTAMP_ERROR, Ordering::Relaxed);

error!("Failed to refill {} ADC buffer: {}", adc_name, e);

// If the ADC has not yet produced any values we still have the
// queue at hand that signals readiness to the main thread.
// This gives us a chance to return an Err from new().
// If the queue was already used just print an error instead.
if let Some((_, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Err(Error::new(e))).unwrap();
}

break;
}

let values = channels.iter().map(|ch| {
let buf_sum: u32 = buf.channel_iter::<u16>(ch).map(|v| v as u32).sum();
(buf_sum / (buf.capacity() as u32)) as u16
wtb.spawn_thread(thread_name, move || {
let adc_setup_res = Self::adc_setup(
adc_name,
trigger_name,
sample_rate,
channel_descs,
buffer_len,
);
let (thread, channels, mut buf) = match adc_setup_res {
Ok((channels, buf)) => {
let thread = Arc::new(Self {
ref_instant: Instant::now(),
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
channel_descs,
});

for (d, s) in thread.values.iter().zip(values) {
d.store(s, Ordering::Relaxed)
}
(thread, channels, buf)
}
Err(e) => {
// Can not fail in practice as the queue is known to be empty
// at this point.
thread_res_tx
.try_send(Err(e))
.expect("Failed to signal ADC setup error due to full queue");
return Ok(());
}
};

let thread_weak = Arc::downgrade(&thread);
let mut signal_ready = Some((thread, thread_res_tx));

// These should only fail if
// a) The monotonic time started running backward
// b) The tacd has been running for more than 2**64ns (584 years).
let ts: u64 = Instant::now()
.checked_duration_since(thread.ref_instant)
.and_then(|d| d.as_nanos().try_into().ok())
.unwrap_or(TIMESTAMP_ERROR);
// Stop running as soon as the last reference to this Arc<IioThread>
// is dropped (e.g. the weak reference can no longer be upgraded).
while let Some(thread) = thread_weak.upgrade() {
if let Err(e) = buf.refill() {
thread.timestamp.store(TIMESTAMP_ERROR, Ordering::Relaxed);

thread.timestamp.store(ts, Ordering::Release);
error!("Failed to refill {} ADC buffer: {}", adc_name, e);

// Now that we know that the ADC actually works and we have
// initial values: return a handle to it.
if let Some((content, tx)) = signal_ready.take() {
// If the ADC has not yet produced any values we still have the
// queue at hand that signals readiness to the main thread.
// This gives us a chance to return an Err from new().
// If the queue was already used just print an error instead.
if let Some((_, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Ok(content)).unwrap();
tx.try_send(Err(Error::new(e)))
.expect("Failed to signal ADC setup error due to full queue");
}

break;
}
})?;

let thread = thread_res_rx.recv().await??;
let values = channels.iter().map(|ch| {
let buf_sum: u32 = buf.channel_iter::<u16>(ch).map(|v| v as u32).sum();
(buf_sum / (buf.capacity() as u32)) as u16
});

// Locking the Mutex could only fail if the Mutex was poisoned by
// a thread that held the lock and panicked.
// At this point the Mutex has not yet been locked in another thread.
*thread.join.lock().unwrap() = Some(join);
for (d, s) in thread.values.iter().zip(values) {
d.store(s, Ordering::Relaxed)
}

// These should only fail if
// a) The monotonic time started running backward
// b) The tacd has been running for more than 2**64ns (584 years).
let ts: u64 = Instant::now()
.checked_duration_since(thread.ref_instant)
.and_then(|d| d.as_nanos().try_into().ok())
.unwrap_or(TIMESTAMP_ERROR);

thread.timestamp.store(ts, Ordering::Release);

// Now that we know that the ADC actually works and we have
// initial values: return a handle to it.
if let Some((content, tx)) = signal_ready.take() {
// Can not fail in practice as the queue is only .take()n
// once and thus known to be empty.
tx.try_send(Ok(content))
.expect("Failed to signal ADC setup completion due to full queue");
}
}

Ok(())
})?;

let thread = thread_res_rx.recv().await??;

Ok(thread)
}

pub async fn new_stm32() -> Result<Arc<Self>> {
pub async fn new_stm32(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
Self::new(
"stm32",
wtb,
"adc-stm32",
"48003000.adc:adc@0",
"tim4_trgo",
80,
Expand All @@ -447,14 +444,23 @@ impl IioThread {
.await
}

pub async fn new_powerboard() -> Result<Arc<Self>> {
pub async fn new_powerboard(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
let hr_trigger_path = Path::new(TRIGGER_HR_PWR_DIR);

if !hr_trigger_path.is_dir() {
create_dir(hr_trigger_path)?;
}

Self::new("powerboard", "lmp92064", "tacd-pwr", 20, CHANNELS_PWR, 1).await
Self::new(
wtb,
"adc-powerboard",
"lmp92064",
"tacd-pwr",
20,
CHANNELS_PWR,
1,
)
.await
}

/// Use the channel names defined at the top of the file to get a reference
Expand Down
4 changes: 2 additions & 2 deletions src/adc/iio/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub struct IioThread {
}

impl IioThread {
pub async fn new_stm32() -> Result<Arc<Self>> {
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
let mut channels = Vec::new();

for name in CHANNELS_STM32 {
Expand All @@ -117,7 +117,7 @@ impl IioThread {
Ok(Arc::new(Self { channels }))
}

pub async fn new_powerboard() -> Result<Arc<Self>> {
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
let mut channels = Vec::new();

for name in CHANNELS_PWR {
Expand Down
10 changes: 6 additions & 4 deletions src/backlight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use anyhow::Result;
use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::task::spawn;
use log::warn;

mod demo_mode;
Expand All @@ -30,21 +29,22 @@ use demo_mode::{Backlight as SysBacklight, Brightness, SysClass};
use sysfs_class::{Backlight as SysBacklight, Brightness, SysClass};

use crate::broker::{BrokerBuilder, Topic};
use crate::watched_tasks::WatchedTasksBuilder;

pub struct Backlight {
pub brightness: Arc<Topic<f32>>,
}

impl Backlight {
pub fn new(bb: &mut BrokerBuilder) -> Result<Self> {
pub fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
let brightness = bb.topic_rw("/v1/tac/display/backlight/brightness", Some(1.0));

let (mut rx, _) = brightness.clone().subscribe_unbounded();

let backlight = SysBacklight::new("backlight")?;
let max_brightness = backlight.max_brightness()?;

spawn(async move {
wtb.spawn_task("backlight-dimmer", async move {
while let Some(fraction) = rx.next().await {
let brightness = (max_brightness as f32) * fraction;
let mut brightness = brightness.clamp(0.0, max_brightness as f32) as u64;
Expand All @@ -60,7 +60,9 @@ impl Backlight {
warn!("Failed to set LED pattern: {}", e);
}
}
});

Ok(())
})?;

Ok(Self { brightness })
}
Expand Down
9 changes: 7 additions & 2 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
// with this program; if not, write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

use anyhow::Result;
use async_std::sync::Arc;
use serde::{de::DeserializeOwned, Serialize};

use crate::watched_tasks::WatchedTasksBuilder;

mod mqtt_conn;
mod persistence;
mod rest;
Expand Down Expand Up @@ -113,11 +116,13 @@ impl BrokerBuilder {
/// Finish building the broker
///
/// This consumes the builder so that no new topics can be registered
pub fn build(self, server: &mut tide::Server<()>) {
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) -> Result<()> {
let topics = Arc::new(self.topics);

persistence::register(topics.clone());
persistence::register(wtb, topics.clone())?;
rest::register(server, topics.clone());
mqtt_conn::register(server, topics);

Ok(())
}
}
Loading
Loading