From aee74cb528c4d0937c4269378b84bf7c2e99a067 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Mon, 15 Jan 2024 02:21:10 +0100 Subject: [PATCH] commit --- Cargo.lock | 39 ++ .../examples/full/knob-pages/default.toml | 40 +- deckster/src/modes/knob/audio_volume.rs | 80 ++- pa-volume-interface/Cargo.toml | 3 +- pa-volume-interface/src/lib.rs | 521 +++++++++++++----- 5 files changed, 498 insertions(+), 185 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8eb1ee..96238c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -831,6 +831,33 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +[[package]] +name = "libpulse-binding" +version = "2.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3557a2dfc380c8f061189a01c6ae7348354e0c9886038dc6c171219c08eaff" +dependencies = [ + "bitflags 1.3.2", + "libc", + "libpulse-sys", + "num-derive", + "num-traits", + "winapi", +] + +[[package]] +name = "libpulse-sys" +version = "1.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc19e110fbf42c17260d30f6d3dc545f58491c7830d38ecb9aaca96e26067a9b" +dependencies = [ + "libc", + "num-derive", + "num-traits", + "pkg-config", + "winapi", +] + [[package]] name = "libudev" version = "0.3.0" @@ -950,6 +977,17 @@ dependencies = [ "libc", ] +[[package]] +name = "num-derive" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -996,6 +1034,7 @@ version = "0.1.0" dependencies = [ "flume", "im", + "libpulse-binding", "tokio", ] diff --git a/deckster/examples/full/knob-pages/default.toml b/deckster/examples/full/knob-pages/default.toml index 5f37174..f6e21ca 100644 --- a/deckster/examples/full/knob-pages/default.toml +++ b/deckster/examples/full/knob-pages/default.toml @@ -1,23 +1,23 @@ -[knobs.right-top] -icon = "@ph/microphone-light[scale=0.9]" -indicators.bar.color = "#ffffff50" - -mode.audio_volume.direction = "input" -mode.audio_volume.regex = "^(SC425 USB Microphone Analog Stereo)$" -mode.audio_volume.disable_press_to_unmute = true -mode.audio_volume.muted_turn_action = "unmute-at-zero" -mode.audio_volume.style.muted.label = "Muted" -mode.audio_volume.style.inactive.icon = "@ph/microphone-slash-light[alpha=0.9|color=#fc4646]" - -[knobs.right-middle] -icon = "@apps/discord[scale=0.25]" -indicators.bar.color = "#ffffff50" - -mode.audio_volume.regex = "Discord" -mode.audio_volume.style.active.label = "{percentage}%" -mode.audio_volume.style.muted.label = "Muted" -mode.audio_volume.style.inactive.label = "" -mode.audio_volume.style.inactive.icon = "@apps/discord[grayscale|alpha=0.9]" +#[knobs.right-top] +#icon = "@ph/microphone-light[scale=0.9]" +#indicators.bar.color = "#ffffff50" +# +#mode.audio_volume.direction = "input" +#mode.audio_volume.regex = "^(SC425 USB Microphone Analog Stereo)$" +#mode.audio_volume.disable_press_to_unmute = true +#mode.audio_volume.muted_turn_action = "unmute-at-zero" +#mode.audio_volume.style.muted.label = "Muted" +#mode.audio_volume.style.inactive.icon = "@ph/microphone-slash-light[alpha=0.9|color=#fc4646]" +# +#[knobs.right-middle] +#icon = "@apps/discord[scale=0.25]" +#indicators.bar.color = "#ffffff50" +# +#mode.audio_volume.regex = "Discord" +#mode.audio_volume.style.active.label = "{percentage}%" +#mode.audio_volume.style.muted.label = "Muted" +#mode.audio_volume.style.inactive.label = "" +#mode.audio_volume.style.inactive.icon = "@apps/discord[grayscale|alpha=0.9]" [knobs.right-bottom] icon = "@apps/spotify[scale=1.1]" diff --git a/deckster/src/modes/knob/audio_volume.rs b/deckster/src/modes/knob/audio_volume.rs index b5ed2b8..069b024 100644 --- a/deckster/src/modes/knob/audio_volume.rs +++ b/deckster/src/modes/knob/audio_volume.rs @@ -1,6 +1,6 @@ +use std::borrow::ToOwned; use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; use once_cell::sync::Lazy; use regex::Regex; @@ -64,29 +64,73 @@ pub enum State { Muted, } -static PA_VOLUME_INTERFACE: Lazy = Lazy::new(|| { - let interface = PaVolumeInterface::spawn_thread(Duration::from_millis(500)); - interface.query_state(); - interface -}); +static PA_VOLUME_INTERFACE: Lazy = Lazy::new(|| PaVolumeInterface::spawn_thread("deckster".to_owned())); + +fn get_volume_cv(channel_volumes: &Vec) -> f32 { + *channel_volumes.first().unwrap() +} + +fn get_volume_es(entity_state: &Option>) -> f32 { + entity_state.as_ref().map(|s| *s.channel_volumes().first().unwrap()).unwrap_or(0.0) +} pub async fn handle(path: KnobPath, config: Arc, mut events: broadcast::Receiver<(KnobPath, KnobEvent)>, commands: flume::Sender) { - let mut volume_update_timestamp = Instant::now(); - let mut value: f32 = 0.0; let mut entity_state: Option> = None; let pa_volume_interface = &PA_VOLUME_INTERFACE; - let mut volume_states = pa_volume_interface.subscribe_to_state(); + let (initial_state, mut volume_states) = pa_volume_interface.subscribe_to_state(); + + if let Some(state) = initial_state { + entity_state = state + .entities_by_id() + .values() + .find(|entity| config.regex.is_match(entity.display_name())) + .map(Arc::clone); + commands + .send(IoWorkerCommand::SetKnobValue { + path: path.clone(), + value: get_volume_es(&entity_state), + }) + .unwrap(); + } + + let update_style = { + let commands = commands.clone(); + let config = Arc::clone(&config); + let path = path.clone(); + + move |entity_state: &Option>| { + let state = match entity_state { + None => State::Inactive, + Some(s) if s.is_muted() => State::Muted, + Some(_) => State::Active, + }; + + let mut style = config.style.get(&state).cloned(); + + if let Some(ref mut s) = &mut style { + let v = get_volume_es(entity_state); + if let Some(ref mut label) = &mut s.label { + *label = label.replace("{percentage}", &((v * 100.0).round() as u32).to_string()); + } + } + + commands + .send(IoWorkerCommand::SetKnobStyle { + path: path.clone(), + value: style, + }) + .unwrap(); + } + }; loop { select! { Ok(volume_state) = volume_states.recv() => { - entity_state = volume_state.entities_by_id().values().find(|entity| config.regex.is_match(entity.name())).map(Arc::clone); + entity_state = volume_state.entities_by_id().values().find(|entity| config.regex.is_match(entity.display_name())).map(Arc::clone); + update_style(&entity_state); - if volume_state.timestamp() > &volume_update_timestamp { - value = entity_state.as_ref().map(|s| *s.channel_volumes().first().unwrap()).unwrap_or(0.0); - commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value }).unwrap(); - } + commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value: get_volume_es(&entity_state) }).unwrap() } Ok((event_path, event)) = events.recv() => { @@ -102,11 +146,9 @@ pub async fn handle(path: KnobPath, config: Arc, mut events: broadcast:: RotationDirection::Counterclockwise => -1.0, }; - volume_update_timestamp = Instant::now(); - value = (value + (factor * config.delta)).clamp(0.0, 1.0); - pa_volume_interface.set_channel_volumes(*entity_state.id(), vec![value; entity_state.channel_volumes().len()]); - - commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value }).unwrap(); + let current_v = get_volume_cv(&entity_state.channel_volumes()); + let v = (current_v + (factor * config.delta)).clamp(0.0, 1.0); + pa_volume_interface.set_channel_volumes(*entity_state.id(), vec![v; entity_state.channel_volumes().len()]); } KnobEvent::Press => { pa_volume_interface.set_is_muted(*entity_state.id(), !entity_state.is_muted()) diff --git a/pa-volume-interface/Cargo.toml b/pa-volume-interface/Cargo.toml index d98cf47..10b2a0b 100644 --- a/pa-volume-interface/Cargo.toml +++ b/pa-volume-interface/Cargo.toml @@ -6,4 +6,5 @@ edition = "2021" [dependencies] flume = "0.11.0" im = "15.1.0" -tokio = { version = "1.35.1", default-features = false, features = ["sync"] } \ No newline at end of file +tokio = { version = "1.35.1", default-features = false, features = ["sync"] } +libpulse-binding = "2.28.1" \ No newline at end of file diff --git a/pa-volume-interface/src/lib.rs b/pa-volume-interface/src/lib.rs index dac788a..d1688d9 100644 --- a/pa-volume-interface/src/lib.rs +++ b/pa-volume-interface/src/lib.rs @@ -1,12 +1,23 @@ -use std::process::Command; -use std::sync::Arc; +use std::cell::RefCell; +use std::rc::Rc; +use std::sync::{Arc, RwLock}; use std::thread; -use std::time::{Duration, Instant}; +use std::time::Instant; use im::HashMap; +use libpulse_binding::callbacks::ListResult; +use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo}; +use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; +use libpulse_binding::context::{subscribe, Context, FlagSet}; +use libpulse_binding::def::Retval; +use libpulse_binding::mainloop::api::Mainloop as MainloopTrait; +use libpulse_binding::mainloop::standard::{IterateResult, Mainloop}; +use libpulse_binding::operation::Operation; +use libpulse_binding::volume::{ChannelVolumes, Volume}; use tokio::sync::broadcast; +use tokio::sync::broadcast::Receiver; -pub type PaEntityId = usize; +pub type PaEntityId = u32; #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum PaEntityKind { @@ -20,7 +31,8 @@ pub struct PaEntityState { id: PaEntityId, kind: PaEntityKind, name: String, - channel_volumes: Box<[f32]>, + display_name: String, + channel_volumes: ChannelVolumes, is_muted: bool, } @@ -37,8 +49,12 @@ impl PaEntityState { &self.name } - pub fn channel_volumes(&self) -> &[f32] { - &self.channel_volumes + pub fn display_name(&self) -> &String { + &self.display_name + } + + pub fn channel_volumes(&self) -> Vec { + self.channel_volumes.get().iter().map(|v| v.0 as f32 / Volume::NORMAL.0 as f32).collect() } pub fn is_muted(&self) -> bool { @@ -46,6 +62,45 @@ impl PaEntityState { } } +impl From<&SourceInfo<'_>> for PaEntityState { + fn from(value: &SourceInfo) -> Self { + PaEntityState { + id: value.index as PaEntityId, + is_muted: value.mute, + name: value.name.clone().unwrap_or_default().into_owned(), + display_name: value.description.clone().unwrap_or_default().into_owned(), + kind: PaEntityKind::Source, + channel_volumes: value.volume, + } + } +} + +impl From<&SinkInfo<'_>> for PaEntityState { + fn from(value: &SinkInfo) -> Self { + PaEntityState { + id: value.index as PaEntityId, + is_muted: value.mute, + name: value.name.clone().unwrap_or_default().into_owned(), + display_name: value.description.clone().unwrap_or_default().into_owned(), + kind: PaEntityKind::Sink, + channel_volumes: value.volume, + } + } +} + +impl From<&SinkInputInfo<'_>> for PaEntityState { + fn from(value: &SinkInputInfo) -> Self { + PaEntityState { + id: value.index as PaEntityId, + is_muted: value.mute, + name: value.name.clone().unwrap_or_default().into_owned(), + display_name: value.name.clone().unwrap_or_default().into_owned(), + kind: PaEntityKind::Application, + channel_volumes: value.volume, + } + } +} + #[derive(Debug, Clone)] pub struct PaVolumeState { timestamp: Instant, @@ -72,154 +127,321 @@ impl PaVolumeState { } } +impl Default for PaVolumeState { + fn default() -> Self { + PaVolumeState { + timestamp: Instant::now(), + entities_by_id: HashMap::new(), + default_sink_id: 0, + default_source_id: 0, + } + } +} + +#[derive(Debug)] enum PaCommand { - QueryState, SetIsMuted { id: PaEntityId, value: bool }, SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> }, Terminate, } -#[derive(Debug)] struct PaThread { - commands_tx: flume::Sender, -} - -fn query_and_publish_state(state_tx: &broadcast::Sender) { - let output = Command::new("pulsemixer").arg("-l").output().unwrap(); - - if !output.status.success() { - panic!("`pulsemixer -l` failed"); - } - - let raw = String::from_utf8_lossy(&output.stdout); - - let mut default_sink_id = 0; - let mut default_source_id = 0; - - let entities_by_id: HashMap<_, _> = raw - .lines() - .map(|line| { - let (prefix, rest) = line.split_once(':').unwrap(); - let kind = match prefix { - "Sink" => PaEntityKind::Sink, - "Source" => PaEntityKind::Source, - "Sink input" => PaEntityKind::Application, - x => panic!("Unknown entity kind: {}", x), - }; - - let segments: Vec<&str> = rest.split(", ").collect(); - - let id = segments[0] - .trim_start() - .strip_prefix("ID: ") - .unwrap() - .split('-') - .last() - .unwrap() - .parse::() - .unwrap(); - - let name = segments[1].strip_prefix("Name: ").unwrap(); - let is_muted = segments[2].strip_prefix("Mute: ").unwrap().parse::().unwrap() == 1; - - let channel_count = segments[3].strip_prefix("Channels: ").unwrap().parse::().unwrap(); - - let channel_volumes: Box<[f32]> = segments[4..(4 + channel_count)] - .iter() - .map(|s| { - let s = s.strip_prefix("Volumes: [").unwrap_or(s); - s.strip_suffix(']').unwrap_or(s) - }) - .map(|s| &s[1..(s.len() - 2)]) - .map(|s| s.parse::().unwrap() / 100.0) - .collect(); - - if segments.last().unwrap() == &"Default" { - match kind { - PaEntityKind::Source => { - default_source_id = id; - } - PaEntityKind::Sink => { - default_sink_id = id; - } - PaEntityKind::Application => { - panic!("Sink sources cannot be a default"); - } - } - } - - ( - id, - Arc::new(PaEntityState { - id, - kind, - name: name.to_owned(), - is_muted, - channel_volumes, - }), - ) - }) - .collect(); - - state_tx - .send(PaVolumeState { - timestamp: Instant::now(), - entities_by_id, - default_sink_id, - default_source_id, - }) - .unwrap(); + mainloop: Rc>, + context: Rc>, + introspector: Rc>, + commands_rx: flume::Receiver, + state_tx: broadcast::Sender>, + current_state: Arc>>>, } impl PaThread { fn spawn( - max_time_between_queries: Duration, - commands_tx: flume::Sender, + client_name: String, commands_rx: flume::Receiver, - state_tx: broadcast::Sender, - ) -> PaThread { - thread::spawn(move || loop { - let command = commands_rx.recv_timeout(max_time_between_queries).unwrap_or(PaCommand::QueryState); + state_tx: broadcast::Sender>, + current_state: Arc>>>, + ) { + thread::spawn(move || { + let mainloop = Mainloop::new().unwrap(); + let context = Context::new(&mainloop, &client_name).unwrap(); + let introspector = context.introspect(); - match command { - PaCommand::SetIsMuted { id, value } => { - let action = if value { "--mute" } else { "--unmute" }; + let mut t = PaThread { + mainloop: Rc::new(RefCell::new(mainloop)), + context: Rc::new(RefCell::new(context)), + introspector: Rc::new(RefCell::new(introspector)), + commands_rx, + state_tx, + current_state, + }; - let status = Command::new("pulsemixer").args(["--id", &id.to_string(), action]).status().unwrap(); - - if !status.success() { - panic!("(Un-)muting failed with status code {:?}", status.code()); - } - } - PaCommand::SetChannelVolumes { id, channel_volumes } => { - let volumes = channel_volumes - .iter() - .map(|v| (v * 100.0).round() as u32) - .map(|v| v.to_string()) - .collect::>() - .join(":"); - - let status = Command::new("pulsemixer") - .args(["--id", &id.to_string(), "--set-volume-all", &volumes]) - .status() - .unwrap(); - - if !status.success() { - panic!("Setting the channel values failed with status code {:?}", status.code()); - } - } - PaCommand::Terminate => break, - PaCommand::QueryState => {} - } - - query_and_publish_state(&state_tx) + t.init(); + t.run(); }); + } - PaThread { commands_tx } + fn init(&mut self) { + self.context.borrow_mut().connect(None, FlagSet::NOFLAGS, None).unwrap(); + self.wait_for(|s| s.context.borrow().get_state() == libpulse_binding::context::State::Ready); + + let introspector = Rc::clone(&self.introspector); + + { + let entities_by_id = Rc::new(RefCell::new(HashMap::>::new())); + let entities_by_id_c = Rc::clone(&entities_by_id); + + self.wait_for_operation(introspector.borrow().get_sink_info_list(move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_info_list failed"), + ListResult::End => {} + ListResult::Item(sink) => { + entities_by_id_c.borrow_mut().insert(sink.index as PaEntityId, Arc::new(sink.into())); + } + })) + .unwrap(); + + let entities_by_id_c = Rc::clone(&entities_by_id); + self.wait_for_operation(introspector.borrow().get_source_info_list(move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_source_info_list failed"), + ListResult::End => {} + ListResult::Item(source) => { + entities_by_id_c.borrow_mut().insert(source.index as PaEntityId, Arc::new(source.into())); + } + })) + .unwrap(); + + let entities_by_id_c = Rc::clone(&entities_by_id); + self.wait_for_operation(introspector.borrow().get_sink_input_info_list(move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"), + ListResult::End => {} + ListResult::Item(sink_input) => { + let id = sink_input.index as PaEntityId; + entities_by_id_c.borrow_mut().insert(id, Arc::new(sink_input.into())); + } + })) + .unwrap(); + + PaThread::set_state( + &self.current_state, + &self.state_tx, + Arc::new(PaVolumeState { + timestamp: Instant::now(), + entities_by_id: Rc::into_inner(entities_by_id).unwrap().into_inner(), + default_sink_id: 0, + default_source_id: 0, + }), + ); + }; + + self.context + .borrow_mut() + .subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| { + if !success { + panic!("Context.subscribe failed") + } + }); + } + + fn run(mut self) { + let introspector = Rc::clone(&self.introspector); + let current_state = Arc::clone(&self.current_state); + let state_tx = self.state_tx.clone(); + + self.context + .borrow_mut() + .set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| { + let entity_id = entity_id as PaEntityId; + let facility = facility.unwrap(); + + let timestamp = Instant::now(); + let current_state = Arc::clone(¤t_state); + + match operation.unwrap() { + subscribe::Operation::Removed => { + let state = PaThread::unwrap_state(¤t_state); + + let entities_by_id = state.entities_by_id.without(&entity_id); + let default_source_id = if entity_id == state.default_source_id { 0 } else { state.default_source_id }; + let default_sink_id = if entity_id == state.default_sink_id { 0 } else { state.default_sink_id }; + + PaThread::set_state( + ¤t_state, + &state_tx, + Arc::new(PaVolumeState { + timestamp, + entities_by_id, + default_source_id, + default_sink_id, + }), + ); + } + subscribe::Operation::New | subscribe::Operation::Changed => { + let state_tx = state_tx.clone(); + + match facility { + Facility::SinkInput => { + introspector.borrow().get_sink_input_info(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_input_info failed"), + ListResult::End => {} + ListResult::Item(sink_input) => { + let state = PaThread::unwrap_state(¤t_state); + let id = sink_input.index as PaEntityId; + + let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); + PaThread::set_state( + ¤t_state, + &state_tx, + Arc::new(PaVolumeState { + timestamp, + entities_by_id, + ..*state + }), + ); + } + }); + } + Facility::Sink => { + introspector.borrow().get_sink_info_by_index(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"), + ListResult::End => {} + ListResult::Item(sink_input) => { + let state = PaThread::unwrap_state(¤t_state); + let id = sink_input.index as PaEntityId; + + let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); + PaThread::set_state( + ¤t_state, + &state_tx, + Arc::new(PaVolumeState { + timestamp, + entities_by_id, + ..*state + }), + ); + } + }); + } + Facility::Source => { + introspector.borrow().get_source_info_by_index(entity_id, move |list_result| match list_result { + ListResult::Error => panic!("Introspector.get_source_info_by_index failed"), + ListResult::End => {} + ListResult::Item(sink_input) => { + let state = PaThread::unwrap_state(¤t_state); + let id = sink_input.index as PaEntityId; + + let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into())); + PaThread::set_state( + ¤t_state, + &state_tx, + Arc::new(PaVolumeState { + timestamp, + entities_by_id, + ..*state + }), + ); + } + }); + } + _ => {} + }; + } + }; + }))); + + let current_state = Arc::clone(&self.current_state); + let mainloop = Rc::clone(&self.mainloop); + + loop { + self.run_single_mainloop_iteration(false); + + if let Ok(command) = self.commands_rx.try_recv() { + match command { + PaCommand::SetIsMuted { id, value } => { + if let Some(state) = PaThread::unwrap_state(¤t_state).entities_by_id.get(&id) { + match state.kind { + PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_mute_by_index(id, value, None), + PaEntityKind::Source => self.introspector.borrow_mut().set_source_mute_by_index(id, value, None), + PaEntityKind::Application => self.introspector.borrow_mut().set_sink_input_mute(id, value, None), + }; + } + } + PaCommand::SetChannelVolumes { id, channel_volumes } => { + if let Some(state) = PaThread::unwrap_state(¤t_state).entities_by_id.get(&id) { + let mut value = state.channel_volumes; + for (i, v) in channel_volumes.iter().enumerate() { + value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32)); + } + + match state.kind { + PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_volume_by_index(id, &value, None), + PaEntityKind::Source => self.introspector.borrow_mut().set_source_volume_by_index(id, &value, None), + PaEntityKind::Application => self.introspector.borrow_mut().set_sink_input_volume(id, &value, None), + }; + } + } + PaCommand::Terminate => { + mainloop.borrow_mut().quit(Retval(0)); + } + } + } + } + } + + fn unwrap_state(state: &Arc>>>) -> Arc { + state + .read() + .unwrap() + .as_ref() + .cloned() + .expect("this function is only called after the initial state was set") + } + + fn set_state(current_state: &RwLock>>, state_tx: &broadcast::Sender>, value: Arc) { + let mut s = current_state.write().unwrap(); + *s = Some(Arc::clone(&value)); + state_tx.send(value).unwrap(); + } + + fn run_single_mainloop_iteration(&mut self, block: bool) { + match self.mainloop.borrow_mut().iterate(block) { + IterateResult::Quit(_) => { + panic!("Mainloop quit.") + } + IterateResult::Err(e) => { + panic!("Mainloop error: {}", e) + } + IterateResult::Success(_) => {} + } + } + + fn wait_for(&mut self, predicate: impl Fn(&mut Self) -> bool) { + loop { + self.run_single_mainloop_iteration(true); + + if predicate(self) { + break; + } + } + } + + fn wait_for_operation(&mut self, operation: Operation) -> Result<(), ()> { + loop { + self.run_single_mainloop_iteration(true); + + match operation.get_state() { + libpulse_binding::operation::State::Running => {} + libpulse_binding::operation::State::Done => return Ok(()), + libpulse_binding::operation::State::Cancelled => return Err(()), + } + } } } -impl Drop for PaThread { +#[derive(Debug)] +struct PaWorker { + commands_tx: flume::Sender, +} + +impl Drop for PaWorker { fn drop(&mut self) { self.commands_tx.send(PaCommand::Terminate).ok(); } @@ -228,31 +450,40 @@ impl Drop for PaThread { #[derive(Debug, Clone)] pub struct PaVolumeInterface { #[allow(unused)] - thread: Arc, - state_tx: broadcast::Sender, + worker: Arc, + current_state: Arc>>>, + state_tx: broadcast::Sender>, commands_tx: flume::Sender, } impl PaVolumeInterface { - pub fn spawn_thread(max_time_between_queries: Duration) -> PaVolumeInterface { + pub fn spawn_thread(client_name: String) -> PaVolumeInterface { let (commands_tx, commands_rx) = flume::unbounded(); let state_tx = broadcast::Sender::new(5); + let current_state = Arc::new(RwLock::new(None)); - let thread = PaThread::spawn(max_time_between_queries, commands_tx.clone(), commands_rx, state_tx.clone()); + PaThread::spawn(client_name, commands_rx, state_tx.clone(), Arc::clone(¤t_state)); + + let worker = PaWorker { + commands_tx: commands_tx.clone(), + }; PaVolumeInterface { - thread: Arc::new(thread), + worker: Arc::new(worker), + current_state, commands_tx, state_tx, } } - pub fn subscribe_to_state(&self) -> broadcast::Receiver { - self.state_tx.subscribe() + pub fn subscribe_to_state(&self) -> (Option>, Receiver>) { + let rx = self.state_tx.subscribe(); + let state = self.current_state(); + (state, rx) } - pub fn query_state(&self) { - self.commands_tx.send(PaCommand::QueryState).unwrap() + pub fn current_state(&self) -> Option> { + self.current_state.read().unwrap().clone() } pub fn set_is_muted(&self, id: PaEntityId, value: bool) {