From 19a595c21b08156d007b2c771369577ce5b7c081 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Wed, 28 Feb 2024 16:48:01 +0100 Subject: [PATCH] commit --- .gitignore | 2 +- Cargo.lock | 41 +++ Cargo.toml | 2 + README.md | 9 +- crates/deckster_mode/src/lib.rs | 6 + .../src/handler_communication.rs | 1 + crates/pa_volume_interface/src/lib.rs | 6 +- examples/full/deckster.toml | 2 +- examples/full/key-pages/default.toml | 5 + examples/full/knob-pages/default.toml | 4 + examples/handler_host/deckster.toml | 7 + src/{runner => coordinator}/graphics.rs | 2 +- src/{runner => coordinator}/io_worker.rs | 8 +- src/{runner => coordinator}/mod.rs | 101 +++--- src/coordinator/mqtt.rs | 154 +++++++++ src/{runner => coordinator}/state.rs | 4 +- src/handler_host/mod.rs | 218 +++---------- src/handler_host/mqtt.rs | 138 +++++++++ src/handler_runner.rs | 221 +++++++++++++ src/icons/mod.rs | 2 +- src/main.rs | 31 +- .../{config.rs => coordinator_config.rs} | 16 +- src/model/handler_host_config.rs | 9 + src/model/mod.rs | 4 +- src/model/mqtt.rs | 29 ++ src/runner/mqtt.rs | 291 ------------------ 26 files changed, 754 insertions(+), 559 deletions(-) create mode 100644 examples/handler_host/deckster.toml rename src/{runner => coordinator}/graphics.rs (99%) rename src/{runner => coordinator}/io_worker.rs (98%) rename src/{runner => coordinator}/mod.rs (53%) create mode 100644 src/coordinator/mqtt.rs rename src/{runner => coordinator}/state.rs (97%) create mode 100644 src/handler_host/mqtt.rs create mode 100644 src/handler_runner.rs rename src/model/{config.rs => coordinator_config.rs} (91%) create mode 100644 src/model/handler_host_config.rs create mode 100644 src/model/mqtt.rs delete mode 100644 src/runner/mqtt.rs diff --git a/.gitignore b/.gitignore index 0084c97..bbdfa6b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target /.idea -/examples/full/handlers \ No newline at end of file +/examples/*/handlers \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index f9b6b38..23b4ad6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,11 +381,13 @@ dependencies = [ "enum-map", "enum-ordinalize", "env_logger", + "flume", "humantime-serde", "is_executable", "itertools", "log", "loupedeck_serial", + "nanoid", "once_cell", "parse-display 0.9.0", "regex", @@ -1034,6 +1036,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "nanoid" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8" +dependencies = [ + "rand", +] + [[package]] name = "nanorand" version = "0.7.0" @@ -1269,6 +1280,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.78" @@ -1287,11 +1304,35 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + [[package]] name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] [[package]] name = "rand_xoshiro" diff --git a/Cargo.toml b/Cargo.toml index 60ad6b5..42cfc11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ once_cell = "1.19.0" is_executable = "1.0.1" rumqttc = "0.23.0" itertools = "0.12.1" +flume = "0.11.0" +nanoid = "0.4.0" [workspace] members = [ diff --git a/README.md b/README.md index 7a1298b..ce47d08 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # Deckster -## Remote handler host handshake - -- The coordinator publishes the config to `PREFIX/config` which includes a nonce. - -- All hosts publish the nonce to `PREFIX/handler_hosts/HOST_ID/active_run_id` so that the coordinator knows they are alive. +## Terminology +- `handler runner`: Node that is running handlers. +- `handler host`: A `handler runner` that is not the `coordinator`. +- `coordinator`: Node to which the Loupedeck device is physically connected. Always a `handler runner`. ## Attribution [foxxyz’s `loupedeck` library for JavaScript](https://github.com/foxxyz/loupedeck) diff --git a/crates/deckster_mode/src/lib.rs b/crates/deckster_mode/src/lib.rs index 47fcacb..f30d055 100644 --- a/crates/deckster_mode/src/lib.rs +++ b/crates/deckster_mode/src/lib.rs @@ -48,8 +48,14 @@ pub fn run< description: e.to_string(), })?; + let should_stop = matches!(event, HandlerEvent::Stop); + h.handle(event); handler = Either::Left(h); + + if should_stop { + break; + } } Either::Right(init_handler) => { let initial_message = serde_json::from_str::>(&line); diff --git a/crates/deckster_shared/src/handler_communication.rs b/crates/deckster_shared/src/handler_communication.rs index f83313a..1f1f1cc 100644 --- a/crates/deckster_shared/src/handler_communication.rs +++ b/crates/deckster_shared/src/handler_communication.rs @@ -42,6 +42,7 @@ pub enum KeyEvent { #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "kebab-case")] pub enum HandlerEvent { + Stop, Knob { path: KnobPath, event: KnobEvent }, Key { path: KeyPath, event: KeyEvent }, } diff --git a/crates/pa_volume_interface/src/lib.rs b/crates/pa_volume_interface/src/lib.rs index 0360daf..24e27e8 100644 --- a/crates/pa_volume_interface/src/lib.rs +++ b/crates/pa_volume_interface/src/lib.rs @@ -374,7 +374,7 @@ impl PaThread { let current_state = Arc::clone(&self.current_state); let mainloop = Rc::clone(&self.mainloop); - loop { + 'outer: loop { self.run_single_mainloop_iteration(false); while let Ok(command) = self.commands_rx.try_recv() { @@ -403,11 +403,13 @@ impl PaThread { } } PaCommand::Terminate => { - mainloop.borrow_mut().quit(Retval(0)); + break 'outer; } } } } + + mainloop.borrow_mut().quit(Retval(0)); } fn unwrap_state(state: &Arc>>>) -> Arc { diff --git a/examples/full/deckster.toml b/examples/full/deckster.toml index afb9846..ea7da91 100644 --- a/examples/full/deckster.toml +++ b/examples/full/deckster.toml @@ -8,7 +8,7 @@ key_page = "default" knob_page = "default" [mqtt] -client_id = "deckster_host" +client_id = "deckster-coordinator" topic_prefix = "deckster" host = "localhost" port = 1883 diff --git a/examples/full/key-pages/default.toml b/examples/full/key-pages/default.toml index e1aa4d0..101bf41 100644 --- a/examples/full/key-pages/default.toml +++ b/examples/full/key-pages/default.toml @@ -1,6 +1,7 @@ [keys.1x2] icon = "@ph/skip-back" +host = "remote" handler = "playerctl" config.mode = "previous" config.style.inactive.icon = "@ph/skip-back[alpha=0.4]" @@ -8,6 +9,7 @@ config.style.inactive.icon = "@ph/skip-back[alpha=0.4]" [keys.2x2] icon = "@ph/play-pause[alpha=0.4]" +host = "remote" handler = "playerctl" config.mode = "play-pause" config.style.paused.icon = "@ph/play" @@ -16,6 +18,7 @@ config.style.playing.icon = "@ph/pause" [keys.3x2] icon = "@ph/skip-forward" +host = "remote" handler = "playerctl" config.mode = "next" config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]" @@ -23,6 +26,7 @@ config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]" [keys.1x3] icon = "@fad/shuffle[alpha=0.4]" +host = "remote" handler = "playerctl" config.mode = "shuffle" config.style.on.icon = "@fad/shuffle[color=#58fc11]" @@ -30,6 +34,7 @@ config.style.on.icon = "@fad/shuffle[color=#58fc11]" [keys.2x3] icon = "@fad/repeat[alpha=0.4]" +host = "remote" handler = "playerctl" config.mode = "loop" config.style.single.icon = "@fad/repeat-one[color=#58fc11]" diff --git a/examples/full/knob-pages/default.toml b/examples/full/knob-pages/default.toml index 0619ebf..5db7e0c 100644 --- a/examples/full/knob-pages/default.toml +++ b/examples/full/knob-pages/default.toml @@ -2,6 +2,7 @@ icon = "@ph/microphone-light[scale=0.9]" indicators.bar.color = "#ffffff50" +host = "remote" handler = "pa_volume" config.delta = 0.05 config.target.type = "input" @@ -19,6 +20,7 @@ config.style.inactive.icon = "@ph/microphone-slash-light[scale=0.9|alpha=0.8|col icon = "@apps/discord[scale=0.25]" indicators.bar.color = "#ffffff50" +host = "remote" handler = "pa_volume" config.delta = 0.05 config.target.type = "application" @@ -31,6 +33,7 @@ config.style.inactive.icon = "@apps/discord[scale=0.25|grayscale|alpha=0.8]" icon = "@apps/youtube[scale=1.3]" indicators.bar.color = "#ffffff50" +host = "remote" handler = "pa_volume" config.delta = 0.05 config.muted_turn_action = "unmute" @@ -44,6 +47,7 @@ config.style.inactive.icon = "@apps/youtube[scale=1.3|grayscale]" icon = "@apps/spotify[scale=1.2]" indicators.bar.color = "#ffffff50" +host = "remote" handler = "pa_volume" config.delta = 0.05 config.muted_turn_action = "unmute-at-zero" diff --git a/examples/handler_host/deckster.toml b/examples/handler_host/deckster.toml new file mode 100644 index 0000000..b9b5036 --- /dev/null +++ b/examples/handler_host/deckster.toml @@ -0,0 +1,7 @@ +host_id = "remote" + +[mqtt] +client_id = "deckster-remote" +topic_prefix = "deckster" +host = "localhost" +port = 1883 \ No newline at end of file diff --git a/src/runner/graphics.rs b/src/coordinator/graphics.rs similarity index 99% rename from src/runner/graphics.rs rename to src/coordinator/graphics.rs index 1a207fe..67d86be 100644 --- a/src/runner/graphics.rs +++ b/src/coordinator/graphics.rs @@ -9,8 +9,8 @@ use tiny_skia::{Color, IntSize, LineCap, LineJoin, Paint, Pixmap, PremultipliedC use deckster_shared::state::{Key, Knob}; use loupedeck_serial::util::Endianness; +use crate::coordinator::graphics::labels::LabelRenderer; use crate::icons::IconManager; -use crate::runner::graphics::labels::LabelRenderer; #[derive(Debug)] pub struct GraphicsContext { diff --git a/src/runner/io_worker.rs b/src/coordinator/io_worker.rs similarity index 98% rename from src/runner/io_worker.rs rename to src/coordinator/io_worker.rs index 8bf8cd9..67ad4fd 100644 --- a/src/runner/io_worker.rs +++ b/src/coordinator/io_worker.rs @@ -15,12 +15,12 @@ use loupedeck_serial::characteristics::{LoupedeckButton, LoupedeckDeviceKeyGridC use loupedeck_serial::device::LoupedeckDevice; use loupedeck_serial::events::{LoupedeckEvent, RotationDirection}; +use crate::coordinator::graphics::labels::LabelRenderer; +use crate::coordinator::graphics::{render_key, render_knob, GraphicsContext}; +use crate::coordinator::state::State; use crate::icons::IconManager; -use crate::model::config::Config; +use crate::model::coordinator_config::Config; use crate::model::position::ButtonPosition; -use crate::runner::graphics::labels::LabelRenderer; -use crate::runner::graphics::{render_key, render_knob, GraphicsContext}; -use crate::runner::state::State; enum IoWork { DeviceEvent(LoupedeckEvent), diff --git a/src/runner/mod.rs b/src/coordinator/mod.rs similarity index 53% rename from src/runner/mod.rs rename to src/coordinator/mod.rs index b79390d..df9db10 100644 --- a/src/runner/mod.rs +++ b/src/coordinator/mod.rs @@ -5,6 +5,7 @@ use std::thread; use color_eyre::eyre::{ContextCompat, WrapErr}; use color_eyre::Result; use log::info; +use nanoid::nanoid; use tokio::sync::broadcast; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; @@ -12,12 +13,13 @@ use deckster_shared::path::{KeyPath, KnobPath}; use loupedeck_serial::commands::VibrationPattern; use loupedeck_serial::device::LoupedeckDevice; -use crate::handler_host; -use crate::handler_host::KeyOrKnobHandlerConfig; -use crate::model::config::Config; +use crate::coordinator::io_worker::{do_io_work, IoWorkerContext}; +use crate::coordinator::mqtt::start_mqtt_client; +use crate::handler_runner; +use crate::handler_runner::KeyOrKnobHandlerConfig; +use crate::model::coordinator_config::Config; use crate::model::get_default_host_id; -use crate::runner::io_worker::{do_io_work, IoWorkerContext}; -use crate::runner::mqtt::start_coordinator_mqtt_client; +use crate::model::mqtt::HandlerHostsConfig; mod graphics; mod io_worker; @@ -42,62 +44,63 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { }) .unwrap(); - let key_configs = config - .key_pages_by_id - .iter() - .flat_map(|(page_id, p)| { - p.keys.iter().map(|(position, k)| { - ( - KeyPath { - page_id: page_id.clone(), - position: *position, - }, - KeyOrKnobHandlerConfig { - host_id: k.host.clone(), - name: k.handler.clone(), - config: Arc::clone(&k.config), - }, - ) + let handler_hosts_config = HandlerHostsConfig { + run_id: nanoid!().into_boxed_str(), + keys: config + .key_pages_by_id + .iter() + .flat_map(|(page_id, p)| { + p.keys.iter().map(|(position, k)| { + ( + KeyPath { + page_id: page_id.clone(), + position: *position, + }, + KeyOrKnobHandlerConfig { + host_id: k.host.clone(), + name: k.handler.clone(), + config: Arc::clone(&k.config), + }, + ) + }) }) - }) - .collect(); + .collect(), + knobs: config + .knob_pages_by_id + .iter() + .flat_map(|(page_id, p)| { + p.knobs.iter().filter_map(|(position, k)| { + if k.handler.is_empty() { + return None; + } - let knob_configs = config - .knob_pages_by_id - .iter() - .flat_map(|(page_id, p)| { - p.knobs.iter().filter_map(|(position, k)| { - if k.handler.is_empty() { - return None; - } - - Some(( - KnobPath { - page_id: page_id.clone(), - position, - }, - KeyOrKnobHandlerConfig { - host_id: k.host.clone(), - name: k.handler.clone(), - config: Arc::clone(&k.config), - }, - )) + Some(( + KnobPath { + page_id: page_id.clone(), + position, + }, + KeyOrKnobHandlerConfig { + host_id: k.host.clone(), + name: k.handler.clone(), + config: Arc::clone(&k.config), + }, + )) + }) }) - }) - .collect(); + .collect(), + }; if let Some(mqtt_config) = &config.mqtt { info!("Initializing MQTT client…"); - start_coordinator_mqtt_client(mqtt_config, &key_configs, &knob_configs, commands_sender.clone(), events_sender.subscribe()).await; + start_mqtt_client(mqtt_config, &handler_hosts_config, commands_sender.clone(), events_sender.subscribe()).await; } info!("Initializing handler processes…"); - handler_host::start( + handler_runner::start( get_default_host_id(), &config_directory.join("handlers"), - key_configs, - knob_configs, + handler_hosts_config, commands_sender.clone(), events_sender.subscribe(), ) diff --git a/src/coordinator/mqtt.rs b/src/coordinator/mqtt.rs new file mode 100644 index 0000000..b7e5a06 --- /dev/null +++ b/src/coordinator/mqtt.rs @@ -0,0 +1,154 @@ +use std::str::FromStr; +use std::time::Duration; + +use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; +use tokio::sync::broadcast; + +use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; +use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; +use deckster_shared::style::{KeyStyle, KnobStyle}; + +use crate::model::mqtt::{HandlerHostsConfig, MqttConfig}; + +pub async fn start_mqtt_client( + config: &MqttConfig, + handler_hosts_config: &HandlerHostsConfig, + commands_sender: flume::Sender, + mut events_receiver: broadcast::Receiver, +) { + let topic_prefix = config.topic_prefix.to_owned(); + let config_topic = format!("{topic_prefix}/config"); + + let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); + options.set_keep_alive(Duration::from_secs(3)); + options.set_clean_session(true); + + options.set_last_will(LastWill::new( + config_topic.clone(), + serde_json::to_vec(&Option::::None).unwrap(), + QoS::ExactlyOnce, + true, + )); + + if let Some(credentials) = &config.credentials { + options.set_credentials(&credentials.username, &credentials.password); + } + + let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); + + client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap(); + + tokio::spawn({ + let topic_prefix = topic_prefix.clone(); + let client = client.clone(); + + async move { + while let Ok(event) = events_receiver.recv().await { + match event { + HandlerEvent::Stop => { + // TODO + } + HandlerEvent::Key { path, event } => { + client + .publish( + format!("{topic_prefix}/keys/{path}/events"), + QoS::ExactlyOnce, + false, + serde_json::to_string(&event).unwrap(), + ) + .await + .unwrap(); + } + HandlerEvent::Knob { path, event } => { + client + .publish( + format!("{topic_prefix}/knobs/{path}/events"), + QoS::ExactlyOnce, + false, + serde_json::to_string(&event).unwrap(), + ) + .await + .unwrap(); + } + } + } + } + }); + + tokio::spawn({ + let topic_prefix = topic_prefix.clone(); + + async move { + while let Ok(event) = event_loop.poll().await { + if let Event::Incoming(Incoming::Publish(event)) = event { + let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + let page_id = segments[1]; + let position = segments[2]; + let property = segments[3]; + + match segments[0] { + "keys" => { + if property == "style" { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKeyStyle { + path: KeyPath { + page_id: page_id.to_owned(), + position: KeyPosition::from_str(position).unwrap(), + }, + value, + }) + .await + .unwrap(); + } + } + "knobs" => { + let position = KnobPosition::from_str(position).unwrap(); + + match property { + "style" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKnobStyle { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + "value" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKnobValue { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + _ => {} + } + } + _ => {} + }; + } + } + } + }); + + client + .publish(config_topic, QoS::ExactlyOnce, true, serde_json::to_string(handler_hosts_config).unwrap()) + .await + .unwrap() +} diff --git a/src/runner/state.rs b/src/coordinator/state.rs similarity index 97% rename from src/runner/state.rs rename to src/coordinator/state.rs index 604f87f..fe96056 100644 --- a/src/runner/state.rs +++ b/src/coordinator/state.rs @@ -6,8 +6,8 @@ use log::error; use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; use deckster_shared::state::{Key, Knob}; +use crate::coordinator::state; use crate::model; -use crate::runner::state; #[derive(Debug)] pub struct State { @@ -19,7 +19,7 @@ pub struct State { } impl State { - pub fn create(config: &model::config::Config) -> Self { + pub fn create(config: &model::coordinator_config::Config) -> Self { let key_pages_by_id: HashMap<_, _> = config .key_pages_by_id .iter() diff --git a/src/handler_host/mod.rs b/src/handler_host/mod.rs index 389431b..ddd50f8 100644 --- a/src/handler_host/mod.rs +++ b/src/handler_host/mod.rs @@ -1,192 +1,58 @@ -use std::collections::HashMap; -use std::ffi::OsString; use std::path::Path; -use std::process::Stdio; -use std::sync::Arc; -use color_eyre::eyre::{eyre, WrapErr}; use color_eyre::Result; -use is_executable::IsExecutable; -use itertools::Itertools; -use log::warn; -use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; -use tokio::process::{ChildStdin, Command}; -use tokio::sync::mpsc; +use log::{debug, info, warn}; +use tokio::sync::{broadcast, mpsc}; -use deckster_shared::handler_communication::{ - HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage, -}; -use deckster_shared::path::{KeyPath, KnobPath}; +use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; -#[derive(Debug, Serialize, Deserialize)] -pub struct KeyOrKnobHandlerConfig { - pub host_id: Box, - pub name: Box, - pub config: Arc, -} +use crate::handler_runner; +use crate::model::handler_host_config::Config; -pub async fn start( - host_id: Box, - handlers_directory: &Path, - key_configs: HashMap, - knob_configs: HashMap, - commands_sender: mpsc::Sender, - mut events_receiver: tokio::sync::broadcast::Receiver, -) -> Result<()> { - let handler_names: Vec = std::fs::read_dir(handlers_directory) - .wrap_err_with(|| format!("while reading the handlers directory: {}", handlers_directory.to_string_lossy()))? - .filter_map(|entry| { - if let Ok(entry) = entry { - let path = entry.path(); - if path.is_executable() { - return Some(path.file_name().unwrap().to_os_string()); - } +mod mqtt; + +pub async fn start(config_directory: &Path, config: Config) -> Result<()> { + let (commands_sender, commands_receiver) = flume::bounded::(5); + let (handler_hosts_config_sender, mut handler_hosts_config_receiver) = mpsc::channel(1); + let events_sender = broadcast::Sender::::new(5); + + info!("Initializing MQTT client…"); + mqtt::start_mqtt_client(&config.mqtt, handler_hosts_config_sender, commands_receiver, events_sender.clone()).await; + + info!("Waiting for initial configuration…"); + + let mut is_running = false; + + while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await { + match handler_hosts_config { + None => { + info!("Coordinator was stopped. Sending stop event to handlers."); + events_sender.send(HandlerEvent::Stop).unwrap(); + is_running = false; } - - None - }) - .collect(); - - let mut handler_stdin_by_name: HashMap, ChildStdin> = HashMap::with_capacity(handler_names.len()); - - let mut handler_name_by_key_path: HashMap> = HashMap::new(); - let mut handler_config_by_key_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); - - for (path, config) in key_configs { - if config.host_id != host_id { - continue; - } - - handler_name_by_key_path.insert(path.clone(), config.name.clone()); - handler_config_by_key_path_by_handler_name - .entry(config.name) - .or_default() - .insert(path, Arc::clone(&config.config)); - } - - let mut handler_name_by_knob_path: HashMap> = HashMap::new(); - let mut handler_config_by_knob_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); - - for (path, config) in knob_configs { - if config.host_id != host_id { - continue; - } - - handler_name_by_knob_path.insert(path.clone(), config.name.clone()); - handler_config_by_knob_path_by_handler_name - .entry(config.name) - .or_default() - .insert(path, Arc::clone(&config.config)); - } - - for handler_name in handler_names { - let handler_name = handler_name - .into_string() - .map_err(|_| eyre!("Command names must be valid Unicode."))? - .into_boxed_str(); - - let (key_configs, knob_configs) = match ( - handler_config_by_key_path_by_handler_name.remove(&handler_name), - handler_config_by_knob_path_by_handler_name.remove(&handler_name), - ) { - (None, None) => { - warn!("Handler '{handler_name}' is not used by any key or knob."); - continue; - } - (a, b) => (a.unwrap_or_default(), b.unwrap_or_default()), - }; - - let mut command = Command::new(handlers_directory.join(handler_name.to_string())) - .arg("deckster-run") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn() - .wrap_err_with(|| format!("while spawning handler: {handler_name}"))?; - - let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines(); - let mut stdin = command.stdin.unwrap(); - - let initial_handler_message = InitialHandlerMessage { key_configs, knob_configs }; - - let serialized_message = serde_json::to_string(&initial_handler_message).unwrap().into_boxed_str().into_boxed_bytes(); - - stdin.write_all(&serialized_message).await.unwrap(); - stdin.write_u8(b'\n').await.unwrap(); - stdin.flush().await.unwrap(); - - let result_line = stdout_lines.next_line().await?.unwrap(); - let result: HandlerInitializationResultMessage = serde_json::from_str(&result_line)?; - - if let HandlerInitializationResultMessage::Error { error } = result { - #[rustfmt::skip] - if let HandlerInitializationError::InvalidConfig { supports_keys, supports_knobs, .. } = error { - if !supports_keys && !initial_handler_message.key_configs.is_empty() { - return Err(eyre!( - "The '{handler_name}' handler does not support keys, but these keys tried to use it: {}", - initial_handler_message.key_configs.keys().map(|k| k.to_string()).join(", ") - )); - } else if !supports_knobs && !initial_handler_message.knob_configs.is_empty() { - return Err(eyre!( - "The '{handler_name}' handler does not support knobs, but these knobs tried to use it: {}", - initial_handler_message.knob_configs.keys().map(|k| k.to_string()).join(", ") - )); + Some(handler_hosts_config) => { + if is_running { + warn!("A new configuration was received before the old one was cleared."); + events_sender.send(HandlerEvent::Stop).unwrap(); } - }; - return Err(eyre!("Starting the '{handler_name}' handler failed: {error}")); - } + is_running = true; - let commands_sender = commands_sender.clone(); + info!("Received new configuration. Starting handlers…"); + debug!("New configuration: {handler_hosts_config:#?}"); - tokio::spawn(async move { - while let Ok(Some(line)) = stdout_lines.next_line().await { - if line.starts_with('{') { - let command = serde_json::from_str::(&line).unwrap(); - - commands_sender.send(command).await.unwrap(); - } else { - println!("{}", line); - } + handler_runner::start( + config.host_id.clone(), + &config_directory.join("handlers"), + handler_hosts_config, + commands_sender.clone(), + events_sender.subscribe(), + ) + .await?; } - }); - - handler_stdin_by_name.insert(handler_name, stdin); - } - - if let Some((handler_name, config_by_key_path)) = handler_config_by_key_path_by_handler_name.drain().next() { - return Err(eyre!( - "There is no executable file named '{handler_name}' in the handlers directory but these keys have it set as their handler: {}", - config_by_key_path.keys().join(", ") - )); - } - - if let Some((handler_name, config_by_knob_path)) = handler_config_by_knob_path_by_handler_name.drain().next() { - return Err(eyre!( - "There is no executable file named '{handler_name}' in the handlers directory but these knobs have it set as their handler: {}", - config_by_knob_path.keys().join(", ") - )); - } - - tokio::spawn(async move { - while let Ok(event) = events_receiver.recv().await { - let handler_name = if let Some(n) = match &event { - HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path), - HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path), - } { - n - } else { - continue; - }; - - let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above"); - let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes(); - - handler_stdin.write_all(&serialized_event).await.unwrap(); - handler_stdin.write_u8(b'\n').await.unwrap(); - handler_stdin.flush().await.unwrap(); } - }); + } + dbg!("hey"); Ok(()) } diff --git a/src/handler_host/mqtt.rs b/src/handler_host/mqtt.rs new file mode 100644 index 0000000..d08e2a2 --- /dev/null +++ b/src/handler_host/mqtt.rs @@ -0,0 +1,138 @@ +use std::str::FromStr; +use std::time::Duration; + +use log::error; +use rumqttc::{Event, Incoming, MqttOptions, QoS}; +use tokio::sync::{broadcast, mpsc}; + +use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; +use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; + +use crate::model::mqtt::HandlerHostsConfig; +use crate::model::mqtt::MqttConfig; + +pub async fn start_mqtt_client( + config: &MqttConfig, + handler_hosts_config_sender: mpsc::Sender>, + commands_receiver: flume::Receiver, + events_sender: broadcast::Sender, +) { + let topic_prefix = config.topic_prefix.to_owned(); + + let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); + options.set_keep_alive(Duration::from_secs(3)); + options.set_clean_session(true); + + if let Some(credentials) = &config.credentials { + options.set_credentials(&credentials.username, &credentials.password); + } + + let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); + + client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + + tokio::spawn({ + let topic_prefix = topic_prefix.clone(); + + async move { + loop { + let poll_result = event_loop.poll().await; + + match poll_result { + Err(error) => { + error!("{error}") + } + Ok(event) => { + if let Event::Incoming(Incoming::Publish(event)) = event { + let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + + if topic_segments[0] == "config" { + if let Ok(config) = serde_json::from_slice(&event.payload) { + handler_hosts_config_sender.send(config).await.unwrap(); + } else { + log::error!("Could not deserialize the latest configuration from {}", event.topic); + handler_hosts_config_sender.send(None).await.unwrap(); + }; + } else { + let page_id = topic_segments[1]; + let position = topic_segments[2]; + let property = topic_segments[3]; + + match topic_segments[0] { + "keys" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Key { + path: KeyPath { + page_id: page_id.to_owned(), + position: KeyPosition::from_str(position).unwrap(), + }, + event, + }) + .unwrap(); + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; + } + "knobs" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Knob { + path: KnobPath { + page_id: page_id.to_owned(), + position: KnobPosition::from_str(position).unwrap(), + }, + event, + }) + .unwrap(); + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; + } + _ => {} + } + } + } + } + } + } + } + }); + + tokio::spawn(async move { + while let Ok(command) = commands_receiver.recv_async().await { + match command { + HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."), + HandlerCommand::SetKeyStyle { path, value } => client + .publish( + format!("{topic_prefix}/keys/{path}/style"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + HandlerCommand::SetKnobStyle { path, value } => client + .publish( + format!("{topic_prefix}/knobs/{path}/style"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + HandlerCommand::SetKnobValue { path, value } => client + .publish( + format!("{topic_prefix}/knobs/{path}/value"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + }; + } + }); +} diff --git a/src/handler_runner.rs b/src/handler_runner.rs new file mode 100644 index 0000000..49c81ba --- /dev/null +++ b/src/handler_runner.rs @@ -0,0 +1,221 @@ +use std::collections::HashMap; +use std::ffi::OsString; +use std::path::Path; +use std::process::Stdio; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use color_eyre::eyre::{eyre, WrapErr}; +use color_eyre::Result; +use is_executable::IsExecutable; +use itertools::Itertools; +use log::{debug, error, warn}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{ChildStdin, Command}; + +use deckster_shared::handler_communication::{ + HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage, +}; +use deckster_shared::path::{KeyPath, KnobPath}; + +use crate::model::mqtt::HandlerHostsConfig; + +pub async fn start( + host_id: Box, + handlers_directory: &Path, + handler_hosts_config: HandlerHostsConfig, + commands_sender: flume::Sender, + mut events_receiver: tokio::sync::broadcast::Receiver, +) -> Result<()> { + let should_stop = Arc::new(AtomicBool::new(false)); + + let handler_names: Vec = std::fs::read_dir(handlers_directory) + .wrap_err_with(|| format!("while reading the handlers directory: {}", handlers_directory.to_string_lossy()))? + .filter_map(|entry| { + if let Ok(entry) = entry { + let path = entry.path(); + if path.is_executable() { + return Some(path.file_name().unwrap().to_os_string()); + } + } + + None + }) + .collect(); + + let mut handler_stdin_by_name: HashMap, ChildStdin> = HashMap::with_capacity(handler_names.len()); + + let mut handler_name_by_key_path: HashMap> = HashMap::new(); + let mut handler_config_by_key_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); + + for (path, config) in handler_hosts_config.keys { + if config.host_id != host_id { + continue; + } + + handler_name_by_key_path.insert(path.clone(), config.name.clone()); + handler_config_by_key_path_by_handler_name + .entry(config.name) + .or_default() + .insert(path, Arc::clone(&config.config)); + } + + let mut handler_name_by_knob_path: HashMap> = HashMap::new(); + let mut handler_config_by_knob_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); + + for (path, config) in handler_hosts_config.knobs { + if config.host_id != host_id { + continue; + } + + handler_name_by_knob_path.insert(path.clone(), config.name.clone()); + handler_config_by_knob_path_by_handler_name + .entry(config.name) + .or_default() + .insert(path, Arc::clone(&config.config)); + } + + for handler_name in handler_names { + let handler_name = handler_name + .into_string() + .map_err(|_| eyre!("Command names must be valid Unicode."))? + .into_boxed_str(); + + let (key_configs, knob_configs) = match ( + handler_config_by_key_path_by_handler_name.remove(&handler_name), + handler_config_by_knob_path_by_handler_name.remove(&handler_name), + ) { + (None, None) => { + warn!("Handler '{handler_name}' is not used by any key or knob."); + continue; + } + (a, b) => (a.unwrap_or_default(), b.unwrap_or_default()), + }; + + let mut command = Command::new(handlers_directory.join(handler_name.to_string())) + .arg("deckster-run") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn() + .wrap_err_with(|| format!("while spawning handler: {handler_name}"))?; + + let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines(); + let mut stdin = command.stdin.take().unwrap(); + + let initial_handler_message = InitialHandlerMessage { key_configs, knob_configs }; + + let serialized_message = serde_json::to_string(&initial_handler_message).unwrap().into_boxed_str().into_boxed_bytes(); + + stdin.write_all(&serialized_message).await.unwrap(); + stdin.write_u8(b'\n').await.unwrap(); + stdin.flush().await.unwrap(); + + let result_line = stdout_lines.next_line().await?.unwrap(); + let result: HandlerInitializationResultMessage = serde_json::from_str(&result_line)?; + + if let HandlerInitializationResultMessage::Error { error } = result { + #[rustfmt::skip] + if let HandlerInitializationError::InvalidConfig { supports_keys, supports_knobs, .. } = error { + if !supports_keys && !initial_handler_message.key_configs.is_empty() { + return Err(eyre!( + "The '{handler_name}' handler does not support keys, but these keys tried to use it: {}", + initial_handler_message.key_configs.keys().map(|k| k.to_string()).join(", ") + )); + } else if !supports_knobs && !initial_handler_message.knob_configs.is_empty() { + return Err(eyre!( + "The '{handler_name}' handler does not support knobs, but these knobs tried to use it: {}", + initial_handler_message.knob_configs.keys().map(|k| k.to_string()).join(", ") + )); + } + }; + + return Err(eyre!("Starting the '{handler_name}' handler failed: {error}")); + } + + handler_stdin_by_name.insert(handler_name.clone(), stdin); + + let commands_sender = commands_sender.clone(); + tokio::spawn(async move { + while let Ok(Some(line)) = stdout_lines.next_line().await { + if line.starts_with('{') { + let command = serde_json::from_str::(&line).unwrap(); + + commands_sender.send_async(command).await.unwrap(); + } else { + println!("{}", line); + } + } + }); + + let should_stop = Arc::clone(&should_stop); + tokio::spawn(async move { + let exit_status = command.wait().await.unwrap(); + + if !should_stop.load(Ordering::Relaxed) { + match exit_status.code() { + None => error!("The '{handler_name}' handler was unexpectedly terminated by a signal."), + Some(code) => error!("The '{handler_name}' handler exited unexpectedly with status code {code}"), + } + } else { + debug!("The '{handler_name}' handler exited: {exit_status:#?}"); + } + }); + } + + if let Some((handler_name, config_by_key_path)) = handler_config_by_key_path_by_handler_name.drain().next() { + return Err(eyre!( + "There is no executable file named '{handler_name}' in the handlers directory but these keys have it set as their handler: {}", + config_by_key_path.keys().join(", ") + )); + } + + if let Some((handler_name, config_by_knob_path)) = handler_config_by_knob_path_by_handler_name.drain().next() { + return Err(eyre!( + "There is no executable file named '{handler_name}' in the handlers directory but these knobs have it set as their handler: {}", + config_by_knob_path.keys().join(", ") + )); + } + + tokio::spawn(async move { + while let Ok(event) = events_receiver.recv().await { + let handler_name = match &event { + HandlerEvent::Stop => { + should_stop.store(true, Ordering::Relaxed); + let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes(); + for handler_stdin in handler_stdin_by_name.values_mut() { + handler_stdin.write_all(&serialized_event).await.unwrap(); + handler_stdin.write_u8(b'\n').await.unwrap(); + handler_stdin.flush().await.unwrap(); + } + + break; + } + HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path), + HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path), + }; + + let handler_name = if let Some(n) = handler_name { + n + } else { + continue; + }; + + let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above"); + let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes(); + + handler_stdin.write_all(&serialized_event).await.unwrap(); + handler_stdin.write_u8(b'\n').await.unwrap(); + handler_stdin.flush().await.unwrap(); + } + }); + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct KeyOrKnobHandlerConfig { + pub host_id: Box, + pub name: Box, + pub config: Arc, +} diff --git a/src/icons/mod.rs b/src/icons/mod.rs index d3d39e2..cc2ba00 100644 --- a/src/icons/mod.rs +++ b/src/icons/mod.rs @@ -12,7 +12,7 @@ use tiny_skia::{BlendMode, FilterQuality, Pixmap, PixmapPaint, Transform}; use deckster_shared::icon_descriptor::{IconDescriptor, IconDescriptorSource}; use deckster_shared::image_filter::ImageFilter; -use crate::model::config::{IconFormat, IconPack}; +use crate::model::coordinator_config::{IconFormat, IconPack}; mod destructive_filter; diff --git a/src/main.rs b/src/main.rs index 506a856..8701858 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::fs; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::Arc; use clap::{Parser, Subcommand}; @@ -9,12 +9,13 @@ use color_eyre::Result; use log::LevelFilter; use walkdir::WalkDir; -use crate::model::config::WithFallbackId; +use crate::model::coordinator_config::WithFallbackId; +mod coordinator; mod handler_host; +mod handler_runner; mod icons; mod model; -mod runner; #[derive(Debug, Parser)] #[command(name = "deckster", about = "Use Loupedeck devices under Linux.")] @@ -25,9 +26,13 @@ struct Cli { #[derive(Debug, Subcommand)] enum Command { - Run { - #[arg(long, required = true)] - config: PathBuf, + Coordinator { + #[arg(short, long, required = true, alias = "start")] + config: Box, + }, + HandlerHost { + #[arg(short, long, required = true)] + config: Box, }, } @@ -38,8 +43,8 @@ pub async fn main() -> Result<()> { let cli = Cli::parse(); match cli.command { - Command::Run { config: config_path } => { - let deckster_file = read_and_deserialize::(config_path.join("deckster.toml").as_path())?; + Command::Coordinator { config: config_path } => { + let deckster_file = read_and_deserialize::(config_path.join("deckster.toml").as_path())?; let config_path = config_path.canonicalize()?; let key_pages_by_id: HashMap = @@ -63,7 +68,7 @@ pub async fn main() -> Result<()> { .map(|p| (p.id.clone(), p)) .collect(); - let config = model::config::Config { + let config = model::coordinator_config::Config { active_button_color: deckster_file.active_button_color, inactive_button_color: deckster_file.inactive_button_color, label_font_family: deckster_file.label_font_family, @@ -76,7 +81,13 @@ pub async fn main() -> Result<()> { } .validate()?; - runner::start(&config_path, config).await? + coordinator::start(&config_path, config).await? + } + Command::HandlerHost { config: config_path } => { + let config = read_and_deserialize::(config_path.join("deckster.toml").as_path())?; + let config_path = config_path.canonicalize()?; + + handler_host::start(&config_path, config).await? } }; diff --git a/src/model/config.rs b/src/model/coordinator_config.rs similarity index 91% rename from src/model/config.rs rename to src/model/coordinator_config.rs index 9563563..473af1f 100644 --- a/src/model/config.rs +++ b/src/model/coordinator_config.rs @@ -11,6 +11,7 @@ use deckster_shared::image_filter::ImageFilter; use deckster_shared::rgb::RGB8Wrapper; use crate::model; +use crate::model::mqtt::MqttConfig; use crate::model::position::ButtonPosition; #[derive(Debug, Deserialize)] @@ -65,21 +66,6 @@ pub struct InitialConfig { pub knob_page: String, } -#[derive(Debug, Deserialize)] -pub struct MqttConfig { - pub client_id: String, - pub host: String, - pub port: u16, - pub credentials: Option, - pub topic_prefix: String, -} - -#[derive(Debug, Deserialize)] -pub struct MqttConfigCredentials { - pub username: String, - pub password: String, -} - #[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum IconFormat { diff --git a/src/model/handler_host_config.rs b/src/model/handler_host_config.rs new file mode 100644 index 0000000..7cdb6c7 --- /dev/null +++ b/src/model/handler_host_config.rs @@ -0,0 +1,9 @@ +use serde::Deserialize; + +use crate::model::mqtt::MqttConfig; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub host_id: Box, + pub mqtt: MqttConfig, +} diff --git a/src/model/mod.rs b/src/model/mod.rs index 340a471..72e2b1c 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -1,7 +1,9 @@ -pub mod config; +pub mod coordinator_config; pub mod geometry; +pub mod handler_host_config; pub mod key_page; pub mod knob_page; +pub mod mqtt; pub mod position; pub fn get_default_host_id() -> Box { diff --git a/src/model/mqtt.rs b/src/model/mqtt.rs new file mode 100644 index 0000000..b4ef52a --- /dev/null +++ b/src/model/mqtt.rs @@ -0,0 +1,29 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use deckster_shared::path::{KeyPath, KnobPath}; + +use crate::handler_runner::KeyOrKnobHandlerConfig; + +#[derive(Debug, Serialize, Deserialize)] +pub struct HandlerHostsConfig { + pub run_id: Box, + pub keys: HashMap, + pub knobs: HashMap, +} + +#[derive(Debug, Deserialize)] +pub struct MqttConfig { + pub client_id: String, + pub host: String, + pub port: u16, + pub credentials: Option, + pub topic_prefix: String, +} + +#[derive(Debug, Deserialize)] +pub struct MqttConfigCredentials { + pub username: String, + pub password: String, +} diff --git a/src/runner/mqtt.rs b/src/runner/mqtt.rs deleted file mode 100644 index bc5c86f..0000000 --- a/src/runner/mqtt.rs +++ /dev/null @@ -1,291 +0,0 @@ -use std::collections::HashMap; -use std::fmt::format; -use std::str::FromStr; -use std::time::Duration; - -use rumqttc::{Event, Incoming, MqttOptions, QoS}; -use serde::{Deserialize, Serialize}; -use tokio::sync::{broadcast, mpsc}; - -use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; -use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; -use deckster_shared::style::{KeyStyle, KnobStyle}; - -use crate::handler_host::KeyOrKnobHandlerConfig; -use crate::model::config::MqttConfig; - -#[derive(Debug, Serialize, Deserialize)] -pub struct HandlerHostsConfig { - run_id: Box, - keys: HashMap, - knobs: HashMap, -} - -pub async fn start_coordinator_mqtt_client( - config: &MqttConfig, - key_configs: &HashMap, - knob_configs: &HashMap, - commands_sender: mpsc::Sender, - mut events_receiver: broadcast::Receiver, -) { - let topic_prefix = config.topic_prefix.to_owned(); - - let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port); - options.set_keep_alive(Duration::from_secs(3)); - options.set_clean_session(false); - - if let Some(credentials) = &config.credentials { - options.set_credentials(&credentials.username, &credentials.password); - } - - let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); - - client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap(); - - tokio::spawn({ - let topic_prefix = topic_prefix.clone(); - let client = client.clone(); - - async move { - while let Ok(event) = events_receiver.recv().await { - match event { - HandlerEvent::Key { path, event } => { - client - .publish( - format!("{topic_prefix}/keys/{path}/events"), - QoS::ExactlyOnce, - false, - serde_json::to_string(&event).unwrap(), - ) - .await - .unwrap(); - } - HandlerEvent::Knob { path, event } => { - client - .publish( - format!("{topic_prefix}/knobs/{path}/events"), - QoS::ExactlyOnce, - false, - serde_json::to_string(&event).unwrap(), - ) - .await - .unwrap(); - } - } - } - } - }); - - tokio::spawn({ - let topic_prefix = topic_prefix.clone(); - - async move { - while let Ok(event) = event_loop.poll().await { - if let Event::Incoming(Incoming::Publish(event)) = event { - let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); - let page_id = segments[1]; - let position = segments[2]; - let property = segments[3]; - - match segments[0] { - "keys" => { - if property == "style" { - let value = serde_json::from_slice::>(&event.payload).unwrap(); - - commands_sender - .send(HandlerCommand::SetKeyStyle { - path: KeyPath { - page_id: page_id.to_owned(), - position: KeyPosition::from_str(position).unwrap(), - }, - value, - }) - .await - .unwrap(); - } - } - "knobs" => { - let position = KnobPosition::from_str(position).unwrap(); - - match property { - "style" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); - - commands_sender - .send(HandlerCommand::SetKnobStyle { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - value, - }) - .await - .unwrap(); - } - "value" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); - - commands_sender - .send(HandlerCommand::SetKnobValue { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - value, - }) - .await - .unwrap(); - } - _ => {} - } - } - _ => {} - }; - } - } - } - }); - - for (path, config) in key_configs { - client - .publish( - format!("{topic_prefix}/keys/{path}/config"), - QoS::AtLeastOnce, - true, - serde_json::to_string(config).unwrap(), - ) - .await - .unwrap() - } - - for (path, config) in knob_configs { - client - .publish( - format!("{topic_prefix}/knobs/{path}/config"), - QoS::AtLeastOnce, - true, - serde_json::to_string(config).unwrap(), - ) - .await - .unwrap() - } -} - -pub async fn start_handler_host_mqtt_client( - config: &MqttConfig, - handler_hosts_config_sender: mpsc::Sender>, - mut commands_receiver: mpsc::Receiver, - events_sender: broadcast::Sender, -) { - let topic_prefix = config.topic_prefix.to_owned(); - - let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port); - options.set_keep_alive(Duration::from_secs(3)); - options.set_clean_session(false); - - if let Some(credentials) = &config.credentials { - options.set_credentials(&credentials.username, &credentials.password); - } - - let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); - - client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); - - tokio::spawn({ - let topic_prefix = topic_prefix.clone(); - - async move { - while let Ok(event) = event_loop.poll().await { - if let Event::Incoming(Incoming::Publish(event)) = event { - let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); - - if topic_segments[0] == "config" { - if let Ok(config) = serde_json::from_slice(&event.payload) { - handler_hosts_config_sender.send(config).await.unwrap(); - } else { - log::error!("Could not deserialize the latest configuration from {}", event.topic); - handler_hosts_config_sender.send(None).await.unwrap(); - }; - } else { - let page_id = topic_segments[1]; - let position = topic_segments[2]; - let property = topic_segments[3]; - - match topic_segments[0] { - "keys" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - events_sender - .send(HandlerEvent::Key { - path: KeyPath { - page_id: page_id.to_owned(), - position: KeyPosition::from_str(position).unwrap(), - }, - event, - }) - .unwrap(); - } else { - log::error!("Could not deserialize the latest event from {}", event.topic); - }; - } - "knobs" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - events_sender - .send(HandlerEvent::Knob { - path: KnobPath { - page_id: page_id.to_owned(), - position: KnobPosition::from_str(position).unwrap(), - }, - event, - }) - .unwrap(); - } else { - log::error!("Could not deserialize the latest event from {}", event.topic); - }; - } - _ => {} - } - } - } - } - } - }); - - tokio::spawn(async move { - while let Some(command) = commands_receiver.recv().await { - match command { - HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."), - HandlerCommand::SetKeyStyle { path, value } => client - .publish( - format!("{topic_prefix}/keys/{path}/style"), - QoS::ExactlyOnce, - false, - serde_json::to_vec(&value).unwrap(), - ) - .await - .unwrap(), - HandlerCommand::SetKnobStyle { path, value } => client - .publish( - format!("{topic_prefix}/knobs/{path}/style"), - QoS::ExactlyOnce, - false, - serde_json::to_vec(&value).unwrap(), - ) - .await - .unwrap(), - HandlerCommand::SetKnobValue { path, value } => client - .publish( - format!("{topic_prefix}/knobs/{path}/value"), - QoS::ExactlyOnce, - false, - serde_json::to_vec(&value).unwrap(), - ) - .await - .unwrap(), - }; - } - }); -}