From b150e4c114fbdd852eb8bbde9fa80f22689c610a Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 1 Mar 2024 01:54:11 +0100 Subject: [PATCH] commit --- src/coordinator/io_worker.rs | 17 ++++++++++------ src/coordinator/mod.rs | 9 ++++++++- src/coordinator/mqtt.rs | 38 ++++++++++++++++++++++++++++-------- src/handler_host/mod.rs | 9 ++++++++- src/handler_host/mqtt.rs | 15 ++++++++++++-- 5 files changed, 70 insertions(+), 18 deletions(-) diff --git a/src/coordinator/io_worker.rs b/src/coordinator/io_worker.rs index 95ca760..049aeff 100644 --- a/src/coordinator/io_worker.rs +++ b/src/coordinator/io_worker.rs @@ -25,6 +25,7 @@ use crate::model::position::ButtonPosition; pub enum CoordinatorCommand { SetActivePages { key_page_id: String, knob_page_id: String }, SetRemoteHostIsActive { host_id: Box, is_active: bool }, + SetAllRemoteHostsInactive, } enum IoWork { @@ -224,11 +225,15 @@ fn handle_coordinator_command(context: &mut IoWorkerContext, command: Coordinato for button in context.device.characteristics().available_buttons { context .device - .set_button_color(button, get_correct_button_color(context, &context.state, ButtonPosition::of(&button))) + .set_button_color(button, get_correct_button_color(context, ButtonPosition::of(&button))) .expect("the button is available for this device because that is literally what we are iterating over"); } - redraw_visible_page(&context); + redraw_visible_page(context); + } + CoordinatorCommand::SetAllRemoteHostsInactive => { + context.state.active_remote_handler_host_ids.clear(); + redraw_visible_page(context); } CoordinatorCommand::SetRemoteHostIsActive { host_id, is_active } => { if is_active { @@ -237,7 +242,7 @@ fn handle_coordinator_command(context: &mut IoWorkerContext, command: Coordinato context.state.active_remote_handler_host_ids.remove(&host_id); } - redraw_visible_page(&context); + redraw_visible_page(context); } } } @@ -295,13 +300,13 @@ fn redraw_visible_page(context: &IoWorkerContext) { // active -> config.active_button_color // no actions defined -> #000000 // inactive -> config.inactive_button_color -fn get_correct_button_color(context: &IoWorkerContext, state: &State, button_position: ButtonPosition) -> RGB8 { +fn get_correct_button_color(context: &IoWorkerContext, button_position: ButtonPosition) -> RGB8 { let button_config = &context.config.buttons[button_position]; if let Some(key_page) = &button_config.key_page { - if key_page == &state.active_key_page_id { + if key_page == &context.state.active_key_page_id { if let Some(knob_page) = &button_config.knob_page { - if knob_page == &state.active_knob_page_id { + if knob_page == &context.state.active_knob_page_id { return context.config.active_button_color.into(); } } diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs index 147b2e4..161a889 100644 --- a/src/coordinator/mod.rs +++ b/src/coordinator/mod.rs @@ -91,7 +91,14 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { if let Some(mqtt_config) = &config.mqtt { log::info!("Initializing MQTT client…"); - start_mqtt_client(mqtt_config, &handler_hosts_config, handler_commands_sender.clone(), events_sender.subscribe()).await; + start_mqtt_client( + mqtt_config, + &handler_hosts_config, + coordinator_commands_sender.clone(), + handler_commands_sender.clone(), + events_sender.subscribe(), + ) + .await; } log::info!("Initializing handler processes…"); diff --git a/src/coordinator/mqtt.rs b/src/coordinator/mqtt.rs index 8ddc994..e396114 100644 --- a/src/coordinator/mqtt.rs +++ b/src/coordinator/mqtt.rs @@ -4,6 +4,7 @@ use std::time::Duration; use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; use tokio::sync::broadcast; +use crate::coordinator::io_worker::CoordinatorCommand; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; use deckster_shared::path::{KeyPath, KeyPosition, KnobPath, KnobPosition}; use deckster_shared::style::{KeyStyle, KnobStyle}; @@ -13,7 +14,8 @@ use crate::model::mqtt::{HandlerHostsConfig, MqttConfig}; pub async fn start_mqtt_client( config: &MqttConfig, handler_hosts_config: &HandlerHostsConfig, - commands_sender: flume::Sender, + coordinator_commands_sender: flume::Sender, + handler_commands_sender: flume::Sender, mut events_receiver: broadcast::Receiver, ) { let topic_prefix = config.topic_prefix.to_owned(); @@ -46,7 +48,7 @@ pub async fn start_mqtt_client( while let Ok(event) = events_receiver.recv().await { match event { HandlerEvent::Stop => { - // TODO + unreachable!("Not expected to ever be sent on the coordinator side") } HandlerEvent::Key { path, event } => { client @@ -75,6 +77,8 @@ pub async fn start_mqtt_client( } }); + let run_id = handler_hosts_config.run_id.clone().into_boxed_bytes(); + tokio::spawn({ let topic_prefix = topic_prefix.clone(); let client = client.clone(); @@ -91,6 +95,8 @@ pub async fn start_mqtt_client( is_first_try = false; } else if is_connected { log::error!("MQTT connection lost: {error}"); + + coordinator_commands_sender.send(CoordinatorCommand::SetAllRemoteHostsInactive).unwrap(); } is_connected = false; @@ -102,6 +108,7 @@ pub async fn start_mqtt_client( is_connected = true; is_first_try = false; + client.subscribe(format!("{topic_prefix}/remote_hosts/+"), QoS::AtLeastOnce).await.unwrap(); client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap(); client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap(); client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap(); @@ -113,16 +120,27 @@ pub async fn start_mqtt_client( } Incoming::Publish(event) => { let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); - let page_id = segments[1]; - let position = segments[2]; - let property = segments[3]; match segments[0] { + "remote_hosts" => { + let host_id = segments[1].to_owned().into_boxed_str(); + + coordinator_commands_sender + .send(CoordinatorCommand::SetRemoteHostIsActive { + host_id, + is_active: event.payload == run_id.as_ref(), + }) + .unwrap(); + } "keys" => { + let page_id = segments[1]; + let position = segments[2]; + let property = segments[3]; + if property == "style" { if let Ok(position) = KeyPosition::from_str(position) { if let Ok(value) = serde_json::from_slice::>(&event.payload) { - commands_sender + handler_commands_sender .send_async(HandlerCommand::SetKeyStyle { path: KeyPath { page_id: page_id.to_owned(), @@ -141,11 +159,15 @@ pub async fn start_mqtt_client( } } "knobs" => { + let page_id = segments[1]; + let position = segments[2]; + let property = segments[3]; + if let Ok(position) = KnobPosition::from_str(position) { match property { "style" => { if let Ok(value) = serde_json::from_slice::>(&event.payload) { - commands_sender + handler_commands_sender .send_async(HandlerCommand::SetKnobStyle { path: KnobPath { page_id: page_id.to_owned(), @@ -159,7 +181,7 @@ pub async fn start_mqtt_client( } "value" => { if let Ok(value) = serde_json::from_slice::>(&event.payload) { - commands_sender + handler_commands_sender .send_async(HandlerCommand::SetKnobValue { path: KnobPath { page_id: page_id.to_owned(), diff --git a/src/handler_host/mod.rs b/src/handler_host/mod.rs index 5cf0035..1ce2012 100644 --- a/src/handler_host/mod.rs +++ b/src/handler_host/mod.rs @@ -16,7 +16,14 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { let events_sender = broadcast::Sender::::new(5); log::info!("Initializing MQTT client…"); - mqtt::start_mqtt_client(&config.mqtt, handler_hosts_config_sender, commands_receiver, events_sender.clone()).await; + mqtt::start_mqtt_client( + config.host_id.clone(), + &config.mqtt, + handler_hosts_config_sender, + commands_receiver, + events_sender.clone(), + ) + .await; log::info!("Waiting for initial configuration…"); diff --git a/src/handler_host/mqtt.rs b/src/handler_host/mqtt.rs index 5f1a47e..6a1f192 100644 --- a/src/handler_host/mqtt.rs +++ b/src/handler_host/mqtt.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use std::time::Duration; -use rumqttc::{Event, Incoming, MqttOptions, QoS}; +use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; use tokio::sync::{broadcast, mpsc}; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; @@ -11,17 +11,21 @@ use crate::model::mqtt::HandlerHostsConfig; use crate::model::mqtt::MqttConfig; pub async fn start_mqtt_client( + host_id: Box, config: &MqttConfig, handler_hosts_config_sender: mpsc::Sender>, commands_receiver: flume::Receiver, events_sender: broadcast::Sender, ) { let topic_prefix = config.topic_prefix.to_owned(); + let activeness_topic = format!("{topic_prefix}/remote_hosts/{host_id}"); let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); options.set_keep_alive(Duration::from_secs(3)); options.set_clean_session(true); + options.set_last_will(LastWill::new(activeness_topic.clone(), "", QoS::AtLeastOnce, true)); + if let Some(credentials) = &config.credentials { options.set_credentials(&credentials.username, &credentials.password); } @@ -69,7 +73,14 @@ pub async fn start_mqtt_client( let topic_segments = topic_name.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); if topic_segments[0] == "config" { - if let Ok(config) = serde_json::from_slice(&event.payload) { + if let Ok(config) = serde_json::from_slice::>(&event.payload) { + if let Some(c) = &config { + client + .publish(activeness_topic.clone(), QoS::AtLeastOnce, true, c.run_id.to_owned().into_boxed_bytes()) + .await + .unwrap(); + } + handler_hosts_config_sender.send(config).await.unwrap(); } else { log::error!("Could not deserialize the latest configuration from {}", topic_name);