This commit is contained in:
Moritz Ruth 2024-03-06 16:18:52 +01:00
parent 3e129bd3d6
commit db0cc13267
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
7 changed files with 230 additions and 49 deletions

11
Cargo.lock generated
View file

@ -896,9 +896,11 @@ dependencies = [
"futures-util", "futures-util",
"log", "log",
"native-tls", "native-tls",
"parse-display 0.9.0",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"serde_with",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-tungstenite", "tokio-tungstenite",
@ -2101,9 +2103,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_with" name = "serde_with"
version = "3.4.0" version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64cd236ccc1b7a29e7e2739f27c0b2dd199804abc4290e32f59f3b68d6405c23" checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270"
dependencies = [ dependencies = [
"base64", "base64",
"chrono", "chrono",
@ -2111,6 +2113,7 @@ dependencies = [
"indexmap 1.9.3", "indexmap 1.9.3",
"indexmap 2.1.0", "indexmap 2.1.0",
"serde", "serde",
"serde_derive",
"serde_json", "serde_json",
"serde_with_macros", "serde_with_macros",
"time", "time",
@ -2118,9 +2121,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_with_macros" name = "serde_with_macros"
version = "3.4.0" version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" checksum = "865f9743393e638991566a8b7a479043c2c8da94a33e0a31f18214c9cae0a64d"
dependencies = [ dependencies = [
"darling", "darling",
"proc-macro2", "proc-macro2",

View file

@ -55,4 +55,16 @@ config.target.type = "application"
config.target.predicates = [{ property = "application-name", value = "spotify" }] config.target.predicates = [{ property = "application-name", value = "spotify" }]
config.style.muted.indicators.bar.color = "#fc464690" config.style.muted.indicators.bar.color = "#fc464690"
config.style.inactive.icon = "@apps/spotify[scale=1.2|grayscale|alpha=0.6]" config.style.inactive.icon = "@apps/spotify[scale=1.2|grayscale|alpha=0.6]"
[knobs.right-bottom]
icon = "@apps/spotify[scale=1.2]"
indicators.bar.color = "#ffffff50"
handler = "home_assistant"
config.mode = "brightness"
config.entity_id = "light.moritz_regal_lampe"
config.style.default.label = "{value}%"
config.style.100.icon = "@apps/discord[scale=1.2]"
config.style.100.label = "{value}%"
config.delta = 10

View file

@ -17,4 +17,6 @@ url = { version = "2.5.0", features = ["serde"] }
tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] } tokio-tungstenite = { version = "0.21.0", features = ["native-tls"] }
tokio-stream = "0.1.14" tokio-stream = "0.1.14"
futures-util = "0.3.30" futures-util = "0.3.30"
native-tls = "0.2.11" native-tls = "0.2.11"
parse-display = "0.9.0"
serde_with = "3.6.1"

View file

@ -1,4 +1,5 @@
use deckster_mode::shared::state::KeyStyleByStateMap; use crate::ha_client::EntityId;
use deckster_mode::shared::state::{KeyStyleByStateMap, KnobStyleByStateMap};
use serde::Deserialize; use serde::Deserialize;
use url::Url; use url::Url;
@ -15,18 +16,20 @@ pub struct KeyConfig {
pub disconnected_state: Option<Box<str>>, pub disconnected_state: Option<Box<str>>,
#[serde(flatten)] #[serde(flatten)]
pub mode: KeyMode, pub mode: KeyMode,
#[serde(default)]
pub style: KeyStyleByStateMap<Box<str>>, pub style: KeyStyleByStateMap<Box<str>>,
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")] #[serde(tag = "mode", rename_all = "kebab-case")]
pub enum KeyMode { pub enum KeyMode {
Toggle { entity_id: Box<str> }, Toggle { entity_id: EntityId },
Button { state_entity_id: Box<str>, button_entity_id: Box<str> }, Button { state_entity_id: EntityId, button_entity_id: EntityId },
} }
impl KeyMode { impl KeyMode {
pub fn state_entity_id(&self) -> &Box<str> { #[inline]
pub fn state_entity_id(&self) -> &EntityId {
match &self { match &self {
KeyMode::Toggle { entity_id, .. } => entity_id, KeyMode::Toggle { entity_id, .. } => entity_id,
KeyMode::Button { state_entity_id, .. } => state_entity_id, KeyMode::Button { state_entity_id, .. } => state_entity_id,
@ -36,16 +39,16 @@ impl KeyMode {
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct KnobConfig { pub struct KnobConfig {
pub(crate) entity_id: Box<str>, pub(crate) entity_id: EntityId,
pub disconnected_state: Option<Box<str>>, pub disconnected_state: Option<Box<str>>,
#[serde(flatten)] #[serde(flatten)]
pub mode: KnobMode, pub mode: KnobMode,
pub style: KeyStyleByStateMap<Box<str>>, #[serde(default)]
pub style: KnobStyleByStateMap<Box<str>>,
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")] #[serde(tag = "mode", rename_all = "kebab-case")]
pub enum KnobMode { pub enum KnobMode {
Select { states: Box<[Box<str>]>, wrap_around: bool }, Brightness { delta: Option<u8> },
Range,
} }

View file

@ -1,7 +1,10 @@
use futures_util::SinkExt; use futures_util::SinkExt;
use native_tls::TlsConnector; use native_tls::TlsConnector;
use parse_display::{Display, FromStr};
use reqwest::header::{HeaderMap, HeaderValue}; use reqwest::header::{HeaderMap, HeaderValue};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::cmp::min; use std::cmp::min;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -11,6 +14,13 @@ use tokio_stream::StreamExt;
use tokio_tungstenite::{tungstenite, Connector}; use tokio_tungstenite::{tungstenite, Connector};
use url::Url; use url::Url;
#[derive(Debug, Clone, FromStr, Display, SerializeDisplay, DeserializeFromStr, Eq, PartialEq, Hash)]
#[display("{domain}.{object_id}")]
pub struct EntityId {
pub domain: String,
pub object_id: String,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum StateUpdate { pub enum StateUpdate {
Disconnected, Disconnected,
@ -19,9 +29,10 @@ pub enum StateUpdate {
#[derive(Debug)] #[derive(Debug)]
pub struct ActualStateUpdate { pub struct ActualStateUpdate {
pub entity_id: Box<str>, pub entity_id: EntityId,
pub state: Box<str>, pub state: Box<str>,
pub timestamp: Box<str>, pub timestamp: Box<str>,
pub attributes: serde_json::Value,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -32,7 +43,7 @@ pub struct HaClient {
} }
impl HaClient { impl HaClient {
pub async fn new(base_url: Url, token: Box<str>, accept_invalid_certs: bool, subscribed_entity_ids: Vec<Box<str>>) -> Self { pub async fn new(base_url: Url, token: Box<str>, accept_invalid_certs: bool, subscribed_entity_ids: Vec<EntityId>) -> Self {
let http_client = reqwest::ClientBuilder::new() let http_client = reqwest::ClientBuilder::new()
.connect_timeout(Duration::from_secs(10)) .connect_timeout(Duration::from_secs(10))
.default_headers({ .default_headers({
@ -82,13 +93,11 @@ impl HaClient {
self.state_updates_sender.subscribe() self.state_updates_sender.subscribe()
} }
pub async fn toggle_entity(&self, entity_id: &str) { pub async fn call_service(&self, domain: &str, name: &str, data: impl Into<reqwest::Body>) {
let (domain, _) = entity_id.split_once('.').expect("entity IDs must contain exactly one dot");
let result = self let result = self
.http_client .http_client
.post(self.base_url.join(&format!("/api/services/{domain}/toggle")).unwrap()) .post(self.base_url.join(&format!("/api/services/{domain}/{name}")).unwrap())
.body(format!("{{\"entity_id\":\"{entity_id}\"}}")) .body(data)
.send() .send()
.await .await
.and_then(|a| a.error_for_status()); .and_then(|a| a.error_for_status());
@ -100,6 +109,16 @@ impl HaClient {
) )
} }
} }
pub async fn toggle_entity(&self, entity_id: &EntityId) {
self.call_service(&entity_id.domain, "toggle", json!({ "entity_id": entity_id.to_string() }).to_string())
.await
}
pub async fn press_button_entity(&self, entity_id: &EntityId) {
self.call_service(&entity_id.domain, "press", json!({ "entity_id": entity_id.to_string() }).to_string())
.await
}
} }
async fn do_work( async fn do_work(
@ -108,7 +127,7 @@ async fn do_work(
tls_connector: TlsConnector, tls_connector: TlsConnector,
state_updates_sender: broadcast::Sender<StateUpdate>, state_updates_sender: broadcast::Sender<StateUpdate>,
http_client: reqwest::Client, http_client: reqwest::Client,
state_timestamp_by_entity_id: HashMap<Box<str>, Box<str>>, state_timestamp_by_entity_id: HashMap<EntityId, Box<str>>,
) { ) {
let states_url = base_url.join("/api/states/").unwrap(); let states_url = base_url.join("/api/states/").unwrap();
let websocket_url = { let websocket_url = {
@ -166,8 +185,6 @@ async fn do_work(
id: 1, id: 1,
trigger: HaTrigger::State { trigger: HaTrigger::State {
entity_id: state_timestamp_by_entity_id.read().await.keys().cloned().collect(), entity_id: state_timestamp_by_entity_id.read().await.keys().cloned().collect(),
// Setting from to null prevents events being sent when only attributes have changed.
from: serde_json::Value::Null,
}, },
}) })
.unwrap(); .unwrap();
@ -181,7 +198,7 @@ async fn do_work(
for entity_id in state_timestamp_by_entity_id.read().await.keys() { for entity_id in state_timestamp_by_entity_id.read().await.keys() {
tokio::spawn(request_entity_state( tokio::spawn(request_entity_state(
states_url.join(entity_id).unwrap(), states_url.join(&entity_id.to_string()).unwrap(),
http_client.clone(), http_client.clone(),
Arc::clone(&state_timestamp_by_entity_id), Arc::clone(&state_timestamp_by_entity_id),
state_updates_sender.clone(), state_updates_sender.clone(),
@ -221,7 +238,7 @@ async fn do_work(
async fn request_entity_state( async fn request_entity_state(
url: Url, url: Url,
http_client: reqwest::Client, http_client: reqwest::Client,
state_timestamp_by_entity_id: Arc<RwLock<HashMap<Box<str>, Box<str>>>>, state_timestamp_by_entity_id: Arc<RwLock<HashMap<EntityId, Box<str>>>>,
state_updates_sender: broadcast::Sender<StateUpdate>, state_updates_sender: broadcast::Sender<StateUpdate>,
) { ) {
match http_client.get(url).send().await.and_then(|a| a.error_for_status()) { match http_client.get(url).send().await.and_then(|a| a.error_for_status()) {
@ -260,8 +277,9 @@ fn extract_state_update_from_event(object: &serde_json::Value) -> Option<ActualS
fn extract_state_update_from_state(object: &serde_json::Value) -> Option<ActualStateUpdate> { fn extract_state_update_from_state(object: &serde_json::Value) -> Option<ActualStateUpdate> {
Some(ActualStateUpdate { Some(ActualStateUpdate {
state: object.get("state")?.as_str()?.to_owned().into_boxed_str(), state: object.get("state")?.as_str()?.to_owned().into_boxed_str(),
entity_id: object.get("entity_id")?.as_str()?.to_owned().into_boxed_str(), entity_id: object.get("entity_id")?.as_str()?.parse().ok()?,
timestamp: object.get("last_changed")?.as_str()?.to_owned().into_boxed_str(), timestamp: object.get("last_updated")?.as_str()?.to_owned().into_boxed_str(),
attributes: object.get("attributes")?.to_owned(),
}) })
} }
@ -285,5 +303,5 @@ pub enum HaOutgoingWsMessage {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(tag = "platform", rename_all = "snake_case")] #[serde(tag = "platform", rename_all = "snake_case")]
pub enum HaTrigger { pub enum HaTrigger {
State { entity_id: Box<[Box<str>]>, from: serde_json::Value }, State { entity_id: Box<[EntityId]> },
} }

View file

@ -1,9 +1,15 @@
use crate::config::{GlobalConfig, KeyConfig, KeyMode, KnobConfig, KnobMode}; use crate::config::{GlobalConfig, KeyConfig, KeyMode, KnobConfig, KnobMode};
use crate::ha_client::{HaClient, StateUpdate}; use crate::ha_client::{HaClient, StateUpdate};
use deckster_mode::shared::handler_communication::{HandlerCommand, HandlerEvent, HandlerInitializationError, InitialHandlerMessage, KeyEvent}; use crate::util::{spawn_debouncer, spawn_throttler};
use deckster_mode::shared::path::KeyPath; use deckster_mode::shared::handler_communication::{
HandlerCommand, HandlerEvent, HandlerInitializationError, InitialHandlerMessage, KeyEvent, KnobEvent, RotationDirection,
};
use deckster_mode::shared::path::{KeyPath, KnobPath};
use deckster_mode::shared::style::KnobStyle;
use deckster_mode::{send_command, DecksterHandler}; use deckster_mode::{send_command, DecksterHandler};
use serde_json::json;
use std::thread; use std::thread;
use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::task::LocalSet; use tokio::task::LocalSet;
@ -46,6 +52,10 @@ impl Handler {
task_set.spawn_local(manage_key(events_sender.subscribe(), ha_client.clone(), path, config)); task_set.spawn_local(manage_key(events_sender.subscribe(), ha_client.clone(), path, config));
} }
for (path, config) in data.knob_configs {
task_set.spawn_local(manage_knob(events_sender.subscribe(), ha_client.clone(), path, config));
}
runtime.block_on(task_set) runtime.block_on(task_set)
} }
}); });
@ -105,8 +115,8 @@ async fn manage_key(mut events: broadcast::Receiver<HandlerEvent>, ha_client: Ha
KeyMode::Toggle { entity_id } => { KeyMode::Toggle { entity_id } => {
ha_client.toggle_entity(entity_id).await; ha_client.toggle_entity(entity_id).await;
} }
KeyMode::Button { .. } => { KeyMode::Button { button_entity_id, .. } => {
todo!() ha_client.press_button_entity(button_entity_id).await;
} }
} }
} }
@ -114,3 +124,121 @@ async fn manage_key(mut events: broadcast::Receiver<HandlerEvent>, ha_client: Ha
} }
} }
} }
async fn manage_knob(mut events: broadcast::Receiver<HandlerEvent>, ha_client: HaClient, path: KnobPath, config: KnobConfig) {
if let Some(state) = &config.disconnected_state {
send_command(HandlerCommand::SetKnobStyle {
path: path.clone(),
value: config.style.get(state).cloned(),
});
}
let mut state_updates = ha_client.subscribe_to_state_updates();
let mut current_client_brightness = 0u8;
let mut current_server_brightness = 0u8;
let (new_brightness_requests_sender, mut new_brightness_requests_receiver) = spawn_throttler::<u8>(Duration::from_millis(150));
let (apply_server_debounce_sender, mut apply_server_debounce_receiver) = spawn_debouncer(Duration::from_secs(3));
loop {
select! {
Some(_) = apply_server_debounce_receiver.recv() => {
current_client_brightness = current_server_brightness;
let percentage = (((current_server_brightness as f32 / 255_f32) * 100.0).round() as u8).to_string().into_boxed_str();
send_command(HandlerCommand::SetKnobStyle {
path: path.clone(),
value: config.style.get(&percentage).or_else(|| config.style.get("default")).cloned().map(|s| KnobStyle {
// RustRover: false positive
label: s.label.map(|l| l.replace("{value}", &percentage)),
..s
})
});
}
Some(new_brightness) = new_brightness_requests_receiver.recv() => {
ha_client.call_service("light", "turn_on", json!({
"entity_id": config.entity_id.to_string(),
"brightness": new_brightness
}).to_string()).await
}
Ok(update) = state_updates.recv() => {
match update {
StateUpdate::Disconnected => {
if let Some(state) = &config.disconnected_state {
send_command(HandlerCommand::SetKnobStyle {
path: path.clone(),
value: config.style.get(state).cloned(),
});
send_command(HandlerCommand::SetKnobValue {
path: path.clone(),
value: None
});
}
}
StateUpdate::Actual(update) => {
if update.entity_id == config.entity_id {
current_server_brightness = update.attributes
.get("brightness")
.and_then(|b| b.as_number())
.and_then(|b| b.as_u64()).unwrap_or(0)
.clamp(0, 255) as u8;
if current_client_brightness == 0 {
current_client_brightness = current_server_brightness;
}
let percentage = (((current_server_brightness as f32 / 255_f32) * 100.0).round() as u8).to_string().into_boxed_str();
send_command(HandlerCommand::SetKnobStyle {
path: path.clone(),
value: config.style.get(&percentage).or_else(|| config.style.get("default")).cloned().map(|s| KnobStyle {
// RustRover: false positive
label: s.label.map(|l| l.replace("{value}", &percentage)),
..s
})
});
}
}
}
}
Ok(HandlerEvent::Knob { path: p, event }) = events.recv() => {
if p != path {
continue
}
match event {
KnobEvent::Press => {
match &config.mode {
KnobMode::Brightness { .. } => {
current_client_brightness = if current_client_brightness == 0 {
255
} else {
0
};
apply_server_debounce_sender.send(()).await.unwrap();
new_brightness_requests_sender.send(current_client_brightness).await.unwrap();
}
}
}
KnobEvent::Rotate { direction } => {
match &config.mode {
KnobMode::Brightness { delta } => {
let factor = match direction {
RotationDirection::Counterclockwise => -1_f32,
RotationDirection::Clockwise => 1_f32,
};
let delta = (delta.unwrap_or(1) as f32 / 100_f32) * 255_f32;
current_client_brightness = (current_client_brightness as isize + (factor * delta).round() as isize).clamp(0, 255) as u8;
apply_server_debounce_sender.send(()).await.unwrap();
new_brightness_requests_sender.send(current_client_brightness).await.unwrap();
}
}
}
_ => {}
}
}
}
}
}

View file

@ -1,8 +1,8 @@
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::timeout; use tokio::time::{timeout, MissedTickBehavior};
/// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`. /// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`.
/// The delay is reset when a new message is reset. /// The delay is reset when a new message is reset.
@ -35,20 +35,35 @@ pub fn spawn_debouncer(duration: Duration) -> (Sender<()>, Receiver<()>) {
(input_sender, output_receiver) (input_sender, output_receiver)
} }
pub fn format_duration(duration: Duration) -> String { /// Sends messages from the input channel into the output channel, but only if the time since the last message is greater than duration.
let full_seconds = duration.as_secs(); /// The last message that was not sent yet will be sent after duration.
let full_minutes = full_seconds / 60; pub fn spawn_throttler<T: Send + 'static>(duration: Duration) -> (Sender<T>, Receiver<T>) {
let hours = full_minutes / 60; let (input_sender, mut input_receiver) = mpsc::channel::<T>(25);
let minutes = full_minutes % 60; let (output_sender, output_receiver) = mpsc::channel::<T>(25);
let seconds = full_seconds % 60;
if hours == 0 { tokio::spawn(async move {
format!("{:0>2}:{:0>2}", minutes, seconds) let mut pending_value: Option<T> = None;
} else { let mut interval = tokio::time::interval(duration);
format!("{:0>2}:{:0>2}:{:0>2}", hours, minutes, seconds) interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
}
}
pub fn get_far_future() -> Instant { 'outer: loop {
Instant::now() + Duration::from_secs(60 * 60 * 24 * 365 * 30) // 30 years tokio::select! {
value = input_receiver.recv() => {
match value {
None => break 'outer,
Some(value) => {
pending_value = Some(value);
}
};
}
_ = interval.tick() => {
if let Some(value) = pending_value.take() {
output_sender.send(value).await.unwrap();
}
}
}
}
});
(input_sender, output_receiver)
} }