Refactor MQTT and implement command buttons

This commit is contained in:
Moritz Ruth 2023-03-01 13:00:23 +01:00
parent b2cbd90d26
commit ed0291ffa5
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
6 changed files with 100 additions and 43 deletions

View file

@ -2,9 +2,8 @@
> Integrates any Linux machine into your Home Assistant ecosystem. > Integrates any Linux machine into your Home Assistant ecosystem.
## Features ## Features
- [ ] Triggers - [ ] Fallback MQTT broker address
- [ ] shutdown, reboot - [x] Command buttons
- [ ] Custom commands
- [ ] Notifications - [ ] Notifications
- [ ] Actions - [ ] Actions
- [ ] System stats - [ ] System stats

View file

@ -11,6 +11,8 @@ use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use validator::{Validate, ValidationError}; use validator::{Validate, ValidationError};
use crate::modules;
#[derive(Serialize, Deserialize, Validate)] #[derive(Serialize, Deserialize, Validate)]
pub struct Mqtt { pub struct Mqtt {
#[validate(length(min = 1))] #[validate(length(min = 1))]
@ -35,20 +37,26 @@ pub struct Internal {
} }
#[derive(Serialize, Deserialize, Validate)] #[derive(Serialize, Deserialize, Validate)]
pub struct Config { pub(crate) struct Config {
#[validate(custom = "validate_unique_id")] #[validate(custom = "validate_unique_id")]
pub unique_id: String, pub unique_id: String,
#[validate(length(min = 1))] #[validate(length(min = 1))]
pub display_name: String, pub display_name: String,
pub announce_mac_address: bool, pub announce_mac_address: bool,
#[validate] #[validate]
pub mqtt: Mqtt, pub mqtt: Mqtt,
#[serde(rename = "DO_NOT_CHANGE")] #[serde(rename = "DO_NOT_CHANGE")]
#[validate] #[validate]
pub internal: Internal, pub internal: Internal,
#[validate]
pub command_buttons: Option<modules::command_buttons::Config>,
} }
fn validate_unique_id(value: &str) -> Result<(), ValidationError> { pub(crate) fn validate_unique_id(value: &str) -> Result<(), ValidationError> {
if Regex::new(r"^[a-zA-Z0-9]+(_[a-zA-Z0-9]+)*$").unwrap().is_match(value) { if Regex::new(r"^[a-zA-Z0-9]+(_[a-zA-Z0-9]+)*$").unwrap().is_match(value) {
Ok(()) Ok(())
} else { } else {
@ -73,6 +81,10 @@ fn create_example_config() -> Config {
internal: Internal { internal: Internal {
stable_id: generate_unique_id(), stable_id: generate_unique_id(),
}, },
command_buttons: Some(modules::command_buttons::Config {
enabled: false,
buttons: Vec::new(),
})
} }
} }

View file

@ -71,9 +71,5 @@ async fn main() -> Result<()> {
modules::init_all(&mut module_context).await?; modules::init_all(&mut module_context).await?;
owned_topics_service mqtt::start_communication(&module_context, event_loop, owned_topics_service).await
.clear_old_and_save_new(module_context.mqtt.client, &module_context.mqtt.owned_topics)
.await?;
mqtt::start_communication(&module_context, event_loop).await
} }

View file

@ -1,24 +1,55 @@
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize};
use tokio::process::Command; use tokio::process::Command;
use validator::Validate;
use crate::config::validate_unique_id;
use super::ModuleContext; use super::ModuleContext;
const MODULE_ID: &str = "power"; const MODULE_ID: &str = "power";
const BUTTON_TRIGGER_TEXT: &str = "press"; const BUTTON_TRIGGER_TEXT: &str = "press";
pub(crate) async fn init(context: &mut ModuleContext<'_>) -> Result<()> { #[derive(Serialize, Deserialize, Validate, Clone)]
log::info!("Initializing…"); pub(crate) struct ButtonConfig {
#[validate(custom = "validate_unique_id")]
pub id: String,
init_command_button(context, "shutdown", "Shutdown", "shutdown -h now").await?; #[validate(length(min = 1))]
init_command_button(context, "reboot", "Reboot", "shutdown -r now").await?; pub name: String,
#[validate(length(min = 1))]
pub command: String,
#[serde(default)]
pub run_in_shell: bool,
}
#[derive(Serialize, Deserialize, Validate)]
pub(crate) struct Config {
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub buttons: Vec<ButtonConfig>,
}
pub(crate) async fn init(context: &mut ModuleContext<'_>) -> Result<()> {
let config = match &context.config.command_buttons {
Some(c) if c.enabled => c,
_ => return Ok(())
};
log::info!("Initializing…");
for button in config.buttons.iter() {
init_command_button(context, button.clone()).await?;
}
Ok(()) Ok(())
} }
async fn init_command_button(context: &mut ModuleContext<'_>, sub_id: &str, name: &str, command: impl Into<String>) -> Result<()> { async fn init_command_button(context: &mut ModuleContext<'_>, config: ButtonConfig) -> Result<()> {
let command = command.into(); let entity_id = context.get_entity_id(MODULE_ID, &config.id);
let entity_id = context.get_entity_id(MODULE_ID, sub_id);
let command_topic = context.mqtt.get_topic("button", &entity_id, "trigger"); let command_topic = context.mqtt.get_topic("button", &entity_id, "trigger");
context context
@ -30,7 +61,7 @@ async fn init_command_button(context: &mut ModuleContext<'_>, sub_id: &str, name
"command_topic": command_topic.as_str(), "command_topic": command_topic.as_str(),
"device": context.mqtt.discovery_device_object.clone(), "device": context.mqtt.discovery_device_object.clone(),
"icon": "mdi:power", "icon": "mdi:power",
"name": name, "name": config.name,
"payload_press": BUTTON_TRIGGER_TEXT, "payload_press": BUTTON_TRIGGER_TEXT,
"object_id": entity_id.as_str(), "object_id": entity_id.as_str(),
"unique_id": entity_id.as_str() "unique_id": entity_id.as_str()
@ -40,7 +71,9 @@ async fn init_command_button(context: &mut ModuleContext<'_>, sub_id: &str, name
context.mqtt.subscribe(command_topic, move |text| { context.mqtt.subscribe(command_topic, move |text| {
if text == BUTTON_TRIGGER_TEXT { if text == BUTTON_TRIGGER_TEXT {
run_command(command.clone()); run_command(config.command.clone(), config.run_in_shell);
} else {
log::warn!("Received invalid trigger text for button {}", config.id)
} }
Ok(()) Ok(())
@ -49,7 +82,7 @@ async fn init_command_button(context: &mut ModuleContext<'_>, sub_id: &str, name
Ok(()) Ok(())
} }
fn run_command(command: String) { fn run_command(command: String, in_shell: bool) {
tokio::spawn(async move { tokio::spawn(async move {
let is_dry_run = cfg!(feature = "dry_run"); let is_dry_run = cfg!(feature = "dry_run");
@ -57,8 +90,16 @@ fn run_command(command: String) {
log::info!("Executing command{}: {}", if is_dry_run { " (dry run)" } else { "" }, command); log::info!("Executing command{}: {}", if is_dry_run { " (dry run)" } else { "" }, command);
let mut command_parts = command.split(' ').collect::<Vec<_>>(); let mut command_parts = command.split(' ').collect::<Vec<_>>();
let mut actual_command = Command::new(command_parts[0]); let mut actual_command = if in_shell {
command_parts.remove(0); let mut c = Command::new("/bin/sh");
c.arg("-lc");
c
} else {
let c = Command::new(command_parts[0]);
command_parts.remove(0);
c
};
actual_command.args(command_parts); actual_command.args(command_parts);
if is_dry_run { if is_dry_run {

View file

@ -4,7 +4,7 @@ use anyhow::Result;
use json::JsonValue; use json::JsonValue;
use rumqttc::{AsyncClient as MqttClient, ClientError, QoS}; use rumqttc::{AsyncClient as MqttClient, ClientError, QoS};
pub mod power; pub mod command_buttons;
type MqttMessageHandler<'a> = dyn Fn(&str) -> Result<()> + 'a; type MqttMessageHandler<'a> = dyn Fn(&str) -> Result<()> + 'a;
@ -18,7 +18,7 @@ pub struct ModuleContextMqtt<'a> {
pub owned_topics: HashSet<String>, pub owned_topics: HashSet<String>,
} }
pub struct ModuleContext<'a> { pub(crate) struct ModuleContext<'a> {
pub config: &'a super::config::Config, pub config: &'a super::config::Config,
pub mqtt: ModuleContextMqtt<'a>, pub mqtt: ModuleContextMqtt<'a>,
} }
@ -47,7 +47,7 @@ impl<'a> ModuleContextMqtt<'a> {
} }
} }
pub async fn init_all(context: &mut ModuleContext<'_>) -> Result<()> { pub(crate) async fn init_all(context: &mut ModuleContext<'_>) -> Result<()> {
power::init(context).await?; command_buttons::init(context).await?;
Ok(()) Ok(())
} }

View file

@ -16,7 +16,7 @@ use crate::modules::ModuleContext;
use super::config; use super::config;
pub async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(MqttClient, EventLoop)> { pub(crate) async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(MqttClient, EventLoop)> {
let mut options = MqttOptions::new(&config.internal.stable_id, config.mqtt.host.to_owned(), config.mqtt.port); let mut options = MqttOptions::new(&config.internal.stable_id, config.mqtt.host.to_owned(), config.mqtt.port);
options.set_clean_session(true); options.set_clean_session(true);
options.set_keep_alive(Duration::from_secs(5)); options.set_keep_alive(Duration::from_secs(5));
@ -28,7 +28,7 @@ pub async fn create_client(config: &config::Config, availability_topic: &str) ->
Ok((mqtt_client, event_loop)) Ok((mqtt_client, event_loop))
} }
pub fn create_discovery_device_object(config: &config::Config) -> JsonValue { pub(crate) fn create_discovery_device_object(config: &config::Config) -> JsonValue {
json::object! { json::object! {
"connections": if config.announce_mac_address { "connections": if config.announce_mac_address {
mac_address::get_mac_address().unwrap_or(None).map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![]) mac_address::get_mac_address().unwrap_or(None).map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![])
@ -45,7 +45,7 @@ pub struct OwnedTopicsService {
} }
impl OwnedTopicsService { impl OwnedTopicsService {
pub async fn new(data_directory_path: &Path) -> Result<OwnedTopicsService> { pub(crate) async fn new(data_directory_path: &Path) -> Result<OwnedTopicsService> {
let path = data_directory_path.join("owned_topics"); let path = data_directory_path.join("owned_topics");
let mut file = OpenOptions::new().write(true).read(true).create(true).open(path).await?; let mut file = OpenOptions::new().write(true).read(true).create(true).open(path).await?;
@ -57,7 +57,7 @@ impl OwnedTopicsService {
Ok(OwnedTopicsService { file, old_topics }) Ok(OwnedTopicsService { file, old_topics })
} }
pub async fn clear_old_and_save_new(mut self, mqtt_client: &MqttClient, new_topics: &HashSet<String>) -> Result<()> { pub(crate) async fn clear_old_and_save_new(mut self, mqtt_client: &MqttClient, new_topics: &HashSet<String>) -> Result<()> {
let unused_topics = self.old_topics.difference(new_topics).map(|s| s.to_owned()).collect::<Vec<_>>(); let unused_topics = self.old_topics.difference(new_topics).map(|s| s.to_owned()).collect::<Vec<_>>();
log::info!( log::info!(
@ -100,22 +100,25 @@ const FAST_RETRYING_INTERVAL_MS: u64 = 500;
const FAST_RETRYING_LIMIT_SECONDS: u64 = 15; const FAST_RETRYING_LIMIT_SECONDS: u64 = 15;
const SLOW_RETRYING_INTERVAL_SECONDS: u64 = 5; const SLOW_RETRYING_INTERVAL_SECONDS: u64 = 5;
pub async fn start_communication(context: &ModuleContext<'_>, mut event_loop: EventLoop) -> Result<()> { pub(crate) async fn start_communication(context: &ModuleContext<'_>, mut event_loop: EventLoop, owned_topics_service: OwnedTopicsService) -> Result<()> {
log::info!("Connecting to MQTT broker at {}:{}", context.config.mqtt.host, context.config.mqtt.port); log::info!("Connecting to MQTT broker at {}:{}", context.config.mqtt.host, context.config.mqtt.port);
context if !context.mqtt.message_handler_by_topic.is_empty() {
.mqtt context
.client .mqtt
.subscribe_many( .client
context .subscribe_many(
.mqtt context
.message_handler_by_topic .mqtt
.keys() .message_handler_by_topic
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)), .keys()
) .map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
.await?; )
.await?;
}
let mut connection_state = ConnectionState::NotConnected; let mut connection_state = ConnectionState::NotConnected;
let mut owned_topics_service = Some(owned_topics_service);
loop { loop {
match connection_state { match connection_state {
@ -128,6 +131,7 @@ pub async fn start_communication(context: &ModuleContext<'_>, mut event_loop: Ev
FAST_RETRYING_LIMIT_SECONDS, FAST_RETRYING_LIMIT_SECONDS,
SLOW_RETRYING_INTERVAL_SECONDS SLOW_RETRYING_INTERVAL_SECONDS
); );
connection_state = ConnectionState::SlowRetrying; connection_state = ConnectionState::SlowRetrying;
} }
} }
@ -168,6 +172,11 @@ pub async fn start_communication(context: &ModuleContext<'_>, mut event_loop: Ev
log::info!("Connection restored") log::info!("Connection restored")
} }
if let Some(service) = owned_topics_service.take() {
service.clear_old_and_save_new(context.mqtt.client, &context.mqtt.owned_topics)
.await?;
}
connection_state = ConnectionState::Connected; connection_state = ConnectionState::Connected;
} }