From db0cc13267674c7afcb02c1a77e03631eb05ad98 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Wed, 6 Mar 2024 16:18:52 +0100 Subject: [PATCH] commit --- Cargo.lock | 11 +- examples/full/knob-pages/default.toml | 14 ++- handlers/home_assistant/Cargo.toml | 4 +- handlers/home_assistant/src/config.rs | 19 ++-- handlers/home_assistant/src/ha_client.rs | 48 +++++--- handlers/home_assistant/src/handler.rs | 136 ++++++++++++++++++++++- handlers/home_assistant/src/util.rs | 47 +++++--- 7 files changed, 230 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a5e5b4..471b542 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -896,9 +896,11 @@ dependencies = [ "futures-util", "log", "native-tls", + "parse-display 0.9.0", "reqwest", "serde", "serde_json", + "serde_with", "tokio", "tokio-stream", "tokio-tungstenite", @@ -2101,9 +2103,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.4.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" +checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" dependencies = [ "base64", "chrono", @@ -2111,6 +2113,7 @@ dependencies = [ "indexmap 1.9.3", "indexmap 2.1.0", "serde", + "serde_derive", "serde_json", "serde_with_macros", "time", @@ -2118,9 +2121,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.4.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" +checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d" dependencies = [ "darling", "proc-macro2", diff --git a/examples/full/knob-pages/default.toml b/examples/full/knob-pages/default.toml index 5db7e0c..6bf7a89 100644 --- a/examples/full/knob-pages/default.toml +++ b/examples/full/knob-pages/default.toml @@ -55,4 +55,16 @@ config.target.type = "application" config.target.predicates = [{ property = "application-name", value = "spotify" }] config.style.muted.indicators.bar.color = "#fc464690" -config.style.inactive.icon = "@apps/spotify[scale=1.2|grayscale|alpha=0.6]" \ No newline at end of file +config.style.inactive.icon = "@apps/spotify[scale=1.2|grayscale|alpha=0.6]" + +[knobs.right-bottom] +icon = "@apps/spotify[scale=1.2]" +indicators.bar.color = "#ffffff50" + +handler = "home_assistant" +config.mode = "brightness" +config.entity_id = "light.moritz_regal_lampe" +config.style.default.label = "{value}%" +config.style.100.icon = "@apps/discord[scale=1.2]" +config.style.100.label = "{value}%" +config.delta = 10 \ No newline at end of file diff --git a/handlers/home_assistant/Cargo.toml b/handlers/home_assistant/Cargo.toml index 2a1693b..dfc4224 100644 --- a/handlers/home_assistant/Cargo.toml +++ b/handlers/home_assistant/Cargo.toml @@ -17,4 +17,6 @@ url = { version = "2.5.0", features = ["serde"] } tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } tokio-stream = "0.1.14" futures-util = "0.3.30" -native-tls = "0.2.11" \ No newline at end of file +native-tls = "0.2.11" +parse-display = "0.9.0" +serde_with = "3.6.1" \ No newline at end of file diff --git a/handlers/home_assistant/src/config.rs b/handlers/home_assistant/src/config.rs index 865fdba..bdc0060 100644 --- a/handlers/home_assistant/src/config.rs +++ b/handlers/home_assistant/src/config.rs @@ -1,4 +1,5 @@ -use deckster_mode::shared::state::KeyStyleByStateMap; +use crate::ha_client::EntityId; +use deckster_mode::shared::state::{KeyStyleByStateMap, KnobStyleByStateMap}; use serde::Deserialize; use url::Url; @@ -15,18 +16,20 @@ pub struct KeyConfig { pub disconnected_state: Option>, #[serde(flatten)] pub mode: KeyMode, + #[serde(default)] pub style: KeyStyleByStateMap>, } #[derive(Debug, Clone, Deserialize)] #[serde(tag = "mode", rename_all = "kebab-case")] pub enum KeyMode { - Toggle { entity_id: Box }, - Button { state_entity_id: Box, button_entity_id: Box }, + Toggle { entity_id: EntityId }, + Button { state_entity_id: EntityId, button_entity_id: EntityId }, } impl KeyMode { - pub fn state_entity_id(&self) -> &Box { + #[inline] + pub fn state_entity_id(&self) -> &EntityId { match &self { KeyMode::Toggle { entity_id, .. } => entity_id, KeyMode::Button { state_entity_id, .. } => state_entity_id, @@ -36,16 +39,16 @@ impl KeyMode { #[derive(Debug, Clone, Deserialize)] pub struct KnobConfig { - pub(crate) entity_id: Box, + pub(crate) entity_id: EntityId, pub disconnected_state: Option>, #[serde(flatten)] pub mode: KnobMode, - pub style: KeyStyleByStateMap>, + #[serde(default)] + pub style: KnobStyleByStateMap>, } #[derive(Debug, Clone, Deserialize)] #[serde(tag = "mode", rename_all = "kebab-case")] pub enum KnobMode { - Select { states: Box<[Box]>, wrap_around: bool }, - Range, + Brightness { delta: Option }, } diff --git a/handlers/home_assistant/src/ha_client.rs b/handlers/home_assistant/src/ha_client.rs index 1dab348..b940fd7 100644 --- a/handlers/home_assistant/src/ha_client.rs +++ b/handlers/home_assistant/src/ha_client.rs @@ -1,7 +1,10 @@ use futures_util::SinkExt; use native_tls::TlsConnector; +use parse_display::{Display, FromStr}; use reqwest::header::{HeaderMap, HeaderValue}; use serde::{Deserialize, Serialize}; +use serde_json::json; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::cmp::min; use std::collections::HashMap; use std::sync::Arc; @@ -11,6 +14,13 @@ use tokio_stream::StreamExt; use tokio_tungstenite::{tungstenite, Connector}; use url::Url; +#[derive(Debug, Clone, FromStr, Display, SerializeDisplay, DeserializeFromStr, Eq, PartialEq, Hash)] +#[display("{domain}.{object_id}")] +pub struct EntityId { + pub domain: String, + pub object_id: String, +} + #[derive(Debug, Clone)] pub enum StateUpdate { Disconnected, @@ -19,9 +29,10 @@ pub enum StateUpdate { #[derive(Debug)] pub struct ActualStateUpdate { - pub entity_id: Box, + pub entity_id: EntityId, pub state: Box, pub timestamp: Box, + pub attributes: serde_json::Value, } #[derive(Debug, Clone)] @@ -32,7 +43,7 @@ pub struct HaClient { } impl HaClient { - pub async fn new(base_url: Url, token: Box, accept_invalid_certs: bool, subscribed_entity_ids: Vec>) -> Self { + pub async fn new(base_url: Url, token: Box, accept_invalid_certs: bool, subscribed_entity_ids: Vec) -> Self { let http_client = reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(10)) .default_headers({ @@ -82,13 +93,11 @@ impl HaClient { self.state_updates_sender.subscribe() } - pub async fn toggle_entity(&self, entity_id: &str) { - let (domain, _) = entity_id.split_once('.').expect("entity IDs must contain exactly one dot"); - + pub async fn call_service(&self, domain: &str, name: &str, data: impl Into) { let result = self .http_client - .post(self.base_url.join(&format!("/api/services/{domain}/toggle")).unwrap()) - .body(format!("{{\"entity_id\":\"{entity_id}\"}}")) + .post(self.base_url.join(&format!("/api/services/{domain}/{name}")).unwrap()) + .body(data) .send() .await .and_then(|a| a.error_for_status()); @@ -100,6 +109,16 @@ impl HaClient { ) } } + + pub async fn toggle_entity(&self, entity_id: &EntityId) { + self.call_service(&entity_id.domain, "toggle", json!({ "entity_id": entity_id.to_string() }).to_string()) + .await + } + + pub async fn press_button_entity(&self, entity_id: &EntityId) { + self.call_service(&entity_id.domain, "press", json!({ "entity_id": entity_id.to_string() }).to_string()) + .await + } } async fn do_work( @@ -108,7 +127,7 @@ async fn do_work( tls_connector: TlsConnector, state_updates_sender: broadcast::Sender, http_client: reqwest::Client, - state_timestamp_by_entity_id: HashMap, Box>, + state_timestamp_by_entity_id: HashMap>, ) { let states_url = base_url.join("/api/states/").unwrap(); let websocket_url = { @@ -166,8 +185,6 @@ async fn do_work( id: 1, trigger: HaTrigger::State { entity_id: state_timestamp_by_entity_id.read().await.keys().cloned().collect(), - // Setting from to null prevents events being sent when only attributes have changed. - from: serde_json::Value::Null, }, }) .unwrap(); @@ -181,7 +198,7 @@ async fn do_work( for entity_id in state_timestamp_by_entity_id.read().await.keys() { tokio::spawn(request_entity_state( - states_url.join(entity_id).unwrap(), + states_url.join(&entity_id.to_string()).unwrap(), http_client.clone(), Arc::clone(&state_timestamp_by_entity_id), state_updates_sender.clone(), @@ -221,7 +238,7 @@ async fn do_work( async fn request_entity_state( url: Url, http_client: reqwest::Client, - state_timestamp_by_entity_id: Arc, Box>>>, + state_timestamp_by_entity_id: Arc>>>, state_updates_sender: broadcast::Sender, ) { match http_client.get(url).send().await.and_then(|a| a.error_for_status()) { @@ -260,8 +277,9 @@ fn extract_state_update_from_event(object: &serde_json::Value) -> Option Option { Some(ActualStateUpdate { state: object.get("state")?.as_str()?.to_owned().into_boxed_str(), - entity_id: object.get("entity_id")?.as_str()?.to_owned().into_boxed_str(), - timestamp: object.get("last_changed")?.as_str()?.to_owned().into_boxed_str(), + entity_id: object.get("entity_id")?.as_str()?.parse().ok()?, + timestamp: object.get("last_updated")?.as_str()?.to_owned().into_boxed_str(), + attributes: object.get("attributes")?.to_owned(), }) } @@ -285,5 +303,5 @@ pub enum HaOutgoingWsMessage { #[derive(Debug, Serialize)] #[serde(tag = "platform", rename_all = "snake_case")] pub enum HaTrigger { - State { entity_id: Box<[Box]>, from: serde_json::Value }, + State { entity_id: Box<[EntityId]> }, } diff --git a/handlers/home_assistant/src/handler.rs b/handlers/home_assistant/src/handler.rs index c018773..ccc65fe 100644 --- a/handlers/home_assistant/src/handler.rs +++ b/handlers/home_assistant/src/handler.rs @@ -1,9 +1,15 @@ use crate::config::{GlobalConfig, KeyConfig, KeyMode, KnobConfig, KnobMode}; use crate::ha_client::{HaClient, StateUpdate}; -use deckster_mode::shared::handler_communication::{HandlerCommand, HandlerEvent, HandlerInitializationError, InitialHandlerMessage, KeyEvent}; -use deckster_mode::shared::path::KeyPath; +use crate::util::{spawn_debouncer, spawn_throttler}; +use deckster_mode::shared::handler_communication::{ + HandlerCommand, HandlerEvent, HandlerInitializationError, InitialHandlerMessage, KeyEvent, KnobEvent, RotationDirection, +}; +use deckster_mode::shared::path::{KeyPath, KnobPath}; +use deckster_mode::shared::style::KnobStyle; use deckster_mode::{send_command, DecksterHandler}; +use serde_json::json; use std::thread; +use std::time::Duration; use tokio::select; use tokio::sync::broadcast; use tokio::task::LocalSet; @@ -46,6 +52,10 @@ impl Handler { task_set.spawn_local(manage_key(events_sender.subscribe(), ha_client.clone(), path, config)); } + for (path, config) in data.knob_configs { + task_set.spawn_local(manage_knob(events_sender.subscribe(), ha_client.clone(), path, config)); + } + runtime.block_on(task_set) } }); @@ -105,8 +115,8 @@ async fn manage_key(mut events: broadcast::Receiver, ha_client: Ha KeyMode::Toggle { entity_id } => { ha_client.toggle_entity(entity_id).await; } - KeyMode::Button { .. } => { - todo!() + KeyMode::Button { button_entity_id, .. } => { + ha_client.press_button_entity(button_entity_id).await; } } } @@ -114,3 +124,121 @@ async fn manage_key(mut events: broadcast::Receiver, ha_client: Ha } } } + +async fn manage_knob(mut events: broadcast::Receiver, ha_client: HaClient, path: KnobPath, config: KnobConfig) { + if let Some(state) = &config.disconnected_state { + send_command(HandlerCommand::SetKnobStyle { + path: path.clone(), + value: config.style.get(state).cloned(), + }); + } + + let mut state_updates = ha_client.subscribe_to_state_updates(); + + let mut current_client_brightness = 0u8; + let mut current_server_brightness = 0u8; + let (new_brightness_requests_sender, mut new_brightness_requests_receiver) = spawn_throttler::(Duration::from_millis(150)); + let (apply_server_debounce_sender, mut apply_server_debounce_receiver) = spawn_debouncer(Duration::from_secs(3)); + + loop { + select! { + Some(_) = apply_server_debounce_receiver.recv() => { + current_client_brightness = current_server_brightness; + + let percentage = (((current_server_brightness as f32 / 255_f32) * 100.0).round() as u8).to_string().into_boxed_str(); + send_command(HandlerCommand::SetKnobStyle { + path: path.clone(), + value: config.style.get(&percentage).or_else(|| config.style.get("default")).cloned().map(|s| KnobStyle { + // RustRover: false positive + label: s.label.map(|l| l.replace("{value}", &percentage)), + ..s + }) + }); + } + Some(new_brightness) = new_brightness_requests_receiver.recv() => { + ha_client.call_service("light", "turn_on", json!({ + "entity_id": config.entity_id.to_string(), + "brightness": new_brightness + }).to_string()).await + } + Ok(update) = state_updates.recv() => { + match update { + StateUpdate::Disconnected => { + if let Some(state) = &config.disconnected_state { + send_command(HandlerCommand::SetKnobStyle { + path: path.clone(), + value: config.style.get(state).cloned(), + }); + send_command(HandlerCommand::SetKnobValue { + path: path.clone(), + value: None + }); + } + } + StateUpdate::Actual(update) => { + if update.entity_id == config.entity_id { + current_server_brightness = update.attributes + .get("brightness") + .and_then(|b| b.as_number()) + .and_then(|b| b.as_u64()).unwrap_or(0) + .clamp(0, 255) as u8; + + if current_client_brightness == 0 { + current_client_brightness = current_server_brightness; + } + + let percentage = (((current_server_brightness as f32 / 255_f32) * 100.0).round() as u8).to_string().into_boxed_str(); + send_command(HandlerCommand::SetKnobStyle { + path: path.clone(), + value: config.style.get(&percentage).or_else(|| config.style.get("default")).cloned().map(|s| KnobStyle { + // RustRover: false positive + label: s.label.map(|l| l.replace("{value}", &percentage)), + ..s + }) + }); + } + } + } + } + Ok(HandlerEvent::Knob { path: p, event }) = events.recv() => { + if p != path { + continue + } + + match event { + KnobEvent::Press => { + match &config.mode { + KnobMode::Brightness { .. } => { + current_client_brightness = if current_client_brightness == 0 { + 255 + } else { + 0 + }; + + apply_server_debounce_sender.send(()).await.unwrap(); + new_brightness_requests_sender.send(current_client_brightness).await.unwrap(); + } + } + } + KnobEvent::Rotate { direction } => { + match &config.mode { + KnobMode::Brightness { delta } => { + let factor = match direction { + RotationDirection::Counterclockwise => -1_f32, + RotationDirection::Clockwise => 1_f32, + }; + + let delta = (delta.unwrap_or(1) as f32 / 100_f32) * 255_f32; + + current_client_brightness = (current_client_brightness as isize + (factor * delta).round() as isize).clamp(0, 255) as u8; + apply_server_debounce_sender.send(()).await.unwrap(); + new_brightness_requests_sender.send(current_client_brightness).await.unwrap(); + } + } + } + _ => {} + } + } + } + } +} diff --git a/handlers/home_assistant/src/util.rs b/handlers/home_assistant/src/util.rs index a94361d..c99fa64 100644 --- a/handlers/home_assistant/src/util.rs +++ b/handlers/home_assistant/src/util.rs @@ -1,8 +1,8 @@ -use std::time::{Duration, Instant}; +use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::time::timeout; +use tokio::time::{timeout, MissedTickBehavior}; /// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`. /// The delay is reset when a new message is reset. @@ -35,20 +35,35 @@ pub fn spawn_debouncer(duration: Duration) -> (Sender<()>, Receiver<()>) { (input_sender, output_receiver) } -pub fn format_duration(duration: Duration) -> String { - let full_seconds = duration.as_secs(); - let full_minutes = full_seconds / 60; - let hours = full_minutes / 60; - let minutes = full_minutes % 60; - let seconds = full_seconds % 60; +/// Sends messages from the input channel into the output channel, but only if the time since the last message is greater than duration. +/// The last message that was not sent yet will be sent after duration. +pub fn spawn_throttler(duration: Duration) -> (Sender, Receiver) { + let (input_sender, mut input_receiver) = mpsc::channel::(25); + let (output_sender, output_receiver) = mpsc::channel::(25); - if hours == 0 { - format!("{:0>2}:{:0>2}", minutes, seconds) - } else { - format!("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds) - } -} + tokio::spawn(async move { + let mut pending_value: Option = None; + let mut interval = tokio::time::interval(duration); + interval.set_missed_tick_behavior(MissedTickBehavior::Delay); -pub fn get_far_future() -> Instant { - Instant::now() + Duration::from_secs(60 * 60 * 24 * 365 * 30) // 30 years + 'outer: loop { + tokio::select! { + value = input_receiver.recv() => { + match value { + None => break 'outer, + Some(value) => { + pending_value = Some(value); + } + }; + } + _ = interval.tick() => { + if let Some(value) = pending_value.take() { + output_sender.send(value).await.unwrap(); + } + } + } + } + }); + + (input_sender, output_receiver) }