This commit is contained in:
Moritz Ruth 2024-02-22 00:12:49 +01:00
parent fa8c988705
commit bc1ced79a8
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
10 changed files with 296 additions and 95 deletions

1
Cargo.lock generated
View file

@ -381,7 +381,6 @@ dependencies = [
"enum-map",
"enum-ordinalize",
"env_logger",
"flume",
"humantime-serde",
"is_executable",
"itertools",

View file

@ -14,7 +14,6 @@ encode_unicode = "1.0.0"
enum-map = "3.0.0-beta.2"
enum-ordinalize = "4.3.0"
env_logger = "0.11.0"
flume = "0.11.0"
humantime-serde = "1.1.1"
log = "0.4.20"
loupedeck_serial = { path = "./crates/loupedeck_serial" }

View file

@ -1,5 +1,11 @@
# Deckster
## Remote handler host handshake
- The coordinator publishes the config to `PREFIX/config` which includes a nonce.
- All hosts publish the nonce to `PREFIX/handler_hosts/HOST_ID/active_run_id` so that the coordinator knows they are alive.
## Attribution
[foxxyzs `loupedeck` library for JavaScript](https://github.com/foxxyz/loupedeck)
(licensed under the [MIT license](https://github.com/foxxyz/loupedeck/blob/e41e5d920130d9ef651e47173c68450b9c832b96/LICENSE))

View file

@ -35,19 +35,21 @@ config.mode = "loop"
config.style.single.icon = "@fad/repeat-one[color=#58fc11]"
config.style.all.icon = "@fad/repeat[color=#58fc11]"
#[keys.3x3]
#icon = "@ph/timer[color=#ff0000]"
#
#handler = "timer"
#config.durations = ["60s", "5m", "10m", "15m", "30m"]
#config.vibrate_when_finished = true
#config.needy = true
[keys.3x3]
icon = "@ph/timer[color=#ff0000]"
#[keys.4x3]
#icon = "@ph/computer-tower"
#label = "Gaming PC"
#
#handler = "home-assistant"
#config.mode = "switch"
#config.name = "switch.mwin"
#config.style.on.icon = "@ph/computer-tower[color=#58fc11]"
host = "moira"
handler = "timer"
config.durations = ["60s", "5m", "10m", "15m", "30m"]
config.vibrate_when_finished = true
config.needy = true
[keys.4x3]
icon = "@ph/computer-tower"
label = "Gaming PC"
host = "moira"
handler = "home-assistant"
config.mode = "switch"
config.name = "switch.mwin"
config.style.on.icon = "@ph/computer-tower[color=#58fc11]"

View file

@ -9,24 +9,29 @@ use color_eyre::Result;
use is_executable::IsExecutable;
use itertools::Itertools;
use log::warn;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdin, Command};
use tokio::sync::mpsc;
use deckster_shared::handler_communication::{
HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage,
};
use deckster_shared::path::{KeyPath, KnobPath};
pub struct KeyOrKnobConfig {
pub handler_name: Box<str>,
pub handler_config: Arc<toml::Table>,
#[derive(Debug, Serialize, Deserialize)]
pub struct KeyOrKnobHandlerConfig {
pub host_id: Box<str>,
pub name: Box<str>,
pub config: Arc<toml::Table>,
}
pub async fn start(
host_id: Box<str>,
handlers_directory: &Path,
key_configs: HashMap<KeyPath, KeyOrKnobConfig>,
knob_configs: HashMap<KnobPath, KeyOrKnobConfig>,
commands_sender: flume::Sender<HandlerCommand>,
key_configs: HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knob_configs: HashMap<KnobPath, KeyOrKnobHandlerConfig>,
commands_sender: mpsc::Sender<HandlerCommand>,
mut events_receiver: tokio::sync::broadcast::Receiver<HandlerEvent>,
) -> Result<()> {
let handler_names: Vec<OsString> = std::fs::read_dir(handlers_directory)
@ -49,22 +54,30 @@ pub async fn start(
let mut handler_config_by_key_path_by_handler_name: HashMap<Box<str>, HashMap<KeyPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in key_configs {
handler_name_by_key_path.insert(path.clone(), config.handler_name.clone());
if config.host_id != host_id {
continue;
}
handler_name_by_key_path.insert(path.clone(), config.name.clone());
handler_config_by_key_path_by_handler_name
.entry(config.handler_name)
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.handler_config));
.insert(path, Arc::clone(&config.config));
}
let mut handler_name_by_knob_path: HashMap<KnobPath, Box<str>> = HashMap::new();
let mut handler_config_by_knob_path_by_handler_name: HashMap<Box<str>, HashMap<KnobPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in knob_configs {
handler_name_by_knob_path.insert(path.clone(), config.handler_name.clone());
if config.host_id != host_id {
continue;
}
handler_name_by_knob_path.insert(path.clone(), config.name.clone());
handler_config_by_knob_path_by_handler_name
.entry(config.handler_name)
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.handler_config));
.insert(path, Arc::clone(&config.config));
}
for handler_name in handler_names {
@ -89,7 +102,7 @@ pub async fn start(
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.wrap_err_with(|| format!("while spawning handler: {}", handler_name))?;
.wrap_err_with(|| format!("while spawning handler: {handler_name}"))?;
let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines();
let mut stdin = command.stdin.unwrap();
@ -131,7 +144,7 @@ pub async fn start(
if line.starts_with('{') {
let command = serde_json::from_str::<HandlerCommand>(&line).unwrap();
commands_sender.send_async(command).await.unwrap();
commands_sender.send(command).await.unwrap();
} else {
println!("{}", line);
}
@ -157,9 +170,13 @@ pub async fn start(
tokio::spawn(async move {
while let Ok(event) = events_receiver.recv().await {
let handler_name = match &event {
HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path).expect("every key must have a handler"),
HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path).expect("every knob must have a handler"),
let handler_name = if let Some(n) = match &event {
HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path),
HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path),
} {
n
} else {
continue;
};
let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above");

View file

@ -50,6 +50,8 @@ pub struct Key {
#[serde(default, flatten)]
pub base_style: KeyStyle,
pub handler: String,
#[serde(default = "super::get_default_host_id")]
pub host: Box<str>,
pub handler: Box<str>,
pub config: Arc<toml::Table>,
}

View file

@ -24,6 +24,8 @@ pub struct Knob {
#[serde(default, flatten)]
pub base_style: KnobStyle,
pub handler: String,
#[serde(default = "super::get_default_host_id")]
pub host: Box<str>,
pub handler: Box<str>,
pub config: Arc<toml::Table>,
}

View file

@ -3,3 +3,7 @@ pub mod geometry;
pub mod key_page;
pub mod knob_page;
pub mod position;
pub fn get_default_host_id() -> Box<str> {
"local".to_owned().into_boxed_str()
}

View file

@ -13,10 +13,11 @@ use loupedeck_serial::commands::VibrationPattern;
use loupedeck_serial::device::LoupedeckDevice;
use crate::handler_host;
use crate::handler_host::KeyOrKnobConfig;
use crate::handler_host::KeyOrKnobHandlerConfig;
use crate::model::config::Config;
use crate::model::get_default_host_id;
use crate::runner::io_worker::{do_io_work, IoWorkerContext};
use crate::runner::mqtt::start_mqtt_client;
use crate::runner::mqtt::start_coordinator_mqtt_client;
mod graphics;
mod io_worker;
@ -51,9 +52,10 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
page_id: page_id.clone(),
position: *position,
},
KeyOrKnobConfig {
handler_name: k.handler.as_str().into(),
handler_config: Arc::clone(&k.config),
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
)
})
@ -74,18 +76,25 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
page_id: page_id.clone(),
position,
},
KeyOrKnobConfig {
handler_name: k.handler.as_str().into(),
handler_config: Arc::clone(&k.config),
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
))
})
})
.collect();
if let Some(mqtt_config) = &config.mqtt {
info!("Initializing MQTT client…");
start_coordinator_mqtt_client(mqtt_config, &key_configs, &knob_configs, commands_sender.clone(), events_sender.subscribe()).await;
}
info!("Initializing handler processes…");
handler_host::start(
get_default_host_id(),
&config_directory.join("handlers"),
key_configs,
knob_configs,
@ -94,11 +103,6 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
)
.await?;
if let Some(mqtt_config) = &config.mqtt {
info!("Initializing MQTT client…");
start_mqtt_client(mqtt_config, commands_sender.clone(), events_sender.subscribe()).await;
}
info!("Connecting to the device…");
let device = available_device.connect().wrap_err("Connecting to the device failed.")?;
info!("Connected.");

