Hassliebe/src/mqtt.rs
2023-02-28 23:25:58 +01:00

186 lines
6.3 KiB
Rust

use std::collections::HashSet;
use std::io::SeekFrom;
use std::path::Path;
use std::time::Duration;
use anyhow::Result;
use json::JsonValue;
use rumqttc::{AsyncClient as MqttClient, EventLoop, LastWill, MqttOptions, Packet, SubscribeFilter};
use rumqttc::Event::Incoming;
use rumqttc::QoS;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::time::{Instant, sleep};
use crate::modules::ModuleContext;
use super::config;
pub async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(MqttClient, EventLoop)> {
let mut options = MqttOptions::new(&config.internal.stable_id, config.mqtt.host.to_owned(), config.mqtt.port);
options.set_clean_session(true);
options.set_keep_alive(Duration::from_secs(5));
options.set_last_will(LastWill::new(availability_topic, "offline", QoS::AtLeastOnce, true));
let (mqtt_client, event_loop) = MqttClient::new(options, 10);
mqtt_client.publish(availability_topic, QoS::AtLeastOnce, true, "online").await?;
Ok((mqtt_client, event_loop))
}
pub fn create_discovery_device_object(config: &config::Config) -> JsonValue {
json::object! {
"connections": if config.announce_mac_address {
mac_address::get_mac_address().unwrap_or(None).map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![])
} else {
json::array![]
},
"name": config.display_name.as_str()
}
}
pub struct OwnedTopicsService {
file: File,
old_topics: HashSet<String>,
}
impl OwnedTopicsService {
pub async fn new(data_directory_path: &Path) -> Result<OwnedTopicsService> {
let path = data_directory_path.join("owned_topics");
let mut file = OpenOptions::new().write(true).read(true).create(true).open(path).await?;
let mut content = String::new();
file.read_to_string(&mut content).await?;
let old_topics = content.split_terminator('\n').skip(1).map(|s| s.to_owned()).collect::<HashSet<_>>();
Ok(OwnedTopicsService { file, old_topics })
}
pub async fn clear_old_and_save_new(mut self, mqtt_client: &MqttClient, new_topics: &HashSet<String>) -> Result<()> {
let unused_topics = self.old_topics.difference(new_topics).map(|s| s.to_owned()).collect::<Vec<_>>();
log::info!(
"{} unused owned topics will be cleared. Now using {} owned topics.",
unused_topics.len(),
new_topics.len()
);
for topic in unused_topics.iter() {
log::trace!("Clearing owned topic: {}", topic);
// Deletes the retained message
mqtt_client.publish(topic, QoS::AtLeastOnce, true, Vec::new()).await?;
}
let mut new_content = String::new();
new_content.push_str("# DO NOT EDIT THIS FILE. It is automatically generated at each run of Hassliebe.\n");
for topic in new_topics {
new_content.push_str(topic);
new_content.push('\n')
}
self.file.set_len(0).await?;
self.file.seek(SeekFrom::Start(0)).await?;
self.file.write_all(new_content.as_bytes()).await?;
Ok(())
}
}
#[derive(Debug, PartialEq, Eq)]
enum ConnectionState {
NotConnected,
FastRetrying { since: Instant },
SlowRetrying,
Connected,
}
const FAST_RETRYING_INTERVAL_MS: u64 = 500;
const FAST_RETRYING_LIMIT_SECONDS: u64 = 15;
const SLOW_RETRYING_INTERVAL_SECONDS: u64 = 5;
pub async fn start_communication(context: &ModuleContext<'_>, mut event_loop: EventLoop) -> Result<()> {
log::info!("Connecting to MQTT broker at {}:{}", context.config.mqtt.host, context.config.mqtt.port);
context
.mqtt
.client
.subscribe_many(
context
.mqtt
.message_handler_by_topic
.keys()
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
)
.await?;
let mut connection_state = ConnectionState::NotConnected;
loop {
match connection_state {
ConnectionState::FastRetrying { since } => {
sleep(Duration::from_millis(FAST_RETRYING_INTERVAL_MS)).await;
if since.elapsed().as_secs() >= FAST_RETRYING_LIMIT_SECONDS {
log::warn!(
"Could not fast-reconnect within {}s. Now retrying every {}s.",
FAST_RETRYING_LIMIT_SECONDS,
SLOW_RETRYING_INTERVAL_SECONDS
);
connection_state = ConnectionState::SlowRetrying;
}
}
ConnectionState::SlowRetrying => {
sleep(Duration::from_secs(SLOW_RETRYING_INTERVAL_SECONDS)).await;
}
_ => {}
}
let notification = event_loop.poll().await;
match notification {
Err(error) => {
log::debug!("Connection error: {}", error);
match connection_state {
ConnectionState::Connected => {
log::warn!("Connection lost. Trying to fast-reconnect…");
connection_state = ConnectionState::FastRetrying { since: Instant::now() };
}
ConnectionState::NotConnected => {
log::warn!("Connection failed. Retrying every {}s.", SLOW_RETRYING_INTERVAL_SECONDS);
connection_state = ConnectionState::SlowRetrying;
}
_ => {}
}
}
Ok(event) => match event {
Incoming(Packet::ConnAck(_)) => {
if connection_state == ConnectionState::NotConnected {
log::info!("Connection established")
} else {
log::info!("Connection restored")
}
connection_state = ConnectionState::Connected;
}
Incoming(Packet::Publish(message)) => {
let text = std::str::from_utf8(&message.payload)?;
if let Some(handler) = context.mqtt.message_handler_by_topic.get(message.topic.as_str()) {
handler(text)?;
}
}
_ => {}
},
}
}
}