From 3ea3ef927714369dd1bb66db4d08ad85d9591d47 Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 13 Sep 2024 19:45:21 +0200 Subject: [PATCH] Remove double indirection at the cost of an unsafe lifetime transmutation --- Cargo.lock | 203 +++++++++++++++----------------------------------- Cargo.toml | 5 +- src/lib.rs | 50 ++++--------- src/worker.rs | 86 +++++++++++++++------ 4 files changed, 145 insertions(+), 199 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9857828..c125d4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,48 +2,30 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "addr2line" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375" -dependencies = [ - "gimli", -] - -[[package]] -name = "adler2" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" - [[package]] name = "arc-swap" version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "async-broadcast" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +dependencies = [ + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "autocfg" version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" -[[package]] -name = "backtrace" -version = "0.3.74" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-targets", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -71,6 +53,42 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "flume" version = "0.11.0" @@ -108,12 +126,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "gimli" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" - [[package]] name = "im" version = "15.1.0" @@ -186,21 +198,6 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" -[[package]] -name = "memchr" -version = "2.7.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" - -[[package]] -name = "miniz_oxide" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" -dependencies = [ - "adler2", -] - [[package]] name = "nanorand" version = "0.7.0" @@ -230,21 +227,24 @@ dependencies = [ "autocfg", ] -[[package]] -name = "object" -version = "0.36.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oneshot" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" + +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -271,11 +271,12 @@ name = "pulseaudio-volume-interface" version = "1.0.0" dependencies = [ "arc-swap", + "async-broadcast", "flume", "im", "libpulse-binding", "log", - "tokio", + "oneshot", ] [[package]] @@ -302,12 +303,6 @@ dependencies = [ "rand_core", ] -[[package]] -name = "rustc-demangle" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" - [[package]] name = "scopeguard" version = "1.2.0" @@ -355,16 +350,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "tokio" -version = "1.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" -dependencies = [ - "backtrace", - "pin-project-lite", -] - [[package]] name = "typenum" version = "1.17.0" @@ -465,67 +450,3 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - -[[package]] -name = "windows-targets" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" - -[[package]] -name = "windows_i686_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" - -[[package]] -name = "windows_i686_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" - -[[package]] -name = "windows_i686_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index 1a01966..dd7d043 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" [dependencies] flume = "0.11.0" im = "15.1.0" -tokio = { version = "1.38.0", default-features = false, features = ["sync"] } libpulse-binding = "2.28.1" log = "0.4.21" -arc-swap = "1.7.1" \ No newline at end of file +arc-swap = "1.7.1" +oneshot = "0.1.8" +async-broadcast = "0.7.1" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 1cbb228..a4bda23 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,12 @@ mod worker; -use std::num::NonZeroU32; use crate::worker::*; -use arc_swap::ArcSwap; use im::HashMap; use libpulse_binding::context::introspect::{SinkInfo, SinkInputInfo, SourceInfo}; use libpulse_binding::volume::{ChannelVolumes, Volume}; +use std::num::NonZeroU32; use std::sync::Arc; use std::time::Instant; -use tokio::sync::broadcast; -use tokio::sync::broadcast::Receiver; #[allow(unused_imports)] use libpulse_binding::mainloop::api::Mainloop as _; @@ -136,9 +133,13 @@ impl State { &self.timestamp } - pub fn default_sink_id(&self) -> &Option { &self.default_sink_id } + pub fn default_sink_id(&self) -> &Option { + &self.default_sink_id + } - pub fn default_source_id(&self) -> &Option { &self.default_source_id } + pub fn default_source_id(&self) -> &Option { + &self.default_source_id + } pub fn entities_by_id(&self) -> &HashMap> { &self.entities_by_id @@ -147,54 +148,33 @@ impl State { #[derive(Debug, Clone)] pub struct PulseAudioVolumeInterface { - #[allow(unused)] worker: Arc, - current_state: Arc>, - state_tx: broadcast::Sender>, - commands_tx: flume::Sender, } impl PulseAudioVolumeInterface { pub fn new(client_name: String) -> PulseAudioVolumeInterface { - let (commands_tx, commands_rx) = flume::unbounded(); - let state_tx = broadcast::Sender::new(5); - let current_state = Arc::new(ArcSwap::new(Arc::new(State { - timestamp: Instant::now(), - default_sink_id: None, - default_source_id: None, - entities_by_id: HashMap::new(), - }))); - - PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(¤t_state)); - - let worker = PaWorker { - commands_tx: commands_tx.clone(), - }; - PulseAudioVolumeInterface { - worker: Arc::new(worker), - current_state, - commands_tx, - state_tx, + worker: Arc::new(PaWorker::spawn(client_name)), } } - pub fn subscribe_to_state(&self) -> (Arc, Receiver>) { - let rx = self.state_tx.subscribe(); + pub fn subscribe_to_state(&self) -> (Arc, async_broadcast::Receiver>) { + let rx = self.worker.state_tx.new_receiver(); let state = self.current_state(); (state, rx) } pub fn current_state(&self) -> Arc { - Arc::clone(&self.current_state.load()) + self.worker.current_state.load_full() } pub fn set_is_muted(&self, id: EntityId, value: bool) { - self.commands_tx.send(ThreadMessage::SetIsMuted { id, value }).unwrap() + self.worker.commands_tx.send(ThreadMessage::SetIsMuted { id, value }).unwrap() } pub fn set_channel_volumes(&self, id: EntityId, channel_volumes: impl Into>) { - self.commands_tx + self.worker + .commands_tx .send(ThreadMessage::SetChannelVolumes { id, channel_volumes: channel_volumes.into(), @@ -203,6 +183,6 @@ impl PulseAudioVolumeInterface { } pub fn set_default_entity(&self, id: EntityId) { - self.commands_tx.send(ThreadMessage::SetDefaultEntity { id }).unwrap() + self.worker.commands_tx.send(ThreadMessage::SetDefaultEntity { id }).unwrap() } } diff --git a/src/worker.rs b/src/worker.rs index e8d4154..48eba6c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,7 +1,8 @@ use crate::{EntityId, EntityKind, EntityMetadata, EntityState, State}; use arc_swap::ArcSwap; +use im::HashMap; use libpulse_binding::callbacks::ListResult; -use libpulse_binding::context::introspect::{Introspector, ServerInfo}; +use libpulse_binding::context::introspect::Introspector; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet}; use libpulse_binding::context::{subscribe, Context, FlagSet, State as ConnectionState}; use libpulse_binding::def::Retval; @@ -12,7 +13,6 @@ use std::num::NonZeroU32; use std::sync::Arc; use std::thread; use std::time::Instant; -use tokio::sync::broadcast; #[derive(Debug)] pub(super) enum ThreadMessage { @@ -28,7 +28,9 @@ pub(super) enum ThreadMessage { SetDefaultEntity { id: EntityId, }, - Terminate, + Terminate { + termination_signal_tx: oneshot::Sender<()>, + }, // emitted by libpulse LoadServerInfo, @@ -55,24 +57,30 @@ pub(super) enum ThreadMessage { }, } -pub(super) struct PaThread { +pub(super) struct PaThread<'worker> { mainloop: Mainloop, context: Context, introspector: Introspector, commands_tx: flume::Sender, commands_rx: flume::Receiver, - state_tx: broadcast::Sender>, - current_state: Arc>, + state_tx: async_broadcast::Sender>, + current_state: &'worker ArcSwap, } -impl PaThread { +impl<'worker> PaThread<'worker> { pub(super) fn spawn( client_name: String, commands_tx: flume::Sender, commands_rx: flume::Receiver, - state_tx: broadcast::Sender>, - current_state: Arc>, + state_tx: async_broadcast::Sender>, + current_state: &'worker ArcSwap, ) { + // SAFETY: current_state is owned by PaWorker. + // Before PaWorker is dropped, it sends a ThreadMessage::Terminate. + // When this thread receives this message, it stops further message processing. + // The drop handler of PaWorker blocks until it has received confirmation that this thread will stop. + let current_state: &'static ArcSwap = unsafe { core::mem::transmute(current_state) }; + thread::spawn(move || { let mut mainloop = Mainloop::new().unwrap(); let context = Context::new(&mainloop, &client_name).unwrap(); @@ -186,12 +194,14 @@ impl PaThread { }))); } - self.context - .subscribe(InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| { + self.context.subscribe( + InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, + |success| { if !success { panic!("Context.subscribe failed") } - }); + }, + ); self.mainloop.unlock(); } @@ -205,7 +215,8 @@ impl PaThread { let commands_tx = self.commands_tx.clone(); match command { - ThreadMessage::Terminate => { + ThreadMessage::Terminate { termination_signal_tx } => { + termination_signal_tx.send(()).unwrap(); break 'outer; } ThreadMessage::SetIsMuted { id, value } => { @@ -251,10 +262,12 @@ impl PaThread { ThreadMessage::LoadServerInfo => { self.introspector.get_server_info(move |server_info| { - commands_tx.send(ThreadMessage::HandleServerInfo { - default_sink_name: server_info.default_sink_name.as_ref().map(|n| n.to_string().into_boxed_str()), - default_source_name: server_info.default_source_name.as_ref().map(|n| n.to_string().into_boxed_str()), - }).unwrap(); + commands_tx + .send(ThreadMessage::HandleServerInfo { + default_sink_name: server_info.default_sink_name.as_ref().map(|n| n.to_string().into_boxed_str()), + default_source_name: server_info.default_source_name.as_ref().map(|n| n.to_string().into_boxed_str()), + }) + .unwrap(); }); } ThreadMessage::LoadSinkInput { entity_id } => { @@ -292,8 +305,11 @@ impl PaThread { .unwrap(), }); } - - ThreadMessage::HandleServerInfo { default_sink_name, default_source_name} => { + + ThreadMessage::HandleServerInfo { + default_sink_name, + default_source_name, + } => { let current_state = self.current_state.load(); self.set_state(Arc::new(State { timestamp: Instant::now(), @@ -346,6 +362,8 @@ impl PaThread { } } + // self.current_state is no longer valid beyond this point. + self.mainloop.quit(Retval(0)); self.mainloop.unlock(); } @@ -354,17 +372,43 @@ impl PaThread { self.current_state.store(Arc::clone(&value)); // If there are no subscribers, that’s ok. - _ = self.state_tx.send(value); + _ = self.state_tx.try_broadcast(value); } } #[derive(Debug)] pub(super) struct PaWorker { pub(super) commands_tx: flume::Sender, + pub(super) state_tx: async_broadcast::Sender>, + pub(super) current_state: ArcSwap, +} + +impl PaWorker { + pub(super) fn spawn(client_name: String) -> PaWorker { + let (commands_tx, commands_rx) = flume::unbounded(); + let (state_tx, _) = async_broadcast::broadcast(5); + + let worker = PaWorker { + commands_tx: commands_tx.clone(), + state_tx: state_tx.clone(), + current_state: ArcSwap::new(Arc::new(State { + timestamp: Instant::now(), + default_sink_id: None, + default_source_id: None, + entities_by_id: HashMap::new(), + })), + }; + + PaThread::spawn(client_name, commands_tx, commands_rx, state_tx, &worker.current_state); + + worker + } } impl Drop for PaWorker { fn drop(&mut self) { - self.commands_tx.send(ThreadMessage::Terminate).ok(); + let (termination_signal_tx, termination_signal_rx) = oneshot::channel(); + self.commands_tx.send(ThreadMessage::Terminate { termination_signal_tx }).ok(); + termination_signal_rx.recv().unwrap() } }