diff --git a/Cargo.lock b/Cargo.lock index 489aa1f..382eabd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -89,6 +89,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "arrayref" version = "0.3.7" @@ -879,8 +885,6 @@ dependencies = [ "log", "parse-display 0.9.0", "reqwest", - "rustls 0.22.2", - "rustls-native-certs 0.7.0", "serde", "serde_json", "serde_with", @@ -1216,9 +1220,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "loupedeck_serial" @@ -1401,9 +1405,11 @@ dependencies = [ name = "pa_volume_interface" version = "0.1.0" dependencies = [ + "arc-swap", "flume", "im", "libpulse-binding", + "log", "tokio", ] diff --git a/README.md b/README.md index 9d9bff9..7d4df8f 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ - Move handlers to their own repositories - Update dependencies - Make the CLI of handlers more useful +- Make the `playerctl` handler independent of… playerctl. Use the [`mpris` crate](https://lib.rs/crates/mpris) directly instead. ## Contributing ### Terminology @@ -14,7 +15,7 @@ ### The different types of `unwrap` - `expect("")`: The author thinks that unwrapping will never fail because of ``. -- `unwrap()`: The author assumes that unwrapping will never fail but explaining why is either obvious or too complicated. +- `unwrap()`: The author assumes that unwrapping will never fail ~~but explaining why is either obvious or too complicated~~ and it’s obvious why. - `unwrap_todo()`: The author has not yet thought about how to handle this value being `None` or `Err`. They will replace this unwrapping with `expect("")`, `unwrap()`, or proper error handling later. diff --git a/crates/pa_volume_interface/Cargo.toml b/crates/pa_volume_interface/Cargo.toml index f90db8c..6d01f2d 100644 --- a/crates/pa_volume_interface/Cargo.toml +++ b/crates/pa_volume_interface/Cargo.toml @@ -7,4 +7,6 @@ edition = "2021" flume = "0.11.0" im = "15.1.0" tokio = { version = "1.35.1", default-features = false, features = ["sync"] } -libpulse-binding = "2.28.1" \ No newline at end of file +libpulse-binding = "2.28.1" +log = "0.4.21" +arc-swap = "1.7.1" \ No newline at end of file diff --git a/crates/pa_volume_interface/src/lib.rs b/crates/pa_volume_interface/src/lib.rs index 24e27e8..2cf95c7 100644 --- a/crates/pa_volume_interface/src/lib.rs +++ b/crates/pa_volume_interface/src/lib.rs @@ -1,5 +1,4 @@ -use std::cell::RefCell; -use std::rc::Rc; +use arc_swap::ArcSwap; use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; @@ -8,11 +7,12 @@ use im::HashMap; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo}; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; -use libpulse_binding::context::{subscribe, Context, FlagSet}; +use libpulse_binding::context::{subscribe, Context, FlagSet, State}; use libpulse_binding::def::Retval; -use libpulse_binding::mainloop::standard::{IterateResult, Mainloop}; -use libpulse_binding::operation::Operation; +use libpulse_binding::mainloop::api::Mainloop as _; +use libpulse_binding::mainloop::threaded::Mainloop; use libpulse_binding::volume::{ChannelVolumes, Volume}; +use log::debug; use tokio::sync::broadcast; use tokio::sync::broadcast::Receiver; @@ -129,8 +129,6 @@ impl From<&SinkInputInfo<'_>> for PaEntityState { pub struct PaVolumeState { timestamp: Instant, entities_by_id: HashMap>, - default_sink_id: PaEntityId, - default_source_id: PaEntityId, } impl PaVolumeState { @@ -141,14 +139,6 @@ impl PaVolumeState { pub fn entities_by_id(&self) -> &HashMap> { &self.entities_by_id } - - pub fn default_sink_id(&self) -> &PaEntityId { - &self.default_sink_id - } - - pub fn default_source_id(&self) -> &PaEntityId { - &self.default_source_id - } } impl Default for PaVolumeState { @@ -156,44 +146,56 @@ impl Default for PaVolumeState { PaVolumeState { timestamp: Instant::now(), entities_by_id: HashMap::new(), - default_sink_id: 0, - default_source_id: 0, } } } #[derive(Debug)] -enum PaCommand { +enum PaThreadMessage { + // by the user SetIsMuted { id: PaEntityId, value: bool }, SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> }, Terminate, + + // internal + LoadSinkInput { entity_id: PaEntityId }, + LoadSink { entity_id: PaEntityId }, + LoadSource { entity_id: PaEntityId }, + UpsertEntity { entity_state: Arc }, + RemoveEntity { entity_id: PaEntityId }, } struct PaThread { - mainloop: Rc>, - context: Rc>, - introspector: Rc>, - commands_rx: flume::Receiver, + mainloop: Mainloop, + context: Context, + introspector: Introspector, + commands_tx: flume::Sender, + commands_rx: flume::Receiver, state_tx: broadcast::Sender>, - current_state: Arc>>>, + current_state: Arc>, } impl PaThread { fn spawn( client_name: String, - commands_rx: flume::Receiver, + commands_tx: flume::Sender, + commands_rx: flume::Receiver, state_tx: broadcast::Sender>, - current_state: Arc>>>, + current_state: Arc>, ) { thread::spawn(move || { - let mainloop = Mainloop::new().unwrap(); + let mut mainloop = Mainloop::new().unwrap(); let context = Context::new(&mainloop, &client_name).unwrap(); let introspector = context.introspect(); + debug!("Starting the mainloop thread…"); + mainloop.start().expect("starting the mainloop never fails"); + let mut t = PaThread { - mainloop: Rc::new(RefCell::new(mainloop)), - context: Rc::new(RefCell::new(context)), - introspector: Rc::new(RefCell::new(introspector)), + mainloop, + context, + introspector, + commands_tx, commands_rx, state_tx, current_state, @@ -205,273 +207,208 @@ impl PaThread { } fn init(&mut self) { - self.context.borrow_mut().connect(None, FlagSet::NOFLAGS, None).unwrap(); - self.wait_for(|s| s.context.borrow().get_state() == libpulse_binding::context::State::Ready); - - let introspector = Rc::clone(&self.introspector); + debug!("Initializing…"); + self.mainloop.lock(); + self.context.connect(None, FlagSet::NOFLAGS, None).unwrap(); { - let entities_by_id = Rc::new(RefCell::new(HashMap::>::new())); - let entities_by_id_c = Rc::clone(&entities_by_id); + let (context_state_change_tx, context_state_change_rx) = flume::bounded(1); + self.context.set_state_callback(Some(Box::new(move || { + context_state_change_tx.send(()).unwrap(); + }))); - self.wait_for_operation(introspector.borrow().get_sink_info_list(move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_info_list failed"), - ListResult::End => {} - ListResult::Item(sink) => { - entities_by_id_c.borrow_mut().insert(sink.index as PaEntityId, Arc::new(sink.into())); + self.mainloop.unlock(); + loop { + context_state_change_rx.recv().unwrap(); + + self.mainloop.lock(); + if self.context.get_state() == State::Ready { + break; } - })) - .unwrap(); + self.mainloop.unlock(); + } + } - let entities_by_id_c = Rc::clone(&entities_by_id); - self.wait_for_operation(introspector.borrow().get_source_info_list(move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_source_info_list failed"), - ListResult::End => {} - ListResult::Item(source) => { - entities_by_id_c.borrow_mut().insert(source.index as PaEntityId, Arc::new(source.into())); - } - })) - .unwrap(); + // Mainloop is still locked - let entities_by_id_c = Rc::clone(&entities_by_id); - self.wait_for_operation(introspector.borrow().get_sink_input_info_list(move |list_result| match list_result { + { + let commands_tx = self.commands_tx.clone(); + self.introspector.get_sink_input_info_list(move |list_result| match list_result { ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"), ListResult::End => {} - ListResult::Item(sink_input) => { - let id = sink_input.index as PaEntityId; - entities_by_id_c.borrow_mut().insert(id, Arc::new(sink_input.into())); - } - })) - .unwrap(); - - PaThread::set_state( - &self.current_state, - &self.state_tx, - Arc::new(PaVolumeState { - timestamp: Instant::now(), - entities_by_id: Rc::into_inner(entities_by_id).unwrap().into_inner(), - default_sink_id: 0, - default_source_id: 0, - }), - ); - }; - - self.context - .borrow_mut() - .subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| { - if !success { - panic!("Context.subscribe failed") - } + ListResult::Item(sink_input) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(sink_input.into()), + }) + .unwrap(), }); - } + } - fn run(mut self) { - let introspector = Rc::clone(&self.introspector); - let current_state = Arc::clone(&self.current_state); - let state_tx = self.state_tx.clone(); + { + let commands_tx = self.commands_tx.clone(); + self.introspector.get_sink_info_list(move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_info_list failed"), + ListResult::End => {} + ListResult::Item(sink) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(sink.into()), + }) + .unwrap(), + }); + } - self.context - .borrow_mut() - .set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| { + { + let commands_tx = self.commands_tx.clone(); + self.introspector.get_source_info_list(move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_source_info_list failed"), + ListResult::End => {} + ListResult::Item(source) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(source.into()), + }) + .unwrap(), + }); + } + + { + let commands_tx = self.commands_tx.clone(); + self.context.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| { let entity_id = entity_id as PaEntityId; let facility = facility.unwrap(); - let timestamp = Instant::now(); - let current_state = Arc::clone(¤t_state); - match operation.unwrap() { subscribe::Operation::Removed => { - let state = PaThread::unwrap_state(¤t_state); - - let entities_by_id = state.entities_by_id.without(&entity_id); - let default_source_id = if entity_id == state.default_source_id { 0 } else { state.default_source_id }; - let default_sink_id = if entity_id == state.default_sink_id { 0 } else { state.default_sink_id }; - - PaThread::set_state( - ¤t_state, - &state_tx, - Arc::new(PaVolumeState { - timestamp, - entities_by_id, - default_source_id, - default_sink_id, - }), - ); + commands_tx.send(PaThreadMessage::RemoveEntity { entity_id }).unwrap(); } subscribe::Operation::New | subscribe::Operation::Changed => { - let state_tx = state_tx.clone(); - match facility { - Facility::SinkInput => { - introspector.borrow().get_sink_input_info(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_input_info failed"), - ListResult::End => {} - ListResult::Item(sink_input) => { - let state = PaThread::unwrap_state(¤t_state); - let id = sink_input.index as PaEntityId; - - let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); - PaThread::set_state( - ¤t_state, - &state_tx, - Arc::new(PaVolumeState { - timestamp, - entities_by_id, - ..*state - }), - ); - } - }); - } - Facility::Sink => { - introspector.borrow().get_sink_info_by_index(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"), - ListResult::End => {} - ListResult::Item(sink_input) => { - let state = PaThread::unwrap_state(¤t_state); - let id = sink_input.index as PaEntityId; - - let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); - PaThread::set_state( - ¤t_state, - &state_tx, - Arc::new(PaVolumeState { - timestamp, - entities_by_id, - ..*state - }), - ); - } - }); - } - Facility::Source => { - introspector.borrow().get_source_info_by_index(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_source_info_by_index failed"), - ListResult::End => {} - ListResult::Item(sink_input) => { - let state = PaThread::unwrap_state(¤t_state); - let id = sink_input.index as PaEntityId; - - let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); - PaThread::set_state( - ¤t_state, - &state_tx, - Arc::new(PaVolumeState { - timestamp, - entities_by_id, - ..*state - }), - ); - } - }); - } + Facility::SinkInput => commands_tx.send(PaThreadMessage::LoadSinkInput { entity_id }).unwrap(), + Facility::Sink => commands_tx.send(PaThreadMessage::LoadSink { entity_id }).unwrap(), + Facility::Source => commands_tx.send(PaThreadMessage::LoadSource { entity_id }).unwrap(), _ => {} }; } }; }))); + } - let current_state = Arc::clone(&self.current_state); - let mainloop = Rc::clone(&self.mainloop); + self.context + .subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| { + if !success { + panic!("Context.subscribe failed") + } + }); + + self.mainloop.unlock(); + } + + fn run(mut self) { + debug!("Waiting for commands…"); 'outer: loop { - self.run_single_mainloop_iteration(false); + while let Ok(command) = self.commands_rx.recv() { + self.mainloop.lock(); + let commands_tx = self.commands_tx.clone(); - while let Ok(command) = self.commands_rx.try_recv() { match command { - PaCommand::SetIsMuted { id, value } => { - if let Some(state) = PaThread::unwrap_state(¤t_state).entities_by_id.get(&id) { + PaThreadMessage::Terminate => { + break 'outer; + } + PaThreadMessage::SetIsMuted { id, value } => { + if let Some(state) = self.current_state.load().entities_by_id.get(&id) { match state.kind() { - PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_mute_by_index(id, value, None), - PaEntityKind::Source => self.introspector.borrow_mut().set_source_mute_by_index(id, value, None), - PaEntityKind::SinkInput => self.introspector.borrow_mut().set_sink_input_mute(id, value, None), + PaEntityKind::Sink => self.introspector.set_sink_mute_by_index(id, value, None), + PaEntityKind::Source => self.introspector.set_source_mute_by_index(id, value, None), + PaEntityKind::SinkInput => self.introspector.set_sink_input_mute(id, value, None), }; } } - PaCommand::SetChannelVolumes { id, channel_volumes } => { - if let Some(state) = PaThread::unwrap_state(¤t_state).entities_by_id.get(&id) { + PaThreadMessage::SetChannelVolumes { id, channel_volumes } => { + if let Some(state) = self.current_state.load().entities_by_id.get(&id) { let mut value = state.channel_volumes; for (i, v) in channel_volumes.iter().enumerate() { value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32)); } match state.kind() { - PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_volume_by_index(id, &value, None), - PaEntityKind::Source => self.introspector.borrow_mut().set_source_volume_by_index(id, &value, None), - PaEntityKind::SinkInput => self.introspector.borrow_mut().set_sink_input_volume(id, &value, None), + PaEntityKind::Sink => self.introspector.set_sink_volume_by_index(id, &value, None), + PaEntityKind::Source => self.introspector.set_source_volume_by_index(id, &value, None), + PaEntityKind::SinkInput => self.introspector.set_sink_input_volume(id, &value, None), }; } } - PaCommand::Terminate => { - break 'outer; + + PaThreadMessage::LoadSinkInput { entity_id } => { + self.introspector.get_sink_input_info(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_input_info failed"), + ListResult::End => {} + ListResult::Item(sink_input) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(sink_input.into()), + }) + .unwrap(), + }); + } + PaThreadMessage::LoadSink { entity_id } => { + self.introspector.get_sink_info_by_index(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"), + ListResult::End => {} + ListResult::Item(sink) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(sink.into()), + }) + .unwrap(), + }); + } + PaThreadMessage::LoadSource { entity_id } => { + self.introspector.get_source_info_by_index(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_source_info_by_index failed"), + ListResult::End => {} + ListResult::Item(source) => commands_tx + .send(PaThreadMessage::UpsertEntity { + entity_state: Arc::new(source.into()), + }) + .unwrap(), + }); + } + PaThreadMessage::UpsertEntity { entity_state } => { + self.set_state(Arc::new(PaVolumeState { + timestamp: Instant::now(), + entities_by_id: self.current_state.load().entities_by_id.update(entity_state.id, entity_state), + })); + } + PaThreadMessage::RemoveEntity { entity_id } => { + self.set_state(Arc::new(PaVolumeState { + timestamp: Instant::now(), + entities_by_id: self.current_state.load().entities_by_id.without(&entity_id), + })); } } + + self.mainloop.unlock(); } } - mainloop.borrow_mut().quit(Retval(0)); + self.mainloop.quit(Retval(0)); + self.mainloop.unlock(); } - fn unwrap_state(state: &Arc>>>) -> Arc { - state - .read() - .unwrap() - .as_ref() - .cloned() - .expect("this function is only called after the initial state was set") - } - - fn set_state(current_state: &RwLock>>, state_tx: &broadcast::Sender>, value: Arc) { - let mut s = current_state.write().unwrap(); - *s = Some(Arc::clone(&value)); + fn set_state(&self, value: Arc) { + self.current_state.store(Arc::clone(&value)); // If there are no subscribers, that’s ok. - _ = state_tx.send(value); - } - - fn run_single_mainloop_iteration(&mut self, block: bool) { - match self.mainloop.borrow_mut().iterate(block) { - IterateResult::Quit(_) => { - panic!("Mainloop quit.") - } - IterateResult::Err(e) => { - panic!("Mainloop error: {}", e) - } - IterateResult::Success(_) => {} - } - } - - fn wait_for(&mut self, predicate: impl Fn(&mut Self) -> bool) { - loop { - self.run_single_mainloop_iteration(true); - - if predicate(self) { - break; - } - } - } - - fn wait_for_operation(&mut self, operation: Operation) -> Result<(), ()> { - loop { - self.run_single_mainloop_iteration(true); - - match operation.get_state() { - libpulse_binding::operation::State::Running => {} - libpulse_binding::operation::State::Done => return Ok(()), - libpulse_binding::operation::State::Cancelled => return Err(()), - } - } + _ = self.state_tx.send(value); } } #[derive(Debug)] struct PaWorker { - commands_tx: flume::Sender, + commands_tx: flume::Sender, } impl Drop for PaWorker { fn drop(&mut self) { - self.commands_tx.send(PaCommand::Terminate).ok(); + self.commands_tx.send(PaThreadMessage::Terminate).ok(); } } @@ -479,18 +416,21 @@ impl Drop for PaWorker { pub struct PaVolumeInterface { #[allow(unused)] worker: Arc, - current_state: Arc>>>, + current_state: Arc>, state_tx: broadcast::Sender>, - commands_tx: flume::Sender, + commands_tx: flume::Sender, } impl PaVolumeInterface { pub fn spawn_thread(client_name: String) -> PaVolumeInterface { let (commands_tx, commands_rx) = flume::unbounded(); let state_tx = broadcast::Sender::new(5); - let current_state = Arc::new(RwLock::new(None)); + let current_state = Arc::new(ArcSwap::new(Arc::new(PaVolumeState { + timestamp: Instant::now(), + entities_by_id: HashMap::new(), + }))); - PaThread::spawn(client_name, commands_rx, state_tx.clone(), Arc::clone(¤t_state)); + PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(¤t_state)); let worker = PaWorker { commands_tx: commands_tx.clone(), @@ -504,23 +444,23 @@ impl PaVolumeInterface { } } - pub fn subscribe_to_state(&self) -> (Option>, Receiver>) { + pub fn subscribe_to_state(&self) -> (Arc, Receiver>) { let rx = self.state_tx.subscribe(); let state = self.current_state(); (state, rx) } - pub fn current_state(&self) -> Option> { - self.current_state.read().unwrap().clone() + pub fn current_state(&self) -> Arc { + Arc::clone(&self.current_state.load()) } pub fn set_is_muted(&self, id: PaEntityId, value: bool) { - self.commands_tx.send(PaCommand::SetIsMuted { id, value }).unwrap() + self.commands_tx.send(PaThreadMessage::SetIsMuted { id, value }).unwrap() } pub fn set_channel_volumes(&self, id: PaEntityId, channel_volumes: impl Into>) { self.commands_tx - .send(PaCommand::SetChannelVolumes { + .send(PaThreadMessage::SetChannelVolumes { id, channel_volumes: channel_volumes.into(), }) diff --git a/handlers/home_assistant/Cargo.toml b/handlers/home_assistant/Cargo.toml index e83a205..3046c63 100644 --- a/handlers/home_assistant/Cargo.toml +++ b/handlers/home_assistant/Cargo.toml @@ -18,8 +18,4 @@ tokio-tungstenite = { version = "0.21.0", features = ["rustls-tls-native-roots"] tokio-stream = "0.1.14" futures-util = "0.3.30" parse-display = "0.9.0" -serde_with = "3.6.1" - -# same as tokio-tungstenite -rustls = "0.22.0" -rustls-native-certs = "0.7.0" \ No newline at end of file +serde_with = "3.6.1" \ No newline at end of file diff --git a/handlers/home_assistant/src/config.rs b/handlers/home_assistant/src/config.rs index bdc0060..02c5c4c 100644 --- a/handlers/home_assistant/src/config.rs +++ b/handlers/home_assistant/src/config.rs @@ -7,8 +7,6 @@ use url::Url; pub struct GlobalConfig { pub base_url: Url, pub token: Box, - #[serde(default)] - pub accept_invalid_certs: bool, } #[derive(Debug, Clone, Deserialize)] diff --git a/handlers/home_assistant/src/ha_client.rs b/handlers/home_assistant/src/ha_client.rs index 28406e7..bbf2ba4 100644 --- a/handlers/home_assistant/src/ha_client.rs +++ b/handlers/home_assistant/src/ha_client.rs @@ -1,5 +1,5 @@ use futures_util::SinkExt; -use parse_display::{Display, FromStr, IntoResult}; +use parse_display::{Display, FromStr}; use reqwest::header::{HeaderMap, HeaderValue}; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -10,7 +10,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::{broadcast, RwLock}; use tokio_stream::StreamExt; -use tokio_tungstenite::{tungstenite, Connector}; +use tokio_tungstenite::tungstenite; use url::Url; #[derive(Debug, Clone, FromStr, Display, SerializeDisplay, DeserializeFromStr, Eq, PartialEq, Hash)] @@ -42,7 +42,7 @@ pub struct HaClient { } impl HaClient { - pub async fn new(base_url: Url, token: Box, accept_invalid_certs: bool, subscribed_entity_ids: Vec) -> Self { + pub async fn new(base_url: Url, token: Box, subscribed_entity_ids: Vec) -> Self { let http_client = reqwest::ClientBuilder::new() .connect_timeout(Duration::from_secs(10)) .default_headers({ @@ -53,7 +53,6 @@ impl HaClient { ); map }) - .danger_accept_invalid_certs(accept_invalid_certs) .user_agent(format!("home_assistant deckster handler (v{})", env!("CARGO_PKG_VERSION"))) .build() .unwrap(); // The HTTP client being available is essential. @@ -61,12 +60,9 @@ impl HaClient { let state_updates_sender = broadcast::Sender::::new(min(subscribed_entity_ids.len(), 16)); let state_timestamp_by_entity_id = subscribed_entity_ids.iter().map(|i| (i.clone(), "".to_owned().into_boxed_str())).collect(); - let rustls_config = rustls::ClientConfig::builder().with_root_certificates(); - tokio::spawn(do_work( base_url.clone(), token, - Arc::new(rustls_config), state_updates_sender.clone(), http_client.clone(), state_timestamp_by_entity_id, @@ -123,7 +119,6 @@ impl HaClient { async fn do_work( base_url: Url, token: Box, - rustls_config: Arc, state_updates_sender: broadcast::Sender, http_client: reqwest::Client, state_timestamp_by_entity_id: HashMap>, @@ -140,8 +135,7 @@ async fn do_work( let state_timestamp_by_entity_id = Arc::new(RwLock::new(state_timestamp_by_entity_id)); loop { - let connection_result = - tokio_tungstenite::connect_async_tls_with_config(&websocket_url, None, false, Some(Connector::Rustls(Arc::clone(&rustls_config)))).await; + let connection_result = tokio_tungstenite::connect_async(&websocket_url).await; match connection_result { Err(tungstenite::Error::Io(error)) => { diff --git a/handlers/home_assistant/src/handler.rs b/handlers/home_assistant/src/handler.rs index ccc65fe..4d89a35 100644 --- a/handlers/home_assistant/src/handler.rs +++ b/handlers/home_assistant/src/handler.rs @@ -40,12 +40,7 @@ impl Handler { let ha_client = task_set.block_on( &runtime, - HaClient::new( - data.global_config.base_url, - data.global_config.token, - data.global_config.accept_invalid_certs, - subscribed_entity_ids, - ), + HaClient::new(data.global_config.base_url, data.global_config.token, subscribed_entity_ids), ); for (path, config) in data.key_configs { diff --git a/handlers/home_assistant/src/main.rs b/handlers/home_assistant/src/main.rs index 6ef67d1..7c86e09 100644 --- a/handlers/home_assistant/src/main.rs +++ b/handlers/home_assistant/src/main.rs @@ -6,7 +6,6 @@ use crate::handler::Handler; mod config; mod ha_client; mod handler; -mod tls; mod util; #[derive(Debug, Parser)] diff --git a/handlers/home_assistant/src/tls.rs b/handlers/home_assistant/src/tls.rs deleted file mode 100644 index 6902ca1..0000000 --- a/handlers/home_assistant/src/tls.rs +++ /dev/null @@ -1,44 +0,0 @@ -use rustls::client::danger::ServerCertVerifier; -use rustls::{ClientConfig, RootCertStore, SignatureScheme}; -use std::sync::Arc; - -// tokio-tungstenite does not provide a way to allow invalid certs. -// Because of that, we need to build our own rustls config. -pub fn get_rustls_client_config() -> Arc { - let mut root_store = RootCertStore::empty(); - let native_certs = rustls_native_certs::load_native_certs().unwrap(); - _ = root_store.add_parsable_certificates(native_certs); - - let mut config = ClientConfig::builder().with_root_certificates(root_store).with_no_client_auth(); - - config.dangerous().set_certificate_verifier(Arc::new(NoVerifier)); - - Arc::new(config) -} - -pub struct NoVerifier; - -impl ServerCertVerifier for NoVerifier { - fn supported_verify_schemes(&self) -> Vec { - todo!() - } - fn verify_server_cert( - &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &ServerName, - _scts: &mut dyn Iterator, - _ocsp_response: &[u8], - _now: std::time::SystemTime, - ) -> Result { - Ok(ServerCertVerified::assertion()) - } - - fn verify_tls12_signature(&self, _message: &[u8], _cert: &rustls::Certificate, _dss: &DigitallySignedStruct) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } - - fn verify_tls13_signature(&self, _message: &[u8], _cert: &rustls::Certificate, _dss: &DigitallySignedStruct) -> Result { - Ok(HandshakeSignatureValid::assertion()) - } -} diff --git a/handlers/pa_volume/src/handler.rs b/handlers/pa_volume/src/handler.rs index 9c628c5..0c7143a 100644 --- a/handlers/pa_volume/src/handler.rs +++ b/handlers/pa_volume/src/handler.rs @@ -145,10 +145,14 @@ fn state_matches(target: &Target, state: &PaEntityState) -> bool { } async fn manage_knob(path: KnobPath, config: KnobConfig, mut events: broadcast::Receiver<(KnobPath, KnobEvent)>, pa_volume_interface: Arc) { - let mut entity_state: Option> = None; - let (initial_state, mut volume_states) = pa_volume_interface.subscribe_to_state(); + let mut entity_state: Option> = initial_state + .entities_by_id() + .values() + .find(|entity| state_matches(&config.target, entity)) + .map(Arc::clone); + let update_knob_value = { let config = &config; let path = path.clone(); @@ -198,14 +202,6 @@ async fn manage_knob(path: KnobPath, config: KnobConfig, mut events: broadcast:: } }; - if let Some(state) = initial_state { - entity_state = state - .entities_by_id() - .values() - .find(|entity| state_matches(&config.target, entity)) - .map(Arc::clone); - } - loop { tokio::select! { Ok(volume_state) = volume_states.recv() => {