Skip to content

Commit

Permalink
watched_tasks: let spawn_task return Result instead of panicking
Browse files Browse the repository at this point in the history
Signed-off-by: Leonard Göhrs <l.goehrs@pengutronix.de>
  • Loading branch information
hnez committed Oct 6, 2023
1 parent 83ab432 commit c4ef644
Show file tree
Hide file tree
Showing 32 changed files with 230 additions and 187 deletions.
2 changes: 1 addition & 1 deletion src/adc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Adc {

time.set(Timestamp::now());
}
});
})?;

Ok(adc)
}
Expand Down
2 changes: 1 addition & 1 deletion src/backlight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Backlight {
}

Ok(())
});
})?;

Ok(Self { brightness })
}
Expand Down
7 changes: 5 additions & 2 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// with this program; if not, write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

use anyhow::Result;
use async_std::sync::Arc;
use serde::{de::DeserializeOwned, Serialize};

Expand Down Expand Up @@ -115,11 +116,13 @@ impl BrokerBuilder {
/// Finish building the broker
///
/// This consumes the builder so that no new topics can be registered
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) {
pub fn build(self, wtb: &mut WatchedTasksBuilder, server: &mut tide::Server<()>) -> Result<()> {
let topics = Arc::new(self.topics);

persistence::register(wtb, topics.clone());
persistence::register(wtb, topics.clone())?;
rest::register(server, topics.clone());
mqtt_conn::register(server, topics);

Ok(())
}
}
4 changes: 2 additions & 2 deletions src/broker/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn save_on_change(
Ok(())
}

pub fn register(wtb: &mut WatchedTasksBuilder, topics: Arc<Vec<Arc<dyn AnyTopic>>>) {
pub fn register(wtb: &mut WatchedTasksBuilder, topics: Arc<Vec<Arc<dyn AnyTopic>>>) -> Result<()> {
load(&topics).unwrap();

let (tx, rx) = unbounded();
Expand All @@ -157,5 +157,5 @@ pub fn register(wtb: &mut WatchedTasksBuilder, topics: Arc<Vec<Arc<dyn AnyTopic>
topic.subscribe_as_bytes(tx.clone(), false);
}

wtb.spawn_task("persistence-save", save_on_change(topics, rx));
wtb.spawn_task("persistence-save", save_on_change(topics, rx))
}
21 changes: 9 additions & 12 deletions src/dbus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::watched_tasks::WatchedTasksBuilder;

#[cfg(feature = "demo_mode")]
mod zb {
pub type Result<T> = std::result::Result<T, ()>;
pub use anyhow::Result;

pub struct Connection;
pub struct ConnectionBuilder;
Expand Down Expand Up @@ -78,20 +78,17 @@ impl DbusSession {
wtb: &mut WatchedTasksBuilder,
led_dut: Arc<Topic<BlinkPattern>>,
led_uplink: Arc<Topic<BlinkPattern>>,
) -> Self {
) -> anyhow::Result<Self> {
let tacd = Tacd::new();

let conn_builder = ConnectionBuilder::system()
.unwrap()
.name("de.pengutronix.tacd")
.unwrap();
let conn_builder = ConnectionBuilder::system()?.name("de.pengutronix.tacd")?;

let conn = Arc::new(tacd.serve(conn_builder).build().await.unwrap());
let conn = Arc::new(tacd.serve(conn_builder).build().await?);

Self {
network: Network::new(bb, wtb, &conn, led_dut, led_uplink),
rauc: Rauc::new(bb, wtb, &conn),
systemd: Systemd::new(bb, wtb, &conn).await,
}
Ok(Self {
network: Network::new(bb, wtb, &conn, led_dut, led_uplink)?,
rauc: Rauc::new(bb, wtb, &conn)?,
systemd: Systemd::new(bb, wtb, &conn).await?,
})
}
}
19 changes: 10 additions & 9 deletions src/dbus/networkmanager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// with this program; if not, write to the Free Software Foundation, Inc.,
// 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

use anyhow::Result;
use async_std;
use async_std::sync::Arc;

Expand All @@ -31,7 +32,7 @@ mod hostname;
// Put them inside a mod so we do not have to decorate each one with
// a #[cfg(not(feature = "demo_mode"))].
mod optional_includes {
pub use anyhow::{anyhow, Result};
pub use anyhow::anyhow;
pub use async_std::stream::StreamExt;
pub use async_std::task::sleep;
pub use futures::{future::FutureExt, pin_mut, select};
Expand Down Expand Up @@ -261,7 +262,7 @@ impl Network {
_conn: C,
_led_dut: Arc<Topic<BlinkPattern>>,
_led_uplink: Arc<Topic<BlinkPattern>>,
) -> Self {
) -> Result<Self> {
let this = Self::setup_topics(bb);

this.hostname.set("lxatac".to_string());
Expand All @@ -275,7 +276,7 @@ impl Network {
carrier: true,
});

this
Ok(this)
}

#[cfg(not(feature = "demo_mode"))]
Expand All @@ -285,7 +286,7 @@ impl Network {
conn: &Arc<Connection>,
led_dut: Arc<Topic<BlinkPattern>>,
led_uplink: Arc<Topic<BlinkPattern>>,
) -> Self {
) -> Result<Self> {
let this = Self::setup_topics(bb);

{
Expand All @@ -307,7 +308,7 @@ impl Network {
}

Ok(())
});
})?;
}