View file

@ -1,16 +1,33 @@
use std::collections::HashMap;
use std::fmt::format;
use std::str::FromStr;
use std::time::Duration;
use rumqttc::{Event, Incoming, MqttOptions, QoS};
use tokio::sync::broadcast;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use deckster_shared::style::{KeyStyle, KnobStyle};
use crate::handler_host::KeyOrKnobHandlerConfig;
use crate::model::config::MqttConfig;
pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Sender<HandlerCommand>, mut events_receiver: broadcast::Receiver<HandlerEvent>) {
#[derive(Debug, Serialize, Deserialize)]
pub struct HandlerHostsConfig {
run_id: Box<str>,
keys: HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knobs: HashMap<KnobPath, KeyOrKnobHandlerConfig>,
}
pub async fn start_coordinator_mqtt_client(
config: &MqttConfig,
key_configs: &HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knob_configs: &HashMap<KnobPath, KeyOrKnobHandlerConfig>,
commands_sender: mpsc::Sender<HandlerCommand>,
mut events_receiver: broadcast::Receiver<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port);
@ -29,6 +46,7 @@ pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Send
tokio::spawn({
let topic_prefix = topic_prefix.clone();
let client = client.clone();
async move {
while let Ok(event) = events_receiver.recv().await {
@ -60,66 +78,214 @@ pub async fn start_mqtt_client(config: &MqttConfig, commands_sender: flume::Send
}
});
tokio::spawn(async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
tokio::spawn({
let topic_prefix = topic_prefix.clone();
match segments[0] {
"keys" => {
if property == "style" {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap();
async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
commands_sender
.send(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.unwrap();
}
}
"knobs" => {
let position = KnobPosition::from_str(position).unwrap();
match property {
"style" => {
let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap();
match segments[0] {
"keys" => {
if property == "style" {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKnobStyle {
path: KnobPath {
.send(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
position,
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.await
.unwrap();
}
"value" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap();
}
"knobs" => {
let position = KnobPosition::from_str(position).unwrap();
commands_sender
.send(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.unwrap();
match property {
"style" => {
let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKnobStyle {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
"value" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
_ => {}
}
}
_ => {}
};
}
}
}
});
for (path, config) in key_configs {
client
.publish(
format!("{topic_prefix}/keys/{path}/config"),
QoS::AtLeastOnce,
true,
serde_json::to_string(config).unwrap(),
)
.await
.unwrap()
}
for (path, config) in knob_configs {
client
.publish(
format!("{topic_prefix}/knobs/{path}/config"),
QoS::AtLeastOnce,
true,
serde_json::to_string(config).unwrap(),
)
.await
.unwrap()
}
}
pub async fn start_handler_host_mqtt_client(
config: &MqttConfig,
handler_hosts_config_sender: mpsc::Sender<Option<HandlerHostsConfig>>,
mut commands_receiver: mpsc::Receiver<HandlerCommand>,
events_sender: broadcast::Sender<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(false);
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
if topic_segments[0] == "config" {
if let Ok(config) = serde_json::from_slice(&event.payload) {
handler_hosts_config_sender.send(config).await.unwrap();
} else {
log::error!("Could not deserialize the latest configuration from {}", event.topic);
handler_hosts_config_sender.send(None).await.unwrap();
};
} else {
let page_id = topic_segments[1];
let position = topic_segments[2];
let property = topic_segments[3];
match topic_segments[0] {
"keys" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Key {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
"knobs" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Knob {
path: KnobPath {
page_id: page_id.to_owned(),
position: KnobPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
_ => {}
}
}
_ => {}
};
}
}
}
});
tokio::spawn(async move {
while let Some(command) = commands_receiver.recv().await {
match command {
HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."),
HandlerCommand::SetKeyStyle { path, value } => client
.publish(
format!("{topic_prefix}/keys/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobStyle { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobValue { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/value"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
};
}
});
}