Implement notifications

This commit is contained in:
Moritz Ruth 2023-03-05 18:01:34 +01:00
parent ed0291ffa5
commit d944132107
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
10 changed files with 1382 additions and 103 deletions

1016
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -10,17 +10,22 @@ dry_run = [] # will prevent some actions like shutting down
[dependencies]
anyhow = "1.0.69"
base64 = "0.21.0"
directories = "4.0.1"
env_logger = "0.10.0"
exitcode = "1.1.2"
image = "0.24.5"
json = "0.12.4"
lazy_static = "1.4.0"
log = "0.4.17"
mac_address = "1.1.4"
notify-rust = { version = "4.8.0", features = ["images"] }
rand = "0.8.5"
regex = "1.7.1"
rumqttc = "0.20.0"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
tokio = { version = "1.25.0", features = ["full"] }
toml = "0.7.2"
validator = { version = "0.16.0", features = ["derive"] }
void = "1.0.2"

View file

@ -4,8 +4,8 @@
## Features
- [ ] Fallback MQTT broker address
- [x] Command buttons
- [ ] Notifications
- [ ] Actions
- [x] Notifications
- [x] Actions
- [ ] System stats
- [ ] CPU usage
- [ ] RAM usage
@ -14,5 +14,11 @@
- [ ] PipeWire
- [ ] File watcher
Ideas:
- Camera video stream
- Idle time
## License
Hassliebe is licensed under the [Blue Oak Model License 1.0.0](/LICENSE.md).

View file

