deckster/handlers/home_assistant/src/util.rs
2024-03-06 16:18:52 +01:00

69 lines
2.5 KiB
Rust

use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{timeout, MissedTickBehavior};
/// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`.
/// The delay is reset when a new message is reset.
pub fn spawn_debouncer(duration: Duration) -> (Sender<()>, Receiver<()>) {
let (input_sender, mut input_receiver) = mpsc::channel::<()>(1);
let (output_sender, output_receiver) = mpsc::channel::<()>(1);
tokio::spawn(async move {
'outer: loop {
if input_receiver.recv().await.is_none() {
break 'outer;
}
'inner: loop {
match timeout(duration, input_receiver.recv()).await {
Ok(None) => break 'outer,
Ok(Some(_)) => continue 'inner,
Err(_) => {
if let Err(TrySendError::Closed(_)) = output_sender.try_send(()) {
break 'outer;
} else {
break 'inner;
}
}
}
}
}
});
(input_sender, output_receiver)
}
/// Sends messages from the input channel into the output channel, but only if the time since the last message is greater than duration.
/// The last message that was not sent yet will be sent after duration.
pub fn spawn_throttler<T: Send + 'static>(duration: Duration) -> (Sender<T>, Receiver<T>) {
let (input_sender, mut input_receiver) = mpsc::channel::<T>(25);
let (output_sender, output_receiver) = mpsc::channel::<T>(25);
tokio::spawn(async move {
let mut pending_value: Option<T> = None;
let mut interval = tokio::time::interval(duration);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
'outer: loop {
tokio::select! {
value = input_receiver.recv() => {
match value {
None => break 'outer,
Some(value) => {
pending_value = Some(value);
}
};
}
_ = interval.tick() => {
if let Some(value) = pending_value.take() {
output_sender.send(value).await.unwrap();
}
}
}
}
});
(input_sender, output_receiver)
}