diff --git a/src/mqtt.rs b/src/mqtt.rs index a8e155f..fc5808c 100644 --- a/src/mqtt.rs +++ b/src/mqtt.rs @@ -28,9 +28,7 @@ pub async fn create_client(config: &config::Config, machine_id: &str, availabili usize::MAX, ); - let (mqtt_client, event_loop) = MqttClient::new(options, 100); - mqtt_client.publish(availability_topic, QoS::AtLeastOnce, true, "online").await?; - + let (mqtt_client, event_loop) = MqttClient::new(options, 30); Ok((mqtt_client, event_loop)) } @@ -111,20 +109,6 @@ pub async fn start_communication(context: &InitializationContext, mut event_loop context.full.config.mqtt.port ); - if !context.message_handler_by_mqtt_topic.is_empty() { - context - .full - .mqtt - .client - .subscribe_many( - context - .message_handler_by_mqtt_topic - .keys() - .map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)), - ) - .await?; - } - let mut connection_state = ConnectionState::NotConnected; let mut owned_topics_service = Some(owned_topics_service); @@ -177,9 +161,30 @@ pub async fn start_communication(context: &InitializationContext, mut event_loop if connection_state == ConnectionState::NotConnected { log::info!("Connection established") } else { - log::info!("Connection restored") + log::info!("Connection restored"); } + if !context.message_handler_by_mqtt_topic.is_empty() { + context + .full + .mqtt + .client + .subscribe_many( + context + .message_handler_by_mqtt_topic + .keys() + .map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)), + ) + .await?; + } + + context + .full + .mqtt + .client + .publish(&context.full.mqtt.availability_topic, QoS::AtLeastOnce, true, "online") + .await?; + if let Some(service) = owned_topics_service.take() { service.clear_old_and_save_new(&context.full.mqtt.client, &context.owned_mqtt_topics).await?; }