diff --git a/src/coordinator/mqtt.rs b/src/coordinator/mqtt.rs index b7e5a06..7c166d9 100644 --- a/src/coordinator/mqtt.rs +++ b/src/coordinator/mqtt.rs @@ -1,7 +1,8 @@ use std::str::FromStr; use std::time::Duration; -use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; +use log::{error, info}; +use rumqttc::{ConnectionError, Event, Incoming, LastWill, MqttOptions, QoS}; use tokio::sync::broadcast; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; @@ -19,6 +20,8 @@ pub async fn start_mqtt_client( let topic_prefix = config.topic_prefix.to_owned(); let config_topic = format!("{topic_prefix}/config"); + let handler_hosts_config_json = serde_json::to_string(handler_hosts_config).unwrap().into_bytes(); + let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); options.set_keep_alive(Duration::from_secs(3)); options.set_clean_session(true); @@ -36,10 +39,6 @@ pub async fn start_mqtt_client( let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); - client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap(); - tokio::spawn({ let topic_prefix = topic_prefix.clone(); let client = client.clone(); @@ -79,76 +78,106 @@ pub async fn start_mqtt_client( tokio::spawn({ let topic_prefix = topic_prefix.clone(); + let client = client.clone(); async move { - while let Ok(event) = event_loop.poll().await { - if let Event::Incoming(Incoming::Publish(event)) = event { - let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); - let page_id = segments[1]; - let position = segments[2]; - let property = segments[3]; + let mut is_connected = false; + let mut is_first_try = true; - match segments[0] { - "keys" => { - if property == "style" { - let value = serde_json::from_slice::>(&event.payload).unwrap(); - - commands_sender - .send_async(HandlerCommand::SetKeyStyle { - path: KeyPath { - page_id: page_id.to_owned(), - position: KeyPosition::from_str(position).unwrap(), - }, - value, - }) - .await - .unwrap(); - } + loop { + match event_loop.poll().await { + Err(error) => { + if is_first_try { + error!("MQTT connection failed (retrying in the background): {error}"); + is_first_try = false; + } else if is_connected { + error!("MQTT connection lost: {error}"); } - "knobs" => { - let position = KnobPosition::from_str(position).unwrap(); - match property { - "style" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); + is_connected = false; + tokio::time::sleep(Duration::from_secs(2)).await; + } + Ok(Event::Incoming(event)) => match event { + Incoming::ConnAck(_) => { + info!("MQTT connection established."); + is_connected = true; + is_first_try = false; - commands_sender - .send_async(HandlerCommand::SetKnobStyle { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - value, - }) - .await - .unwrap(); + client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap(); + + client + .publish(config_topic.clone(), QoS::ExactlyOnce, true, handler_hosts_config_json.clone()) + .await + .unwrap() + } + Incoming::Publish(event) => { + let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + let page_id = segments[1]; + let position = segments[2]; + let property = segments[3]; + + match segments[0] { + "keys" => { + if property == "style" { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKeyStyle { + path: KeyPath { + page_id: page_id.to_owned(), + position: KeyPosition::from_str(position).unwrap(), + }, + value, + }) + .await + .unwrap(); + } } - "value" => { - let value = serde_json::from_slice::>(&event.payload).unwrap(); + "knobs" => { + let position = KnobPosition::from_str(position).unwrap(); - commands_sender - .send_async(HandlerCommand::SetKnobValue { - path: KnobPath { - page_id: page_id.to_owned(), - position, - }, - value, - }) - .await - .unwrap(); + match property { + "style" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKnobStyle { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + "value" => { + let value = serde_json::from_slice::>(&event.payload).unwrap(); + + commands_sender + .send_async(HandlerCommand::SetKnobValue { + path: KnobPath { + page_id: page_id.to_owned(), + position, + }, + value, + }) + .await + .unwrap(); + } + _ => {} + } } _ => {} - } + }; } _ => {} - }; + }, + _ => {} } } } }); - - client - .publish(config_topic, QoS::ExactlyOnce, true, serde_json::to_string(handler_hosts_config).unwrap()) - .await - .unwrap() } diff --git a/src/handler_host/mod.rs b/src/handler_host/mod.rs index ddd50f8..86b8a6a 100644 --- a/src/handler_host/mod.rs +++ b/src/handler_host/mod.rs @@ -1,7 +1,7 @@ use std::path::Path; use color_eyre::Result; -use log::{debug, info, warn}; +use log::{debug, info, trace, warn}; use tokio::sync::{broadcast, mpsc}; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; @@ -26,8 +26,11 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await { match handler_hosts_config { None => { - info!("Coordinator was stopped. Sending stop event to handlers."); - events_sender.send(HandlerEvent::Stop).unwrap(); + if is_running { + info!("Stopping handlers…"); + events_sender.send(HandlerEvent::Stop).unwrap(); + } + is_running = false; } Some(handler_hosts_config) => { @@ -39,7 +42,7 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { is_running = true; info!("Received new configuration. Starting handlers…"); - debug!("New configuration: {handler_hosts_config:#?}"); + trace!("New configuration: {handler_hosts_config:#?}"); handler_runner::start( config.host_id.clone(), @@ -52,7 +55,6 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> { } } } - dbg!("hey"); Ok(()) } diff --git a/src/handler_host/mqtt.rs b/src/handler_host/mqtt.rs index d08e2a2..5f5fca2 100644 --- a/src/handler_host/mqtt.rs +++ b/src/handler_host/mqtt.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use std::time::Duration; -use log::error; +use log::{error, info}; use rumqttc::{Event, Incoming, MqttOptions, QoS}; use tokio::sync::{broadcast, mpsc}; @@ -29,73 +29,93 @@ pub async fn start_mqtt_client( let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); - client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); - client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); - tokio::spawn({ let topic_prefix = topic_prefix.clone(); + let client = client.clone(); async move { + let mut is_connected = false; + let mut is_first_try = true; + loop { - let poll_result = event_loop.poll().await; - - match poll_result { + match event_loop.poll().await { Err(error) => { - error!("{error}") + if is_first_try { + error!("MQTT connection failed (retrying in the background): {error}"); + is_first_try = false; + } else if is_connected { + error!("MQTT connection lost: {error}"); + handler_hosts_config_sender.send(None).await.unwrap(); + } + + is_connected = false; + tokio::time::sleep(Duration::from_secs(2)).await; } - Ok(event) => { - if let Event::Incoming(Incoming::Publish(event)) = event { - let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + Ok(Event::Incoming(event)) => { + match event { + Incoming::ConnAck(_) => { + info!("MQTT connection established."); + is_connected = true; + is_first_try = false; - if topic_segments[0] == "config" { - if let Ok(config) = serde_json::from_slice(&event.payload) { - handler_hosts_config_sender.send(config).await.unwrap(); + client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap(); + } + Incoming::Publish(event) => { + let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::>(); + + if topic_segments[0] == "config" { + if let Ok(config) = serde_json::from_slice(&event.payload) { + handler_hosts_config_sender.send(config).await.unwrap(); + } else { + log::error!("Could not deserialize the latest configuration from {}", event.topic); + handler_hosts_config_sender.send(None).await.unwrap(); + }; } else { - log::error!("Could not deserialize the latest configuration from {}", event.topic); - handler_hosts_config_sender.send(None).await.unwrap(); - }; - } else { - let page_id = topic_segments[1]; - let position = topic_segments[2]; - let property = topic_segments[3]; + let page_id = topic_segments[1]; + let position = topic_segments[2]; + let property = topic_segments[3]; - match topic_segments[0] { - "keys" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - events_sender - .send(HandlerEvent::Key { - path: KeyPath { - page_id: page_id.to_owned(), - position: KeyPosition::from_str(position).unwrap(), - }, - event, - }) - .unwrap(); - } else { - log::error!("Could not deserialize the latest event from {}", event.topic); - }; + match topic_segments[0] { + "keys" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Key { + path: KeyPath { + page_id: page_id.to_owned(), + position: KeyPosition::from_str(position).unwrap(), + }, + event, + }) + .ok(); // This can be Err when events are received before the configuration + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; + } + "knobs" if property == "events" => { + if let Ok(event) = serde_json::from_slice(&event.payload) { + events_sender + .send(HandlerEvent::Knob { + path: KnobPath { + page_id: page_id.to_owned(), + position: KnobPosition::from_str(position).unwrap(), + }, + event, + }) + .ok(); // This can be Err when events are received before the configuration + } else { + log::error!("Could not deserialize the latest event from {}", event.topic); + }; + } + _ => {} } - "knobs" if property == "events" => { - if let Ok(event) = serde_json::from_slice(&event.payload) { - events_sender - .send(HandlerEvent::Knob { - path: KnobPath { - page_id: page_id.to_owned(), - position: KnobPosition::from_str(position).unwrap(), - }, - event, - }) - .unwrap(); - } else { - log::error!("Could not deserialize the latest event from {}", event.topic); - }; - } - _ => {} } } + _ => {} } } + _ => {} } } }