use std::process::Command; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; use im::HashMap; use tokio::sync::broadcast; pub type PaEntityId = usize; #[derive(Debug, Eq, PartialEq, Copy, Clone)] pub enum PaEntityKind { Source, Sink, Application, } #[derive(Debug, Clone)] pub struct PaEntityState { id: PaEntityId, kind: PaEntityKind, name: String, channel_volumes: Box<[f32]>, is_muted: bool, } impl PaEntityState { pub fn id(&self) -> &PaEntityId { &self.id } pub fn kind(&self) -> &PaEntityKind { &self.kind } pub fn name(&self) -> &String { &self.name } pub fn channel_volumes(&self) -> &[f32] { &self.channel_volumes } pub fn is_muted(&self) -> bool { self.is_muted } } #[derive(Debug, Clone)] pub struct PaVolumeState { timestamp: Instant, entities_by_id: HashMap>, default_sink_id: PaEntityId, default_source_id: PaEntityId, } impl PaVolumeState { pub fn timestamp(&self) -> &Instant { &self.timestamp } pub fn entities_by_id(&self) -> &HashMap> { &self.entities_by_id } pub fn default_sink_id(&self) -> &PaEntityId { &self.default_sink_id } pub fn default_source_id(&self) -> &PaEntityId { &self.default_source_id } } enum PaCommand { QueryState, SetIsMuted { id: PaEntityId, value: bool }, SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> }, Terminate, } #[derive(Debug)] struct PaThread { commands_tx: flume::Sender, } fn query_and_publish_state(state_tx: &broadcast::Sender) { let output = Command::new("pulsemixer").arg("-l").output().unwrap(); if !output.status.success() { panic!("`pulsemixer -l` failed"); } let raw = String::from_utf8_lossy(&output.stdout); let mut default_sink_id = 0; let mut default_source_id = 0; let entities_by_id: HashMap<_, _> = raw .lines() .map(|line| { let (prefix, rest) = line.split_once(':').unwrap(); let kind = match prefix { "Sink" => PaEntityKind::Sink, "Source" => PaEntityKind::Source, "Sink input" => PaEntityKind::Application, x => panic!("Unknown entity kind: {}", x), }; let segments: Vec<&str> = rest.split(", ").collect(); let id = segments[0] .trim_start() .strip_prefix("ID: ") .unwrap() .split('-') .last() .unwrap() .parse::() .unwrap(); let name = segments[1].strip_prefix("Name: ").unwrap(); let is_muted = segments[2].strip_prefix("Mute: ").unwrap().parse::().unwrap() == 1; let channel_count = segments[3].strip_prefix("Channels: ").unwrap().parse::().unwrap(); let channel_volumes: Box<[f32]> = segments[4..(4 + channel_count)] .iter() .map(|s| { let s = s.strip_prefix("Volumes: [").unwrap_or(s); s.strip_suffix(']').unwrap_or(s) }) .map(|s| &s[1..(s.len() - 2)]) .map(|s| s.parse::().unwrap() / 100.0) .collect(); if segments.last().unwrap() == &"Default" { match kind { PaEntityKind::Source => { default_source_id = id; } PaEntityKind::Sink => { default_sink_id = id; } PaEntityKind::Application => { panic!("Sink sources cannot be a default"); } } } ( id, Arc::new(PaEntityState { id, kind, name: name.to_owned(), is_muted, channel_volumes, }), ) }) .collect(); state_tx .send(PaVolumeState { timestamp: Instant::now(), entities_by_id, default_sink_id, default_source_id, }) .unwrap(); } impl PaThread { fn spawn( max_time_between_queries: Duration, commands_tx: flume::Sender, commands_rx: flume::Receiver, state_tx: broadcast::Sender, ) -> PaThread { thread::spawn(move || loop { let command = commands_rx.recv_timeout(max_time_between_queries).unwrap_or(PaCommand::QueryState); match command { PaCommand::SetIsMuted { id, value } => { let action = if value { "--mute" } else { "--unmute" }; let status = Command::new("pulsemixer").args(["--id", &id.to_string(), action]).status().unwrap(); if !status.success() { panic!("(Un-)muting failed with status code {:?}", status.code()); } } PaCommand::SetChannelVolumes { id, channel_volumes } => { let volumes = channel_volumes .iter() .map(|v| (v * 100.0).round() as u32) .map(|v| v.to_string()) .collect::>() .join(":"); let status = Command::new("pulsemixer") .args(["--id", &id.to_string(), "--set-volume-all", &volumes]) .status() .unwrap(); if !status.success() { panic!("Setting the channel values failed with status code {:?}", status.code()); } } PaCommand::Terminate => break, PaCommand::QueryState => {} } query_and_publish_state(&state_tx) }); PaThread { commands_tx } } } impl Drop for PaThread { fn drop(&mut self) { self.commands_tx.send(PaCommand::Terminate).ok(); } } #[derive(Debug, Clone)] pub struct PaVolumeInterface { #[allow(unused)] thread: Arc, state_tx: broadcast::Sender, commands_tx: flume::Sender, } impl PaVolumeInterface { pub fn spawn_thread(max_time_between_queries: Duration) -> PaVolumeInterface { let (commands_tx, commands_rx) = flume::unbounded(); let state_tx = broadcast::Sender::new(5); let thread = PaThread::spawn(max_time_between_queries, commands_tx.clone(), commands_rx, state_tx.clone()); PaVolumeInterface { thread: Arc::new(thread), commands_tx, state_tx, } } pub fn subscribe_to_state(&self) -> broadcast::Receiver { self.state_tx.subscribe() } pub fn query_state(&self) { self.commands_tx.send(PaCommand::QueryState).unwrap() } pub fn set_is_muted(&self, id: PaEntityId, value: bool) { self.commands_tx.send(PaCommand::SetIsMuted { id, value }).unwrap() } pub fn set_channel_volumes(&self, id: PaEntityId, channel_volumes: impl Into>) { self.commands_tx .send(PaCommand::SetChannelVolumes { id, channel_volumes: channel_volumes.into(), }) .unwrap() } }