diff --git a/Cargo.lock b/Cargo.lock index daa11e8..9e54e1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,18 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "async-broadcast" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -188,7 +200,7 @@ checksum = "1ee891b04274a59bd38b412188e24b849617b2e45a0fd8d057deb63e7403761b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -259,7 +271,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -307,6 +319,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -364,6 +385,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crypto-common" version = "0.1.6" @@ -395,7 +422,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -406,7 +433,7 @@ checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ "darling_core", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -443,7 +470,7 @@ dependencies = [ "loupedeck_serial", "nanoid", "once_cell", - "parse-display", + "parse-display 0.9.1", "regex", "resvg", "rgb", @@ -477,7 +504,7 @@ dependencies = [ "derive_more", "enum-map", "enum-ordinalize", - "parse-display", + "parse-display 0.9.1", "rgb", "serde", "serde_with", @@ -511,7 +538,7 @@ checksum = "2bba3e9872d7c58ce7ef0fcf1844fcc3e23ef2a58377b50df35dd98e42a5726e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -532,7 +559,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -564,7 +591,7 @@ checksum = "44600091ce205df4f8b661e98617d49c37b2dd609e449ec82b0fb5d7b33e2eeb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -584,7 +611,7 @@ checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -605,7 +632,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -637,6 +664,27 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "execute" version = "0.1.0" @@ -781,7 +829,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -890,7 +938,7 @@ dependencies = [ "env_logger", "futures-util", "log", - "parse-display", + "parse-display 0.9.1", "reqwest", "serde", "serde_json", @@ -1151,7 +1199,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -1522,6 +1570,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oneshot" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" + [[package]] name = "openssl-probe" version = "0.1.5" @@ -1543,8 +1597,8 @@ dependencies = [ "deckster_mode", "env_logger", "log", - "pa_volume_interface", - "parse-display", + "parse-display 0.10.0", + "pulseaudio-volume-interface", "regex", "serde", "serde_regex", @@ -1552,16 +1606,10 @@ dependencies = [ ] [[package]] -name = "pa_volume_interface" -version = "0.1.0" -dependencies = [ - "arc-swap", - "flume", - "im", - "libpulse-binding", - "log", - "tokio", -] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" [[package]] name = "parking_lot" @@ -1592,7 +1640,18 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" dependencies = [ - "parse-display-derive", + "parse-display-derive 0.9.1", + "regex", + "regex-syntax", +] + +[[package]] +name = "parse-display" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287d8d3ebdce117b8539f59411e4ed9ec226e0a4153c7f55495c6070d68e6f72" +dependencies = [ + "parse-display-derive 0.10.0", "regex", "regex-syntax", ] @@ -1608,7 +1667,21 @@ dependencies = [ "regex", "regex-syntax", "structmeta", - "syn 2.0.66", + "syn 2.0.77", +] + +[[package]] +name = "parse-display-derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fc048687be30d79502dea2f623d052f3a074012c6eac41726b7ab17213616b1" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax", + "structmeta", + "syn 2.0.77", ] [[package]] @@ -1640,7 +1713,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -1702,13 +1775,27 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" dependencies = [ "unicode-ident", ] +[[package]] +name = "pulseaudio-volume-interface" +version = "1.0.0" +source = "git+https://git.moritzruth.de/moritzruth/pulseaudio-volume-interface.git#9cb683ddb3c98c5ee0a2af32176f8ef2f5ce25d5" +dependencies = [ + "arc-swap", + "async-broadcast", + "flume", + "im", + "libpulse-binding", + "log", + "oneshot", +] + [[package]] name = "quote" version = "1.0.36" @@ -1784,9 +1871,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -2110,7 +2197,7 @@ checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2182,7 +2269,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2347,7 +2434,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2358,7 +2445,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2401,9 +2488,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.66" +version = "2.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42f3f41a2de00b01c0aaad383c5a45241efc8b2d1eda5661812fda5f3cdcff5" +checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" dependencies = [ "proc-macro2", "quote", @@ -2424,7 +2511,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2453,7 +2540,7 @@ checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2589,7 +2676,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] [[package]] @@ -2989,7 +3076,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", "wasm-bindgen-shared", ] @@ -3023,7 +3110,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3292,7 +3379,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", "synstructure", ] @@ -3319,7 +3406,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", "synstructure", ] @@ -3348,5 +3435,5 @@ checksum = "97cf56601ee5052b4417d90c8755c6683473c926039908196cf35d99f893ebe7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.66", + "syn 2.0.77", ] diff --git a/crates/pa_volume_interface/Cargo.toml b/crates/pa_volume_interface/Cargo.toml deleted file mode 100644 index 0bff545..0000000 --- a/crates/pa_volume_interface/Cargo.toml +++ /dev/null @@ -1,12 +0,0 @@ -[package] -name = "pa_volume_interface" -version = "0.1.0" -edition = "2021" - -[dependencies] -flume = "0.11.0" -im = "15.1.0" -tokio = { version = "1.38.0", default-features = false, features = ["sync"] } -libpulse-binding = "2.28.1" -log = "0.4.21" -arc-swap = "1.7.1" \ No newline at end of file diff --git a/crates/pa_volume_interface/src/lib.rs b/crates/pa_volume_interface/src/lib.rs deleted file mode 100644 index 2a83908..0000000 --- a/crates/pa_volume_interface/src/lib.rs +++ /dev/null @@ -1,469 +0,0 @@ -use arc_swap::ArcSwap; -use std::sync::Arc; -use std::thread; -use std::time::Instant; - -use im::HashMap; -use libpulse_binding::callbacks::ListResult; -use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo}; -use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; -use libpulse_binding::context::{subscribe, Context, FlagSet, State}; -use libpulse_binding::def::Retval; -use libpulse_binding::mainloop::api::Mainloop as _; -use libpulse_binding::mainloop::threaded::Mainloop; -use libpulse_binding::volume::{ChannelVolumes, Volume}; -use log::debug; -use tokio::sync::broadcast; -use tokio::sync::broadcast::Receiver; - -pub type PaEntityId = u32; - -#[derive(Debug, Eq, PartialEq, Copy, Clone)] -pub enum PaEntityKind { - Source, - Sink, - SinkInput, -} - -#[derive(Debug, Eq, PartialEq, Clone)] -pub enum PaEntityMetadata { - Source { - name: String, - description: String, - }, - Sink { - name: String, - description: String, - }, - SinkInput { - description: String, - binary_name: Option, - application_name: Option, - }, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct PaEntityState { - id: PaEntityId, - channel_volumes: ChannelVolumes, - is_muted: bool, - metadata: PaEntityMetadata, -} - -impl PaEntityState { - pub fn id(&self) -> &PaEntityId { - &self.id - } - - pub fn kind(&self) -> PaEntityKind { - match &self.metadata { - PaEntityMetadata::Source { .. } => PaEntityKind::Source, - PaEntityMetadata::Sink { .. } => PaEntityKind::Sink, - PaEntityMetadata::SinkInput { .. } => PaEntityKind::SinkInput, - } - } - - pub fn metadata(&self) -> &PaEntityMetadata { - &self.metadata - } - - pub fn channel_volumes(&self) -> Vec { - self.channel_volumes.get().iter().map(|v| v.0 as f32 / Volume::NORMAL.0 as f32).collect() - } - - pub fn is_muted(&self) -> bool { - self.is_muted - } -} - -impl From<&SourceInfo<'_>> for PaEntityState { - fn from(value: &SourceInfo) -> Self { - PaEntityState { - id: value.index as PaEntityId, - is_muted: value.mute, - channel_volumes: value.volume, - metadata: PaEntityMetadata::Source { - name: value.name.clone().unwrap_or_default().into_owned(), - description: value.description.clone().unwrap_or_default().into_owned(), - }, - } - } -} - -impl From<&SinkInfo<'_>> for PaEntityState { - fn from(value: &SinkInfo) -> Self { - PaEntityState { - id: value.index as PaEntityId, - is_muted: value.mute, - channel_volumes: value.volume, - metadata: PaEntityMetadata::Sink { - name: value.name.clone().unwrap_or_default().into_owned(), - description: value.description.clone().unwrap_or_default().into_owned(), - }, - } - } -} - -impl From<&SinkInputInfo<'_>> for PaEntityState { - fn from(value: &SinkInputInfo) -> Self { - PaEntityState { - id: value.index as PaEntityId, - is_muted: value.mute, - channel_volumes: value.volume, - metadata: PaEntityMetadata::SinkInput { - description: value.name.clone().unwrap_or_default().into_owned(), - application_name: value - .proplist - .get("application.name") - .map(|v| String::from_utf8_lossy(v).trim_end_matches(char::from(0)).to_owned()), - binary_name: value - .proplist - .get("application.process.binary") - .map(|v| String::from_utf8_lossy(v).trim_end_matches(char::from(0)).to_owned()), - }, - } - } -} - -#[derive(Debug, Clone)] -pub struct PaVolumeState { - timestamp: Instant, - entities_by_id: HashMap>, -} - -impl PaVolumeState { - pub fn timestamp(&self) -> &Instant { - &self.timestamp - } - - pub fn entities_by_id(&self) -> &HashMap> { - &self.entities_by_id - } -} - -impl Default for PaVolumeState { - fn default() -> Self { - PaVolumeState { - timestamp: Instant::now(), - entities_by_id: HashMap::new(), - } - } -} - -#[derive(Debug)] -enum PaThreadMessage { - // by the user - SetIsMuted { id: PaEntityId, value: bool }, - SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> }, - Terminate, - - // internal - LoadSinkInput { entity_id: PaEntityId }, - LoadSink { entity_id: PaEntityId }, - LoadSource { entity_id: PaEntityId }, - UpsertEntity { entity_state: Arc }, - RemoveEntity { entity_id: PaEntityId }, -} - -struct PaThread { - mainloop: Mainloop, - context: Context, - introspector: Introspector, - commands_tx: flume::Sender, - commands_rx: flume::Receiver, - state_tx: broadcast::Sender>, - current_state: Arc>, -} - -impl PaThread { - fn spawn( - client_name: String, - commands_tx: flume::Sender, - commands_rx: flume::Receiver, - state_tx: broadcast::Sender>, - current_state: Arc>, - ) { - thread::spawn(move || { - let mut mainloop = Mainloop::new().unwrap(); - let context = Context::new(&mainloop, &client_name).unwrap(); - let introspector = context.introspect(); - - debug!("Starting the mainloop thread…"); - mainloop.start().expect("starting the mainloop never fails"); - - let mut t = PaThread { - mainloop, - context, - introspector, - commands_tx, - commands_rx, - state_tx, - current_state, - }; - - t.init(); - t.run(); - }); - } - - fn init(&mut self) { - debug!("Initializing…"); - self.mainloop.lock(); - self.context.connect(None, FlagSet::NOFLAGS, None).unwrap(); - - { - let (context_state_change_tx, context_state_change_rx) = flume::bounded(1); - self.context.set_state_callback(Some(Box::new(move || { - context_state_change_tx.send(()).unwrap(); - }))); - - self.mainloop.unlock(); - loop { - context_state_change_rx.recv().unwrap(); - - self.mainloop.lock(); - if self.context.get_state() == State::Ready { - break; - } - self.mainloop.unlock(); - } - } - - // Mainloop is still locked - - { - let commands_tx = self.commands_tx.clone(); - self.introspector.get_sink_input_info_list(move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"), - ListResult::End => {} - ListResult::Item(sink_input) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(sink_input.into()), - }) - .unwrap(), - }); - } - - { - let commands_tx = self.commands_tx.clone(); - self.introspector.get_sink_info_list(move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_info_list failed"), - ListResult::End => {} - ListResult::Item(sink) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(sink.into()), - }) - .unwrap(), - }); - } - - { - let commands_tx = self.commands_tx.clone(); - self.introspector.get_source_info_list(move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_source_info_list failed"), - ListResult::End => {} - ListResult::Item(source) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(source.into()), - }) - .unwrap(), - }); - } - - { - let commands_tx = self.commands_tx.clone(); - self.context.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| { - let entity_id = entity_id as PaEntityId; - let facility = facility.unwrap(); - - match operation.unwrap() { - subscribe::Operation::Removed => { - commands_tx.send(PaThreadMessage::RemoveEntity { entity_id }).unwrap(); - } - subscribe::Operation::New | subscribe::Operation::Changed => { - match facility { - Facility::SinkInput => commands_tx.send(PaThreadMessage::LoadSinkInput { entity_id }).unwrap(), - Facility::Sink => commands_tx.send(PaThreadMessage::LoadSink { entity_id }).unwrap(), - Facility::Source => commands_tx.send(PaThreadMessage::LoadSource { entity_id }).unwrap(), - _ => {} - }; - } - }; - }))); - } - - self.context - .subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| { - if !success { - panic!("Context.subscribe failed") - } - }); - - self.mainloop.unlock(); - } - - fn run(mut self) { - debug!("Waiting for commands…"); - - 'outer: loop { - while let Ok(command) = self.commands_rx.recv() { - self.mainloop.lock(); - let commands_tx = self.commands_tx.clone(); - - match command { - PaThreadMessage::Terminate => { - break 'outer; - } - PaThreadMessage::SetIsMuted { id, value } => { - if let Some(state) = self.current_state.load().entities_by_id.get(&id) { - match state.kind() { - PaEntityKind::Sink => self.introspector.set_sink_mute_by_index(id, value, None), - PaEntityKind::Source => self.introspector.set_source_mute_by_index(id, value, None), - PaEntityKind::SinkInput => self.introspector.set_sink_input_mute(id, value, None), - }; - } - } - PaThreadMessage::SetChannelVolumes { id, channel_volumes } => { - if let Some(state) = self.current_state.load().entities_by_id.get(&id) { - let mut value = state.channel_volumes; - for (i, v) in channel_volumes.iter().enumerate() { - value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32)); - } - - match state.kind() { - PaEntityKind::Sink => self.introspector.set_sink_volume_by_index(id, &value, None), - PaEntityKind::Source => self.introspector.set_source_volume_by_index(id, &value, None), - PaEntityKind::SinkInput => self.introspector.set_sink_input_volume(id, &value, None), - }; - } - } - - PaThreadMessage::LoadSinkInput { entity_id } => { - self.introspector.get_sink_input_info(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_input_info failed"), - ListResult::End => {} - ListResult::Item(sink_input) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(sink_input.into()), - }) - .unwrap(), - }); - } - PaThreadMessage::LoadSink { entity_id } => { - self.introspector.get_sink_info_by_index(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"), - ListResult::End => {} - ListResult::Item(sink) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(sink.into()), - }) - .unwrap(), - }); - } - PaThreadMessage::LoadSource { entity_id } => { - self.introspector.get_source_info_by_index(entity_id, move |list_result| match list_result { - ListResult::Error => panic!("Introspector.get_source_info_by_index failed"), - ListResult::End => {} - ListResult::Item(source) => commands_tx - .send(PaThreadMessage::UpsertEntity { - entity_state: Arc::new(source.into()), - }) - .unwrap(), - }); - } - PaThreadMessage::UpsertEntity { entity_state } => { - self.set_state(Arc::new(PaVolumeState { - timestamp: Instant::now(), - entities_by_id: self.current_state.load().entities_by_id.update(entity_state.id, entity_state), - })); - } - PaThreadMessage::RemoveEntity { entity_id } => { - self.set_state(Arc::new(PaVolumeState { - timestamp: Instant::now(), - entities_by_id: self.current_state.load().entities_by_id.without(&entity_id), - })); - } - } - - self.mainloop.unlock(); - } - } - - self.mainloop.quit(Retval(0)); - self.mainloop.unlock(); - } - - fn set_state(&self, value: Arc) { - self.current_state.store(Arc::clone(&value)); - - // If there are no subscribers, that’s ok. - _ = self.state_tx.send(value); - } -} - -#[derive(Debug)] -struct PaWorker { - commands_tx: flume::Sender, -} - -impl Drop for PaWorker { - fn drop(&mut self) { - self.commands_tx.send(PaThreadMessage::Terminate).ok(); - } -} - -#[derive(Debug, Clone)] -pub struct PaVolumeInterface { - #[allow(unused)] - worker: Arc, - current_state: Arc>, - state_tx: broadcast::Sender>, - commands_tx: flume::Sender, -} - -impl PaVolumeInterface { - pub fn spawn_thread(client_name: String) -> PaVolumeInterface { - let (commands_tx, commands_rx) = flume::unbounded(); - let state_tx = broadcast::Sender::new(5); - let current_state = Arc::new(ArcSwap::new(Arc::new(PaVolumeState { - timestamp: Instant::now(), - entities_by_id: HashMap::new(), - }))); - - PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(¤t_state)); - - let worker = PaWorker { - commands_tx: commands_tx.clone(), - }; - - PaVolumeInterface { - worker: Arc::new(worker), - current_state, - commands_tx, - state_tx, - } - } - - pub fn subscribe_to_state(&self) -> (Arc, Receiver>) { - let rx = self.state_tx.subscribe(); - let state = self.current_state(); - (state, rx) - } - - pub fn current_state(&self) -> Arc { - Arc::clone(&self.current_state.load()) - } - - pub fn set_is_muted(&self, id: PaEntityId, value: bool) { - self.commands_tx.send(PaThreadMessage::SetIsMuted { id, value }).unwrap() - } - - pub fn set_channel_volumes(&self, id: PaEntityId, channel_volumes: impl Into>) { - self.commands_tx - .send(PaThreadMessage::SetChannelVolumes { - id, - channel_volumes: channel_volumes.into(), - }) - .unwrap() - } -} diff --git a/handlers/home_assistant/src/main.rs b/handlers/home_assistant/src/main.rs index 7c86e09..6476b0c 100644 --- a/handlers/home_assistant/src/main.rs +++ b/handlers/home_assistant/src/main.rs @@ -6,7 +6,6 @@ use crate::handler::Handler; mod config; mod ha_client; mod handler; -mod util; #[derive(Debug, Parser)] #[command(name = "home_assistant")] diff --git a/handlers/home_assistant/src/util.rs b/handlers/home_assistant/src/util.rs deleted file mode 100644 index c99fa64..0000000 --- a/handlers/home_assistant/src/util.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::time::Duration; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::time::{timeout, MissedTickBehavior}; - -/// Sends a message into the output channel after a message in the input channel was received, with a delay of `duration`. -/// The delay is reset when a new message is reset. -pub fn spawn_debouncer(duration: Duration) -> (Sender<()>, Receiver<()>) { - let (input_sender, mut input_receiver) = mpsc::channel::<()>(1); - let (output_sender, output_receiver) = mpsc::channel::<()>(1); - - tokio::spawn(async move { - 'outer: loop { - if input_receiver.recv().await.is_none() { - break 'outer; - } - - 'inner: loop { - match timeout(duration, input_receiver.recv()).await { - Ok(None) => break 'outer, - Ok(Some(_)) => continue 'inner, - Err(_) => { - if let Err(TrySendError::Closed(_)) = output_sender.try_send(()) { - break 'outer; - } else { - break 'inner; - } - } - } - } - } - }); - - (input_sender, output_receiver) -} - -/// Sends messages from the input channel into the output channel, but only if the time since the last message is greater than duration. -/// The last message that was not sent yet will be sent after duration. -pub fn spawn_throttler(duration: Duration) -> (Sender, Receiver) { - let (input_sender, mut input_receiver) = mpsc::channel::(25); - let (output_sender, output_receiver) = mpsc::channel::(25); - - tokio::spawn(async move { - let mut pending_value: Option = None; - let mut interval = tokio::time::interval(duration); - interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - 'outer: loop { - tokio::select! { - value = input_receiver.recv() => { - match value { - None => break 'outer, - Some(value) => { - pending_value = Some(value); - } - }; - } - _ = interval.tick() => { - if let Some(value) = pending_value.take() { - output_sender.send(value).await.unwrap(); - } - } - } - } - }); - - (input_sender, output_receiver) -} diff --git a/handlers/pa_volume/Cargo.toml b/handlers/pa_volume/Cargo.toml index cffdf5b..c91ec10 100644 --- a/handlers/pa_volume/Cargo.toml +++ b/handlers/pa_volume/Cargo.toml @@ -5,13 +5,13 @@ edition = "2021" [dependencies] deckster_mode = { path = "../../crates/deckster_mode" } -pa_volume_interface = { path = "../../crates/pa_volume_interface" } +pulseaudio-volume-interface = { git = "https://git.moritzruth.de/moritzruth/pulseaudio-volume-interface.git" } clap = { version = "4.5.7", features = ["derive"] } color-eyre = "0.6.3" serde = { version = "1.0.203", features = ["derive"] } serde_regex = "1.1.0" regex = "1.10.5" -parse-display = "0.9.1" +parse-display = "0.10.0" env_logger = "0.11.3" log = "0.4.21" -tokio = { version = "1.38.0", features = ["macros", "parking_lot", "rt-multi-thread", "sync"] } \ No newline at end of file +tokio = { version = "1.38.0", features = ["macros", "parking_lot", "rt-multi-thread"] } \ No newline at end of file diff --git a/handlers/pa_volume/src/handler.rs b/handlers/pa_volume/src/handler.rs index 8af42b1..0c7773e 100644 --- a/handlers/pa_volume/src/handler.rs +++ b/handlers/pa_volume/src/handler.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use parse_display::Display; +use pulseaudio_volume_interface::PulseAudioVolumeInterface; use regex::Regex; use serde::Deserialize; use tokio::sync::broadcast; @@ -11,7 +12,7 @@ use deckster_mode::shared::handler_communication::{ use deckster_mode::shared::path::KnobPath; use deckster_mode::shared::state::KnobStyleByStateMap; use deckster_mode::{send_command, DecksterHandler}; -use pa_volume_interface::{PaEntityKind, PaEntityMetadata, PaEntityState, PaVolumeInterface}; +use pulseaudio_volume_interface::{EntityKind as PaEntityKind, EntityMetadata as PaEntityMetadata, EntityState as PaEntityState}; #[derive(Debug, Clone, Deserialize)] pub struct KnobConfig { @@ -113,7 +114,7 @@ fn state_matches(target: &Target, state: &PaEntityState) -> bool { static EMPTY_STRING: String = String::new(); - return target.predicates.iter().all(|p| { + target.predicates.iter().all(|p| { let v = match (&p.property, state.metadata()) { (TargetPredicateProperty::InternalName, PaEntityMetadata::Sink { name, .. }) => Some(name), (TargetPredicateProperty::InternalName, PaEntityMetadata::Source { name, .. }) => Some(name), @@ -141,10 +142,10 @@ fn state_matches(target: &Target, state: &PaEntityState) -> bool { false } - }); + }) } -async fn manage_knob(path: KnobPath, config: KnobConfig, mut events: broadcast::Receiver<(KnobPath, KnobEvent)>, pa_volume_interface: Arc) { +async fn manage_knob(path: KnobPath, config: KnobConfig, mut events: broadcast::Receiver<(KnobPath, KnobEvent)>, pa_volume_interface: PulseAudioVolumeInterface) { let (initial_state, mut volume_states) = pa_volume_interface.subscribe_to_state(); let mut entity_state: Option> = initial_state @@ -273,13 +274,13 @@ pub struct Handler { impl Handler { pub fn new(data: InitialHandlerMessage<(), (), KnobConfig>) -> Result { let (events_sender, _) = broadcast::channel::<(KnobPath, KnobEvent)>(5); - let pa_volume_interface = Arc::new(PaVolumeInterface::spawn_thread("deckster handler".to_owned())); + let pa_volume_interface = PulseAudioVolumeInterface::new("deckster handler".to_owned()); let runtime = tokio::runtime::Builder::new_multi_thread().worker_threads(1).build().unwrap(); for (path, config) in data.knob_configs { let events_receiver = events_sender.subscribe(); - runtime.spawn(manage_knob(path, config, events_receiver, Arc::clone(&pa_volume_interface))); + runtime.spawn(manage_knob(path, config, events_receiver, pa_volume_interface.clone())); } Ok(Handler { events_sender, runtime }) diff --git a/handlers/pa_volume/src/main.rs b/handlers/pa_volume/src/main.rs index bee9a25..6f3f923 100644 --- a/handlers/pa_volume/src/main.rs +++ b/handlers/pa_volume/src/main.rs @@ -1,10 +1,9 @@ use clap::Parser; use color_eyre::Result; -use pa_volume_interface::PaVolumeInterface; use std::sync::Arc; use std::thread::sleep; use std::time::Duration; - +use pulseaudio_volume_interface::PulseAudioVolumeInterface; use crate::handler::Handler; mod handler; @@ -33,8 +32,10 @@ fn main() -> Result<()> { } fn print_entities() { - let pa_volume_interface = Arc::new(PaVolumeInterface::spawn_thread("deckster handler".to_owned())); - sleep(Duration::from_secs(1)); // wait for the data to load + let pa_volume_interface = Arc::new(PulseAudioVolumeInterface::new("deckster handler".to_owned())); + // Wait for the data to load. + // Because of PulseAudio, there is no way to know when everything is ready, so we have to just hope a second is enough. + sleep(Duration::from_secs(1)); let full_state = pa_volume_interface.current_state(); let entities_by_id = full_state.entities_by_id();