deckster/src/handler_host/mod.rs

60 lines
2.1 KiB
Rust

use std::path::Path;
use color_eyre::Result;
use log::{info, trace, warn};
use tokio::sync::{broadcast, mpsc};
use deckster_shared::handler_communication::{HandlerCommand, HandlerEvent};
use crate::handler_runner;
use crate::model::handler_host_config::Config;
mod mqtt;
pub async fn start(config_directory: &Path, config: Config) -> Result<()> {
let (commands_sender, commands_receiver) = flume::bounded::<HandlerCommand>(5);
let (handler_hosts_config_sender, mut handler_hosts_config_receiver) = mpsc::channel(1);
let events_sender = broadcast::Sender::<HandlerEvent>::new(5);
info!("Initializing MQTT client…");
mqtt::start_mqtt_client(&config.mqtt, handler_hosts_config_sender, commands_receiver, events_sender.clone()).await;
info!("Waiting for initial configuration…");
let mut is_running = false;
while let Some(handler_hosts_config) = handler_hosts_config_receiver.recv().await {
match handler_hosts_config {
None => {
if is_running {
info!("Stopping handlers…");
events_sender.send(HandlerEvent::Stop).ok(); // only fails when all handlers are already stopped.
}
is_running = false;
}
Some(handler_hosts_config) => {
if is_running {
warn!("A new configuration was received before the old one was cleared.");
events_sender.send(HandlerEvent::Stop).ok(); // only fails when all handlers are already stopped.
}
is_running = true;
info!("Received new configuration. Starting handlers…");
trace!("New configuration: {handler_hosts_config:#?}");
handler_runner::start(
config.host_id.clone(),
&config_directory.join("handlers"),
handler_hosts_config,
commands_sender.clone(),
events_sender.subscribe(),
)
.await?;
}
}
}
Ok(())
}