Refactor pa_volume_interface to not poll inefficiently

This commit is contained in:
Moritz Ruth 2024-06-17 00:27:10 +02:00
parent 4d476c5e42
commit e2f4aac438
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
11 changed files with 212 additions and 329 deletions

View file

@ -7,4 +7,6 @@ edition = "2021"
flume = "0.11.0"
im = "15.1.0"
tokio = { version = "1.35.1", default-features = false, features = ["sync"] }
libpulse-binding = "2.28.1"
libpulse-binding = "2.28.1"
log = "0.4.21"
arc-swap = "1.7.1"

View file

@ -1,5 +1,4 @@
use std::cell::RefCell;
use std::rc::Rc;
use arc_swap::ArcSwap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Instant;
@ -8,11 +7,12 @@ use im::HashMap;
use libpulse_binding::callbacks::ListResult;
use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo};
use libpulse_binding::context::subscribe::{Facility, InterestMaskSet};
use libpulse_binding::context::{subscribe, Context, FlagSet};
use libpulse_binding::context::{subscribe, Context, FlagSet, State};
use libpulse_binding::def::Retval;
use libpulse_binding::mainloop::standard::{IterateResult, Mainloop};
use libpulse_binding::operation::Operation;
use libpulse_binding::mainloop::api::Mainloop as _;
use libpulse_binding::mainloop::threaded::Mainloop;
use libpulse_binding::volume::{ChannelVolumes, Volume};
use log::debug;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver;
@ -129,8 +129,6 @@ impl From<&SinkInputInfo<'_>> for PaEntityState {
pub struct PaVolumeState {
timestamp: Instant,
entities_by_id: HashMap<PaEntityId, Arc<PaEntityState>>,
default_sink_id: PaEntityId,
default_source_id: PaEntityId,
}
impl PaVolumeState {
@ -141,14 +139,6 @@ impl PaVolumeState {
pub fn entities_by_id(&self) -> &HashMap<PaEntityId, Arc<PaEntityState>> {
&self.entities_by_id
}
pub fn default_sink_id(&self) -> &PaEntityId {
&self.default_sink_id
}
pub fn default_source_id(&self) -> &PaEntityId {
&self.default_source_id
}
}
impl Default for PaVolumeState {
@ -156,44 +146,56 @@ impl Default for PaVolumeState {
PaVolumeState {
timestamp: Instant::now(),
entities_by_id: HashMap::new(),
default_sink_id: 0,
default_source_id: 0,
}
}
}
#[derive(Debug)]
enum PaCommand {
enum PaThreadMessage {
// by the user
SetIsMuted { id: PaEntityId, value: bool },
SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> },
Terminate,
// internal
LoadSinkInput { entity_id: PaEntityId },
LoadSink { entity_id: PaEntityId },
LoadSource { entity_id: PaEntityId },
UpsertEntity { entity_state: Arc<PaEntityState> },
RemoveEntity { entity_id: PaEntityId },
}
struct PaThread {
mainloop: Rc<RefCell<Mainloop>>,
context: Rc<RefCell<Context>>,
introspector: Rc<RefCell<Introspector>>,
commands_rx: flume::Receiver<PaCommand>,
mainloop: Mainloop,
context: Context,
introspector: Introspector,
commands_tx: flume::Sender<PaThreadMessage>,
commands_rx: flume::Receiver<PaThreadMessage>,
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
current_state: Arc<ArcSwap<PaVolumeState>>,
}
impl PaThread {
fn spawn(
client_name: String,
commands_rx: flume::Receiver<PaCommand>,
commands_tx: flume::Sender<PaThreadMessage>,
commands_rx: flume::Receiver<PaThreadMessage>,
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
current_state: Arc<ArcSwap<PaVolumeState>>,
) {
thread::spawn(move || {
let mainloop = Mainloop::new().unwrap();
let mut mainloop = Mainloop::new().unwrap();
let context = Context::new(&mainloop, &client_name).unwrap();
let introspector = context.introspect();
debug!("Starting the mainloop thread…");
mainloop.start().expect("starting the mainloop never fails");
let mut t = PaThread {
mainloop: Rc::new(RefCell::new(mainloop)),
context: Rc::new(RefCell::new(context)),
introspector: Rc::new(RefCell::new(introspector)),
mainloop,
context,
introspector,
commands_tx,
commands_rx,
state_tx,
current_state,
@ -205,273 +207,208 @@ impl PaThread {
}
fn init(&mut self) {
self.context.borrow_mut().connect(None, FlagSet::NOFLAGS, None).unwrap();
self.wait_for(|s| s.context.borrow().get_state() == libpulse_binding::context::State::Ready);
let introspector = Rc::clone(&self.introspector);
debug!("Initializing…");
self.mainloop.lock();
self.context.connect(None, FlagSet::NOFLAGS, None).unwrap();
{
let entities_by_id = Rc::new(RefCell::new(HashMap::<PaEntityId, Arc<PaEntityState>>::new()));
let entities_by_id_c = Rc::clone(&entities_by_id);
let (context_state_change_tx, context_state_change_rx) = flume::bounded(1);
self.context.set_state_callback(Some(Box::new(move || {
context_state_change_tx.send(()).unwrap();
})));
self.wait_for_operation(introspector.borrow().get_sink_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_list failed"),
ListResult::End => {}
ListResult::Item(sink) => {
entities_by_id_c.borrow_mut().insert(sink.index as PaEntityId, Arc::new(sink.into()));
self.mainloop.unlock();
loop {
context_state_change_rx.recv().unwrap();
self.mainloop.lock();
if self.context.get_state() == State::Ready {
break;
}
}))
.unwrap();
self.mainloop.unlock();
}
}
let entities_by_id_c = Rc::clone(&entities_by_id);
self.wait_for_operation(introspector.borrow().get_source_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_list failed"),
ListResult::End => {}
ListResult::Item(source) => {
entities_by_id_c.borrow_mut().insert(source.index as PaEntityId, Arc::new(source.into()));
}
}))
.unwrap();
// Mainloop is still locked
let entities_by_id_c = Rc::clone(&entities_by_id);
self.wait_for_operation(introspector.borrow().get_sink_input_info_list(move |list_result| match list_result {
{
let commands_tx = self.commands_tx.clone();
self.introspector.get_sink_input_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let id = sink_input.index as PaEntityId;
entities_by_id_c.borrow_mut().insert(id, Arc::new(sink_input.into()));
}
}))
.unwrap();
PaThread::set_state(
&self.current_state,
&self.state_tx,
Arc::new(PaVolumeState {
timestamp: Instant::now(),
entities_by_id: Rc::into_inner(entities_by_id).unwrap().into_inner(),
default_sink_id: 0,
default_source_id: 0,
}),
);
};
self.context
.borrow_mut()
.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| {
if !success {
panic!("Context.subscribe failed")
}
ListResult::Item(sink_input) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(sink_input.into()),
})
.unwrap(),
});
}
}
fn run(mut self) {
let introspector = Rc::clone(&self.introspector);
let current_state = Arc::clone(&self.current_state);
let state_tx = self.state_tx.clone();
{
let commands_tx = self.commands_tx.clone();
self.introspector.get_sink_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_list failed"),
ListResult::End => {}
ListResult::Item(sink) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(sink.into()),
})
.unwrap(),
});
}
self.context
.borrow_mut()
.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| {
{
let commands_tx = self.commands_tx.clone();
self.introspector.get_source_info_list(move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_list failed"),
ListResult::End => {}
ListResult::Item(source) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(source.into()),
})
.unwrap(),
});
}
{
let commands_tx = self.commands_tx.clone();
self.context.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| {
let entity_id = entity_id as PaEntityId;
let facility = facility.unwrap();
let timestamp = Instant::now();
let current_state = Arc::clone(&current_state);
match operation.unwrap() {
subscribe::Operation::Removed => {
let state = PaThread::unwrap_state(&current_state);
let entities_by_id = state.entities_by_id.without(&entity_id);
let default_source_id = if entity_id == state.default_source_id { 0 } else { state.default_source_id };
let default_sink_id = if entity_id == state.default_sink_id { 0 } else { state.default_sink_id };
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
default_source_id,
default_sink_id,
}),
);
commands_tx.send(PaThreadMessage::RemoveEntity { entity_id }).unwrap();
}
subscribe::Operation::New | subscribe::Operation::Changed => {
let state_tx = state_tx.clone();
match facility {
Facility::SinkInput => {
introspector.borrow().get_sink_input_info(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_input_info failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
Facility::Sink => {
introspector.borrow().get_sink_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
Facility::Source => {
introspector.borrow().get_source_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_by_index failed"),
ListResult::End => {}
ListResult::Item(sink_input) => {
let state = PaThread::unwrap_state(&current_state);
let id = sink_input.index as PaEntityId;
let entities_by_id = state.entities_by_id.update(id, Arc::new(sink_input.into()));
PaThread::set_state(
&current_state,
&state_tx,
Arc::new(PaVolumeState {
timestamp,
entities_by_id,
..*state
}),
);
}
});
}
Facility::SinkInput => commands_tx.send(PaThreadMessage::LoadSinkInput { entity_id }).unwrap(),
Facility::Sink => commands_tx.send(PaThreadMessage::LoadSink { entity_id }).unwrap(),
Facility::Source => commands_tx.send(PaThreadMessage::LoadSource { entity_id }).unwrap(),
_ => {}
};
}
};
})));
}
let current_state = Arc::clone(&self.current_state);
let mainloop = Rc::clone(&self.mainloop);
self.context
.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| {
if !success {
panic!("Context.subscribe failed")
}
});
self.mainloop.unlock();
}
fn run(mut self) {
debug!("Waiting for commands…");
'outer: loop {
self.run_single_mainloop_iteration(false);
while let Ok(command) = self.commands_rx.recv() {
self.mainloop.lock();
let commands_tx = self.commands_tx.clone();
while let Ok(command) = self.commands_rx.try_recv() {
match command {
PaCommand::SetIsMuted { id, value } => {
if let Some(state) = PaThread::unwrap_state(&current_state).entities_by_id.get(&id) {
PaThreadMessage::Terminate => {
break 'outer;
}
PaThreadMessage::SetIsMuted { id, value } => {
if let Some(state) = self.current_state.load().entities_by_id.get(&id) {
match state.kind() {
PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_mute_by_index(id, value, None),
PaEntityKind::Source => self.introspector.borrow_mut().set_source_mute_by_index(id, value, None),
PaEntityKind::SinkInput => self.introspector.borrow_mut().set_sink_input_mute(id, value, None),
PaEntityKind::Sink => self.introspector.set_sink_mute_by_index(id, value, None),
PaEntityKind::Source => self.introspector.set_source_mute_by_index(id, value, None),
PaEntityKind::SinkInput => self.introspector.set_sink_input_mute(id, value, None),
};
}
}
PaCommand::SetChannelVolumes { id, channel_volumes } => {
if let Some(state) = PaThread::unwrap_state(&current_state).entities_by_id.get(&id) {
PaThreadMessage::SetChannelVolumes { id, channel_volumes } => {
if let Some(state) = self.current_state.load().entities_by_id.get(&id) {
let mut value = state.channel_volumes;
for (i, v) in channel_volumes.iter().enumerate() {
value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32));
}
match state.kind() {
PaEntityKind::Sink => self.introspector.borrow_mut().set_sink_volume_by_index(id, &value, None),
PaEntityKind::Source => self.introspector.borrow_mut().set_source_volume_by_index(id, &value, None),
PaEntityKind::SinkInput => self.introspector.borrow_mut().set_sink_input_volume(id, &value, None),
PaEntityKind::Sink => self.introspector.set_sink_volume_by_index(id, &value, None),
PaEntityKind::Source => self.introspector.set_source_volume_by_index(id, &value, None),
PaEntityKind::SinkInput => self.introspector.set_sink_input_volume(id, &value, None),
};
}
}
PaCommand::Terminate => {
break 'outer;
PaThreadMessage::LoadSinkInput { entity_id } => {
self.introspector.get_sink_input_info(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_input_info failed"),
ListResult::End => {}
ListResult::Item(sink_input) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(sink_input.into()),
})
.unwrap(),
});
}
PaThreadMessage::LoadSink { entity_id } => {
self.introspector.get_sink_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"),
ListResult::End => {}
ListResult::Item(sink) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(sink.into()),
})
.unwrap(),
});
}
PaThreadMessage::LoadSource { entity_id } => {
self.introspector.get_source_info_by_index(entity_id, move |list_result| match list_result {
ListResult::Error => panic!("Introspector.get_source_info_by_index failed"),
ListResult::End => {}
ListResult::Item(source) => commands_tx
.send(PaThreadMessage::UpsertEntity {
entity_state: Arc::new(source.into()),
})
.unwrap(),
});
}
PaThreadMessage::UpsertEntity { entity_state } => {
self.set_state(Arc::new(PaVolumeState {
timestamp: Instant::now(),
entities_by_id: self.current_state.load().entities_by_id.update(entity_state.id, entity_state),
}));
}
PaThreadMessage::RemoveEntity { entity_id } => {
self.set_state(Arc::new(PaVolumeState {
timestamp: Instant::now(),
entities_by_id: self.current_state.load().entities_by_id.without(&entity_id),
}));
}
}
self.mainloop.unlock();
}
}
mainloop.borrow_mut().quit(Retval(0));
self.mainloop.quit(Retval(0));
self.mainloop.unlock();
}
fn unwrap_state(state: &Arc<RwLock<Option<Arc<PaVolumeState>>>>) -> Arc<PaVolumeState> {
state
.read()
.unwrap()
.as_ref()
.cloned()
.expect("this function is only called after the initial state was set")
}
fn set_state(current_state: &RwLock<Option<Arc<PaVolumeState>>>, state_tx: &broadcast::Sender<Arc<PaVolumeState>>, value: Arc<PaVolumeState>) {
let mut s = current_state.write().unwrap();
*s = Some(Arc::clone(&value));
fn set_state(&self, value: Arc<PaVolumeState>) {
self.current_state.store(Arc::clone(&value));
// If there are no subscribers, thats ok.
_ = state_tx.send(value);
}
fn run_single_mainloop_iteration(&mut self, block: bool) {
match self.mainloop.borrow_mut().iterate(block) {
IterateResult::Quit(_) => {
panic!("Mainloop quit.")
}
IterateResult::Err(e) => {
panic!("Mainloop error: {}", e)
}
IterateResult::Success(_) => {}
}
}
fn wait_for(&mut self, predicate: impl Fn(&mut Self) -> bool) {
loop {
self.run_single_mainloop_iteration(true);
if predicate(self) {
break;
}
}
}
fn wait_for_operation<T: ?Sized>(&mut self, operation: Operation<T>) -> Result<(), ()> {
loop {
self.run_single_mainloop_iteration(true);
match operation.get_state() {
libpulse_binding::operation::State::Running => {}
libpulse_binding::operation::State::Done => return Ok(()),
libpulse_binding::operation::State::Cancelled => return Err(()),
}
}
_ = self.state_tx.send(value);
}
}
#[derive(Debug)]
struct PaWorker {
commands_tx: flume::Sender<PaCommand>,
commands_tx: flume::Sender<PaThreadMessage>,
}
impl Drop for PaWorker {
fn drop(&mut self) {
self.commands_tx.send(PaCommand::Terminate).ok();
self.commands_tx.send(PaThreadMessage::Terminate).ok();
}
}
@ -479,18 +416,21 @@ impl Drop for PaWorker {
pub struct PaVolumeInterface {
#[allow(unused)]
worker: Arc<PaWorker>,
current_state: Arc<RwLock<Option<Arc<PaVolumeState>>>>,
current_state: Arc<ArcSwap<PaVolumeState>>,
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
commands_tx: flume::Sender<PaCommand>,
commands_tx: flume::Sender<PaThreadMessage>,
}
impl PaVolumeInterface {
pub fn spawn_thread(client_name: String) -> PaVolumeInterface {
let (commands_tx, commands_rx) = flume::unbounded();
let state_tx = broadcast::Sender::new(5);
let current_state = Arc::new(RwLock::new(None));
let current_state = Arc::new(ArcSwap::new(Arc::new(PaVolumeState {
timestamp: Instant::now(),
entities_by_id: HashMap::new(),
})));
PaThread::spawn(client_name, commands_rx, state_tx.clone(), Arc::clone(&current_state));
PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(&current_state));
let worker = PaWorker {
commands_tx: commands_tx.clone(),
@ -504,23 +444,23 @@ impl PaVolumeInterface {
}
}
pub fn subscribe_to_state(&self) -> (Option<Arc<PaVolumeState>>, Receiver<Arc<PaVolumeState>>) {
pub fn subscribe_to_state(&self) -> (Arc<PaVolumeState>, Receiver<Arc<PaVolumeState>>) {
let rx = self.state_tx.subscribe();
let state = self.current_state();
(state, rx)
}
pub fn current_state(&self) -> Option<Arc<PaVolumeState>> {
self.current_state.read().unwrap().clone()
pub fn current_state(&self) -> Arc<PaVolumeState> {
Arc::clone(&self.current_state.load())
}
pub fn set_is_muted(&self, id: PaEntityId, value: bool) {
self.commands_tx.send(PaCommand::SetIsMuted { id, value }).unwrap()
self.commands_tx.send(PaThreadMessage::SetIsMuted { id, value }).unwrap()
}
pub fn set_channel_volumes(&self, id: PaEntityId, channel_volumes: impl Into<Box<[f32]>>) {
self.commands_tx
.send(PaCommand::SetChannelVolumes {
.send(PaThreadMessage::SetChannelVolumes {
id,
channel_volumes: channel_volumes.into(),
})