From 13f307d38758e2862c7b4a57bfd5e29b25b98e45 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Mon, 4 Mar 2024 12:29:44 +0100 Subject: [PATCH] commit --- Cargo.lock | 20 +- README.md | 2 + .../src/handler_communication.rs | 34 +++ examples/full/key-pages/default.toml | 18 +- handlers/timer/Cargo.toml | 14 + handlers/timer/src/handler.rs | 269 ++++++++++++++++++ handlers/timer/src/main.rs | 26 ++ handlers/timer/src/util.rs | 54 ++++ src/coordinator/io_worker.rs | 39 ++- src/coordinator/mqtt.rs | 9 +- src/coordinator/state.rs | 6 +- src/handler_host/mqtt.rs | 121 ++++---- 12 files changed, 543 insertions(+), 69 deletions(-) create mode 100644 handlers/timer/Cargo.toml create mode 100644 handlers/timer/src/handler.rs create mode 100644 handlers/timer/src/main.rs create mode 100644 handlers/timer/src/util.rs diff --git a/Cargo.lock b/Cargo.lock index 1fd760a..9756f22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -184,15 +184,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "5bc015644b92d5890fab7489e49d21f879d5c990186827d42ec511919404f38b" dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", "serde", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -2004,6 +2004,20 @@ dependencies = [ "time-core", ] +[[package]] +name = "timer" +version = "0.1.0" +dependencies = [ + "clap", + "color-eyre", + "deckster_mode", + "env_logger", + "humantime-serde", + "log", + "serde", + "tokio", +] + [[package]] name = "tiny-skia" version = "0.11.3" diff --git a/README.md b/README.md index 79e35b0..9d9bff9 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ - Move loupedeck_serial and pa_volume_interface out of this repository. - Publish libraries to crates.io - Move handlers to their own repositories +- Update dependencies +- Make the CLI of handlers more useful ## Contributing ### Terminology diff --git a/crates/deckster_shared/src/handler_communication.rs b/crates/deckster_shared/src/handler_communication.rs index e8f267f..360e43d 100644 --- a/crates/deckster_shared/src/handler_communication.rs +++ b/crates/deckster_shared/src/handler_communication.rs @@ -47,9 +47,43 @@ pub enum HandlerEvent { Key { path: KeyPath, event: KeyEvent }, } +#[derive(Debug, PartialEq, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "kebab-case")] +pub enum VibrationPattern { + Short, + Medium, + Long, + Low, + ShortLow, + ShortLower, + Lower, + Lowest, + DescendSlow, + DescendMed, + DescendFast, + AscendSlow, + AscendMed, + AscendFast, + RevSlowest, + RevSlow, + RevMed, + RevFast, + RevFaster, + RevFastest, + RiseFall, + Buzz, + Rumble5, + Rumble4, + Rumble3, + Rumble2, + Rumble1, + VeryLong, +} + #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(tag = "command", rename_all = "kebab-case")] pub enum HandlerCommand { + Vibrate { pattern: VibrationPattern }, SetKeyStyle { path: KeyPath, value: Option }, SetKnobStyle { path: KnobPath, value: Option }, SetKnobValue { path: KnobPath, value: Option }, diff --git a/examples/full/key-pages/default.toml b/examples/full/key-pages/default.toml index 101bf41..069cb3b 100644 --- a/examples/full/key-pages/default.toml +++ b/examples/full/key-pages/default.toml @@ -1,7 +1,6 @@ [keys.1x2] icon = "@ph/skip-back" -host = "remote" handler = "playerctl" config.mode = "previous" config.style.inactive.icon = "@ph/skip-back[alpha=0.4]" @@ -9,7 +8,6 @@ config.style.inactive.icon = "@ph/skip-back[alpha=0.4]" [keys.2x2] icon = "@ph/play-pause[alpha=0.4]" -host = "remote" handler = "playerctl" config.mode = "play-pause" config.style.paused.icon = "@ph/play" @@ -18,7 +16,6 @@ config.style.playing.icon = "@ph/pause" [keys.3x2] icon = "@ph/skip-forward" -host = "remote" handler = "playerctl" config.mode = "next" config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]" @@ -26,7 +23,6 @@ config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]" [keys.1x3] icon = "@fad/shuffle[alpha=0.4]" -host = "remote" handler = "playerctl" config.mode = "shuffle" config.style.on.icon = "@fad/shuffle[color=#58fc11]" @@ -34,20 +30,24 @@ config.style.on.icon = "@fad/shuffle[color=#58fc11]" [keys.2x3] icon = "@fad/repeat[alpha=0.4]" -host = "remote" handler = "playerctl" config.mode = "loop" config.style.single.icon = "@fad/repeat-one[color=#58fc11]" config.style.all.icon = "@fad/repeat[color=#58fc11]" [keys.3x3] -icon = "@ph/timer[color=#ff0000]" +icon = "@ph/timer[scale=0.9]" -host = "moira" handler = "timer" -config.durations = ["60s", "5m", "10m", "15m", "30m"] +config.durations = ["5s"] +config.select_timeout = "1500ms" +config.alarm_timeout = "10s" config.vibrate_when_finished = true -config.needy = true +config.alarm_style_switch_interval = "200ms" +config.style.alarm1.border = "#ff0000ff" +config.style.alarm1.label = "00:00" +config.style.alarm2.border = "#ff00000f" +config.style.alarm2.label = "00:00" [keys.4x3] icon = "@ph/computer-tower" diff --git a/handlers/timer/Cargo.toml b/handlers/timer/Cargo.toml new file mode 100644 index 0000000..f576aea --- /dev/null +++ b/handlers/timer/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "timer" +version = "0.1.0" +edition = "2021" + +[dependencies] +deckster_mode = { path = "../../crates/deckster_mode" } +clap = { version = "4.4.18", features = ["derive"] } +color-eyre = "0.6.2" +env_logger = "0.11.1" +log = "0.4.20" +tokio = { version = "1.35.1", features = ["macros", "parking_lot", "rt", "sync", "time"] } +serde = { version = "1.0.196", features = ["derive"] } +humantime-serde = "1.1.1" \ No newline at end of file diff --git a/handlers/timer/src/handler.rs b/handlers/timer/src/handler.rs new file mode 100644 index 0000000..885e299 --- /dev/null +++ b/handlers/timer/src/handler.rs @@ -0,0 +1,269 @@ +use crate::util::{format_duration, get_far_future, spawn_debouncer}; +use deckster_mode::shared::handler_communication::{ + HandlerCommand, HandlerEvent, HandlerInitializationError, InitialHandlerMessage, KeyEvent, VibrationPattern, +}; +use deckster_mode::shared::path::KeyPath; +use deckster_mode::shared::state::KeyStyleByStateMap; +use deckster_mode::{send_command, DecksterHandler}; +use serde::Deserialize; +use std::thread; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast::Receiver; +use tokio::sync::{broadcast, mpsc}; +use tokio::task::LocalSet; +use tokio::time::{interval, interval_at, MissedTickBehavior}; + +#[derive(Debug, Clone, Deserialize)] +pub struct KeyConfig { + durations: Box<[humantime_serde::Serde]>, + #[serde(with = "humantime_serde")] + select_timeout: Duration, + #[serde(with = "humantime_serde")] + alarm_timeout: Duration, + vibrate_when_finished: bool, + #[serde(with = "humantime_serde")] + alarm_style_switch_interval: Duration, + #[serde(default)] + style: KeyStyleByStateMap, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)] +#[serde(rename_all = "kebab-case")] +enum KeyStyleState { + Inactive, + Selection, + Running, + Paused, + Alarm1, + Alarm2, +} + +pub struct Handler { + events_sender: broadcast::Sender<(KeyPath, KeyEvent)>, +} + +impl Handler { + pub fn new(data: InitialHandlerMessage) -> Result { + let events_sender = broadcast::Sender::<(KeyPath, KeyEvent)>::new(5); + + thread::spawn({ + let events_sender = events_sender.clone(); + + move || { + let runtime = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); + let task_set = LocalSet::new(); + + let (vibration_active_sender, mut vibration_active_receiver) = mpsc::channel::(2); + task_set.spawn_local(async move { + let mut active_count = 0usize; + + let mut interval = tokio::time::interval(Duration::from_secs(2)); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + biased; // Important! Always first drain the channel before actually doing something. + + result = vibration_active_receiver.recv() => { + match result { + None => break, + Some(false) => active_count -= 1, + Some(true) => { + active_count += 1; + + if active_count == 1 { + interval.reset_immediately() + } + }, + } + } + + _ = interval.tick(), if active_count > 0 => { + send_command(HandlerCommand::Vibrate { pattern: VibrationPattern::Long }) + } + } + } + }); + + for (path, config) in data.key_configs { + task_set.spawn_local(manage_key(events_sender.subscribe(), vibration_active_sender.clone(), path, config)); + } + + runtime.block_on(task_set) + } + }); + + Ok(Handler { events_sender }) + } +} + +impl DecksterHandler for Handler { + fn handle(&mut self, event: HandlerEvent) { + if let HandlerEvent::Key { path, event } = event { + // No receivers being available can be ignored. + _ = self.events_sender.send((path, event)); + } + } +} + +#[derive(Debug, PartialEq)] +enum KeyStage { + Inactive, + Selection { selected_index: usize }, + Running { duration: Duration }, + Paused { duration: Duration }, + Alarm, +} + +#[derive(Debug)] +struct KeyState { + stage_change_time: Instant, + stage: KeyStage, +} + +async fn manage_key(mut events: Receiver<(KeyPath, KeyEvent)>, vibration_active_sender: mpsc::Sender, path: KeyPath, config: KeyConfig) { + let mut state = KeyState { + stage_change_time: Instant::now(), + stage: KeyStage::Inactive, + }; + + let (reset_timeout_sender, mut select_timeout_receiver) = spawn_debouncer(config.select_timeout); + + let mut display_interval = interval(Duration::from_millis(500)); + display_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut alarm1_interval = interval_at(tokio::time::Instant::now() + Duration::ZERO, config.alarm_style_switch_interval * 2); + let mut alarm2_interval = interval_at( + tokio::time::Instant::now() + config.alarm_style_switch_interval, + config.alarm_style_switch_interval * 2, + ); + + alarm1_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + alarm2_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut alarm_sleep = Box::pin(tokio::time::sleep_until(get_far_future().into())); + + loop { + tokio::select! { + _ = &mut alarm_sleep => { + if state.stage == KeyStage::Alarm { + if config.vibrate_when_finished { + vibration_active_sender.send(false).await.unwrap(); + } + + state.stage = KeyStage::Inactive; + state.stage_change_time = Instant::now(); + alarm_sleep.as_mut().reset(get_far_future().into()); + send_key_style(&path, &state, &config); + } else { + if config.vibrate_when_finished { + vibration_active_sender.send(true).await.unwrap(); + } + + state.stage = KeyStage::Alarm; + state.stage_change_time = Instant::now(); + alarm_sleep.as_mut().reset(tokio::time::Instant::now() + config.alarm_timeout); + } + } + _ = display_interval.tick(), if matches!(state.stage, KeyStage::Running { .. } | KeyStage::Paused { .. }) => { + send_key_style(&path, &state, &config); + } + _ = alarm1_interval.tick(), if matches!(state.stage, KeyStage::Alarm) => { + send_command(HandlerCommand::SetKeyStyle { path: path.clone(), value: config.style.get(&KeyStyleState::Alarm1).cloned() }); + } + _ = alarm2_interval.tick(), if matches!(state.stage, KeyStage::Alarm) => { + send_command(HandlerCommand::SetKeyStyle { path: path.clone(), value: config.style.get(&KeyStyleState::Alarm2).cloned() }); + } + _ = select_timeout_receiver.recv() => { + if let KeyStage::Selection { selected_index } = state.stage { + let duration = config.durations[selected_index].into_inner(); + alarm_sleep.as_mut().reset(tokio::time::Instant::now() + duration); + state.stage = KeyStage::Running { duration }; + state.stage_change_time = Instant::now(); + send_key_style(&path, &state, &config); + } + } + Ok((p, event)) = events.recv() => { + if p != path { + continue + } + + if event == KeyEvent::Press { + match &mut state.stage { + KeyStage::Inactive => { + state.stage = KeyStage::Selection { selected_index: 0 }; + _ = reset_timeout_sender.try_send(()); + } + KeyStage::Selection { selected_index } => { + *selected_index = selected_index.wrapping_add(1) % config.durations.len(); + _ = reset_timeout_sender.try_send(()); + } + KeyStage::Running { duration } => { + state.stage = KeyStage::Paused { duration: *duration - state.stage_change_time.elapsed() }; + } + KeyStage::Paused { duration } => { + alarm_sleep.as_mut().reset(tokio::time::Instant::now() + *duration); + state.stage = KeyStage::Running { duration: *duration }; + } + KeyStage::Alarm => { + if config.vibrate_when_finished { + vibration_active_sender.send(false).await.unwrap(); + } + + state.stage = KeyStage::Inactive; + state.stage_change_time = Instant::now(); + alarm_sleep.as_mut().reset(get_far_future().into()); + send_command(HandlerCommand::SetKeyStyle { path: path.clone(), value: config.style.get(&KeyStyleState::Inactive).cloned() }); + } + } + + state.stage_change_time = Instant::now(); + send_key_style(&path, &state, &config); + } + } + } + } +} + +fn send_key_style(path: &KeyPath, state: &KeyState, config: &KeyConfig) { + let elapsed = state.stage_change_time.elapsed(); + + let style = match state.stage { + KeyStage::Inactive => config.style.get(&KeyStyleState::Inactive).cloned(), + KeyStage::Selection { selected_index } => { + let duration = config.durations[selected_index].into_inner(); + let mut s = config.style.get(&KeyStyleState::Selection).cloned().unwrap_or_default(); + s.label = Some(format_duration(duration)); + Some(s) + } + KeyStage::Running { duration } => { + if elapsed > duration { + return; + } + + let mut s = config.style.get(&KeyStyleState::Running).cloned().unwrap_or_default(); + s.label = Some(format_duration(duration - elapsed)); + Some(s) + } + KeyStage::Paused { duration } => { + if elapsed > duration { + return; + } + + let mut s = config.style.get(&KeyStyleState::Paused).cloned().unwrap_or_default(); + s.label = Some(if state.stage_change_time.elapsed().as_millis() % 1000 < 500 { + format_duration(duration - state.stage_change_time.elapsed()) + } else { + "".to_owned() + }); + + Some(s) + } + KeyStage::Alarm => config.style.get(&KeyStyleState::Alarm1).cloned(), + }; + + send_command(HandlerCommand::SetKeyStyle { + path: path.clone(), + value: style, + }); +} diff --git a/handlers/timer/src/main.rs b/handlers/timer/src/main.rs new file mode 100644 index 0000000..7c537c0 --- /dev/null +++ b/handlers/timer/src/main.rs @@ -0,0 +1,26 @@ +use clap::Parser; +use color_eyre::Result; + +use crate::handler::Handler; + +mod handler; +mod util; + +#[derive(Debug, Parser)] +#[command(name = "timer")] +enum CliCommand { + #[command(name = "deckster-run", hide = true)] + Run, +} + +fn main() -> Result<()> { + let command = CliCommand::parse(); + + match command { + CliCommand::Run => { + deckster_mode::run(Handler::new)?; + } + } + + Ok(()) +} diff --git a/handlers/timer/src/util.rs b/handlers/timer/src/util.rs new file mode 100644 index 0000000..a94361d --- /dev/null +++ b/handlers/timer/src/util.rs @@ -0,0 +1,54 @@ +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::time::timeout; + +/// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`. +/// The delay is reset when a new message is reset. +pub fn spawn_debouncer(duration: Duration) -> (Sender<()>, Receiver<()>) { + let (input_sender, mut input_receiver) = mpsc::channel::<()>(1); + let (output_sender, output_receiver) = mpsc::channel::<()>(1); + + tokio::spawn(async move { + 'outer: loop { + if input_receiver.recv().await.is_none() { + break 'outer; + } + + 'inner: loop { + match timeout(duration, input_receiver.recv()).await { + Ok(None) => break 'outer, + Ok(Some(_)) => continue 'inner, + Err(_) => { + if let Err(TrySendError::Closed(_)) = output_sender.try_send(()) { + break 'outer; + } else { + break 'inner; + } + } + } + } + } + }); + + (input_sender, output_receiver) +} + +pub fn format_duration(duration: Duration) -> String { + let full_seconds = duration.as_secs(); + let full_minutes = full_seconds / 60; + let hours = full_minutes / 60; + let minutes = full_minutes % 60; + let seconds = full_seconds % 60; + + if hours == 0 { + format!("{:0>2}:{:0>2}", minutes, seconds) + } else { + format!("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds) + } +} + +pub fn get_far_future() -> Instant { + Instant::now() + Duration::from_secs(60 * 60 * 24 * 365 * 30) // 30 years +} diff --git a/src/coordinator/io_worker.rs b/src/coordinator/io_worker.rs index 049aeff..ed3949b 100644 --- a/src/coordinator/io_worker.rs +++ b/src/coordinator/io_worker.rs @@ -7,10 +7,11 @@ use resvg::usvg::tiny_skia_path::IntSize; use rgb::RGB8; use tokio::sync::broadcast; -use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent, KeyEvent, KeyTouchEventKind, KnobEvent}; +use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent, KeyEvent, KeyTouchEventKind, KnobEvent, VibrationPattern}; use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; use deckster_shared::state::{Key, Knob}; use loupedeck_serial::characteristics::{LoupedeckDeviceKeyGridCharacteristics, LoupedeckDisplayRect, LoupedeckKnob}; +use loupedeck_serial::commands::VibrationPattern as LSVibrationPattern; use loupedeck_serial::device::LoupedeckDevice; use loupedeck_serial::events::{LoupedeckEvent, RotationDirection}; @@ -251,6 +252,9 @@ fn handle_handler_command(context: &mut IoWorkerContext, command: HandlerCommand log::trace!("Handling handler command: {:?}", &command); match command { + HandlerCommand::Vibrate { pattern } => { + context.device.vibrate(map_vibration_pattern(pattern)); + } HandlerCommand::SetKeyStyle { path, value } => { context.state.mutate_key_for_command("SetKeyStyle", &path, |k| { k.style = value; @@ -417,3 +421,36 @@ fn get_position_of_loupedeck_knob(value: LoupedeckKnob) -> KnobPosition { LoupedeckKnob::RightBottom => KnobPosition::RightBottom, } } + +fn map_vibration_pattern(value: VibrationPattern) -> LSVibrationPattern { + match value { + VibrationPattern::Short => LSVibrationPattern::Short, + VibrationPattern::Medium => LSVibrationPattern::Medium, + VibrationPattern::Long => LSVibrationPattern::Long, + VibrationPattern::Low => LSVibrationPattern::Low, + VibrationPattern::ShortLow => LSVibrationPattern::ShortLow, + VibrationPattern::ShortLower => LSVibrationPattern::ShortLower, + VibrationPattern::Lower => LSVibrationPattern::Lower, + VibrationPattern::Lowest => LSVibrationPattern::Lowest, + VibrationPattern::DescendSlow => LSVibrationPattern::DescendSlow, + VibrationPattern::DescendMed => LSVibrationPattern::DescendMed, + VibrationPattern::DescendFast => LSVibrationPattern::DescendFast, + VibrationPattern::AscendSlow => LSVibrationPattern::AscendSlow, + VibrationPattern::AscendMed => LSVibrationPattern::AscendMed, + VibrationPattern::AscendFast => LSVibrationPattern::AscendFast, + VibrationPattern::RevSlowest => LSVibrationPattern::RevSlowest, + VibrationPattern::RevSlow => LSVibrationPattern::RevSlow, + VibrationPattern::RevMed => LSVibrationPattern::RevMed, + VibrationPattern::RevFast => LSVibrationPattern::RevFast, + VibrationPattern::RevFaster => LSVibrationPattern::RevFaster, + VibrationPattern::RevFastest => LSVibrationPattern::RevFastest, + VibrationPattern::RiseFall => LSVibrationPattern::RiseFall, + VibrationPattern::Buzz => LSVibrationPattern::Buzz, + VibrationPattern::Rumble5 => LSVibrationPattern::Rumble5, + VibrationPattern::Rumble4 => LSVibrationPattern::Rumble4, + VibrationPattern::Rumble3 => LSVibrationPattern::Rumble3, + VibrationPattern::Rumble2 => LSVibrationPattern::Rumble2, + VibrationPattern::Rumble1 => LSVibrationPattern::Rumble1, + VibrationPattern::VeryLong => LSVibrationPattern::VeryLong, + } +} diff --git a/src/coordinator/mqtt.rs b/src/coordinator/mqtt.rs index e396114..ef18993 100644 --- a/src/coordinator/mqtt.rs +++ b/src/coordinator/mqtt.rs @@ -5,7 +5,7 @@ use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; use tokio::sync::broadcast; use crate::coordinator::io_worker::CoordinatorCommand; -use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; +use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent, VibrationPattern}; use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; use deckster_shared::style::{KeyStyle, KnobStyle}; @@ -132,6 +132,13 @@ pub async fn start_mqtt_client( }) .unwrap(); } + "vibrate" => { + if let Ok(pattern) = serde_json::from_slice::(&event.payload) { + handler_commands_sender.send_async(HandlerCommand::Vibrate { pattern }).await.unwrap(); + } else { + log::error!("Unknown vibration pattern from {}: {}", event.topic, String::from_utf8_lossy(&event.payload)); + } + } "keys" => { let page_id = segments[1]; let position = segments[2]; diff --git a/src/coordinator/state.rs b/src/coordinator/state.rs index b925c6d..4156c3b 100644 --- a/src/coordinator/state.rs +++ b/src/coordinator/state.rs @@ -70,7 +70,11 @@ impl State { active_key_page_id: config.initial.key_page.clone(), active_knob_page_id: config.initial.knob_page.clone(), active_touch_ids: HashSet::new(), - active_remote_handler_host_ids: HashSet::new(), + active_remote_handler_host_ids: { + let mut s = HashSet::new(); + s.insert("".to_owned().into_boxed_str()); + s + }, key_pages_by_id, knob_pages_by_id, } diff --git a/src/handler_host/mqtt.rs b/src/handler_host/mqtt.rs index 6a1f192..7684b08 100644 --- a/src/handler_host/mqtt.rs +++ b/src/handler_host/mqtt.rs @@ -65,6 +65,7 @@ pub async fn start_mqtt_client( is_first_try = false; client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/vibrate"), QoS::ExactlyOnce).await.unwrap(); client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); } @@ -72,65 +73,68 @@ pub async fn start_mqtt_client( let topic_name = event.topic; let topic_segments = topic_name.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); - if topic_segments[0] == "config" { - if let Ok(config) = serde_json::from_slice::>(&event.payload) { - if let Some(c) = &config { - client - .publish(activeness_topic.clone(), QoS::AtLeastOnce, true, c.run_id.to_owned().into_boxed_bytes()) - .await - .unwrap(); - } + match topic_segments[0] { + "config" => { + if let Ok(config) = serde_json::from_slice::>(&event.payload) { + if let Some(c) = &config { + client + .publish(activeness_topic.clone(), QoS::AtLeastOnce, true, c.run_id.to_owned().into_boxed_bytes()) + .await + .unwrap(); + } - handler_hosts_config_sender.send(config).await.unwrap(); - } else { - log::error!("Could not deserialize the latest configuration from {}", topic_name); - handler_hosts_config_sender.send(None).await.unwrap(); - }; - } else { - let page_id = topic_segments[1]; - let position = topic_segments[2]; - let property = topic_segments[3]; + handler_hosts_config_sender.send(config).await.unwrap(); + } else { + log::error!("Could not deserialize the latest configuration from {}", topic_name); + handler_hosts_config_sender.send(None).await.unwrap(); + }; + } + _ => { + let page_id = topic_segments[1]; + let position = topic_segments[2]; + let property = topic_segments[3]; - match topic_segments[0] { - "keys" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - if let Ok(position) = KeyPosition::from_str(position) { - // This can be Err when events are received before the configuration - // but in that case we just ignore the event. - _ = events_sender.send(HandlerEvent::Key { - path: KeyPath { - page_id: page_id.to_owned(), - position, - }, - event, - }) + match topic_segments[0] { + "keys" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + if let Ok(position) = KeyPosition::from_str(position) { + // This can be Err when events are received before the configuration + // but in that case we just ignore the event. + _ = events_sender.send(HandlerEvent::Key { + path: KeyPath { + page_id: page_id.to_owned(), + position, + }, + event, + }) + } else { + log::warn!("Invalid key position in topic name: {topic_name}"); + } } else { - log::warn!("Invalid key position in topic name: {topic_name}"); - } - } else { - log::error!("Could not deserialize the latest event from {topic_name}"); - }; - } - "knobs" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - if let Ok(position) = KnobPosition::from_str(position) { - // This can be Err when events are received before the configuration - // but in that case we just ignore the event. - _ = events_sender.send(HandlerEvent::Knob { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - event, - }); + log::error!("Could not deserialize the latest event from {topic_name}"); + }; + } + "knobs" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + if let Ok(position) = KnobPosition::from_str(position) { + // This can be Err when events are received before the configuration + // but in that case we just ignore the event. + _ = events_sender.send(HandlerEvent::Knob { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + event, + }); + } else { + log::warn!("Invalid knob position in topic name: {topic_name}"); + } } else { - log::warn!("Invalid knob position in topic name: {topic_name}"); - } - } else { - log::error!("Could not deserialize the latest event from {}", topic_name); - }; + log::error!("Could not deserialize the latest event from {}", topic_name); + }; + } + _ => {} } - _ => {} } } } @@ -146,6 +150,15 @@ pub async fn start_mqtt_client( tokio::spawn(async move { while let Ok(command) = commands_receiver.recv_async().await { match command { + HandlerCommand::Vibrate { pattern } => client + .publish( + format!("{topic_prefix}/vibrate"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&pattern).unwrap(), + ) + .await + .unwrap(), HandlerCommand::SetKeyStyle { path, value } => client .publish( format!("{topic_prefix}/keys/{path}/style"),