From bc1ced79a84ec212c62a23b0a151dfbe0bace823 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Thu, 22 Feb 2024 00:12:49 +0100 Subject: [PATCH] commit --- Cargo.lock | 1 - Cargo.toml | 1 - README.md | 6 + examples/full/key-pages/default.toml | 32 ++-- src/handler_host/mod.rs | 51 ++++-- src/model/key_page.rs | 4 +- src/model/knob_page.rs | 4 +- src/model/mod.rs | 4 + src/runner/mod.rs | 30 ++-- src/runner/mqtt.rs | 258 ++++++++++++++++++++++----- 10 files changed, 296 insertions(+), 95 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6778dd1..f9b6b38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -381,7 +381,6 @@ dependencies = [ "enum-map", "enum-ordinalize", "env_logger", - "flume", "humantime-serde", "is_executable", "itertools", diff --git a/Cargo.toml b/Cargo.toml index befbe7e..60ad6b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,6 @@ encode_unicode = "1.0.0" enum-map = "3.0.0-beta.2" enum-ordinalize = "4.3.0" env_logger = "0.11.0" -flume = "0.11.0" humantime-serde = "1.1.1" log = "0.4.20" loupedeck_serial = { path = "./crates/loupedeck_serial" } diff --git a/README.md b/README.md index acc7fb6..7a1298b 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ # Deckster +## Remote handler host handshake + +- The coordinator publishes the config to `PREFIX/config` which includes a nonce. + +- All hosts publish the nonce to `PREFIX/handler_hosts/HOST_ID/active_run_id` so that the coordinator knows they are alive. + ## Attribution [foxxyz’s `loupedeck` library for JavaScript](https://github.com/foxxyz/loupedeck) (licensed under the [MIT license](https://github.com/foxxyz/loupedeck/blob/e41e5d920130d9ef651e47173c68450b9c832b96/LICENSE)) diff --git a/examples/full/key-pages/default.toml b/examples/full/key-pages/default.toml index 8f9ef3e..e1aa4d0 100644 --- a/examples/full/key-pages/default.toml +++ b/examples/full/key-pages/default.toml @@ -35,19 +35,21 @@ config.mode = "loop" config.style.single.icon = "@fad/repeat-one[color=#58fc11]" config.style.all.icon = "@fad/repeat[color=#58fc11]" -#[keys.3x3] -#icon = "@ph/timer[color=#ff0000]" -# -#handler = "timer" -#config.durations = ["60s", "5m", "10m", "15m", "30m"] -#config.vibrate_when_finished = true -#config.needy = true +[keys.3x3] +icon = "@ph/timer[color=#ff0000]" -#[keys.4x3] -#icon = "@ph/computer-tower" -#label = "Gaming PC" -# -#handler = "home-assistant" -#config.mode = "switch" -#config.name = "switch.mwin" -#config.style.on.icon = "@ph/computer-tower[color=#58fc11]" \ No newline at end of file +host = "moira" +handler = "timer" +config.durations = ["60s", "5m", "10m", "15m", "30m"] +config.vibrate_when_finished = true +config.needy = true + +[keys.4x3] +icon = "@ph/computer-tower" +label = "Gaming PC" + +host = "moira" +handler = "home-assistant" +config.mode = "switch" +config.name = "switch.mwin" +config.style.on.icon = "@ph/computer-tower[color=#58fc11]" \ No newline at end of file diff --git a/src/handler_host/mod.rs b/src/handler_host/mod.rs index 0a529eb..389431b 100644 --- a/src/handler_host/mod.rs +++ b/src/handler_host/mod.rs @@ -9,24 +9,29 @@ use color_eyre::Result; use is_executable::IsExecutable; use itertools::Itertools; use log::warn; +use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{ChildStdin, Command}; +use tokio::sync::mpsc; use deckster_shared::handler_communication::{ HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage, }; use deckster_shared::path::{KeyPath, KnobPath}; -pub struct KeyOrKnobConfig { - pub handler_name: Box, - pub handler_config: Arc, +#[derive(Debug, Serialize, Deserialize)] +pub struct KeyOrKnobHandlerConfig { + pub host_id: Box, + pub name: Box, + pub config: Arc, } pub async fn start( + host_id: Box, handlers_directory: &Path, - key_configs: HashMap, - knob_configs: HashMap, - commands_sender: flume::Sender, + key_configs: HashMap, + knob_configs: HashMap, + commands_sender: mpsc::Sender, mut events_receiver: tokio::sync::broadcast::Receiver, ) -> Result<()> { let handler_names: Vec = std::fs::read_dir(handlers_directory) @@ -49,22 +54,30 @@ pub async fn start( let mut handler_config_by_key_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); for (path, config) in key_configs { - handler_name_by_key_path.insert(path.clone(), config.handler_name.clone()); + if config.host_id != host_id { + continue; + } + + handler_name_by_key_path.insert(path.clone(), config.name.clone()); handler_config_by_key_path_by_handler_name - .entry(config.handler_name) + .entry(config.name) .or_default() - .insert(path, Arc::clone(&config.handler_config)); + .insert(path, Arc::clone(&config.config)); } let mut handler_name_by_knob_path: HashMap> = HashMap::new(); let mut handler_config_by_knob_path_by_handler_name: HashMap, HashMap>> = HashMap::new(); for (path, config) in knob_configs { - handler_name_by_knob_path.insert(path.clone(), config.handler_name.clone()); + if config.host_id != host_id { + continue; + } + + handler_name_by_knob_path.insert(path.clone(), config.name.clone()); handler_config_by_knob_path_by_handler_name - .entry(config.handler_name) + .entry(config.name) .or_default() - .insert(path, Arc::clone(&config.handler_config)); + .insert(path, Arc::clone(&config.config)); } for handler_name in handler_names { @@ -89,7 +102,7 @@ pub async fn start( .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn() - .wrap_err_with(|| format!("while spawning handler: {}", handler_name))?; + .wrap_err_with(|| format!("while spawning handler: {handler_name}"))?; let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines(); let mut stdin = command.stdin.unwrap(); @@ -131,7 +144,7 @@ pub async fn start( if line.starts_with('{') { let command = serde_json::from_str::(&line).unwrap(); - commands_sender.send_async(command).await.unwrap(); + commands_sender.send(command).await.unwrap(); } else { println!("{}", line); } @@ -157,9 +170,13 @@ pub async fn start( tokio::spawn(async move { while let Ok(event) = events_receiver.recv().await { - let handler_name = match &event { - HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path).expect("every key must have a handler"), - HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path).expect("every knob must have a handler"), + let handler_name = if let Some(n) = match &event { + HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path), + HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path), + } { + n + } else { + continue; }; let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above"); diff --git a/src/model/key_page.rs b/src/model/key_page.rs index 87ed8e1..c50afc5 100644 --- a/src/model/key_page.rs +++ b/src/model/key_page.rs @@ -50,6 +50,8 @@ pub struct Key { #[serde(default, flatten)] pub base_style: KeyStyle, - pub handler: String, + #[serde(default = "super::get_default_host_id")] + pub host: Box, + pub handler: Box, pub config: Arc, } diff --git a/src/model/knob_page.rs b/src/model/knob_page.rs index c68a8de..ed385d3 100644 --- a/src/model/knob_page.rs +++ b/src/model/knob_page.rs @@ -24,6 +24,8 @@ pub struct Knob { #[serde(default, flatten)] pub base_style: KnobStyle, - pub handler: String, + #[serde(default = "super::get_default_host_id")] + pub host: Box, + pub handler: Box, pub config: Arc, } diff --git a/src/model/mod.rs b/src/model/mod.rs index 4e78e69..340a471 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -3,3 +3,7 @@ pub mod geometry; pub mod key_page; pub mod knob_page; pub mod position; + +pub fn get_default_host_id() -> Box { + "local".to_owned().into_boxed_str() +} diff --git a/src/runner/mod.rs b/src/runner/mod.rs index ac590d6..b79390d 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -13,10 +13,11 @@ use loupedeck_serial::commands::VibrationPattern; use loupedeck_serial::device::LoupedeckDevice; use crate::handler_host; -use crate::handler_host::KeyOrKnobConfig; +use crate::handler_host::KeyOrKnobHandlerConfig; use crate::model::config::Config; +use crate::model::get_default_host_id; use crate::runner::io_worker::{do_io_work, IoWorkerContext}; -use crate::runner::mqtt::start_mqtt_client; +use crate::runner::mqtt::start_coordinator_mqtt_client; mod graphics; mod io_worker; @@ -51,9 +52,10 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { page_id: page_id.clone(), position: *position, }, - KeyOrKnobConfig { - handler_name: k.handler.as_str().into(), - handler_config: Arc::clone(&k.config), + KeyOrKnobHandlerConfig { + host_id: k.host.clone(), + name: k.handler.clone(), + config: Arc::clone(&k.config), }, ) }) @@ -74,18 +76,25 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { page_id: page_id.clone(), position, }, - KeyOrKnobConfig { - handler_name: k.handler.as_str().into(), - handler_config: Arc::clone(&k.config), + KeyOrKnobHandlerConfig { + host_id: k.host.clone(), + name: k.handler.clone(), + config: Arc::clone(&k.config), }, )) }) }) .collect(); + if let Some(mqtt_config) = &config.mqtt { + info!("Initializing MQTT client…"); + start_coordinator_mqtt_client(mqtt_config, &key_configs, &knob_configs, commands_sender.clone(), events_sender.subscribe()).await; + } + info!("Initializing handler processes…"); handler_host::start( + get_default_host_id(), &config_directory.join("handlers"), key_configs, knob_configs, @@ -94,11 +103,6 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { ) .await?; - if let Some(mqtt_config) = &config.mqtt { - info!("Initializing MQTT client…"); - start_mqtt_client(mqtt_config, commands_sender.clone(), events_sender.subscribe()).await; - } - info!("Connecting to the device…"); let device = available_device.connect().wrap_err("Connecting to the device failed.")?; info!("Connected."); diff --git a/src/runner/mqtt.rs b/src/runner/mqtt.rs index 4b54988..bc5c86f 100644 --- a/src/runner/mqtt.rs +++ b/src/runner/mqtt.rs @@ -1,16 +1,33 @@ +use std::collections::HashMap; +use std::fmt::format; use std::str::FromStr; use std::time::Duration; use rumqttc::{Event, Incoming, MqttOptions, QoS}; -use tokio::sync::broadcast; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast, mpsc}; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; use deckster_shared::style::{KeyStyle, KnobStyle}; +use crate::handler_host::KeyOrKnobHandlerConfig; use crate::model::config::MqttConfig; -pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Sender, mut events_receiver: broadcast::Receiver) { +#[derive(Debug, Serialize, Deserialize)] +pub struct HandlerHostsConfig { + run_id: Box, + keys: HashMap, + knobs: HashMap, +} + +pub async fn start_coordinator_mqtt_client( + config: &MqttConfig, + key_configs: &HashMap, + knob_configs: &HashMap, + commands_sender: mpsc::Sender, + mut events_receiver: broadcast::Receiver, +) { let topic_prefix = config.topic_prefix.to_owned(); let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port); @@ -29,6 +46,7 @@ pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Send tokio::spawn({ let topic_prefix = topic_prefix.clone(); + let client = client.clone(); async move { while let Ok(event) = events_receiver.recv().await { @@ -60,66 +78,214 @@ pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Send } }); - tokio::spawn(async move { - while let Ok(event) = event_loop.poll().await { - if let Event::Incoming(Incoming::Publish(event)) = event { - let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').collect::>(); - let page_id = segments[1]; - let position = segments[2]; - let property = segments[3]; + tokio::spawn({ + let topic_prefix = topic_prefix.clone(); - match segments[0] { - "keys" => { - if property == "style" { - let value = serde_json::from_slice::>(&event.payload).unwrap(); + async move { + while let Ok(event) = event_loop.poll().await { + if let Event::Incoming(Incoming::Publish(event)) = event { + let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + let page_id = segments[1]; + let position = segments[2]; + let property = segments[3]; - commands_sender - .send(HandlerCommand::SetKeyStyle { - path: KeyPath { - page_id: page_id.to_owned(), - position: KeyPosition::from_str(position).unwrap(), - }, - value, - }) - .unwrap(); - } - } - "knobs" => { - let position = KnobPosition::from_str(position).unwrap(); - - match property { - "style" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); + match segments[0] { + "keys" => { + if property == "style" { + let value = serde_json::from_slice::>(&event.payload).unwrap(); commands_sender - .send(HandlerCommand::SetKnobStyle { - path: KnobPath { + .send(HandlerCommand::SetKeyStyle { + path: KeyPath { page_id: page_id.to_owned(), - position, + position: KeyPosition::from_str(position).unwrap(), }, value, }) + .await .unwrap(); } - "value" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); + } + "knobs" => { + let position = KnobPosition::from_str(position).unwrap(); - commands_sender - .send(HandlerCommand::SetKnobValue { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - value, - }) - .unwrap(); + match property { + "style" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send(HandlerCommand::SetKnobStyle { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + "value" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send(HandlerCommand::SetKnobValue { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + _ => {} + } + } + _ => {} + }; + } + } + } + }); + + for (path, config) in key_configs { + client + .publish( + format!("{topic_prefix}/keys/{path}/config"), + QoS::AtLeastOnce, + true, + serde_json::to_string(config).unwrap(), + ) + .await + .unwrap() + } + + for (path, config) in knob_configs { + client + .publish( + format!("{topic_prefix}/knobs/{path}/config"), + QoS::AtLeastOnce, + true, + serde_json::to_string(config).unwrap(), + ) + .await + .unwrap() + } +} + +pub async fn start_handler_host_mqtt_client( + config: &MqttConfig, + handler_hosts_config_sender: mpsc::Sender>, + mut commands_receiver: mpsc::Receiver, + events_sender: broadcast::Sender, +) { + let topic_prefix = config.topic_prefix.to_owned(); + + let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port); + options.set_keep_alive(Duration::from_secs(3)); + options.set_clean_session(false); + + if let Some(credentials) = &config.credentials { + options.set_credentials(&credentials.username, &credentials.password); + } + + let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); + + client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + + tokio::spawn({ + let topic_prefix = topic_prefix.clone(); + + async move { + while let Ok(event) = event_loop.poll().await { + if let Event::Incoming(Incoming::Publish(event)) = event { + let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + + if topic_segments[0] == "config" { + if let Ok(config) = serde_json::from_slice(&event.payload) { + handler_hosts_config_sender.send(config).await.unwrap(); + } else { + log::error!("Could not deserialize the latest configuration from {}", event.topic); + handler_hosts_config_sender.send(None).await.unwrap(); + }; + } else { + let page_id = topic_segments[1]; + let position = topic_segments[2]; + let property = topic_segments[3]; + + match topic_segments[0] { + "keys" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Key { + path: KeyPath { + page_id: page_id.to_owned(), + position: KeyPosition::from_str(position).unwrap(), + }, + event, + }) + .unwrap(); + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; + } + "knobs" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Knob { + path: KnobPath { + page_id: page_id.to_owned(), + position: KnobPosition::from_str(position).unwrap(), + }, + event, + }) + .unwrap(); + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; } _ => {} } } - _ => {} - }; + } } } }); + + tokio::spawn(async move { + while let Some(command) = commands_receiver.recv().await { + match command { + HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."), + HandlerCommand::SetKeyStyle { path, value } => client + .publish( + format!("{topic_prefix}/keys/{path}/style"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + HandlerCommand::SetKnobStyle { path, value } => client + .publish( + format!("{topic_prefix}/knobs/{path}/style"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + HandlerCommand::SetKnobValue { path, value } => client + .publish( + format!("{topic_prefix}/knobs/{path}/value"), + QoS::ExactlyOnce, + false, + serde_json::to_vec(&value).unwrap(), + ) + .await + .unwrap(), + }; + } + }); }