And again

This commit is contained in:
Moritz Ruth 2023-02-24 01:30:51 +01:00
parent b87c19df84
commit aa941fd195
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
8 changed files with 638 additions and 60 deletions

View file

@ -10,7 +10,7 @@
<option name="buildTarget" value="REMOTE" />
<option name="backtrace" value="SHORT" />
<envs>
<env name="RUST_LOG" value="hassliebe" />
<env name="RUST_LOG" value="hassliebe=debug" />
</envs>
<option name="isRedirectInput" value="false" />
<option name="redirectInputPath" value="" />

359
Cargo.lock generated
View file

@ -75,6 +75,26 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]]
name = "directories"
version = "4.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f51c5d4ddabd36886dd3e1438cb358cdcb0d7c499cb99cb4ac2e38e18b5cb210"
dependencies = [
"dirs-sys",
]
[[package]]
name = "dirs-sys"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
dependencies = [
"libc",
"redox_users",
"winapi",
]
[[package]]
name = "env_logger"
version = "0.10.0"
@ -109,6 +129,12 @@ dependencies = [
"libc",
]
[[package]]
name = "exitcode"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de853764b47027c2e862a995c34978ffa63c1501f2e15f987ba11bd4f9bba193"
[[package]]
name = "flume"
version = "0.10.14"
@ -122,6 +148,15 @@ dependencies = [
"spin 0.9.5",
]
[[package]]
name = "form_urlencoded"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.26"
@ -224,17 +259,31 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hassliebe"
version = "0.1.0"
dependencies = [
"anyhow",
"directories",
"env_logger",
"exitcode",
"json",
"lazy_static",
"log",
"mac_address",
"rand",
"regex",
"rumqttc",
"serde",
"tokio",
"toml",
"validator",
]
[[package]]
@ -258,6 +307,43 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "idna"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "if_chain"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed"
[[package]]
name = "indexmap"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "io-lifetimes"
version = "1.0.5"
@ -280,6 +366,12 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "itoa"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fad582f4b9e86b6caa621cabeb0963332d92eea04729ab12892c2533951e6440"
[[package]]
name = "js-sys"
version = "0.3.61"
@ -295,6 +387,12 @@ version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.139"
@ -336,6 +434,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "matches"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5"
[[package]]
name = "memchr"
version = "2.5.0"
@ -385,6 +489,15 @@ dependencies = [
"memoffset",
]
[[package]]
name = "nom8"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae01545c9c7fc4486ab7debaf2aad7003ac19431791868fb2e8066df97fad2f8"
dependencies = [
"memchr",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
@ -430,6 +543,12 @@ dependencies = [
"windows-sys 0.45.0",
]
[[package]]
name = "percent-encoding"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pin-project"
version = "1.0.12"
@ -468,6 +587,36 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5da3b0203fd7ee5720aa0b5e790b591aa5d3f41c3ed2c34a3a393382198af2f7"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.51"
@ -486,6 +635,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -495,6 +674,17 @@ dependencies = [
"bitflags",
]
[[package]]
name = "redox_users"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
dependencies = [
"getrandom",
"redox_syscall",
"thiserror",
]
[[package]]
name = "regex"
version = "1.7.1"
@ -592,6 +782,12 @@ dependencies = [
"base64",
]
[[package]]
name = "ryu"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde"
[[package]]
name = "schannel"
version = "0.1.21"
@ -640,6 +836,46 @@ dependencies = [
"libc",
]
[[package]]
name = "serde"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "serde_json"
version = "1.0.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cad406b69c91885b5107daf2c29572f6c8cdb3c66826821e286c533490c0bc76"
dependencies = [
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_spanned"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0efd8caf556a6cebd3b285caf480045fcc1ac04f6bd786b09a6f11af30c4fcf4"
dependencies = [
"serde",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -729,6 +965,21 @@ dependencies = [
"syn",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.25.0"
@ -771,18 +1022,126 @@ dependencies = [
"webpki",
]
[[package]]
name = "toml"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7afcae9e3f0fe2c370fd4657108972cbb2fa9db1b9f84849cefd80741b01cb6"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ab8ed2edee10b50132aed5f331333428b011c99402b5a534154ed15746f9622"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e6a7712b49e1775fb9a7b998de6635b299237f48b404dde71704f2e0e7f37e5"
dependencies = [
"indexmap",
"nom8",
"serde",
"serde_spanned",
"toml_datetime",
]
[[package]]
name = "unicode-bidi"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54675592c1dbefd78cbd98db9bacd89886e1ca50692a0692baefffdeb92dd58"
[[package]]
name = "unicode-ident"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84a22b9f218b40614adcb3f4ff08b703773ad44fa9423e4e0d346d5db86e4ebc"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna 0.3.0",
"percent-encoding",
]
[[package]]
name = "validator"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ad5bf234c7d3ad1042e5252b7eddb2c4669ee23f32c7dd0e9b7705f07ef591"
dependencies = [
"idna 0.2.3",
"lazy_static",
"regex",
"serde",
"serde_derive",
"serde_json",
"url",
"validator_derive",
]
[[package]]
name = "validator_derive"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc44ca3088bb3ba384d9aecf40c6a23a676ce23e09bdaca2073d99c207f864af"
dependencies = [
"if_chain",
"lazy_static",
"proc-macro-error",
"proc-macro2",
"quote",
"regex",
"syn",
"validator_types",
]
[[package]]
name = "validator_types"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "111abfe30072511849c5910134e8baf8dc05de4c0e5903d681cbd5c9c4d611e3"
dependencies = [
"proc-macro2",
"syn",
]
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"

View file

@ -10,9 +10,17 @@ dry_run = [] # will prevent some actions like shutting down
[dependencies]
anyhow = "1.0.69"
directories = "4.0.1"
env_logger = "0.10.0"
exitcode = "1.1.2"
json = "0.12.4"
lazy_static = "1.4.0"
log = "0.4.17"
mac_address = "1.1.4"
rand = "0.8.5"
regex = "1.7.1"
rumqttc = "0.20.0"
serde = { version = "1.0.152", features = ["derive"] }
tokio = { version = "1.25.0", features = ["full"] }
toml = "0.7.2"
validator = { version = "0.16.0", features = ["derive"] }

1
rustfmt.toml Normal file
View file

@ -0,0 +1 @@
max_width = 160

116
src/config.rs Normal file
View file

@ -0,0 +1,116 @@
use std::borrow::ToOwned;
use std::fs;
use std::fs::File;
use std::io::{ErrorKind, Read, Write};
use anyhow::{anyhow, bail, Context, Result};
use rand::distributions::Alphanumeric;
use rand::Rng;
use regex::Regex;
use serde::{Deserialize, Serialize};
use validator::{Validate, ValidationError};
#[derive(Serialize, Deserialize, Validate)]
pub struct Mqtt {
#[validate(length(min = 1))]
pub host: String,
pub port: u16,
#[serde(default)]
#[validate]
pub credentials: Option<MqttCredentials>,
}
#[derive(Serialize, Deserialize, Validate)]
pub struct MqttCredentials {
pub user: String,
pub password: String,
}
#[derive(Serialize, Deserialize, Validate)]
pub struct Internal {
#[validate(length(min = 2))]
pub stable_id: String,
}
#[derive(Serialize, Deserialize, Validate)]
pub struct Config {
#[validate(custom = "validate_unique_id")]
pub unique_id: String,
#[validate(length(min = 1))]
pub display_name: String,
pub announce_mac_address: bool,
#[validate]
pub mqtt: Mqtt,
#[serde(rename = "DO_NOT_CHANGE")]
#[validate]
pub internal: Internal,
}
fn validate_unique_id(value: &str) -> Result<(), ValidationError> {
if Regex::new(r"^[a-zA-Z0-9]+(_[a-zA-Z0-9])*$").unwrap().is_match(value) {
Ok(())
} else {
Err(ValidationError::new("invalid_unique_id"))
}
}
fn generate_unique_id() -> String {
rand::thread_rng().sample_iter(&Alphanumeric).take(12).map(char::from).collect()
}
fn create_example_config() -> Config {
Config {
unique_id: "my_pc".to_owned(),
display_name: "My PC".to_owned(),
announce_mac_address: true,
mqtt: Mqtt {
host: "".to_owned(),
port: 1883,
credentials: None,
},
internal: Internal {
stable_id: generate_unique_id(),
},
}
}
pub fn load() -> Result<Option<Config>> {
let dirs = directories::ProjectDirs::from("", "", "Hassliebe")
.ok_or_else(|| anyhow!("Could not determine a valid home directory path. Please specify a custom config file path via HASS_CONFIG"))?;
let mut path = dirs.config_dir().to_owned();
path.push("config.toml");
match File::open(&path) {
Ok(mut file) => {
log::info!("Reading config file: {}", path.to_string_lossy());
let mut string_content = String::new();
file.read_to_string(&mut string_content).context("while reading the config file")?;
let parsed = toml::from_str::<Config>(string_content.as_str()).context("while parsing the config file")?;
Ok(Some(parsed))
}
Err(error) if error.kind() == ErrorKind::NotFound => {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let mut file = File::create(&path).context("while creating the default config file")?;
let default = toml::to_string::<Config>(&create_example_config()).expect("create_example_config() should be valid");
file.write_all(default.as_bytes())?;
log::warn!("Created an example config file at {}", path.to_string_lossy());
log::warn!("Make sure to edit the file before running again.");
Ok(None)
}
Err(error) => bail!("Config file could not be opened: {}", error),
}
}

View file

@ -1,47 +1,28 @@
use crate::modules::ModuleContext;
use anyhow::Result;
use rumqttc::Event::Incoming;
use rumqttc::QoS;
use rumqttc::{AsyncClient as MqttClient, LastWill, MqttOptions, Packet, SubscribeFilter};
use std::collections::HashMap;
use std::time::Duration;
use std::process::exit;
use anyhow::Result;
use crate::modules::ModuleContext;
mod modules;
const DEVICE_ID: &str = "linux-agent-m6";
mod config;
mod mqtt;
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
start_mqtt_communication().await
}
async fn start_mqtt_communication() -> Result<()> {
let availability_topic = DEVICE_ID.to_owned() + "/availability";
let mac_address = mac_address::get_mac_address().unwrap_or(None);
let discovery_device_object = json::object! {
"connections": mac_address.map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![]),
"name": "m6"
let Some(config) = config::load()? else {
exit(exitcode::CONFIG);
};
let mut mqtt_options = MqttOptions::new(DEVICE_ID, "192.168.188.48", 1883);
mqtt_options.set_clean_session(true);
mqtt_options.set_keep_alive(Duration::from_secs(5));
mqtt_options.set_last_will(LastWill::new(
&availability_topic,
"offline",
QoS::AtLeastOnce,
true,
));
let (mqtt_client, mut event_loop) = MqttClient::new(mqtt_options, 10);
mqtt_client
.publish(&availability_topic, QoS::AtLeastOnce, true, "online")
.await?;
let availability_topic = config.unique_id.to_owned() + "/availability";
let (mqtt_client, event_loop) = mqtt::create_client(&config, &availability_topic).await?;
let discovery_device_object = mqtt::create_discovery_device_object(&config);
let mut module_context = ModuleContext {
device_id: DEVICE_ID,
config: &config,
mqtt_client: &mqtt_client,
mqtt_availability_topic: availability_topic.as_str(),
mqtt_discovery_device_object: &discovery_device_object,
@ -50,27 +31,6 @@ async fn start_mqtt_communication() -> Result<()> {
modules::init_all(&mut module_context).await?;
mqtt_client
.subscribe_many(
module_context
.mqtt_message_handler_by_topic
.keys()
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
)
.await?;
loop {
let notification = event_loop.poll().await?;
if let Incoming(Packet::Publish(message)) = notification {
let text = std::str::from_utf8(&message.payload)?;
if let Some(handler) = module_context
.mqtt_message_handler_by_topic
.get(message.topic.as_str())
{
handler(text)?;
}
}
}
mqtt::start_communication(&module_context, event_loop).await
}

View file

@ -1,14 +1,15 @@
pub mod power;
use std::collections::HashMap;
use anyhow::Result;
use json::JsonValue;
use rumqttc::AsyncClient as MqttClient;
use std::collections::HashMap;
pub mod power;
type MqttMessageHandler<'a> = dyn Fn(&str) -> Result<()> + 'a;
pub struct ModuleContext<'a> {
pub device_id: &'a str,
pub config: &'a super::config::Config,
pub mqtt_discovery_device_object: &'a JsonValue,
pub mqtt_availability_topic: &'a str,
pub mqtt_client: &'a MqttClient,
@ -21,7 +22,7 @@ impl<'a> ModuleContext<'a> {
}
fn get_entity_id(&self, module_id: &str, sub_id: &str) -> String {
format!("{}__{}__{}", self.device_id, module_id, sub_id)
format!("{}_{}_{}", self.config.unique_id, module_id, sub_id)
}
fn subscribe_mqtt_topic<F: Fn(&str) -> Result<()> + 'a>(

133
src/mqtt.rs Normal file
View file

@ -0,0 +1,133 @@
use std::time::Duration;
use anyhow::Result;
use json::JsonValue;
use rumqttc::{AsyncClient as MqttClient, AsyncClient, EventLoop, LastWill, MqttOptions, Packet, SubscribeFilter};
use rumqttc::Event::Incoming;
use rumqttc::QoS;
use tokio::time::{Instant, sleep};
use crate::modules::ModuleContext;
use super::config;
pub async fn create_client(config: &config::Config, availability_topic: &str) -> Result<(AsyncClient, EventLoop)> {
let mut options = MqttOptions::new(&config.internal.stable_id, config.mqtt.host.to_owned(), config.mqtt.port);
options.set_clean_session(true);
options.set_keep_alive(Duration::from_secs(5));
options.set_last_will(LastWill::new(availability_topic, "offline", QoS::AtLeastOnce, true));
let (mqtt_client, event_loop) = MqttClient::new(options, 10);
mqtt_client.publish(availability_topic, QoS::AtLeastOnce, true, "online").await?;
Ok((mqtt_client, event_loop))
}
pub fn create_discovery_device_object(config: &config::Config) -> JsonValue {
json::object! {
"connections": if config.announce_mac_address {
mac_address::get_mac_address().unwrap_or(None).map(|a| json::array![["mac", a.to_string()]]).unwrap_or(json::array![])
} else {
json::array![]
},
"name": config.display_name.as_str()
}
}
#[derive(Debug, PartialEq, Eq)]
enum ConnectionState {
NotConnected,
FastRetrying { since: Instant },
SlowRetrying,
Connected,
}
const FAST_RETRYING_INTERVAL_MS: u64 = 500;
const FAST_RETRYING_LIMIT_SECONDS: u64 = 15;
const SLOW_RETRYING_INTERVAL_SECONDS: u64 = 5;
pub async fn start_communication(context: &ModuleContext<'_>, mut event_loop: EventLoop) -> Result<()> {
log::info!("Connecting to MQTT broker at {}:{}", context.config.mqtt.host, context.config.mqtt.port);
context
.mqtt_client
.subscribe_many(
context
.mqtt_message_handler_by_topic
.keys()
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
)
.await?;
let mut connection_state = ConnectionState::NotConnected;
loop {
match connection_state {
ConnectionState::FastRetrying { since } => {
sleep(Duration::from_millis(FAST_RETRYING_INTERVAL_MS)).await;
if since.elapsed().as_secs() >= FAST_RETRYING_LIMIT_SECONDS {
log::warn!(
"Could not fast-reconnect within {}s. Now retrying every {}s.",
FAST_RETRYING_LIMIT_SECONDS,
SLOW_RETRYING_INTERVAL_SECONDS
);
connection_state = ConnectionState::SlowRetrying;
}
}
ConnectionState::SlowRetrying => {
sleep(Duration::from_secs(SLOW_RETRYING_INTERVAL_SECONDS)).await;
}
_ => {}
}
let notification = event_loop.poll().await;
match notification {
Err(error) => {
log::debug!("Connection error: {}", error);
match connection_state {
ConnectionState::Connected => {
log::warn!("Connection lost. Trying to fast-reconnect…");
connection_state = ConnectionState::FastRetrying { since: Instant::now() };
}
ConnectionState::NotConnected => {
log::warn!("Connection failed. Retrying every {}s.", SLOW_RETRYING_INTERVAL_SECONDS);
connection_state = ConnectionState::SlowRetrying;
}
_ => {}
}
}
Ok(event) => match event {
Incoming(Packet::ConnAck(_)) => {
if connection_state == ConnectionState::NotConnected {
log::info!("Connection established")
} else {
log::info!("Connection restored")
}
connection_state = ConnectionState::Connected;
}
Incoming(Packet::Publish(message)) => {
let text = std::str::from_utf8(&message.payload)?;
if let Some(handler) = context.mqtt_message_handler_by_topic.get(message.topic.as_str()) {
handler(text)?;
}
}
Incoming(packet) => {
log::trace!("Unhandled packet received: {:?}", packet)
}
_ => {}
},
}
}
}