Use the extracted version of pa_volume_interface
This commit is contained in:
parent
659d937280
commit
fd2fe49d53
8 changed files with 149 additions and 611 deletions
|
@ -1,12 +0,0 @@
|
|||
[package]
|
||||
name = "pa_volume_interface"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
flume = "0.11.0"
|
||||
im = "15.1.0"
|
||||
tokio = { version = "1.38.0", default-features = false, features = ["sync"] }
|
||||
libpulse-binding = "2.28.1"
|
||||
log = "0.4.21"
|
||||
arc-swap = "1.7.1"
|
|
@ -1,469 +0,0 @@
|
|||
use arc_swap::ArcSwap;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use im::HashMap;
|
||||
use libpulse_binding::callbacks::ListResult;
|
||||
use libpulse_binding::context::introspect::{Introspector, SinkInfo, SinkInputInfo, SourceInfo};
|
||||
use libpulse_binding::context::subscribe::{Facility, InterestMaskSet};
|
||||
use libpulse_binding::context::{subscribe, Context, FlagSet, State};
|
||||
use libpulse_binding::def::Retval;
|
||||
use libpulse_binding::mainloop::api::Mainloop as _;
|
||||
use libpulse_binding::mainloop::threaded::Mainloop;
|
||||
use libpulse_binding::volume::{ChannelVolumes, Volume};
|
||||
use log::debug;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::broadcast::Receiver;
|
||||
|
||||
pub type PaEntityId = u32;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
|
||||
pub enum PaEntityKind {
|
||||
Source,
|
||||
Sink,
|
||||
SinkInput,
|
||||
}
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Clone)]
|
||||
pub enum PaEntityMetadata {
|
||||
Source {
|
||||
name: String,
|
||||
description: String,
|
||||
},
|
||||
Sink {
|
||||
name: String,
|
||||
description: String,
|
||||
},
|
||||
SinkInput {
|
||||
description: String,
|
||||
binary_name: Option<String>,
|
||||
application_name: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct PaEntityState {
|
||||
id: PaEntityId,
|
||||
channel_volumes: ChannelVolumes,
|
||||
is_muted: bool,
|
||||
metadata: PaEntityMetadata,
|
||||
}
|
||||
|
||||
impl PaEntityState {
|
||||
pub fn id(&self) -> &PaEntityId {
|
||||
&self.id
|
||||
}
|
||||
|
||||
pub fn kind(&self) -> PaEntityKind {
|
||||
match &self.metadata {
|
||||
PaEntityMetadata::Source { .. } => PaEntityKind::Source,
|
||||
PaEntityMetadata::Sink { .. } => PaEntityKind::Sink,
|
||||
PaEntityMetadata::SinkInput { .. } => PaEntityKind::SinkInput,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn metadata(&self) -> &PaEntityMetadata {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
pub fn channel_volumes(&self) -> Vec<f32> {
|
||||
self.channel_volumes.get().iter().map(|v| v.0 as f32 / Volume::NORMAL.0 as f32).collect()
|
||||
}
|
||||
|
||||
pub fn is_muted(&self) -> bool {
|
||||
self.is_muted
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&SourceInfo<'_>> for PaEntityState {
|
||||
fn from(value: &SourceInfo) -> Self {
|
||||
PaEntityState {
|
||||
id: value.index as PaEntityId,
|
||||
is_muted: value.mute,
|
||||
channel_volumes: value.volume,
|
||||
metadata: PaEntityMetadata::Source {
|
||||
name: value.name.clone().unwrap_or_default().into_owned(),
|
||||
description: value.description.clone().unwrap_or_default().into_owned(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&SinkInfo<'_>> for PaEntityState {
|
||||
fn from(value: &SinkInfo) -> Self {
|
||||
PaEntityState {
|
||||
id: value.index as PaEntityId,
|
||||
is_muted: value.mute,
|
||||
channel_volumes: value.volume,
|
||||
metadata: PaEntityMetadata::Sink {
|
||||
name: value.name.clone().unwrap_or_default().into_owned(),
|
||||
description: value.description.clone().unwrap_or_default().into_owned(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&SinkInputInfo<'_>> for PaEntityState {
|
||||
fn from(value: &SinkInputInfo) -> Self {
|
||||
PaEntityState {
|
||||
id: value.index as PaEntityId,
|
||||
is_muted: value.mute,
|
||||
channel_volumes: value.volume,
|
||||
metadata: PaEntityMetadata::SinkInput {
|
||||
description: value.name.clone().unwrap_or_default().into_owned(),
|
||||
application_name: value
|
||||
.proplist
|
||||
.get("application.name")
|
||||
.map(|v| String::from_utf8_lossy(v).trim_end_matches(char::from(0)).to_owned()),
|
||||
binary_name: value
|
||||
.proplist
|
||||
.get("application.process.binary")
|
||||
.map(|v| String::from_utf8_lossy(v).trim_end_matches(char::from(0)).to_owned()),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PaVolumeState {
|
||||
timestamp: Instant,
|
||||
entities_by_id: HashMap<PaEntityId, Arc<PaEntityState>>,
|
||||
}
|
||||
|
||||
impl PaVolumeState {
|
||||
pub fn timestamp(&self) -> &Instant {
|
||||
&self.timestamp
|
||||
}
|
||||
|
||||
pub fn entities_by_id(&self) -> &HashMap<PaEntityId, Arc<PaEntityState>> {
|
||||
&self.entities_by_id
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PaVolumeState {
|
||||
fn default() -> Self {
|
||||
PaVolumeState {
|
||||
timestamp: Instant::now(),
|
||||
entities_by_id: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum PaThreadMessage {
|
||||
// by the user
|
||||
SetIsMuted { id: PaEntityId, value: bool },
|
||||
SetChannelVolumes { id: PaEntityId, channel_volumes: Box<[f32]> },
|
||||
Terminate,
|
||||
|
||||
// internal
|
||||
LoadSinkInput { entity_id: PaEntityId },
|
||||
LoadSink { entity_id: PaEntityId },
|
||||
LoadSource { entity_id: PaEntityId },
|
||||
UpsertEntity { entity_state: Arc<PaEntityState> },
|
||||
RemoveEntity { entity_id: PaEntityId },
|
||||
}
|
||||
|
||||
struct PaThread {
|
||||
mainloop: Mainloop,
|
||||
context: Context,
|
||||
introspector: Introspector,
|
||||
commands_tx: flume::Sender<PaThreadMessage>,
|
||||
commands_rx: flume::Receiver<PaThreadMessage>,
|
||||
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
|
||||
current_state: Arc<ArcSwap<PaVolumeState>>,
|
||||
}
|
||||
|
||||
impl PaThread {
|
||||
fn spawn(
|
||||
client_name: String,
|
||||
commands_tx: flume::Sender<PaThreadMessage>,
|
||||
commands_rx: flume::Receiver<PaThreadMessage>,
|
||||
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
|
||||
current_state: Arc<ArcSwap<PaVolumeState>>,
|
||||
) {
|
||||
thread::spawn(move || {
|
||||
let mut mainloop = Mainloop::new().unwrap();
|
||||
let context = Context::new(&mainloop, &client_name).unwrap();
|
||||
let introspector = context.introspect();
|
||||
|
||||
debug!("Starting the mainloop thread…");
|
||||
mainloop.start().expect("starting the mainloop never fails");
|
||||
|
||||
let mut t = PaThread {
|
||||
mainloop,
|
||||
context,
|
||||
introspector,
|
||||
commands_tx,
|
||||
commands_rx,
|
||||
state_tx,
|
||||
current_state,
|
||||
};
|
||||
|
||||
t.init();
|
||||
t.run();
|
||||
});
|
||||
}
|
||||
|
||||
fn init(&mut self) {
|
||||
debug!("Initializing…");
|
||||
self.mainloop.lock();
|
||||
self.context.connect(None, FlagSet::NOFLAGS, None).unwrap();
|
||||
|
||||
{
|
||||
let (context_state_change_tx, context_state_change_rx) = flume::bounded(1);
|
||||
self.context.set_state_callback(Some(Box::new(move || {
|
||||
context_state_change_tx.send(()).unwrap();
|
||||
})));
|
||||
|
||||
self.mainloop.unlock();
|
||||
loop {
|
||||
context_state_change_rx.recv().unwrap();
|
||||
|
||||
self.mainloop.lock();
|
||||
if self.context.get_state() == State::Ready {
|
||||
break;
|
||||
}
|
||||
self.mainloop.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
// Mainloop is still locked
|
||||
|
||||
{
|
||||
let commands_tx = self.commands_tx.clone();
|
||||
self.introspector.get_sink_input_info_list(move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_sink_input_info_list failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(sink_input) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(sink_input.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let commands_tx = self.commands_tx.clone();
|
||||
self.introspector.get_sink_info_list(move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_sink_info_list failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(sink) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(sink.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let commands_tx = self.commands_tx.clone();
|
||||
self.introspector.get_source_info_list(move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_source_info_list failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(source) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(source.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
let commands_tx = self.commands_tx.clone();
|
||||
self.context.set_subscribe_callback(Some(Box::new(move |facility, operation, entity_id| {
|
||||
let entity_id = entity_id as PaEntityId;
|
||||
let facility = facility.unwrap();
|
||||
|
||||
match operation.unwrap() {
|
||||
subscribe::Operation::Removed => {
|
||||
commands_tx.send(PaThreadMessage::RemoveEntity { entity_id }).unwrap();
|
||||
}
|
||||
subscribe::Operation::New | subscribe::Operation::Changed => {
|
||||
match facility {
|
||||
Facility::SinkInput => commands_tx.send(PaThreadMessage::LoadSinkInput { entity_id }).unwrap(),
|
||||
Facility::Sink => commands_tx.send(PaThreadMessage::LoadSink { entity_id }).unwrap(),
|
||||
Facility::Source => commands_tx.send(PaThreadMessage::LoadSource { entity_id }).unwrap(),
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
};
|
||||
})));
|
||||
}
|
||||
|
||||
self.context
|
||||
.subscribe(InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| {
|
||||
if !success {
|
||||
panic!("Context.subscribe failed")
|
||||
}
|
||||
});
|
||||
|
||||
self.mainloop.unlock();
|
||||
}
|
||||
|
||||
fn run(mut self) {
|
||||
debug!("Waiting for commands…");
|
||||
|
||||
'outer: loop {
|
||||
while let Ok(command) = self.commands_rx.recv() {
|
||||
self.mainloop.lock();
|
||||
let commands_tx = self.commands_tx.clone();
|
||||
|
||||
match command {
|
||||
PaThreadMessage::Terminate => {
|
||||
break 'outer;
|
||||
}
|
||||
PaThreadMessage::SetIsMuted { id, value } => {
|
||||
if let Some(state) = self.current_state.load().entities_by_id.get(&id) {
|
||||
match state.kind() {
|
||||
PaEntityKind::Sink => self.introspector.set_sink_mute_by_index(id, value, None),
|
||||
PaEntityKind::Source => self.introspector.set_source_mute_by_index(id, value, None),
|
||||
PaEntityKind::SinkInput => self.introspector.set_sink_input_mute(id, value, None),
|
||||
};
|
||||
}
|
||||
}
|
||||
PaThreadMessage::SetChannelVolumes { id, channel_volumes } => {
|
||||
if let Some(state) = self.current_state.load().entities_by_id.get(&id) {
|
||||
let mut value = state.channel_volumes;
|
||||
for (i, v) in channel_volumes.iter().enumerate() {
|
||||
value.set(i as u8, Volume((Volume::NORMAL.0 as f32 * v).floor() as u32));
|
||||
}
|
||||
|
||||
match state.kind() {
|
||||
PaEntityKind::Sink => self.introspector.set_sink_volume_by_index(id, &value, None),
|
||||
PaEntityKind::Source => self.introspector.set_source_volume_by_index(id, &value, None),
|
||||
PaEntityKind::SinkInput => self.introspector.set_sink_input_volume(id, &value, None),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
PaThreadMessage::LoadSinkInput { entity_id } => {
|
||||
self.introspector.get_sink_input_info(entity_id, move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_sink_input_info failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(sink_input) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(sink_input.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
PaThreadMessage::LoadSink { entity_id } => {
|
||||
self.introspector.get_sink_info_by_index(entity_id, move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_sink_info_by_index failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(sink) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(sink.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
PaThreadMessage::LoadSource { entity_id } => {
|
||||
self.introspector.get_source_info_by_index(entity_id, move |list_result| match list_result {
|
||||
ListResult::Error => panic!("Introspector.get_source_info_by_index failed"),
|
||||
ListResult::End => {}
|
||||
ListResult::Item(source) => commands_tx
|
||||
.send(PaThreadMessage::UpsertEntity {
|
||||
entity_state: Arc::new(source.into()),
|
||||
})
|
||||
.unwrap(),
|
||||
});
|
||||
}
|
||||
PaThreadMessage::UpsertEntity { entity_state } => {
|
||||
self.set_state(Arc::new(PaVolumeState {
|
||||
timestamp: Instant::now(),
|
||||
entities_by_id: self.current_state.load().entities_by_id.update(entity_state.id, entity_state),
|
||||
}));
|
||||
}
|
||||
PaThreadMessage::RemoveEntity { entity_id } => {
|
||||
self.set_state(Arc::new(PaVolumeState {
|
||||
timestamp: Instant::now(),
|
||||
entities_by_id: self.current_state.load().entities_by_id.without(&entity_id),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
self.mainloop.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
self.mainloop.quit(Retval(0));
|
||||
self.mainloop.unlock();
|
||||
}
|
||||
|
||||
fn set_state(&self, value: Arc<PaVolumeState>) {
|
||||
self.current_state.store(Arc::clone(&value));
|
||||
|
||||
// If there are no subscribers, that’s ok.
|
||||
_ = self.state_tx.send(value);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PaWorker {
|
||||
commands_tx: flume::Sender<PaThreadMessage>,
|
||||
}
|
||||
|
||||
impl Drop for PaWorker {
|
||||
fn drop(&mut self) {
|
||||
self.commands_tx.send(PaThreadMessage::Terminate).ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PaVolumeInterface {
|
||||
#[allow(unused)]
|
||||
worker: Arc<PaWorker>,
|
||||
current_state: Arc<ArcSwap<PaVolumeState>>,
|
||||
state_tx: broadcast::Sender<Arc<PaVolumeState>>,
|
||||
commands_tx: flume::Sender<PaThreadMessage>,
|
||||
}
|
||||
|
||||
impl PaVolumeInterface {
|
||||
pub fn spawn_thread(client_name: String) -> PaVolumeInterface {
|
||||
let (commands_tx, commands_rx) = flume::unbounded();
|
||||
let state_tx = broadcast::Sender::new(5);
|
||||
let current_state = Arc::new(ArcSwap::new(Arc::new(PaVolumeState {
|
||||
timestamp: Instant::now(),
|
||||
entities_by_id: HashMap::new(),
|
||||
})));
|
||||
|
||||
PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(¤t_state));
|
||||
|
||||
let worker = PaWorker {
|
||||
commands_tx: commands_tx.clone(),
|
||||
};
|
||||
|
||||
PaVolumeInterface {
|
||||
worker: Arc::new(worker),
|
||||
current_state,
|
||||
commands_tx,
|
||||
state_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe_to_state(&self) -> (Arc<PaVolumeState>, Receiver<Arc<PaVolumeState>>) {
|
||||
let rx = self.state_tx.subscribe();
|
||||
let state = self.current_state();
|
||||
(state, rx)
|
||||
}
|
||||
|
||||
pub fn current_state(&self) -> Arc<PaVolumeState> {
|
||||
Arc::clone(&self.current_state.load())
|
||||
}
|
||||
|
||||
pub fn set_is_muted(&self, id: PaEntityId, value: bool) {
|
||||
self.commands_tx.send(PaThreadMessage::SetIsMuted { id, value }).unwrap()
|
||||
}
|
||||
|
||||
pub fn set_channel_volumes(&self, id: PaEntityId, channel_volumes: impl Into<Box<[f32]>>) {
|
||||
self.commands_tx
|
||||
.send(PaThreadMessage::SetChannelVolumes {
|
||||
id,
|
||||
channel_volumes: channel_volumes.into(),
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue