Fix what happens when the connection is restored
This commit is contained in:
parent
03fc1b1ec8
commit
19c682219c
1 changed files with 23 additions and 18 deletions
41
src/mqtt.rs
41
src/mqtt.rs
|
@ -28,9 +28,7 @@ pub async fn create_client(config: &config::Config, machine_id: &str, availabili
|
||||||
usize::MAX,
|
usize::MAX,
|
||||||
);
|
);
|
||||||
|
|
||||||
let (mqtt_client, event_loop) = MqttClient::new(options, 100);
|
let (mqtt_client, event_loop) = MqttClient::new(options, 30);
|
||||||
mqtt_client.publish(availability_topic, QoS::AtLeastOnce, true, "online").await?;
|
|
||||||
|
|
||||||
Ok((mqtt_client, event_loop))
|
Ok((mqtt_client, event_loop))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,20 +109,6 @@ pub async fn start_communication(context: &InitializationContext, mut event_loop
|
||||||
context.full.config.mqtt.port
|
context.full.config.mqtt.port
|
||||||
);
|
);
|
||||||
|
|
||||||
if !context.message_handler_by_mqtt_topic.is_empty() {
|
|
||||||
context
|
|
||||||
.full
|
|
||||||
.mqtt
|
|
||||||
.client
|
|
||||||
.subscribe_many(
|
|
||||||
context
|
|
||||||
.message_handler_by_mqtt_topic
|
|
||||||
.keys()
|
|
||||||
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut connection_state = ConnectionState::NotConnected;
|
let mut connection_state = ConnectionState::NotConnected;
|
||||||
let mut owned_topics_service = Some(owned_topics_service);
|
let mut owned_topics_service = Some(owned_topics_service);
|
||||||
|
|
||||||
|
@ -177,9 +161,30 @@ pub async fn start_communication(context: &InitializationContext, mut event_loop
|
||||||
if connection_state == ConnectionState::NotConnected {
|
if connection_state == ConnectionState::NotConnected {
|
||||||
log::info!("Connection established")
|
log::info!("Connection established")
|
||||||
} else {
|
} else {
|
||||||
log::info!("Connection restored")
|
log::info!("Connection restored");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !context.message_handler_by_mqtt_topic.is_empty() {
|
||||||
|
context
|
||||||
|
.full
|
||||||
|
.mqtt
|
||||||
|
.client
|
||||||
|
.subscribe_many(
|
||||||
|
context
|
||||||
|
.message_handler_by_mqtt_topic
|
||||||
|
.keys()
|
||||||
|
.map(|k| SubscribeFilter::new(k.to_owned(), QoS::AtLeastOnce)),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
context
|
||||||
|
.full
|
||||||
|
.mqtt
|
||||||
|
.client
|
||||||
|
.publish(&context.full.mqtt.availability_topic, QoS::AtLeastOnce, true, "online")
|
||||||
|
.await?;
|
||||||
|
|
||||||
if let Some(service) = owned_topics_service.take() {
|
if let Some(service) = owned_topics_service.take() {
|
||||||
service.clear_old_and_save_new(&context.full.mqtt.client, &context.owned_mqtt_topics).await?;
|
service.clear_old_and_save_new(&context.full.mqtt.client, &context.owned_mqtt_topics).await?;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue