This commit is contained in:
Moritz Ruth 2024-01-15 02:21:10 +01:00
parent 846a061063
commit aee74cb528
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
5 changed files with 498 additions and 185 deletions

39
Cargo.lock generated
View file

@ -831,6 +831,33 @@ version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058"
[[package]]
name = "libpulse-binding"
version = "2.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed3557a2dfc380c8f061189a01c6ae7348354e0c9886038dc6c171219c08eaff"
dependencies = [
"bitflags 1.3.2",
"libc",
"libpulse-sys",
"num-derive",
"num-traits",
"winapi",
]
[[package]]
name = "libpulse-sys"
version = "1.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc19e110fbf42c17260d30f6d3dc545f58491c7830d38ecb9aaca96e26067a9b"
dependencies = [
"libc",
"num-derive",
"num-traits",
"pkg-config",
"winapi",
]
[[package]]
name = "libudev"
version = "0.3.0"
@ -950,6 +977,17 @@ dependencies = [
"libc",
]
[[package]]
name = "num-derive"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "num-traits"
version = "0.2.17"
@ -996,6 +1034,7 @@ version = "0.1.0"
dependencies = [
"flume",
"im",
"libpulse-binding",
"tokio",
]

View file

@ -1,23 +1,23 @@
[knobs.right-top]
icon = "@ph/microphone-light[scale=0.9]"
indicators.bar.color = "#ffffff50"
mode.audio_volume.direction = "input"
mode.audio_volume.regex = "^(SC425 USB Microphone Analog Stereo)$"
mode.audio_volume.disable_press_to_unmute = true
mode.audio_volume.muted_turn_action = "unmute-at-zero"
mode.audio_volume.style.muted.label = "Muted"
mode.audio_volume.style.inactive.icon = "@ph/microphone-slash-light[alpha=0.9|color=#fc4646]"
[knobs.right-middle]
icon = "@apps/discord[scale=0.25]"
indicators.bar.color = "#ffffff50"
mode.audio_volume.regex = "Discord"
mode.audio_volume.style.active.label = "{percentage}%"
mode.audio_volume.style.muted.label = "Muted"
mode.audio_volume.style.inactive.label = ""
mode.audio_volume.style.inactive.icon = "@apps/discord[grayscale|alpha=0.9]"
#[knobs.right-top]
#icon = "@ph/microphone-light[scale=0.9]"
#indicators.bar.color = "#ffffff50"
#
#mode.audio_volume.direction = "input"
#mode.audio_volume.regex = "^(SC425 USB Microphone Analog Stereo)$"
#mode.audio_volume.disable_press_to_unmute = true
#mode.audio_volume.muted_turn_action = "unmute-at-zero"
#mode.audio_volume.style.muted.label = "Muted"
#mode.audio_volume.style.inactive.icon = "@ph/microphone-slash-light[alpha=0.9|color=#fc4646]"
#
#[knobs.right-middle]
#icon = "@apps/discord[scale=0.25]"
#indicators.bar.color = "#ffffff50"
#
#mode.audio_volume.regex = "Discord"
#mode.audio_volume.style.active.label = "{percentage}%"
#mode.audio_volume.style.muted.label = "Muted"
#mode.audio_volume.style.inactive.label = ""
#mode.audio_volume.style.inactive.icon = "@apps/discord[grayscale|alpha=0.9]"
[knobs.right-bottom]
icon = "@apps/spotify[scale=1.1]"

View file

@ -1,6 +1,6 @@
use std::borrow::ToOwned;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use regex::Regex;
@ -64,29 +64,73 @@ pub enum State {
Muted,
}
static PA_VOLUME_INTERFACE: Lazy<PaVolumeInterface> = Lazy::new(|| {
let interface = PaVolumeInterface::spawn_thread(Duration::from_millis(500));
interface.query_state();
interface
});
static PA_VOLUME_INTERFACE: Lazy<PaVolumeInterface> = Lazy::new(|| PaVolumeInterface::spawn_thread("deckster".to_owned()));
fn get_volume_cv(channel_volumes: &Vec<f32>) -> f32 {
*channel_volumes.first().unwrap()
}
fn get_volume_es(entity_state: &Option<Arc<PaEntityState>>) -> f32 {
entity_state.as_ref().map(|s| *s.channel_volumes().first().unwrap()).unwrap_or(0.0)
}
pub async fn handle(path: KnobPath, config: Arc<Config>, mut events: broadcast::Receiver<(KnobPath, KnobEvent)>, commands: flume::Sender<IoWorkerCommand>) {
let mut volume_update_timestamp = Instant::now();
let mut value: f32 = 0.0;
let mut entity_state: Option<Arc<PaEntityState>> = None;
let pa_volume_interface = &PA_VOLUME_INTERFACE;
let mut volume_states = pa_volume_interface.subscribe_to_state();
let (initial_state, mut volume_states) = pa_volume_interface.subscribe_to_state();
if let Some(state) = initial_state {
entity_state = state
.entities_by_id()
.values()
.find(|entity| config.regex.is_match(entity.display_name()))
.map(Arc::clone);
commands
.send(IoWorkerCommand::SetKnobValue {
path: path.clone(),
value: get_volume_es(&entity_state),
})
.unwrap();
}
let update_style = {
let commands = commands.clone();
let config = Arc::clone(&config);
let path = path.clone();
move |entity_state: &Option<Arc<PaEntityState>>| {
let state = match entity_state {
None => State::Inactive,
Some(s) if s.is_muted() => State::Muted,
Some(_) => State::Active,
};
let mut style = config.style.get(&state).cloned();
if let Some(ref mut s) = &mut style {
let v = get_volume_es(entity_state);
if let Some(ref mut label) = &mut s.label {
*label = label.replace("{percentage}", &((v * 100.0).round() as u32).to_string());
}
}
commands
.send(IoWorkerCommand::SetKnobStyle {
path: path.clone(),
value: style,
})
.unwrap();
}
};
loop {
select! {
Ok(volume_state) = volume_states.recv() => {
entity_state = volume_state.entities_by_id().values().find(|entity| config.regex.is_match(entity.name())).map(Arc::clone);
entity_state = volume_state.entities_by_id().values().find(|entity| config.regex.is_match(entity.display_name())).map(Arc::clone);
update_style(&entity_state);
if volume_state.timestamp() > &volume_update_timestamp {
value = entity_state.as_ref().map(|s| *s.channel_volumes().first().unwrap()).unwrap_or(0.0);
commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value }).unwrap();
}
commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value: get_volume_es(&entity_state) }).unwrap()
}
Ok((event_path, event)) = events.recv() => {
@ -102,11 +146,9 @@ pub async fn handle(path: KnobPath, config: Arc<Config>, mut events: broadcast::
RotationDirection::Counterclockwise => -1.0,
};
volume_update_timestamp = Instant::now();
value = (value + (factor * config.delta)).clamp(0.0, 1.0);
pa_volume_interface.set_channel_volumes(*entity_state.id(), vec![value; entity_state.channel_volumes().len()]);
commands.send(IoWorkerCommand::SetKnobValue { path: path.clone(), value }).unwrap();
let current_v = get_volume_cv(&entity_state.channel_volumes());
let v = (current_v + (factor * config.delta)).clamp(0.0, 1.0);
pa_volume_interface.set_channel_volumes(*entity_state.id(), vec![v; entity_state.channel_volumes().len()]);
}
KnobEvent::Press => {
pa_volume_interface.set_is_muted(*entity_state.id(), !entity_state.is_muted())

View file

@ -7,3 +7,4 @@ edition = "2021"
flume = "0.11.0"
im = "15.1.0"
tokio = { version = "1.35.1", default-features = false, features = ["sync"] }
libpulse-binding = "2.28.1"

View file

@ -1,12 +1,23 @@
use std::process::Command;
use std::sync::Arc;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Instant;
use im::HashMap;
use libpulse_binding::callbacks::ListResult;
use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo};
use libpulse_binding::context::subscribe::{Facility, InterestMaskSet};
use libpulse_binding::context::{subscribe, Context, FlagSet};
use libpulse_binding::def::Retval;
use libpulse_binding::mainloop::api::Mainloop as MainloopTrait;
use libpulse_binding::mainloop::standard::{IterateResult, Mainloop};
use libpulse_binding::operation::Operation;
use libpulse_binding::volume::{ChannelVolumes, Volume};
use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver;
pub type PaEntityId = usize;
pub type PaEntityId = u32;
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub enum PaEntityKind {
@ -20,7 +31,8 @@ pub struct PaEntityState {
id: PaEntityId,
kind: PaEntityKind,
name: String,
channel_volumes: Box<[f32]>,
display_name: String,
channel_volumes: ChannelVolumes,
is_muted: bool,
}
@ -37,8 +49,12 @@ impl PaEntityState {
&self.name
}
pub fn channel_volumes(&self) -> &[f32] {
&self.channel_volumes
pub fn display_name(&self) -> &String {
&self.display_name
}
pub fn channel_volumes(&self) -> Vec<f32> {
self.channel_volumes.get().iter().map(|v| v.0 as f32 / Volume::NORMAL.0 as f32).collect()
}
pub fn is_muted(&self) -> bool {
@ -46,6 +62,45 @@ impl PaEntityState {
}
}
impl From<&SourceInfo<'_>> for PaEntityState {
fn from(value: &SourceInfo) -> Self {
PaEntityState {
id: value.index as PaEntityId,
is_muted: value.mute,
name: value.name.clone().unwrap_or_default().into_owned(),
display_name: value.description.clone().unwrap_or_default().into_owned(),
kind: PaEntityKind::Source,
channel_volumes: value.volume,
}
}
}
impl From<&SinkInfo<'_>> for PaEntityState {
fn from(value: &SinkInfo) -> Self {
PaEntityState {
id: value.index as PaEntityId,
is_muted: value.mute,
name: value.name.clone().unwrap_or_default().into_owned(),
display_name: value.description.clone().unwrap_or_default().into_owned(),
kind: PaEntityKind::Sink,
channel_volumes: value.volume,
}
}
}
impl From<&SinkInputInfo<'_>> for PaEntityState {
fn from(value: &SinkInputInfo) -> Self {
PaEntityState {
id: value.index as PaEntityId,
is_muted: value.mute,
name: value.name.clone().unwrap_or_default().into_owned(),
display_name: value.name.clone().unwrap_or_default().into_owned(),
kind: PaEntityKind::Application,
channel_volumes: value.volume,
}
}
}
#[derive(Debug, Clone)]
pub struct PaVolumeState {
timestamp: Instant,
@ -72,154 +127,321 @@ impl PaVolumeState {
}
}
impl Default for PaVolumeState {
fn default() -> Self {
PaVolumeState {
timestamp: Instant::now(),
entities_by_id: HashMap::new(),
default_sink_id: 0,
default_source_id: 0,
}
}
}
#[derive(Debug)]
enum PaCommand {
QueryState,
SetIsMuted { id: PaEntityId, value: bool },
SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> },
Terminate,
}
#[derive(Debug)]
struct PaThread {
commands_tx: flume::Sender<PaCommand>,
}
fn query_and_publish_state(state_tx: &broadcast::Sender<PaVolumeState>) {
let output = Command::new("pulsemixer").arg("-l").output().unwrap();
if !output.status.success() {
panic!("`pulsemixer -l` failed");
}
let raw = String::from_utf8_lossy(&output.stdout);
let mut default_sink_id = 0;
let mut default_source_id = 0;
let entities_by_id: HashMap<_, _> = raw
.lines()
.map(|line| {
let (prefix, rest) = line.split_once(':').unwrap();
let kind = match prefix {
"Sink" => PaEntityKind::Sink,
"Source" => PaEntityKind::Source,
"Sink input" => PaEntityKind::Application,
x => panic!("Unknown entity kind: {}", x),
};
let segments: Vec<&str> = rest.split(", ").collect();
let id = segments[0]
.trim_start()
.strip_prefix("ID: ")
.unwrap()
.split('-')
.last()
.unwrap()
.parse::<usize>()
.unwrap();
let name = segments[1].strip_prefix("Name: ").unwrap();
let is_muted = segments[2].strip_prefix("Mute: ").unwrap().parse::<u8>().unwrap() == 1;
let channel_count = segments[3].strip_prefix("Channels: ").unwrap().parse::<usize>().unwrap();
let channel_volumes: Box<[f32]> = segments[4..(4 + channel_count)]
.iter()
.map(|s| {
let s = s.strip_prefix("Volumes: [").unwrap_or(s);
s.strip_suffix(']').unwrap_or(s)
})
.map(|s| &s[1..(s.len() - 2)])
.map(|s| s.parse::<f32>().unwrap() / 100.0)
.collect();
if segments.last().unwrap() == &"Default" {
match kind {
PaEntityKind::Source => {
default_source_id = id;
}
PaEntityKind::Sink => {
default_sink_id = id;
}
PaEntityKind::Application => {
panic!("Sink sources cannot be a default");
}
}
}
(
id,
Arc::new(PaEntityState {
id,
kind,
name: name.to_owned(),
is_muted,
channel_volumes,
}),
)
})
.collect();
state_tx
.send(PaVolumeState {
timestamp: Instant::now(),
entities_by_id,
default_sink_id,
default_source_id,
})
.unwrap();
mainloop: Rc<RefCell<Mainloop>>,
context: Rc<RefCell<Context>>,
introspector: Rc<RefCell<Introspector>>,
commands_rx: flume::Receiver<PaCommand>,
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
}
impl PaThread {
fn spawn(
max_time_between_queries: Duration,
commands_tx: flume::Sender<PaCommand>,
client_name: String,
commands_rx: flume::Receiver<PaCommand>,
state_tx: broadcast::Sender<PaVolumeState>,
) -> PaThread {
thread::spawn(move || loop {
let command = commands_rx.recv_timeout(max_time_between_queries).unwrap_or(PaCommand::QueryState);
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
) {
thread::spawn(move || {
let mainloop = Mainloop::new().unwrap();
let context = Context::new(&mainloop, &client_name).unwrap();
let introspector = context.introspect();
let mut t = PaThread {
mainloop: Rc::new(RefCell::new(mainloop)),
context: Rc::new(RefCell::new(context)),
introspector: Rc::new(RefCell::new(introspector)),
commands_rx,
state_tx,
current_state,
};
t.init();
t.run();
});
}
fn init(&mut self) {
self.context.borrow_mut().connect(None, FlagSet::NOFLAGS, None).unwrap();
self.wait_for(|s| s.context.borrow().get_state() == libpulse_binding::context::State::Ready);
let introspector = Rc::clone(&self.introspector);
{
let entities_by_id = Rc::new(RefCell::new(HashMap::<PaEntityId, Arc<PaEntityState>>::new()));
let entities_by_id_c = Rc::clone(&entities_by_id);
self.wait_for_operation(introspector.borrow().get_sink_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_list failed"),
ListResult::End => {}
ListResult::Item(sink) => {
entities_by_id_c.borrow_mut().insert(sink.index as PaEntityId, Arc::new(sink.into()));
}
}))
.unwrap();
let entities_by_id_c = Rc::clone(&entities_by_id);
self.wait_for_operation(introspector.borrow().get_source_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_list failed"),
ListResult::End => {}
ListResult::Item(source) => {
entities_by_id_c.borrow_mut().insert(source.index as PaEntityId, Arc::new(source.into()));
}
}))
.unwrap();
let entities_by_id_c = Rc::clone(&entities_by_id);
self.wait_for_operation(introspector.borrow().get_sink_input_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let id = sink_input.index as PaEntityId;
entities_by_id_c.borrow_mut().insert(id, Arc::new(sink_input.into()));
}
}))
.unwrap();
PaThread::set_state(
&self.current_state,
&self.state_tx,
Arc::new(PaVolumeState {
timestamp: Instant::now(),
entities_by_id: Rc::into_inner(entities_by_id).unwrap().into_inner(),
default_sink_id: 0,
default_source_id: 0,
}),
);
};
self.context
.borrow_mut()
.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| {
if !success {
panic!("Context.subscribe failed")
}
});
}
fn run(mut self) {
let introspector = Rc::clone(&self.introspector);
let current_state = Arc::clone(&self.current_state);
let state_tx = self.state_tx.clone();
self.context
.borrow_mut()
.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| {
let entity_id = entity_id as PaEntityId;
let facility = facility.unwrap();
let timestamp = Instant::now();
let current_state = Arc::clone(&current_state);
match operation.unwrap() {
subscribe::Operation::Removed => {
let state = PaThread::unwrap_state(&current_state);
let entities_by_id = state.entities_by_id.without(&entity_id);
let default_source_id = if entity_id == state.default_source_id { 0 } else { state.default_source_id };
let default_sink_id = if entity_id == state.default_sink_id { 0 } else { state.default_sink_id };
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
default_source_id,
default_sink_id,
}),
);
}
subscribe::Operation::New | subscribe::Operation::Changed => {
let state_tx = state_tx.clone();
match facility {
Facility::SinkInput => {
introspector.borrow().get_sink_input_info(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_input_info failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
Facility::Sink => {
introspector.borrow().get_sink_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
Facility::Source => {
introspector.borrow().get_source_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_by_index failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
_ => {}
};
}
};
})));
let current_state = Arc::clone(&self.current_state);
let mainloop = Rc::clone(&self.mainloop);
loop {
self.run_single_mainloop_iteration(false);
if let Ok(command) = self.commands_rx.try_recv() {
match command {
PaCommand::SetIsMuted { id, value } => {
let action = if value { "--mute" } else { "--unmute" };
let status = Command::new("pulsemixer").args(["--id", &id.to_string(), action]).status().unwrap();
if !status.success() {
panic!("(Un-)muting failed with status code {:?}", status.code());
if let Some(state) = PaThread::unwrap_state(&current_state).entities_by_id.get(&id) {
match state.kind {
PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_mute_by_index(id, value, None),
PaEntityKind::Source => self.introspector.borrow_mut().set_source_mute_by_index(id, value, None),
PaEntityKind::Application => self.introspector.borrow_mut().set_sink_input_mute(id, value, None),
};
}
}
PaCommand::SetChannelVolumes { id, channel_volumes } => {
let volumes = channel_volumes
.iter()
.map(|v| (v * 100.0).round() as u32)
.map(|v| v.to_string())
.collect::<Vec<_>>()
.join(":");
let status = Command::new("pulsemixer")
.args(["--id", &id.to_string(), "--set-volume-all", &volumes])
.status()
.unwrap();
if !status.success() {
panic!("Setting the channel values failed with status code {:?}", status.code());
}
}
PaCommand::Terminate => break,
PaCommand::QueryState => {}
if let Some(state) = PaThread::unwrap_state(&current_state).entities_by_id.get(&id) {
let mut value = state.channel_volumes;
for (i, v) in channel_volumes.iter().enumerate() {
value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32));
}
query_and_publish_state(&state_tx)
});
PaThread { commands_tx }
match state.kind {
PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_volume_by_index(id, &value, None),
PaEntityKind::Source => self.introspector.borrow_mut().set_source_volume_by_index(id, &value, None),
PaEntityKind::Application => self.introspector.borrow_mut().set_sink_input_volume(id, &value, None),
};
}
}
PaCommand::Terminate => {
mainloop.borrow_mut().quit(Retval(0));
}
}
}
}
}
impl Drop for PaThread {
fn unwrap_state(state: &Arc<RwLock<Option<Arc<PaVolumeState>>>>) -> Arc<PaVolumeState> {
state
.read()
.unwrap()
.as_ref()
.cloned()
.expect("this function is only called after the initial state was set")
}
fn set_state(current_state: &RwLock<Option<Arc<PaVolumeState>>>, state_tx: &broadcast::Sender<Arc<PaVolumeState>>, value: Arc<PaVolumeState>) {
let mut s = current_state.write().unwrap();
*s = Some(Arc::clone(&value));
state_tx.send(value).unwrap();
}
fn run_single_mainloop_iteration(&mut self, block: bool) {
match self.mainloop.borrow_mut().iterate(block) {
IterateResult::Quit(_) => {
panic!("Mainloop quit.")
}
IterateResult::Err(e) => {
panic!("Mainloop error: {}", e)
}
IterateResult::Success(_) => {}
}
}
fn wait_for(&mut self, predicate: impl Fn(&mut Self) -> bool) {
loop {
self.run_single_mainloop_iteration(true);
if predicate(self) {
break;
}
}
}
fn wait_for_operation<T: ?Sized>(&mut self, operation: Operation<T>) -> Result<(), ()> {
loop {
self.run_single_mainloop_iteration(true);
match operation.get_state() {
libpulse_binding::operation::State::Running => {}
libpulse_binding::operation::State::Done => return Ok(()),
libpulse_binding::operation::State::Cancelled => return Err(()),
}
}
}
}
#[derive(Debug)]
struct PaWorker {
commands_tx: flume::Sender<PaCommand>,
}
impl Drop for PaWorker {
fn drop(&mut self) {
self.commands_tx.send(PaCommand::Terminate).ok();
}
@ -228,31 +450,40 @@ impl Drop for PaThread {
#[derive(Debug, Clone)]
pub struct PaVolumeInterface {
#[allow(unused)]
thread: Arc<PaThread>,
state_tx: broadcast::Sender<PaVolumeState>,
worker: Arc<PaWorker>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
commands_tx: flume::Sender<PaCommand>,
}
impl PaVolumeInterface {
pub fn spawn_thread(max_time_between_queries: Duration) -> PaVolumeInterface {
pub fn spawn_thread(client_name: String) -> PaVolumeInterface {
let (commands_tx, commands_rx) = flume::unbounded();
let state_tx = broadcast::Sender::new(5);
let current_state = Arc::new(RwLock::new(None));
let thread = PaThread::spawn(max_time_between_queries, commands_tx.clone(), commands_rx, state_tx.clone());
PaThread::spawn(client_name, commands_rx, state_tx.clone(), Arc::clone(&current_state));
let worker = PaWorker {
commands_tx: commands_tx.clone(),
};
PaVolumeInterface {
thread: Arc::new(thread),
worker: Arc::new(worker),
current_state,
commands_tx,
state_tx,
}
}
pub fn subscribe_to_state(&self) -> broadcast::Receiver<PaVolumeState> {
self.state_tx.subscribe()
pub fn subscribe_to_state(&self) -> (Option<Arc<PaVolumeState>>, Receiver<Arc<PaVolumeState>>) {
let rx = self.state_tx.subscribe();
let state = self.current_state();
(state, rx)
}
pub fn query_state(&self) {
self.commands_tx.send(PaCommand::QueryState).unwrap()
pub fn current_state(&self) -> Option<Arc<PaVolumeState>> {
self.current_state.read().unwrap().clone()
}
pub fn set_is_muted(&self, id: PaEntityId, value: bool) {