This commit is contained in:
Moritz Ruth 2024-02-28 16:48:01 +01:00
parent bc1ced79a8
commit 19a595c21b
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
26 changed files with 754 additions and 559 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
/target
/.idea
/examples/full/handlers
/examples/*/handlers

41
Cargo.lock generated
View file

@ -381,11 +381,13 @@ dependencies = [
"enum-map",
"enum-ordinalize",
"env_logger",
"flume",
"humantime-serde",
"is_executable",
"itertools",
"log",
"loupedeck_serial",
"nanoid",
"once_cell",
"parse-display 0.9.0",
"regex",
@ -1034,6 +1036,15 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "nanoid"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8"
dependencies = [
"rand",
]
[[package]]
name = "nanorand"
version = "0.7.0"
@ -1269,6 +1280,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro2"
version = "1.0.78"
@ -1287,11 +1304,35 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_xoshiro"

View file

@ -34,6 +34,8 @@ once_cell = "1.19.0"
is_executable = "1.0.1"
rumqttc = "0.23.0"
itertools = "0.12.1"
flume = "0.11.0"
nanoid = "0.4.0"
[workspace]
members = [

View file

@ -1,10 +1,9 @@
# Deckster
## Remote handler host handshake
- The coordinator publishes the config to `PREFIX/config` which includes a nonce.
- All hosts publish the nonce to `PREFIX/handler_hosts/HOST_ID/active_run_id` so that the coordinator knows they are alive.
## Terminology
- `handler runner`: Node that is running handlers.
- `handler host`: A `handler runner` that is not the `coordinator`.
- `coordinator`: Node to which the Loupedeck device is physically connected. Always a `handler runner`.
## Attribution
[foxxyzs `loupedeck` library for JavaScript](https://github.com/foxxyz/loupedeck)

View file

@ -48,8 +48,14 @@ pub fn run<
description: e.to_string(),
})?;
let should_stop = matches!(event, HandlerEvent::Stop);
h.handle(event);
handler = Either::Left(h);
if should_stop {
break;
}
}
Either::Right(init_handler) => {
let initial_message = serde_json::from_str::<InitialHandlerMessage<KeyConfig, KnobConfig>>(&line);

View file

@ -42,6 +42,7 @@ pub enum KeyEvent {
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum HandlerEvent {
Stop,
Knob { path: KnobPath, event: KnobEvent },
Key { path: KeyPath, event: KeyEvent },
}

View file

@ -374,7 +374,7 @@ impl PaThread {
let current_state = Arc::clone(&self.current_state);
let mainloop = Rc::clone(&self.mainloop);
loop {
'outer: loop {
self.run_single_mainloop_iteration(false);
while let Ok(command) = self.commands_rx.try_recv() {
@ -403,11 +403,13 @@ impl PaThread {
}
}
PaCommand::Terminate => {
mainloop.borrow_mut().quit(Retval(0));
break 'outer;
}
}
}
}
mainloop.borrow_mut().quit(Retval(0));
}
fn unwrap_state(state: &Arc<RwLock<Option<Arc<PaVolumeState>>>>) -> Arc<PaVolumeState> {

View file

@ -8,7 +8,7 @@ key_page = "default"
knob_page = "default"
[mqtt]
client_id = "deckster_host"
client_id = "deckster-coordinator"
topic_prefix = "deckster"
host = "localhost"
port = 1883

View file

@ -1,6 +1,7 @@
[keys.1x2]
icon = "@ph/skip-back"
host = "remote"
handler = "playerctl"
config.mode = "previous"
config.style.inactive.icon = "@ph/skip-back[alpha=0.4]"
@ -8,6 +9,7 @@ config.style.inactive.icon = "@ph/skip-back[alpha=0.4]"
[keys.2x2]
icon = "@ph/play-pause[alpha=0.4]"
host = "remote"
handler = "playerctl"
config.mode = "play-pause"
config.style.paused.icon = "@ph/play"
@ -16,6 +18,7 @@ config.style.playing.icon = "@ph/pause"
[keys.3x2]
icon = "@ph/skip-forward"
host = "remote"
handler = "playerctl"
config.mode = "next"
config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]"
@ -23,6 +26,7 @@ config.style.inactive.icon = "@ph/skip-forward[alpha=0.4]"
[keys.1x3]
icon = "@fad/shuffle[alpha=0.4]"
host = "remote"
handler = "playerctl"
config.mode = "shuffle"
config.style.on.icon = "@fad/shuffle[color=#58fc11]"
@ -30,6 +34,7 @@ config.style.on.icon = "@fad/shuffle[color=#58fc11]"
[keys.2x3]
icon = "@fad/repeat[alpha=0.4]"
host = "remote"
handler = "playerctl"
config.mode = "loop"
config.style.single.icon = "@fad/repeat-one[color=#58fc11]"

View file

@ -2,6 +2,7 @@
icon = "@ph/microphone-light[scale=0.9]"
indicators.bar.color = "#ffffff50"
host = "remote"
handler = "pa_volume"
config.delta = 0.05
config.target.type = "input"
@ -19,6 +20,7 @@ config.style.inactive.icon = "@ph/microphone-slash-light[scale=0.9|alpha=0.8|col
icon = "@apps/discord[scale=0.25]"
indicators.bar.color = "#ffffff50"
host = "remote"
handler = "pa_volume"
config.delta = 0.05
config.target.type = "application"
@ -31,6 +33,7 @@ config.style.inactive.icon = "@apps/discord[scale=0.25|grayscale|alpha=0.8]"
icon = "@apps/youtube[scale=1.3]"
indicators.bar.color = "#ffffff50"
host = "remote"
handler = "pa_volume"
config.delta = 0.05
config.muted_turn_action = "unmute"
@ -44,6 +47,7 @@ config.style.inactive.icon = "@apps/youtube[scale=1.3|grayscale]"
icon = "@apps/spotify[scale=1.2]"
indicators.bar.color = "#ffffff50"
host = "remote"
handler = "pa_volume"
config.delta = 0.05
config.muted_turn_action = "unmute-at-zero"

View file

@ -0,0 +1,7 @@
host_id = "remote"
[mqtt]
client_id = "deckster-remote"
topic_prefix = "deckster"
host = "localhost"
port = 1883

View file

@ -9,8 +9,8 @@ use tiny_skia::{Color, IntSize, LineCap, LineJoin, Paint, Pixmap, PremultipliedC
use deckster_shared::state::{Key, Knob};
use loupedeck_serial::util::Endianness;
use crate::coordinator::graphics::labels::LabelRenderer;
use crate::icons::IconManager;
use crate::runner::graphics::labels::LabelRenderer;
#[derive(Debug)]
pub struct GraphicsContext {

View file

@ -15,12 +15,12 @@ use loupedeck_serial::characteristics::{LoupedeckButton, LoupedeckDeviceKeyGridC
use loupedeck_serial::device::LoupedeckDevice;
use loupedeck_serial::events::{LoupedeckEvent, RotationDirection};
use crate::coordinator::graphics::labels::LabelRenderer;
use crate::coordinator::graphics::{render_key, render_knob, GraphicsContext};
use crate::coordinator::state::State;
use crate::icons::IconManager;
use crate::model::config::Config;
use crate::model::coordinator_config::Config;
use crate::model::position::ButtonPosition;
use crate::runner::graphics::labels::LabelRenderer;
use crate::runner::graphics::{render_key, render_knob, GraphicsContext};
use crate::runner::state::State;
enum IoWork {
DeviceEvent(LoupedeckEvent),

View file

@ -5,6 +5,7 @@ use std::thread;
use color_eyre::eyre::{ContextCompat, WrapErr};
use color_eyre::Result;
use log::info;
use nanoid::nanoid;
use tokio::sync::broadcast;
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
@ -12,12 +13,13 @@ use deckster_shared::path::{KeyPath, KnobPath};
use loupedeck_serial::commands::VibrationPattern;
use loupedeck_serial::device::LoupedeckDevice;
use crate::handler_host;
use crate::handler_host::KeyOrKnobHandlerConfig;
use crate::model::config::Config;
use crate::coordinator::io_worker::{do_io_work, IoWorkerContext};
use crate::coordinator::mqtt::start_mqtt_client;
use crate::handler_runner;
use crate::handler_runner::KeyOrKnobHandlerConfig;
use crate::model::coordinator_config::Config;
use crate::model::get_default_host_id;
use crate::runner::io_worker::{do_io_work, IoWorkerContext};
use crate::runner::mqtt::start_coordinator_mqtt_client;
use crate::model::mqtt::HandlerHostsConfig;
mod graphics;
mod io_worker;
@ -42,62 +44,63 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
})
.unwrap();
let key_configs = config
.key_pages_by_id
.iter()
.flat_map(|(page_id, p)| {
p.keys.iter().map(|(position, k)| {
(
KeyPath {
page_id: page_id.clone(),
position: *position,
},
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
)
let handler_hosts_config = HandlerHostsConfig {
run_id: nanoid!().into_boxed_str(),
keys: config
.key_pages_by_id
.iter()
.flat_map(|(page_id, p)| {
p.keys.iter().map(|(position, k)| {
(
KeyPath {
page_id: page_id.clone(),
position: *position,
},
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
)
})
})
})
.collect();
.collect(),
knobs: config
.knob_pages_by_id
.iter()
.flat_map(|(page_id, p)| {
p.knobs.iter().filter_map(|(position, k)| {
if k.handler.is_empty() {
return None;
}
let knob_configs = config
.knob_pages_by_id
.iter()
.flat_map(|(page_id, p)| {
p.knobs.iter().filter_map(|(position, k)| {
if k.handler.is_empty() {
return None;
}
Some((
KnobPath {
page_id: page_id.clone(),
position,
},
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
))
Some((
KnobPath {
page_id: page_id.clone(),
position,
},
KeyOrKnobHandlerConfig {
host_id: k.host.clone(),
name: k.handler.clone(),
config: Arc::clone(&k.config),
},
))
})
})
})
.collect();
.collect(),
};
if let Some(mqtt_config) = &config.mqtt {
info!("Initializing MQTT client…");
start_coordinator_mqtt_client(mqtt_config, &key_configs, &knob_configs, commands_sender.clone(), events_sender.subscribe()).await;
start_mqtt_client(mqtt_config, &handler_hosts_config, commands_sender.clone(), events_sender.subscribe()).await;
}
info!("Initializing handler processes…");
handler_host::start(
handler_runner::start(
get_default_host_id(),
&config_directory.join("handlers"),
key_configs,
knob_configs,
handler_hosts_config,
commands_sender.clone(),
events_sender.subscribe(),
)

154
src/coordinator/mqtt.rs Normal file
View file

@ -0,0 +1,154 @@
use std::str::FromStr;
use std::time::Duration;
use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS};
use tokio::sync::broadcast;
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use deckster_shared::style::{KeyStyle, KnobStyle};
use crate::model::mqtt::{HandlerHostsConfig, MqttConfig};
pub async fn start_mqtt_client(
config: &MqttConfig,
handler_hosts_config: &HandlerHostsConfig,
commands_sender: flume::Sender<HandlerCommand>,
mut events_receiver: broadcast::Receiver<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let config_topic = format!("{topic_prefix}/config");
let mut options = MqttOptions::new(&config.client_id, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(true);
options.set_last_will(LastWill::new(
config_topic.clone(),
serde_json::to_vec(&Option::<HandlerHostsConfig>::None).unwrap(),
QoS::ExactlyOnce,
true,
));
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
let client = client.clone();
async move {
while let Ok(event) = events_receiver.recv().await {
match event {
HandlerEvent::Stop => {
// TODO
}
HandlerEvent::Key { path, event } => {
client
.publish(
format!("{topic_prefix}/keys/{path}/events"),
QoS::ExactlyOnce,
false,
serde_json::to_string(&event).unwrap(),
)
.await
.unwrap();
}
HandlerEvent::Knob { path, event } => {
client
.publish(
format!("{topic_prefix}/knobs/{path}/events"),
QoS::ExactlyOnce,
false,
serde_json::to_string(&event).unwrap(),
)
.await
.unwrap();
}
}
}
}
});
tokio::spawn({
let topic_prefix = topic_prefix.clone();
async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
match segments[0] {
"keys" => {
if property == "style" {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap();
commands_sender
.send_async(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.await
.unwrap();
}
}
"knobs" => {
let position = KnobPosition::from_str(position).unwrap();
match property {
"style" => {
let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap();
commands_sender
.send_async(HandlerCommand::SetKnobStyle {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
"value" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap();
commands_sender
.send_async(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
_ => {}
}
}
_ => {}
};
}
}
}
});
client
.publish(config_topic, QoS::ExactlyOnce, true, serde_json::to_string(handler_hosts_config).unwrap())
.await
.unwrap()
}

View file

@ -6,8 +6,8 @@ use log::error;
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use deckster_shared::state::{Key, Knob};
use crate::coordinator::state;
use crate::model;
use crate::runner::state;
#[derive(Debug)]
pub struct State {
@ -19,7 +19,7 @@ pub struct State {
}
impl State {
pub fn create(config: &model::config::Config) -> Self {
pub fn create(config: &model::coordinator_config::Config) -> Self {
let key_pages_by_id: HashMap<_, _> = config
.key_pages_by_id
.iter()

View file

@ -1,192 +1,58 @@
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
use color_eyre::eyre::{eyre, WrapErr};
use color_eyre::Result;
use is_executable::IsExecutable;
use itertools::Itertools;
use log::warn;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdin, Command};
use tokio::sync::mpsc;
use log::{debug, info, warn};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{
HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage,
};
use deckster_shared::path::{KeyPath, KnobPath};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
#[derive(Debug, Serialize, Deserialize)]
pub struct KeyOrKnobHandlerConfig {
pub host_id: Box<str>,
pub name: Box<str>,
pub config: Arc<toml::Table>,
}
use crate::handler_runner;
use crate::model::handler_host_config::Config;
pub async fn start(
host_id: Box<str>,
handlers_directory: &Path,
key_configs: HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knob_configs: HashMap<KnobPath, KeyOrKnobHandlerConfig>,
commands_sender: mpsc::Sender<HandlerCommand>,
mut events_receiver: tokio::sync::broadcast::Receiver<HandlerEvent>,
) -> Result<()> {
let handler_names: Vec<OsString> = std::fs::read_dir(handlers_directory)
.wrap_err_with(|| format!("while reading the handlers directory: {}", handlers_directory.to_string_lossy()))?
.filter_map(|entry| {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_executable() {
return Some(path.file_name().unwrap().to_os_string());
}
mod mqtt;
pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
let (commands_sender, commands_receiver) = flume::bounded::<HandlerCommand>(5);
let (handler_hosts_config_sender, mut handler_hosts_config_receiver) = mpsc::channel(1);
let events_sender = broadcast::Sender::<HandlerEvent>::new(5);
info!("Initializing MQTT client…");
mqtt::start_mqtt_client(&config.mqtt, handler_hosts_config_sender, commands_receiver, events_sender.clone()).await;
info!("Waiting for initial configuration…");
let mut is_running = false;
while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await {
match handler_hosts_config {
None => {
info!("Coordinator was stopped. Sending stop event to handlers.");
events_sender.send(HandlerEvent::Stop).unwrap();
is_running = false;
}
None
})
.collect();
let mut handler_stdin_by_name: HashMap<Box<str>, ChildStdin> = HashMap::with_capacity(handler_names.len());
let mut handler_name_by_key_path: HashMap<KeyPath, Box<str>> = HashMap::new();
let mut handler_config_by_key_path_by_handler_name: HashMap<Box<str>, HashMap<KeyPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in key_configs {
if config.host_id != host_id {
continue;
}
handler_name_by_key_path.insert(path.clone(), config.name.clone());
handler_config_by_key_path_by_handler_name
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.config));
}
let mut handler_name_by_knob_path: HashMap<KnobPath, Box<str>> = HashMap::new();
let mut handler_config_by_knob_path_by_handler_name: HashMap<Box<str>, HashMap<KnobPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in knob_configs {
if config.host_id != host_id {
continue;
}
handler_name_by_knob_path.insert(path.clone(), config.name.clone());
handler_config_by_knob_path_by_handler_name
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.config));
}
for handler_name in handler_names {
let handler_name = handler_name
.into_string()
.map_err(|_| eyre!("Command names must be valid Unicode."))?
.into_boxed_str();
let (key_configs, knob_configs) = match (
handler_config_by_key_path_by_handler_name.remove(&handler_name),
handler_config_by_knob_path_by_handler_name.remove(&handler_name),
) {
(None, None) => {
warn!("Handler '{handler_name}' is not used by any key or knob.");
continue;
}
(a, b) => (a.unwrap_or_default(), b.unwrap_or_default()),
};
let mut command = Command::new(handlers_directory.join(handler_name.to_string()))
.arg("deckster-run")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.wrap_err_with(|| format!("while spawning handler: {handler_name}"))?;
let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines();
let mut stdin = command.stdin.unwrap();
let initial_handler_message = InitialHandlerMessage { key_configs, knob_configs };
let serialized_message = serde_json::to_string(&initial_handler_message).unwrap().into_boxed_str().into_boxed_bytes();
stdin.write_all(&serialized_message).await.unwrap();
stdin.write_u8(b'\n').await.unwrap();
stdin.flush().await.unwrap();
let result_line = stdout_lines.next_line().await?.unwrap();
let result: HandlerInitializationResultMessage = serde_json::from_str(&result_line)?;
if let HandlerInitializationResultMessage::Error { error } = result {
#[rustfmt::skip]
if let HandlerInitializationError::InvalidConfig { supports_keys, supports_knobs, .. } = error {
if !supports_keys && !initial_handler_message.key_configs.is_empty() {
return Err(eyre!(
"The '{handler_name}' handler does not support keys, but these keys tried to use it: {}",
initial_handler_message.key_configs.keys().map(|k| k.to_string()).join(", ")
));
} else if !supports_knobs && !initial_handler_message.knob_configs.is_empty() {
return Err(eyre!(
"The '{handler_name}' handler does not support knobs, but these knobs tried to use it: {}",
initial_handler_message.knob_configs.keys().map(|k| k.to_string()).join(", ")
));
Some(handler_hosts_config) => {
if is_running {
warn!("A new configuration was received before the old one was cleared.");
events_sender.send(HandlerEvent::Stop).unwrap();
}
};
return Err(eyre!("Starting the '{handler_name}' handler failed: {error}"));
}
is_running = true;
let commands_sender = commands_sender.clone();
info!("Received new configuration. Starting handlers…");
debug!("New configuration: {handler_hosts_config:#?}");
tokio::spawn(async move {
while let Ok(Some(line)) = stdout_lines.next_line().await {
if line.starts_with('{') {
let command = serde_json::from_str::<HandlerCommand>(&line).unwrap();
commands_sender.send(command).await.unwrap();
} else {
println!("{}", line);
}
handler_runner::start(
config.host_id.clone(),
&config_directory.join("handlers"),
handler_hosts_config,
commands_sender.clone(),
events_sender.subscribe(),
)
.await?;
}
});
handler_stdin_by_name.insert(handler_name, stdin);
}
if let Some((handler_name, config_by_key_path)) = handler_config_by_key_path_by_handler_name.drain().next() {
return Err(eyre!(
"There is no executable file named '{handler_name}' in the handlers directory but these keys have it set as their handler: {}",
config_by_key_path.keys().join(", ")
));
}
if let Some((handler_name, config_by_knob_path)) = handler_config_by_knob_path_by_handler_name.drain().next() {
return Err(eyre!(
"There is no executable file named '{handler_name}' in the handlers directory but these knobs have it set as their handler: {}",
config_by_knob_path.keys().join(", ")
));
}
tokio::spawn(async move {
while let Ok(event) = events_receiver.recv().await {
let handler_name = if let Some(n) = match &event {
HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path),
HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path),
} {
n
} else {
continue;
};
let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above");
let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes();
handler_stdin.write_all(&serialized_event).await.unwrap();
handler_stdin.write_u8(b'\n').await.unwrap();
handler_stdin.flush().await.unwrap();
}
});
}
dbg!("hey");
Ok(())
}

138
src/handler_host/mqtt.rs Normal file
View file

@ -0,0 +1,138 @@
use std::str::FromStr;
use std::time::Duration;
use log::error;
use rumqttc::{Event, Incoming, MqttOptions, QoS};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use crate::model::mqtt::HandlerHostsConfig;
use crate::model::mqtt::MqttConfig;
pub async fn start_mqtt_client(
config: &MqttConfig,
handler_hosts_config_sender: mpsc::Sender<Option<HandlerHostsConfig>>,
commands_receiver: flume::Receiver<HandlerCommand>,
events_sender: broadcast::Sender<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let mut options = MqttOptions::new(&config.client_id, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(true);
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
async move {
loop {
let poll_result = event_loop.poll().await;
match poll_result {
Err(error) => {
error!("{error}")
}
Ok(event) => {
if let Event::Incoming(Incoming::Publish(event)) = event {
let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
if topic_segments[0] == "config" {
if let Ok(config) = serde_json::from_slice(&event.payload) {
handler_hosts_config_sender.send(config).await.unwrap();
} else {
log::error!("Could not deserialize the latest configuration from {}", event.topic);
handler_hosts_config_sender.send(None).await.unwrap();
};
} else {
let page_id = topic_segments[1];
let position = topic_segments[2];
let property = topic_segments[3];
match topic_segments[0] {
"keys" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Key {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
"knobs" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Knob {
path: KnobPath {
page_id: page_id.to_owned(),
position: KnobPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
_ => {}
}
}
}
}
}
}
}
});
tokio::spawn(async move {
while let Ok(command) = commands_receiver.recv_async().await {
match command {
HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."),
HandlerCommand::SetKeyStyle { path, value } => client
.publish(
format!("{topic_prefix}/keys/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobStyle { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobValue { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/value"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
};
}
});
}

221
src/handler_runner.rs Normal file
View file

@ -0,0 +1,221 @@
use std::collections::HashMap;
use std::ffi::OsString;
use std::path::Path;
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use color_eyre::eyre::{eyre, WrapErr};
use color_eyre::Result;
use is_executable::IsExecutable;
use itertools::Itertools;
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{ChildStdin, Command};
use deckster_shared::handler_communication::{
HandlerCommand, HandlerEvent, HandlerInitializationError, HandlerInitializationResultMessage, InitialHandlerMessage,
};
use deckster_shared::path::{KeyPath, KnobPath};
use crate::model::mqtt::HandlerHostsConfig;
pub async fn start(
host_id: Box<str>,
handlers_directory: &Path,
handler_hosts_config: HandlerHostsConfig,
commands_sender: flume::Sender<HandlerCommand>,
mut events_receiver: tokio::sync::broadcast::Receiver<HandlerEvent>,
) -> Result<()> {
let should_stop = Arc::new(AtomicBool::new(false));
let handler_names: Vec<OsString> = std::fs::read_dir(handlers_directory)
.wrap_err_with(|| format!("while reading the handlers directory: {}", handlers_directory.to_string_lossy()))?
.filter_map(|entry| {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_executable() {
return Some(path.file_name().unwrap().to_os_string());
}
}
None
})
.collect();
let mut handler_stdin_by_name: HashMap<Box<str>, ChildStdin> = HashMap::with_capacity(handler_names.len());
let mut handler_name_by_key_path: HashMap<KeyPath, Box<str>> = HashMap::new();
let mut handler_config_by_key_path_by_handler_name: HashMap<Box<str>, HashMap<KeyPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in handler_hosts_config.keys {
if config.host_id != host_id {
continue;
}
handler_name_by_key_path.insert(path.clone(), config.name.clone());
handler_config_by_key_path_by_handler_name
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.config));
}
let mut handler_name_by_knob_path: HashMap<KnobPath, Box<str>> = HashMap::new();
let mut handler_config_by_knob_path_by_handler_name: HashMap<Box<str>, HashMap<KnobPath, Arc<toml::Table>>> = HashMap::new();
for (path, config) in handler_hosts_config.knobs {
if config.host_id != host_id {
continue;
}
handler_name_by_knob_path.insert(path.clone(), config.name.clone());
handler_config_by_knob_path_by_handler_name
.entry(config.name)
.or_default()
.insert(path, Arc::clone(&config.config));
}
for handler_name in handler_names {
let handler_name = handler_name
.into_string()
.map_err(|_| eyre!("Command names must be valid Unicode."))?
.into_boxed_str();
let (key_configs, knob_configs) = match (
handler_config_by_key_path_by_handler_name.remove(&handler_name),
handler_config_by_knob_path_by_handler_name.remove(&handler_name),
) {
(None, None) => {
warn!("Handler '{handler_name}' is not used by any key or knob.");
continue;
}
(a, b) => (a.unwrap_or_default(), b.unwrap_or_default()),
};
let mut command = Command::new(handlers_directory.join(handler_name.to_string()))
.arg("deckster-run")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.wrap_err_with(|| format!("while spawning handler: {handler_name}"))?;
let mut stdout_lines = BufReader::new(command.stdout.take().unwrap()).lines();
let mut stdin = command.stdin.take().unwrap();
let initial_handler_message = InitialHandlerMessage { key_configs, knob_configs };
let serialized_message = serde_json::to_string(&initial_handler_message).unwrap().into_boxed_str().into_boxed_bytes();
stdin.write_all(&serialized_message).await.unwrap();
stdin.write_u8(b'\n').await.unwrap();
stdin.flush().await.unwrap();
let result_line = stdout_lines.next_line().await?.unwrap();
let result: HandlerInitializationResultMessage = serde_json::from_str(&result_line)?;
if let HandlerInitializationResultMessage::Error { error } = result {
#[rustfmt::skip]
if let HandlerInitializationError::InvalidConfig { supports_keys, supports_knobs, .. } = error {
if !supports_keys && !initial_handler_message.key_configs.is_empty() {
return Err(eyre!(
"The '{handler_name}' handler does not support keys, but these keys tried to use it: {}",
initial_handler_message.key_configs.keys().map(|k| k.to_string()).join(", ")
));
} else if !supports_knobs && !initial_handler_message.knob_configs.is_empty() {
return Err(eyre!(
"The '{handler_name}' handler does not support knobs, but these knobs tried to use it: {}",
initial_handler_message.knob_configs.keys().map(|k| k.to_string()).join(", ")
));
}
};
return Err(eyre!("Starting the '{handler_name}' handler failed: {error}"));
}
handler_stdin_by_name.insert(handler_name.clone(), stdin);
let commands_sender = commands_sender.clone();
tokio::spawn(async move {
while let Ok(Some(line)) = stdout_lines.next_line().await {
if line.starts_with('{') {
let command = serde_json::from_str::<HandlerCommand>(&line).unwrap();
commands_sender.send_async(command).await.unwrap();
} else {
println!("{}", line);
}
}
});
let should_stop = Arc::clone(&should_stop);
tokio::spawn(async move {
let exit_status = command.wait().await.unwrap();
if !should_stop.load(Ordering::Relaxed) {
match exit_status.code() {
None => error!("The '{handler_name}' handler was unexpectedly terminated by a signal."),
Some(code) => error!("The '{handler_name}' handler exited unexpectedly with status code {code}"),
}
} else {
debug!("The '{handler_name}' handler exited: {exit_status:#?}");
}
});
}
if let Some((handler_name, config_by_key_path)) = handler_config_by_key_path_by_handler_name.drain().next() {
return Err(eyre!(
"There is no executable file named '{handler_name}' in the handlers directory but these keys have it set as their handler: {}",
config_by_key_path.keys().join(", ")
));
}
if let Some((handler_name, config_by_knob_path)) = handler_config_by_knob_path_by_handler_name.drain().next() {
return Err(eyre!(
"There is no executable file named '{handler_name}' in the handlers directory but these knobs have it set as their handler: {}",
config_by_knob_path.keys().join(", ")
));
}
tokio::spawn(async move {
while let Ok(event) = events_receiver.recv().await {
let handler_name = match &event {
HandlerEvent::Stop => {
should_stop.store(true, Ordering::Relaxed);
let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes();
for handler_stdin in handler_stdin_by_name.values_mut() {
handler_stdin.write_all(&serialized_event).await.unwrap();
handler_stdin.write_u8(b'\n').await.unwrap();
handler_stdin.flush().await.unwrap();
}
break;
}
HandlerEvent::Key { path, .. } => handler_name_by_key_path.get(path),
HandlerEvent::Knob { path, .. } => handler_name_by_knob_path.get(path),
};
let handler_name = if let Some(n) = handler_name {
n
} else {
continue;
};
let handler_stdin = handler_stdin_by_name.get_mut(handler_name).expect("was already checked above");
let serialized_event = serde_json::to_string(&event).unwrap().into_boxed_str().into_boxed_bytes();
handler_stdin.write_all(&serialized_event).await.unwrap();
handler_stdin.write_u8(b'\n').await.unwrap();
handler_stdin.flush().await.unwrap();
}
});
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
pub struct KeyOrKnobHandlerConfig {
pub host_id: Box<str>,
pub name: Box<str>,
pub config: Arc<toml::Table>,
}

View file

@ -12,7 +12,7 @@ use tiny_skia::{BlendMode, FilterQuality, Pixmap, PixmapPaint, Transform};
use deckster_shared::icon_descriptor::{IconDescriptor, IconDescriptorSource};
use deckster_shared::image_filter::ImageFilter;
use crate::model::config::{IconFormat, IconPack};
use crate::model::coordinator_config::{IconFormat, IconPack};
mod destructive_filter;

View file

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use clap::{Parser, Subcommand};
@ -9,12 +9,13 @@ use color_eyre::Result;
use log::LevelFilter;
use walkdir::WalkDir;
use crate::model::config::WithFallbackId;
use crate::model::coordinator_config::WithFallbackId;
mod coordinator;
mod handler_host;
mod handler_runner;
mod icons;
mod model;
mod runner;
#[derive(Debug, Parser)]
#[command(name = "deckster", about = "Use Loupedeck devices under Linux.")]
@ -25,9 +26,13 @@ struct Cli {
#[derive(Debug, Subcommand)]
enum Command {
Run {
#[arg(long, required = true)]
config: PathBuf,
Coordinator {
#[arg(short, long, required = true, alias = "start")]
config: Box<Path>,
},
HandlerHost {
#[arg(short, long, required = true)]
config: Box<Path>,
},
}
@ -38,8 +43,8 @@ pub async fn main() -> Result<()> {
let cli = Cli::parse();
match cli.command {
Command::Run { config: config_path } => {
let deckster_file = read_and_deserialize::<model::config::File>(config_path.join("deckster.toml").as_path())?;
Command::Coordinator { config: config_path } => {
let deckster_file = read_and_deserialize::<model::coordinator_config::File>(config_path.join("deckster.toml").as_path())?;
let config_path = config_path.canonicalize()?;
let key_pages_by_id: HashMap<String, model::key_page::Page> =
@ -63,7 +68,7 @@ pub async fn main() -> Result<()> {
.map(|p| (p.id.clone(), p))
.collect();
let config = model::config::Config {
let config = model::coordinator_config::Config {
active_button_color: deckster_file.active_button_color,
inactive_button_color: deckster_file.inactive_button_color,
label_font_family: deckster_file.label_font_family,
@ -76,7 +81,13 @@ pub async fn main() -> Result<()> {
}
.validate()?;
runner::start(&config_path, config).await?
coordinator::start(&config_path, config).await?
}
Command::HandlerHost { config: config_path } => {
let config = read_and_deserialize::<model::handler_host_config::Config>(config_path.join("deckster.toml").as_path())?;
let config_path = config_path.canonicalize()?;
handler_host::start(&config_path, config).await?
}
};

View file

@ -11,6 +11,7 @@ use deckster_shared::image_filter::ImageFilter;
use deckster_shared::rgb::RGB8Wrapper;
use crate::model;
use crate::model::mqtt::MqttConfig;
use crate::model::position::ButtonPosition;
#[derive(Debug, Deserialize)]
@ -65,21 +66,6 @@ pub struct InitialConfig {
pub knob_page: String,
}
#[derive(Debug, Deserialize)]
pub struct MqttConfig {
pub client_id: String,
pub host: String,
pub port: u16,
pub credentials: Option<MqttConfigCredentials>,
pub topic_prefix: String,
}
#[derive(Debug, Deserialize)]
pub struct MqttConfigCredentials {
pub username: String,
pub password: String,
}
#[derive(Debug, Eq, PartialEq, Hash, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum IconFormat {

View file

@ -0,0 +1,9 @@
use serde::Deserialize;
use crate::model::mqtt::MqttConfig;
#[derive(Debug, Deserialize)]
pub struct Config {
pub host_id: Box<str>,
pub mqtt: MqttConfig,
}

View file

@ -1,7 +1,9 @@
pub mod config;
pub mod coordinator_config;
pub mod geometry;
pub mod handler_host_config;
pub mod key_page;
pub mod knob_page;
pub mod mqtt;
pub mod position;
pub fn get_default_host_id() -> Box<str> {

29
src/model/mqtt.rs Normal file
View file

@ -0,0 +1,29 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use deckster_shared::path::{KeyPath, KnobPath};
use crate::handler_runner::KeyOrKnobHandlerConfig;
#[derive(Debug, Serialize, Deserialize)]
pub struct HandlerHostsConfig {
pub run_id: Box<str>,
pub keys: HashMap<KeyPath, KeyOrKnobHandlerConfig>,
pub knobs: HashMap<KnobPath, KeyOrKnobHandlerConfig>,
}
#[derive(Debug, Deserialize)]
pub struct MqttConfig {
pub client_id: String,
pub host: String,
pub port: u16,
pub credentials: Option<MqttConfigCredentials>,
pub topic_prefix: String,
}
#[derive(Debug, Deserialize)]
pub struct MqttConfigCredentials {
pub username: String,
pub password: String,
}

View file

@ -1,291 +0,0 @@
use std::collections::HashMap;
use std::fmt::format;
use std::str::FromStr;
use std::time::Duration;
use rumqttc::{Event, Incoming, MqttOptions, QoS};
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use deckster_shared::style::{KeyStyle, KnobStyle};
use crate::handler_host::KeyOrKnobHandlerConfig;
use crate::model::config::MqttConfig;
#[derive(Debug, Serialize, Deserialize)]
pub struct HandlerHostsConfig {
run_id: Box<str>,
keys: HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knobs: HashMap<KnobPath, KeyOrKnobHandlerConfig>,
}
pub async fn start_coordinator_mqtt_client(
config: &MqttConfig,
key_configs: &HashMap<KeyPath, KeyOrKnobHandlerConfig>,
knob_configs: &HashMap<KnobPath, KeyOrKnobHandlerConfig>,
commands_sender: mpsc::Sender<HandlerCommand>,
mut events_receiver: broadcast::Receiver<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(false);
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
let client = client.clone();
async move {
while let Ok(event) = events_receiver.recv().await {
match event {
HandlerEvent::Key { path, event } => {
client
.publish(
format!("{topic_prefix}/keys/{path}/events"),
QoS::ExactlyOnce,
false,
serde_json::to_string(&event).unwrap(),
)
.await
.unwrap();
}
HandlerEvent::Knob { path, event } => {
client
.publish(
format!("{topic_prefix}/knobs/{path}/events"),
QoS::ExactlyOnce,
false,
serde_json::to_string(&event).unwrap(),
)
.await
.unwrap();
}
}
}
}
});
tokio::spawn({
let topic_prefix = topic_prefix.clone();
async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
match segments[0] {
"keys" => {
if property == "style" {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.await
.unwrap();
}
}
"knobs" => {
let position = KnobPosition::from_str(position).unwrap();
match property {
"style" => {
let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKnobStyle {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
"value" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap();
commands_sender
.send(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
_ => {}
}
}
_ => {}
};
}
}
}
});
for (path, config) in key_configs {
client
.publish(
format!("{topic_prefix}/keys/{path}/config"),
QoS::AtLeastOnce,
true,
serde_json::to_string(config).unwrap(),
)
.await
.unwrap()
}
for (path, config) in knob_configs {
client
.publish(
format!("{topic_prefix}/knobs/{path}/config"),
QoS::AtLeastOnce,
true,
serde_json::to_string(config).unwrap(),
)
.await
.unwrap()
}
}
pub async fn start_handler_host_mqtt_client(
config: &MqttConfig,
handler_hosts_config_sender: mpsc::Sender<Option<HandlerHostsConfig>>,
mut commands_receiver: mpsc::Receiver<HandlerCommand>,
events_sender: broadcast::Sender<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let mut options = MqttOptions::new(&config.topic_prefix, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(false);
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
async move {
while let Ok(event) = event_loop.poll().await {
if let Event::Incoming(Incoming::Publish(event)) = event {
let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
if topic_segments[0] == "config" {
if let Ok(config) = serde_json::from_slice(&event.payload) {
handler_hosts_config_sender.send(config).await.unwrap();
} else {
log::error!("Could not deserialize the latest configuration from {}", event.topic);
handler_hosts_config_sender.send(None).await.unwrap();
};
} else {
let page_id = topic_segments[1];
let position = topic_segments[2];
let property = topic_segments[3];
match topic_segments[0] {
"keys" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Key {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
"knobs" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Knob {
path: KnobPath {
page_id: page_id.to_owned(),
position: KnobPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
_ => {}
}
}
}
}
}
});
tokio::spawn(async move {
while let Some(command) = commands_receiver.recv().await {
match command {
HandlerCommand::SetActivePages { .. } => log::warn!("HandlerCommand::SetActivePages is not supported for remote handlers."),
HandlerCommand::SetKeyStyle { path, value } => client
.publish(
format!("{topic_prefix}/keys/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobStyle { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/style"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
HandlerCommand::SetKnobValue { path, value } => client
.publish(
format!("{topic_prefix}/knobs/{path}/value"),
QoS::ExactlyOnce,
false,
serde_json::to_vec(&value).unwrap(),
)
.await
.unwrap(),
};
}
});
}