{
Expand Down Expand Up @@ -337,7 +338,7 @@ impl Network {
}

Ok(())
});
})?;
}

{
Expand All @@ -364,7 +365,7 @@ impl Network {
}

Ok(())
});
})?;
}

{
Expand All @@ -386,9 +387,9 @@ impl Network {
}

Ok(())
});
})?;
}

this
Ok(this)
}
}
20 changes: 10 additions & 10 deletions src/dbus/rauc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Rauc {
bb: &mut BrokerBuilder,
wtb: &mut WatchedTasksBuilder,
_conn: &Arc<Connection>,
) -> Self {
) -> Result<Self> {
let inst = Self::setup_topics(bb);

inst.operation.set("idle".to_string());
Expand All @@ -300,17 +300,17 @@ impl Rauc {
inst.channels.clone(),
inst.slot_status.clone(),
),
);
)?;

inst
Ok(inst)
}

#[cfg(not(feature = "demo_mode"))]
pub fn new(
bb: &mut BrokerBuilder,
wtb: &mut WatchedTasksBuilder,
conn: &Arc<Connection>,
) -> Self {
) -> Result<Self> {
let inst = Self::setup_topics(bb);

let conn_task = conn.clone();
Expand Down Expand Up @@ -409,7 +409,7 @@ impl Rauc {
break Ok(());
}
}
});
})?;

let conn_task = conn.clone();
let progress = inst.progress.clone();
Expand All @@ -431,7 +431,7 @@ impl Rauc {
}

Ok(())
});
})?;

let conn_task = conn.clone();
let last_error = inst.last_error.clone();
Expand All @@ -453,7 +453,7 @@ impl Rauc {
}

Ok(())
});
})?;

let conn_task = conn.clone();
let (mut install_stream, _) = inst.install.clone().subscribe_unbounded();
Expand All @@ -473,7 +473,7 @@ impl Rauc {
}

Ok(())
});
})?;

// Reload the channel list on request
let (reload_stream, _) = inst.reload.clone().subscribe_unbounded();
Expand All @@ -485,8 +485,8 @@ impl Rauc {
inst.channels.clone(),
inst.slot_status.clone(),
),
);
)?;

inst
Ok(inst)
}
}
36 changes: 21 additions & 15 deletions src/dbus/systemd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ impl Service {
_wtb: &mut WatchedTasksBuilder,
_conn: Arc<Connection>,
_unit_name: &str,
) {
) -> anyhow::Result<()> {
self.status.set(ServiceStatus::get().await.unwrap());

Ok(())
}

#[cfg(not(feature = "demo_mode"))]
Expand All @@ -110,7 +112,7 @@ impl Service {
wtb: &mut WatchedTasksBuilder,
conn: Arc<Connection>,
unit_name: &'static str,
) {
) -> anyhow::Result<()> {
let unit_path = {
let manager = manager::ManagerProxy::new(&conn).await.unwrap();
manager.get_unit(unit_name).await.unwrap()
Expand Down Expand Up @@ -150,7 +152,7 @@ impl Service {
.await
.unwrap();
}
});
})?;

let (mut action_reqs, _) = self.action.clone().subscribe_unbounded();

Expand All @@ -171,7 +173,9 @@ impl Service {
}

Ok(())
});
})?;

Ok(())
}
}

Expand All @@ -181,7 +185,7 @@ impl Systemd {
wtb: &mut WatchedTasksBuilder,
reboot: Arc<Topic<bool>>,
_conn: Arc<Connection>,
) {
) -> anyhow::Result<()> {
let (mut reboot_reqs, _) = reboot.subscribe_unbounded();

wtb.spawn_task("systemd-reboot", async move {
Expand All @@ -192,15 +196,15 @@ impl Systemd {
}

Ok(())
});
})
}

#[cfg(not(feature = "demo_mode"))]
pub fn handle_reboot(
wtb: &mut WatchedTasksBuilder,
reboot: Arc<Topic<bool>>,
conn: Arc<Connection>,
) {
) -> anyhow::Result<()> {
let (mut reboot_reqs, _) = reboot.subscribe_unbounded();

wtb.spawn_task("systemd-reboot", async move {
Expand All @@ -215,35 +219,37 @@ impl Systemd {
}

Ok(())
});
})
}

pub async fn new(
bb: &mut BrokerBuilder,
wtb: &mut WatchedTasksBuilder,
conn: &Arc<Connection>,
) -> Self {
) -> anyhow::Result<Self> {
let reboot = bb.topic_rw("/v1/tac/reboot", Some(false));

Self::handle_reboot(wtb, reboot.clone(), conn.clone());
Self::handle_reboot(wtb, reboot.clone(), conn.clone())?;

let networkmanager = Service::new(bb, "network-manager");
let labgrid = Service::new(bb, "labgrid-exporter");
let iobus = Service::new(bb, "lxa-iobus");

networkmanager
.connect(wtb, conn.clone(), "NetworkManager.service")
.await;
.await?;
labgrid
.connect(wtb, conn.clone(), "labgrid-exporter.service")
.await;
iobus.connect(wtb, conn.clone(), "lxa-iobus.service").await;
.await?;
iobus
.connect(wtb, conn.clone(), "lxa-iobus.service")
.await?;

Self {
Ok(Self {
reboot,
networkmanager,
labgrid,
iobus,
}
})
}
}
Loading

0 comments on commit c4ef644

Please sign in to comment.