This commit is contained in:
Moritz Ruth 2024-02-29 19:24:55 +01:00
parent 19a595c21b
commit 0340dddcab
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
3 changed files with 170 additions and 119 deletions

View file

@ -1,7 +1,8 @@
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use rumqttc::{Event, Incoming, LastWill, MqttOptions, QoS}; use log::{error, info};
use rumqttc::{ConnectionError, Event, Incoming, LastWill, MqttOptions, QoS};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
@ -19,6 +20,8 @@ pub async fn start_mqtt_client(
let topic_prefix = config.topic_prefix.to_owned(); let topic_prefix = config.topic_prefix.to_owned();
let config_topic = format!("{topic_prefix}/config"); let config_topic = format!("{topic_prefix}/config");
let handler_hosts_config_json = serde_json::to_string(handler_hosts_config).unwrap().into_bytes();
let mut options = MqttOptions::new(&config.client_id, &config.host, config.port); let mut options = MqttOptions::new(&config.client_id, &config.host, config.port);
options.set_keep_alive(Duration::from_secs(3)); options.set_keep_alive(Duration::from_secs(3));
options.set_clean_session(true); options.set_clean_session(true);
@ -36,10 +39,6 @@ pub async fn start_mqtt_client(
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({ tokio::spawn({
let topic_prefix = topic_prefix.clone(); let topic_prefix = topic_prefix.clone();
let client = client.clone(); let client = client.clone();
@ -79,76 +78,106 @@ pub async fn start_mqtt_client(
tokio::spawn({ tokio::spawn({
let topic_prefix = topic_prefix.clone(); let topic_prefix = topic_prefix.clone();
let client = client.clone();
async move { async move {
while let Ok(event) = event_loop.poll().await { let mut is_connected = false;
if let Event::Incoming(Incoming::Publish(event)) = event { let mut is_first_try = true;
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
match segments[0] { loop {
"keys" => { match event_loop.poll().await {
if property == "style" { Err(error) => {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap(); if is_first_try {
error!("MQTT connection failed (retrying in the background): {error}");
commands_sender is_first_try = false;
.send_async(HandlerCommand::SetKeyStyle { } else if is_connected {
path: KeyPath { error!("MQTT connection lost: {error}");
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.await
.unwrap();
}
} }
"knobs" => {
let position = KnobPosition::from_str(position).unwrap();
match property { is_connected = false;
"style" => { tokio::time::sleep(Duration::from_secs(2)).await;
let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap(); }
Ok(Event::Incoming(event)) => match event {
Incoming::ConnAck(_) => {
info!("MQTT connection established.");
is_connected = true;
is_first_try = false;
commands_sender client.subscribe(format!("{topic_prefix}/keys/+/+/style"), QoS::ExactlyOnce).await.unwrap();
.send_async(HandlerCommand::SetKnobStyle { client.subscribe(format!("{topic_prefix}/knobs/+/+/style"), QoS::ExactlyOnce).await.unwrap();
path: KnobPath { client.subscribe(format!("{topic_prefix}/knobs/+/+/value"), QoS::ExactlyOnce).await.unwrap();
page_id: page_id.to_owned(),
position, client
}, .publish(config_topic.clone(), QoS::ExactlyOnce, true, handler_hosts_config_json.clone())
value, .await
}) .unwrap()
.await }
.unwrap(); Incoming::Publish(event) => {
let segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
let page_id = segments[1];
let position = segments[2];
let property = segments[3];
match segments[0] {
"keys" => {
if property == "style" {
let value = serde_json::from_slice::<Option<KeyStyle>>(&event.payload).unwrap();
commands_sender
.send_async(HandlerCommand::SetKeyStyle {
path: KeyPath {
page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(),
},
value,
})
.await
.unwrap();
}
} }
"value" => { "knobs" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap(); let position = KnobPosition::from_str(position).unwrap();
commands_sender match property {
.send_async(HandlerCommand::SetKnobValue { "style" => {
path: KnobPath { let value = serde_json::from_slice::<Option<KnobStyle>>(&event.payload).unwrap();
page_id: page_id.to_owned(),
position, commands_sender
}, .send_async(HandlerCommand::SetKnobStyle {
value, path: KnobPath {
}) page_id: page_id.to_owned(),
.await position,
.unwrap(); },
value,
})
.await
.unwrap();
}
"value" => {
let value = serde_json::from_slice::<Option<f32>>(&event.payload).unwrap();
commands_sender
.send_async(HandlerCommand::SetKnobValue {
path: KnobPath {
page_id: page_id.to_owned(),
position,
},
value,
})
.await
.unwrap();
}
_ => {}
}
} }
_ => {} _ => {}
} };
} }
_ => {} _ => {}
}; },
_ => {}
} }
} }
} }
}); });
client
.publish(config_topic, QoS::ExactlyOnce, true, serde_json::to_string(handler_hosts_config).unwrap())
.await
.unwrap()
} }

View file

@ -1,7 +1,7 @@
use std::path::Path; use std::path::Path;
use color_eyre::Result; use color_eyre::Result;
use log::{debug, info, warn}; use log::{debug, info, trace, warn};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent}; use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
@ -26,8 +26,11 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await { while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await {
match handler_hosts_config { match handler_hosts_config {
None => { None => {
info!("Coordinator was stopped. Sending stop event to handlers."); if is_running {
events_sender.send(HandlerEvent::Stop).unwrap(); info!("Stopping handlers…");
events_sender.send(HandlerEvent::Stop).unwrap();
}
is_running = false; is_running = false;
} }
Some(handler_hosts_config) => { Some(handler_hosts_config) => {
@ -39,7 +42,7 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
is_running = true; is_running = true;
info!("Received new configuration. Starting handlers…"); info!("Received new configuration. Starting handlers…");
debug!("New configuration: {handler_hosts_config:#?}"); trace!("New configuration: {handler_hosts_config:#?}");
handler_runner::start( handler_runner::start(
config.host_id.clone(), config.host_id.clone(),
@ -52,7 +55,6 @@ pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
} }
} }
} }
dbg!("hey");
Ok(()) Ok(())
} }

View file

@ -1,7 +1,7 @@
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use log::error; use log::{error, info};
use rumqttc::{Event, Incoming, MqttOptions, QoS}; use rumqttc::{Event, Incoming, MqttOptions, QoS};
use tokio::sync::{broadcast, mpsc}; use tokio::sync::{broadcast, mpsc};
@ -29,73 +29,93 @@ pub async fn start_mqtt_client(
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32); let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 32);
client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap();
client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap();
tokio::spawn({ tokio::spawn({
let topic_prefix = topic_prefix.clone(); let topic_prefix = topic_prefix.clone();
let client = client.clone();
async move { async move {
let mut is_connected = false;
let mut is_first_try = true;
loop { loop {
let poll_result = event_loop.poll().await; match event_loop.poll().await {
match poll_result {
Err(error) => { Err(error) => {
error!("{error}") if is_first_try {
error!("MQTT connection failed (retrying in the background): {error}");
is_first_try = false;
} else if is_connected {
error!("MQTT connection lost: {error}");
handler_hosts_config_sender.send(None).await.unwrap();
}
is_connected = false;
tokio::time::sleep(Duration::from_secs(2)).await;
} }
Ok(event) => { Ok(Event::Incoming(event)) => {
if let Event::Incoming(Incoming::Publish(event)) = event { match event {
let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>(); Incoming::ConnAck(_) => {
info!("MQTT connection established.");
is_connected = true;
is_first_try = false;
if topic_segments[0] == "config" { client.subscribe(format!("{topic_prefix}/config"), QoS::ExactlyOnce).await.unwrap();
if let Ok(config) = serde_json::from_slice(&event.payload) { client.subscribe(format!("{topic_prefix}/keys/+/+/events"), QoS::ExactlyOnce).await.unwrap();
handler_hosts_config_sender.send(config).await.unwrap(); client.subscribe(format!("{topic_prefix}/knobs/+/+/events"), QoS::ExactlyOnce).await.unwrap();
}
Incoming::Publish(event) => {
let topic_segments = event.topic.strip_prefix(&topic_prefix).unwrap().split('/').skip(1).collect::<Vec<&str>>();
if topic_segments[0] == "config" {
if let Ok(config) = serde_json::from_slice(&event.payload) {
handler_hosts_config_sender.send(config).await.unwrap();
} else {
log::error!("Could not deserialize the latest configuration from {}", event.topic);
handler_hosts_config_sender.send(None).await.unwrap();
};
} else { } else {
log::error!("Could not deserialize the latest configuration from {}", event.topic); let page_id = topic_segments[1];
handler_hosts_config_sender.send(None).await.unwrap(); let position = topic_segments[2];
}; let property = topic_segments[3];
} else {
let page_id = topic_segments[1];
let position = topic_segments[2];
let property = topic_segments[3];
match topic_segments[0] { match topic_segments[0] {
"keys" if property == "events" => { "keys" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) { if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender events_sender
.send(HandlerEvent::Key { .send(HandlerEvent::Key {
path: KeyPath { path: KeyPath {
page_id: page_id.to_owned(), page_id: page_id.to_owned(),
position: KeyPosition::from_str(position).unwrap(), position: KeyPosition::from_str(position).unwrap(),
}, },
event, event,
}) })
.unwrap(); .ok(); // This can be Err when events are received before the configuration
} else { } else {
log::error!("Could not deserialize the latest event from {}", event.topic); log::error!("Could not deserialize the latest event from {}", event.topic);
}; };
}
"knobs" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Knob {
path: KnobPath {
page_id: page_id.to_owned(),
position: KnobPosition::from_str(position).unwrap(),
},
event,
})
.ok(); // This can be Err when events are received before the configuration
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
_ => {}
} }
"knobs" if property == "events" => {
if let Ok(event) = serde_json::from_slice(&event.payload) {
events_sender
.send(HandlerEvent::Knob {
path: KnobPath {
page_id: page_id.to_owned(),
position: KnobPosition::from_str(position).unwrap(),
},
event,
})
.unwrap();
} else {
log::error!("Could not deserialize the latest event from {}", event.topic);
};
}
_ => {}
} }
} }
_ => {}
} }
} }
_ => {}
} }
} }
} }