@ -5,13 +5,12 @@ use std::io::{ErrorKind, Read, Write};
use std::path::Path;
use anyhow::{bail, Context, Result};
use rand::distributions::Alphanumeric;
use rand::Rng;
use regex::Regex;
use serde::{Deserialize, Serialize};
use validator::{Validate, ValidationError};
use crate::modules;
use crate::util::generate_alphanumeric_id;
#[derive(Serialize, Deserialize, Validate)]
pub struct Mqtt {
@ -37,7 +36,16 @@ pub struct Internal {
}
#[derive(Serialize, Deserialize, Validate)]
pub(crate) struct Config {
pub struct Modules {
#[validate]
pub buttons: Option<modules::buttons::Config>,
#[validate]
pub notifications: Option<modules::notifications::Config>,
}
#[derive(Serialize, Deserialize, Validate)]
pub struct Config {
#[validate(custom = "validate_unique_id")]
pub unique_id: String,
@ -48,15 +56,15 @@ pub(crate) struct Config {
#[validate]
pub mqtt: Mqtt,
#[validate]
pub modules: Modules,
#[serde(rename = "DO_NOT_CHANGE")]
#[validate]
pub internal: Internal,
#[validate]
pub command_buttons: Option<modules::command_buttons::Config>,
}
pub(crate) fn validate_unique_id(value: &str) -> Result<(), ValidationError> {
pub fn validate_unique_id(value: &str) -> Result<(), ValidationError> {
if Regex::new(r"^[a-zA-Z0-9]+(_[a-zA-Z0-9]+)*$").unwrap().is_match(value) {
Ok(())
} else {
@ -64,10 +72,6 @@ pub(crate) fn validate_unique_id(value: &str) -> Result<(), ValidationError> {
}
}
fn generate_unique_id() -> String {
rand::thread_rng().sample_iter(&Alphanumeric).take(12).map(char::from).collect()
}
fn create_example_config() -> Config {
Config {
unique_id: "my_pc".to_owned(),
@ -79,16 +83,19 @@ fn create_example_config() -> Config {
credentials: None,
},
internal: Internal {
stable_id: generate_unique_id(),
stable_id: generate_alphanumeric_id(12),
},
modules: Modules {
buttons: Some(modules::buttons::Config {
enabled: false,
buttons: Vec::new(),
}),
notifications: Some(modules::notifications::Config { enabled: false }),
},
command_buttons: Some(modules::command_buttons::Config {
enabled: false,
buttons: Vec::new(),
})
}
}
pub(crate) fn load(config_file_path: &Path) -> Result<Option<Config>> {
pub fn load(config_file_path: &Path) -> Result<Option<Config>> {
match File::open(config_file_path) {
Ok(mut file) => {
log::info!("Reading config file: {}", config_file_path.to_string_lossy());
@ -108,7 +115,6 @@ pub(crate) fn load(config_file_path: &Path) -> Result<Option<Config>> {
}
let mut file = File::create(config_file_path).context("while creating the default config file")?;
let default = toml::to_string::<Config>(&create_example_config()).expect("create_example_config() should be valid");
file.write_all(default.as_bytes())?;

View file

@ -1,15 +1,17 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use crate::modules::{ModuleContext, ModuleContextMqtt};
use crate::modules::{InitializationContext, ModuleContext, ModuleContextMqtt};
use crate::mqtt::OwnedTopicsService;
mod config;
mod modules;
mod mqtt;
mod util;
struct Paths {
data_directory: Box<Path>,
@ -58,18 +60,21 @@ async fn main() -> Result<()> {
let owned_topics_service = OwnedTopicsService::new(&paths.data_directory).await?;
let mut module_context = ModuleContext {
config: &config,
mqtt: ModuleContextMqtt {
client: &mqtt_client,
availability_topic: availability_topic.as_str(),
discovery_device_object: &discovery_device_object,
message_handler_by_topic: HashMap::new(),
owned_topics: HashSet::new(),
},
let mut initialization_context = InitializationContext {
owned_mqtt_topics: HashSet::new(),
message_handler_by_mqtt_topic: HashMap::new(),
full: Arc::new(ModuleContext {
config,
mqtt: ModuleContextMqtt {
client: mqtt_client,
availability_topic,
discovery_device_object,
},
}),
};
modules::init_all(&mut module_context).await?;
modules::init_all(&mut initialization_context).await?;
mqtt::start_communication(&module_context, event_loop, owned_topics_service).await
mqtt::start_communication(&initialization_context, event_loop, owned_topics_service).await
}

View file

@ -4,14 +4,13 @@ use tokio::process::Command;
use validator::Validate;
use crate::config::validate_unique_id;
use crate::modules::InitializationContext;
use super::ModuleContext;
const MODULE_ID: &str = "power";
const MODULE_ID: &str = "buttons";
const BUTTON_TRIGGER_TEXT: &str = "press";
#[derive(Serialize, Deserialize, Validate, Clone)]
pub(crate) struct ButtonConfig {
pub struct ButtonConfig {
#[validate(custom = "validate_unique_id")]
pub id: String,
@ -26,7 +25,7 @@ pub(crate) struct ButtonConfig {
}
#[derive(Serialize, Deserialize, Validate)]
pub(crate) struct Config {
pub struct Config {
#[serde(default)]
pub enabled: bool,
@ -34,10 +33,11 @@ pub(crate) struct Config {
pub buttons: Vec<ButtonConfig>,
}
pub(crate) async fn init(context: &mut ModuleContext<'_>) -> Result<()> {
let config = match &context.config.command_buttons {
pub async fn init(context: &mut InitializationContext) -> Result<()> {
let full_context = context.get_full();
let config = match &full_context.config.modules.buttons {
Some(c) if c.enabled => c,
_ => return Ok(())
_ => return Ok(()),
};
log::info!("Initializing…");
@ -48,18 +48,17 @@ pub(crate) async fn init(context: &mut ModuleContext<'_>) -> Result<()> {
Ok(())
}
async fn init_command_button(context: &mut ModuleContext<'_>, config: ButtonConfig) -> Result<()> {
let entity_id = context.get_entity_id(MODULE_ID, &config.id);
let command_topic = context.mqtt.get_topic("button", &entity_id, "trigger");
async fn init_command_button(context: &mut InitializationContext, config: ButtonConfig) -> Result<()> {
let entity_id = context.full.get_entity_id(MODULE_ID, &config.id);
let command_topic = context.full.mqtt.get_homeassistant_topic("button", &entity_id, "trigger");
context
.mqtt
.send_retained_message(
context.mqtt.get_topic("button", &entity_id, "config"),
.send_retained_mqtt_message(
context.full.mqtt.get_homeassistant_topic("button", &entity_id, "config"),
json::stringify(json::object! {
"availability_topic": context.mqtt.availability_topic,
"availability_topic": context.full.mqtt.availability_topic.clone(),
"command_topic": command_topic.as_str(),
"device": context.mqtt.discovery_device_object.clone(),
"device": context.full.mqtt.discovery_device_object.clone(),
"icon": "mdi:power",
"name": config.name,
"payload_press": BUTTON_TRIGGER_TEXT,
@ -69,7 +68,7 @@ async fn init_command_button(context: &mut ModuleContext<'_>, config: ButtonConf
)
.await?;
context.mqtt.subscribe(command_topic, move |text| {
context.subscribe_mqtt_topic(command_topic, move |text| {
if text == BUTTON_TRIGGER_TEXT {
run_command(config.command.clone(), config.run_in_shell);
} else {

View file

@ -1,53 +1,66 @@
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::Result;
use json::JsonValue;
use rumqttc::{AsyncClient as MqttClient, ClientError, QoS};
pub mod command_buttons;
pub mod buttons;
pub mod notifications;
type MqttMessageHandler<'a> = dyn Fn(&str) -> Result<()> + 'a;
type MqttMessageHandler = dyn Fn(&str) -> Result<()>;
pub struct ModuleContextMqtt<'a> {
pub discovery_device_object: &'a JsonValue,
pub availability_topic: &'a str,
pub client: &'a MqttClient,
pub message_handler_by_topic: HashMap<String, Box<MqttMessageHandler<'a>>>,
pub struct ModuleContextMqtt {
pub discovery_device_object: JsonValue,
pub availability_topic: String,
pub client: MqttClient,
}
pub struct InitializationContext {
// Owned topics are topics which a retained message was sent into.
pub owned_topics: HashSet<String>,
pub owned_mqtt_topics: HashSet<String>,
pub message_handler_by_mqtt_topic: HashMap<String, Box<MqttMessageHandler>>,
pub full: Arc<ModuleContext>,
}
pub(crate) struct ModuleContext<'a> {
pub config: &'a super::config::Config,
pub mqtt: ModuleContextMqtt<'a>,
impl InitializationContext {
fn subscribe_mqtt_topic<F: Fn(&str) -> Result<()> + 'static>(&mut self, topic: impl Into<String>, handler: F) {
self.message_handler_by_mqtt_topic.insert(topic.into(), Box::new(handler));
}
async fn send_retained_mqtt_message(&mut self, topic: impl Into<String>, message: impl Into<String>) -> std::result::Result<(), ClientError> {
let topic = topic.into();
let message = message.into();
self.owned_mqtt_topics.insert(topic.to_owned());
self.full.mqtt.client.publish(topic, QoS::AtLeastOnce, true, message).await
}
fn get_full(&self) -> Arc<ModuleContext> {
self.full.clone()
}
}
impl<'a> ModuleContext<'a> {
pub struct ModuleContext {
pub config: super::config::Config,
pub mqtt: ModuleContextMqtt,
}
impl ModuleContext {
fn get_entity_id(&self, module_id: &str, sub_id: &str) -> String {
format!("{}_{}_{}", self.config.unique_id, module_id, sub_id)
}
}
impl<'a> ModuleContextMqtt<'a> {
fn get_topic(&self, component_type: &str, entity_id: &str, suffix: &str) -> String {
impl ModuleContextMqtt {
fn get_homeassistant_topic(&self, component_type: &str, entity_id: &str, suffix: &str) -> String {
format!("homeassistant/{}/{}/{}", component_type, entity_id, suffix)
}
fn subscribe<F: Fn(&str) -> Result<()> + 'a>(&mut self, topic: impl Into<String>, handler: F) {
self.message_handler_by_topic.insert(topic.into(), Box::new(handler));
}
async fn send_retained_message(&mut self, topic: impl Into<String>, message: impl Into<String>) -> std::result::Result<(), ClientError> {
let topic = topic.into();
let message = message.into();
self.owned_topics.insert(topic.to_owned());
self.client.publish(topic, QoS::AtLeastOnce, true, message).await
}
}
pub(crate) async fn init_all(context: &mut ModuleContext<'_>) -> Result<()> {
command_buttons::init(context).await?;
pub async fn init_all(context: &mut InitializationContext) -> Result<()> {
buttons::init(context).await?;
notifications::init(context).await?;
Ok(())
}

View file

@ -0,0 +1,221 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{Context, Result};
use base64::Engine;
use notify_rust::Hint;
use rumqttc::QoS;
use serde::{Deserialize, Serialize};
use tokio::task::spawn_blocking;
use validator::Validate;
use crate::modules::{InitializationContext, ModuleContext};
use crate::util::{generate_alphanumeric_id, hash_string_to_u32, spawn_nonessential};
const MODULE_ID: &str = "notifications";
#[derive(Serialize, Deserialize, Validate)]
pub struct Config {
#[serde(default)]
pub enabled: bool,
}
#[derive(Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum NotificationTimeout {
Never,
#[default]
Default,
Ms(u16),
}
#[derive(Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum NotificationUrgency {
Low,
#[default]
Normal,
Critical,
}
impl From<NotificationUrgency> for notify_rust::Urgency {
fn from(value: NotificationUrgency) -> Self {
match value {
NotificationUrgency::Low => notify_rust::Urgency::Low,
NotificationUrgency::Normal => notify_rust::Urgency::Normal,
NotificationUrgency::Critical => notify_rust::Urgency::Critical,
}
}
}
#[derive(Deserialize, Validate)]
pub struct NotificationMessage {
/// Using the ID of an existing notification will replace the old notification.
#[validate(length(min = 1))]
id: Option<String>,
#[serde(default)]
timeout: NotificationTimeout,
#[serde(default)]
urgency: NotificationUrgency,
/// Usually used as the title of the notification.
#[validate(length(min = 1))]
summary_text: String,
/// Basic HTML is usually supported.
#[validate(length(min = 1))]
long_content: Option<String>,
/// If the notification server supports persisting notifications across sessions, this will prevent it from doing so for this notification.
#[serde(default)]
transient: bool,
/// Padded base64-encoded image
encoded_image: Option<String>,
#[serde(default)]
actions: HashMap<String, String>,
}
pub async fn init(context: &mut InitializationContext) -> Result<()> {
let full_context = context.get_full();
let _config = match &full_context.config.modules.notifications {
Some(c) if c.enabled => c,
_ => return Ok(()),
};
log::info!("Initializing…");
let full_context = context.get_full();
context.subscribe_mqtt_topic(format!("{}/{}/simple", context.full.config.unique_id, MODULE_ID), move |text| {
let (summary_text, long_content) = text.split_once('\n').unwrap_or((text, ""));
tokio::spawn(handle_notification_message(
full_context.clone(),
NotificationMessage {
id: None,
transient: true,
summary_text: summary_text.to_owned(),
long_content: if long_content.is_empty() { None } else { Some(long_content.to_owned()) },
timeout: NotificationTimeout::default(),
urgency: NotificationUrgency::default(),
encoded_image: None,
actions: HashMap::new(),
},
));
Ok(())
});
let full_context = context.get_full();
context.subscribe_mqtt_topic(format!("{}/{}/json", context.full.config.unique_id, MODULE_ID), move |text| {
let message = serde_json::from_str::<NotificationMessage>(text);
match message {
Err(error) => log::error!("Could not deserialize message: {}", error),
Ok(message) => {
spawn_nonessential(handle_notification_message(full_context.clone(), message));
}
}
Ok(())
});
context.subscribe_mqtt_topic(format!("{}/{}/close", context.full.config.unique_id, MODULE_ID), move |id| {
spawn_nonessential(close_notification(id.to_owned()));
Ok(())
});
Ok(())
}
async fn handle_notification_message(context: Arc<ModuleContext>, message: NotificationMessage) -> Result<()> {
let mut notification = notify_rust::Notification::new();
let id = message.id.unwrap_or_else(|| generate_alphanumeric_id(10));
let internal_id = hash_string_to_u32(id.as_str());
notification.id(internal_id);
notification.summary(message.summary_text.as_str());
notification.urgency(message.urgency.into());
notification.hint(Hint::Transient(message.transient));
notification.timeout(match message.timeout {
NotificationTimeout::Default => -1,
NotificationTimeout::Never => 0,
NotificationTimeout::Ms(ms) => ms.into(),
});
for (action_id, label) in message.actions {
notification.action(&action_id, &label);
}
if let Some(encoded_image) = message.encoded_image {
let image_data = base64::engine::general_purpose::STANDARD_NO_PAD
.decode(encoded_image)
.context("while decoding the base64 image")?;
let image = image::load_from_memory(&image_data).context("while reading the image")?;
notification.image_data(image.try_into()?);
}
if let Some(long_content) = message.long_content {
notification.body(long_content.as_str());
}
log::debug!("Showing notification: {}", id);
let handle = notification.show_async().await?;
{
let context = context.clone();
spawn_blocking(move || {
handle.wait_for_action(|action_id| {
spawn_nonessential(send_notification_action_message(
context,
id.clone(),
if action_id == "__closed" { "closed".to_owned() } else { action_id.to_owned() },
));
});
});
}
Ok(())
}
async fn close_notification(id: String) -> Result<()> {
// We create a new notification because otherwise we would need to store old notification handles
// and notify-rust does not expose a method to close a notification by ID.
let internal_id = hash_string_to_u32(&id);
let mut notification = notify_rust::Notification::new();
notification.id(internal_id);
log::debug!("Closing notification: {}", id);
let handle = notification.show_async().await?;
handle.close();
Ok(())
}
async fn send_notification_action_message(context: Arc<ModuleContext>, notification_id: String, action_id: String) -> Result<()> {
context
.mqtt
.client
.publish(
format!("{}/{}/action/{}", context.config.unique_id, MODULE_ID, notification_id),
QoS::ExactlyOnce,
false,
action_id,
)
.await?;
Ok(())
}

View file

@ -12,23 +12,25 @@ use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::time::{Instant, sleep};
use crate::modules::ModuleContext;
use crate::modules::InitializationContext;
use super::config;
pub(crate) async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(MqttClient, EventLoop)> {
pub async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(MqttClient, EventLoop)> {
let mut options = MqttOptions::new(&config.internal.stable_id, config.mqtt.host.to_owned(), config.mqtt.port);
options.set_clean_session(true);
options.set_keep_alive(Duration::from_secs(5));
options.set_last_will(LastWill::new(availability_topic, "offline", QoS::AtLeastOnce, true));
options.set_max_packet_size(usize::MAX, usize::MAX);
let (mqtt_client, event_loop) = MqttClient::new(options, 10);
mqtt_client.publish(availability_topic, QoS::AtLeastOnce, true, "online").await?;
Ok((mqtt_client, event_loop))
}
pub(crate) fn create_discovery_device_object(config: &config::Config) -> JsonValue {
pub fn create_discovery_device_object(config: &config::Config) -> JsonValue {
json::object! {
"connections": if config.announce_mac_address {
mac_address::get_mac_address().unwrap_or(None).map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![])
@ -45,7 +47,7 @@ pub struct OwnedTopicsService {
}
impl OwnedTopicsService {
pub(crate) async fn new(data_directory_path: &Path) -> Result<OwnedTopicsService> {
pub async fn new(data_directory_path: &Path) -> Result<OwnedTopicsService> {
let path = data_directory_path.join("owned_topics");
let mut file = OpenOptions::new().write(true).read(true).create(true).open(path).await?;
@ -57,11 +59,11 @@ impl OwnedTopicsService {
Ok(OwnedTopicsService { file, old_topics })
}
pub(crate) async fn clear_old_and_save_new(mut self, mqtt_client: &MqttClient, new_topics: &HashSet<String>) -> Result<()> {
pub async fn clear_old_and_save_new(mut self, mqtt_client: &MqttClient, new_topics: &HashSet<String>) -> Result<()> {
let unused_topics = self.old_topics.difference(new_topics).map(|s| s.to_owned()).collect::<Vec<_>>();
log::info!(
"{} unused owned topics will be cleared. Now using {} owned topics.",
"{} unused owned topic(s) will be cleared. Now using {} owned topic(s).",
unused_topics.len(),
new_topics.len()
);
@ -100,17 +102,21 @@ const FAST_RETRYING_INTERVAL_MS: u64 = 500;
const FAST_RETRYING_LIMIT_SECONDS: u64 = 15;
const SLOW_RETRYING_INTERVAL_SECONDS: u64 = 5;
pub(crate) async fn start_communication(context: &ModuleContext<'_>, mut event_loop: EventLoop, owned_topics_service: OwnedTopicsService) -> Result<()> {
log::info!("Connecting to MQTT broker at {}:{}", context.config.mqtt.host, context.config.mqtt.port);
pub async fn start_communication(context: &InitializationContext, mut event_loop: EventLoop, owned_topics_service: OwnedTopicsService) -> Result<()> {
log::info!(
"Connecting to MQTT broker at {}:{}",
context.full.config.mqtt.host,
context.full.config.mqtt.port
);
if !context.mqtt.message_handler_by_topic.is_empty() {
if !context.message_handler_by_mqtt_topic.is_empty() {
context
.full
.mqtt
.client
.subscribe_many(
context
.mqtt
.message_handler_by_topic
.message_handler_by_mqtt_topic
.keys()
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
)
@ -173,8 +179,7 @@ pub(crate) async fn start_communication(context: &ModuleContext<'_>, mut event_l
}
if let Some(service) = owned_topics_service.take() {
service.clear_old_and_save_new(context.mqtt.client, &context.mqtt.owned_topics)
.await?;
service.clear_old_and_save_new(&context.full.mqtt.client, &context.owned_mqtt_topics).await?;
}
connection_state = ConnectionState::Connected;
@ -183,7 +188,7 @@ pub(crate) async fn start_communication(context: &ModuleContext<'_>, mut event_l
Incoming(Packet::Publish(message)) => {
let text = std::str::from_utf8(&message.payload)?;
if let Some(handler) = context.mqtt.message_handler_by_topic.get(message.topic.as_str()) {
if let Some(handler) = context.message_handler_by_mqtt_topic.get(message.topic.as_str()) {
handler(text)?;
}
}

27
src/util.rs Normal file
View file

@ -0,0 +1,27 @@
use std::collections::hash_map::DefaultHasher;
use std::future::Future;
use std::hash::{Hash, Hasher};
use anyhow::Result;
use rand::distributions::Alphanumeric;
use rand::Rng;
use tokio::task::JoinHandle;
#[inline]
pub fn spawn_nonessential(future: impl Future<Output=Result<()>> + Send + 'static) -> JoinHandle<()> {
tokio::spawn(async {
if let Err(error) = future.await {
log::error!("{:#}", error)
}
})
}
pub fn generate_alphanumeric_id(length: usize) -> String {
rand::thread_rng().sample_iter(&Alphanumeric).take(length).map(char::from).collect()
}
pub fn hash_string_to_u32(string: &str) -> u32 {
let mut hasher = DefaultHasher::new();
string.hash(&mut hasher);
(hasher.finish() / 2) as u32
}