This commit is contained in:
Moritz Ruth 2024-03-01 01:54:11 +01:00
parent f44283160a
commit b150e4c114
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
5 changed files with 70 additions and 18 deletions

View file

@ -25,6 +25,7 @@ use crate::model::position::ButtonPosition;
pub enum CoordinatorCommand {
SetActivePages { key_page_id: String, knob_page_id: String },
SetRemoteHostIsActive { host_id: Box<str>, is_active: bool },
SetAllRemoteHostsInactive,
}
enum IoWork {
@ -224,11 +225,15 @@ fn handle_coordinator_command(context: &mut IoWorkerContext, command: Coordinato
for button in context.device.characteristics().available_buttons {
context
.device
.set_button_color(button, get_correct_button_color(context, &context.state, ButtonPosition::of(&button)))
.set_button_color(button, get_correct_button_color(context, ButtonPosition::of(&button)))
.expect("the button is available for this device because that is literally what we are iterating over");
}
redraw_visible_page(&context);
redraw_visible_page(context);
}
CoordinatorCommand::SetAllRemoteHostsInactive => {
context.state.active_remote_handler_host_ids.clear();
redraw_visible_page(context);
}
CoordinatorCommand::SetRemoteHostIsActive { host_id, is_active } => {
if is_active {
@ -237,7 +242,7 @@ fn handle_coordinator_command(context: &mut IoWorkerContext, command: Coordinato
context.state.active_remote_handler_host_ids.remove(&host_id);
}
redraw_visible_page(&context);
redraw_visible_page(context);
}
}
}
@ -295,13 +300,13 @@ fn redraw_visible_page(context: &IoWorkerContext) {
// active -> config.active_button_color
// no actions defined -> #000000
// inactive -> config.inactive_button_color
fn get_correct_button_color(context: &IoWorkerContext, state: &State, button_position: ButtonPosition) -> RGB8 {
fn get_correct_button_color(context: &IoWorkerContext, button_position: ButtonPosition) -> RGB8 {
let button_config = &context.config.buttons[button_position];
if let Some(key_page) = &button_config.key_page {
if key_page == &state.active_key_page_id {
if key_page == &context.state.active_key_page_id {
if let Some(knob_page) = &button_config.knob_page {
if knob_page == &state.active_knob_page_id {
if knob_page == &context.state.active_knob_page_id {
return context.config.active_button_color.into();
}
}

View file

@ -91,7 +91,14 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
if let Some(mqtt_config) = &config.mqtt {
log::info!("Initializing MQTT client…");
start_mqtt_client(mqtt_config, &handler_hosts_config, handler_commands_sender.clone(), events_sender.subscribe()).await;
start_mqtt_client(
mqtt_config,
&handler_hosts_config,
coordinator_commands_sender.clone(),
handler_commands_sender.clone(),
events_sender.subscribe(),
)
.await;
}
log::info!("Initializing handler processes…");

View file

@ -4,6 +4,7 @@ use std::time::Duration;
use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS};
use tokio::sync::broadcast;
use crate::coordinator::io_worker::CoordinatorCommand;
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition};
use deckster_shared::style::{KeyStyle, KnobStyle};
@ -13,7 +14,8 @@ use crate::model::mqtt::{HandlerHostsConfig, MqttConfig};
pub async fn start_mqtt_client(
config: &MqttConfig,
handler_hosts_config: &HandlerHostsConfig,
commands_sender: flume::Sender<HandlerCommand>,
coordinator_commands_sender: flume::Sender<CoordinatorCommand>,
handler_commands_sender: flume::Sender<HandlerCommand>,
mut events_receiver: broadcast::Receiver<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
@ -46,7 +48,7 @@ pub async fn start_mqtt_client(
while let Ok(event) = events_receiver.recv().await {
match event {
HandlerEvent::Stop => {
// TODO
unreachable!("Not expected to ever be sent on the coordinator side")
}
HandlerEvent::Key { path, event } => {
client
@ -75,6 +77,8 @@ pub async fn start_mqtt_client(
}
});
let run_id = handler_hosts_config.run_id.clone().into_boxed_bytes();
tokio::spawn({
let topic_prefix = topic_prefix.clone();
let client = client.clone();
@ -91,6 +95,8 @@ pub async fn start_mqtt_client(
is_first_try = false;
} else if is_connected {
log::error!("MQTT connection lost: {error}");
coordinator_commands_sender.send(CoordinatorCommand::SetAllRemoteHostsInactive).unwrap();
}
is_connected = false;
@ -102,6 +108,7 @@ pub async fn start_mqtt_client(
is_connected = true;
is_first_try = false;
client.subscribe(format!("{topic_prefix}/remote_hosts/+"), QoS::AtLeastOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap();
@ -113,16 +120,27 @@ pub async fn start_mqtt_client(
}
Incoming::Publish(event) => {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
match segments[0] {
"remote_hosts" => {
let host_id = segments[1].to_owned().into_boxed_str();
coordinator_commands_sender
.send(CoordinatorCommand::SetRemoteHostIsActive {
host_id,
is_active: event.payload == run_id.as_ref(),
})
.unwrap();
}
"keys" => {
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
if property == "style" {
if let Ok(position) = KeyPosition::from_str(position) {
if let Ok(value) = serde_json::from_slice::<Option<KeyStyle>>(&event.payload) {
commands_sender
handler_commands_sender
.send_async(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
@ -141,11 +159,15 @@ pub async fn start_mqtt_client(
}
}
"knobs" => {
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
if let Ok(position) = KnobPosition::from_str(position) {
match property {
"style" => {
if let Ok(value) = serde_json::from_slice::<Option<KnobStyle>>(&event.payload) {
commands_sender
handler_commands_sender
.send_async(HandlerCommand::SetKnobStyle {
path: KnobPath {
page_id: page_id.to_owned(),
@ -159,7 +181,7 @@ pub async fn start_mqtt_client(
}
"value" => {
if let Ok(value) = serde_json::from_slice::<Option<f32>>(&event.payload) {
commands_sender
handler_commands_sender
.send_async(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),

View file

@ -16,7 +16,14 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
let events_sender = broadcast::Sender::<HandlerEvent>::new(5);
log::info!("Initializing MQTT client…");
mqtt::start_mqtt_client(&config.mqtt, handler_hosts_config_sender, commands_receiver, events_sender.clone()).await;
mqtt::start_mqtt_client(
config.host_id.clone(),
&config.mqtt,
handler_hosts_config_sender,
commands_receiver,
events_sender.clone(),
)
.await;
log::info!("Waiting for initial configuration…");

View file

@ -1,7 +1,7 @@
use std::str::FromStr;
use std::time::Duration;
use rumqttc::{Event, Incoming, MqttOptions, QoS};
use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
@ -11,17 +11,21 @@ use crate::model::mqtt::HandlerHostsConfig;
use crate::model::mqtt::MqttConfig;
pub async fn start_mqtt_client(
host_id: Box<str>,
config: &MqttConfig,
handler_hosts_config_sender: mpsc::Sender<Option<HandlerHostsConfig>>,
commands_receiver: flume::Receiver<HandlerCommand>,
events_sender: broadcast::Sender<HandlerEvent>,
) {
let topic_prefix = config.topic_prefix.to_owned();
let activeness_topic = format!("{topic_prefix}/remote_hosts/{host_id}");
let mut options = MqttOptions::new(&config.client_id, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(true);
options.set_last_will(LastWill::new(activeness_topic.clone(), "", QoS::AtLeastOnce, true));
if let Some(credentials) = &config.credentials {
options.set_credentials(&credentials.username, &credentials.password);
}
@ -69,7 +73,14 @@ pub async fn start_mqtt_client(
let topic_segments = topic_name.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
if topic_segments[0] == "config" {
if let Ok(config) = serde_json::from_slice(&event.payload) {
if let Ok(config) = serde_json::from_slice::<Option<HandlerHostsConfig>>(&event.payload) {
if let Some(c) = &config {
client
.publish(activeness_topic.clone(), QoS::AtLeastOnce, true, c.run_id.to_owned().into_boxed_bytes())
.await
.unwrap();
}
handler_hosts_config_sender.send(config).await.unwrap();
} else {
log::error!("Could not deserialize the latest configuration from {}", topic_name);