1
0
Fork 0

Remove double indirection at the cost of an unsafe lifetime transmutation

This commit is contained in:
Moritz Ruth 2024-09-13 19:45:21 +02:00
parent 1b757150ea
commit 3ea3ef9277
Signed by: moritzruth
GPG key ID: C9BBAB79405EE56D
4 changed files with 145 additions and 199 deletions

203
Cargo.lock generated
View file

@ -2,48 +2,30 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f5fb1d8e4442bd405fdfd1dacb42792696b0cf9cb15882e5d097b742a676d375"
dependencies = [
"gimli",
]
[[package]]
name = "adler2"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627"
[[package]]
name = "arc-swap"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "async-broadcast"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e"
dependencies = [
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "autocfg"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "backtrace"
version = "0.3.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a"
dependencies = [
"addr2line",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
"windows-targets",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -71,6 +53,42 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "concurrent-queue"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "event-listener"
version = "5.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "flume"
version = "0.11.0"
@ -108,12 +126,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "gimli"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
[[package]]
name = "im"
version = "15.1.0"
@ -186,21 +198,6 @@ version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "miniz_oxide"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1"
dependencies = [
"adler2",
]
[[package]]
name = "nanorand"
version = "0.7.0"
@ -230,21 +227,24 @@ dependencies = [
"autocfg",
]
[[package]]
name = "object"
version = "0.36.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
dependencies = [
"memchr",
]
[[package]]
name = "once_cell"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "oneshot"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29"
[[package]]
name = "parking"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba"
[[package]]
name = "pin-project-lite"
version = "0.2.14"
@ -271,11 +271,12 @@ name = "pulseaudio-volume-interface"
version = "1.0.0"
dependencies = [
"arc-swap",
"async-broadcast",
"flume",
"im",
"libpulse-binding",
"log",
"tokio",
"oneshot",
]
[[package]]
@ -302,12 +303,6 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -355,16 +350,6 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "tokio"
version = "1.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
dependencies = [
"backtrace",
"pin-project-lite",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -465,67 +450,3 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"

View file

@ -6,7 +6,8 @@ edition = "2021"
[dependencies]
flume = "0.11.0"
im = "15.1.0"
tokio = { version = "1.38.0", default-features = false, features = ["sync"] }
libpulse-binding = "2.28.1"
log = "0.4.21"
arc-swap = "1.7.1"
oneshot = "0.1.8"
async-broadcast = "0.7.1"

View file

@ -1,15 +1,12 @@
mod worker;
use std::num::NonZeroU32;
use crate::worker::*;
use arc_swap::ArcSwap;
use im::HashMap;
use libpulse_binding::context::introspect::{SinkInfo, SinkInputInfo, SourceInfo};
use libpulse_binding::volume::{ChannelVolumes, Volume};
use std::num::NonZeroU32;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Receiver;
#[allow(unused_imports)]
use libpulse_binding::mainloop::api::Mainloop as _;
@ -136,9 +133,13 @@ impl State {
&self.timestamp
}
pub fn default_sink_id(&self) -> &Option<EntityId> { &self.default_sink_id }
pub fn default_sink_id(&self) -> &Option<EntityId> {
&self.default_sink_id
}
pub fn default_source_id(&self) -> &Option<EntityId> { &self.default_source_id }
pub fn default_source_id(&self) -> &Option<EntityId> {
&self.default_source_id
}
pub fn entities_by_id(&self) -> &HashMap<EntityId, Arc<EntityState>> {
&self.entities_by_id
@ -147,54 +148,33 @@ impl State {
#[derive(Debug, Clone)]
pub struct PulseAudioVolumeInterface {
#[allow(unused)]
worker: Arc<PaWorker>,
current_state: Arc<ArcSwap<State>>,
state_tx: broadcast::Sender<Arc<State>>,
commands_tx: flume::Sender<ThreadMessage>,
}
impl PulseAudioVolumeInterface {
pub fn new(client_name: String) -> PulseAudioVolumeInterface {
let (commands_tx, commands_rx) = flume::unbounded();
let state_tx = broadcast::Sender::new(5);
let current_state = Arc::new(ArcSwap::new(Arc::new(State {
timestamp: Instant::now(),
default_sink_id: None,
default_source_id: None,
entities_by_id: HashMap::new(),
})));
PaThread::spawn(client_name, commands_tx.clone(), commands_rx, state_tx.clone(), Arc::clone(&current_state));
let worker = PaWorker {
commands_tx: commands_tx.clone(),
};
PulseAudioVolumeInterface {
worker: Arc::new(worker),
current_state,
commands_tx,
state_tx,
worker: Arc::new(PaWorker::spawn(client_name)),
}
}
pub fn subscribe_to_state(&self) -> (Arc<State>, Receiver<Arc<State>>) {
let rx = self.state_tx.subscribe();
pub fn subscribe_to_state(&self) -> (Arc<State>, async_broadcast::Receiver<Arc<State>>) {
let rx = self.worker.state_tx.new_receiver();
let state = self.current_state();
(state, rx)
}
pub fn current_state(&self) -> Arc<State> {
Arc::clone(&self.current_state.load())
self.worker.current_state.load_full()
}
pub fn set_is_muted(&self, id: EntityId, value: bool) {
self.commands_tx.send(ThreadMessage::SetIsMuted { id, value }).unwrap()
self.worker.commands_tx.send(ThreadMessage::SetIsMuted { id, value }).unwrap()
}
pub fn set_channel_volumes(&self, id: EntityId, channel_volumes: impl Into<Box<[f32]>>) {
self.commands_tx
self.worker
.commands_tx
.send(ThreadMessage::SetChannelVolumes {
id,
channel_volumes: channel_volumes.into(),
@ -203,6 +183,6 @@ impl PulseAudioVolumeInterface {
}
pub fn set_default_entity(&self, id: EntityId) {
self.commands_tx.send(ThreadMessage::SetDefaultEntity { id }).unwrap()
self.worker.commands_tx.send(ThreadMessage::SetDefaultEntity { id }).unwrap()
}
}

View file

@ -1,7 +1,8 @@
use crate::{EntityId, EntityKind, EntityMetadata, EntityState, State};
use arc_swap::ArcSwap;
use im::HashMap;
use libpulse_binding::callbacks::ListResult;
use libpulse_binding::context::introspect::{Introspector, ServerInfo};
use libpulse_binding::context::introspect::Introspector;
use libpulse_binding::context::subscribe::{Facility, InterestMaskSet};
use libpulse_binding::context::{subscribe, Context, FlagSet, State as ConnectionState};
use libpulse_binding::def::Retval;
@ -12,7 +13,6 @@ use std::num::NonZeroU32;
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use tokio::sync::broadcast;
#[derive(Debug)]
pub(super) enum ThreadMessage {
@ -28,7 +28,9 @@ pub(super) enum ThreadMessage {
SetDefaultEntity {
id: EntityId,
},
Terminate,
Terminate {
termination_signal_tx: oneshot::Sender<()>,
},
// emitted by libpulse
LoadServerInfo,
@ -55,24 +57,30 @@ pub(super) enum ThreadMessage {
},
}
pub(super) struct PaThread {
pub(super) struct PaThread<'worker> {
mainloop: Mainloop,
context: Context,
introspector: Introspector,
commands_tx: flume::Sender<ThreadMessage>,
commands_rx: flume::Receiver<ThreadMessage>,
state_tx: broadcast::Sender<Arc<State>>,
current_state: Arc<ArcSwap<State>>,
state_tx: async_broadcast::Sender<Arc<State>>,
current_state: &'worker ArcSwap<State>,
}
impl PaThread {
impl<'worker> PaThread<'worker> {
pub(super) fn spawn(
client_name: String,
commands_tx: flume::Sender<ThreadMessage>,
commands_rx: flume::Receiver<ThreadMessage>,
state_tx: broadcast::Sender<Arc<State>>,
current_state: Arc<ArcSwap<State>>,
state_tx: async_broadcast::Sender<Arc<State>>,
current_state: &'worker ArcSwap<State>,
) {
// SAFETY: current_state is owned by PaWorker.
// Before PaWorker is dropped, it sends a ThreadMessage::Terminate.
// When this thread receives this message, it stops further message processing.
// The drop handler of PaWorker blocks until it has received confirmation that this thread will stop.
let current_state: &'static ArcSwap<State> = unsafe { core::mem::transmute(current_state) };
thread::spawn(move || {
let mut mainloop = Mainloop::new().unwrap();
let context = Context::new(&mainloop, &client_name).unwrap();
@ -186,12 +194,14 @@ impl PaThread {
})));
}
self.context
.subscribe(InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT, |success| {
self.context.subscribe(
InterestMaskSet::SERVER | InterestMaskSet::SINK | InterestMaskSet::SOURCE | InterestMaskSet::SINK_INPUT,
|success| {
if !success {
panic!("Context.subscribe failed")
}
});
},
);
self.mainloop.unlock();
}
@ -205,7 +215,8 @@ impl PaThread {
let commands_tx = self.commands_tx.clone();
match command {
ThreadMessage::Terminate => {
ThreadMessage::Terminate { termination_signal_tx } => {
termination_signal_tx.send(()).unwrap();
break 'outer;
}
ThreadMessage::SetIsMuted { id, value } => {
@ -251,10 +262,12 @@ impl PaThread {
ThreadMessage::LoadServerInfo => {
self.introspector.get_server_info(move |server_info| {
commands_tx.send(ThreadMessage::HandleServerInfo {
default_sink_name: server_info.default_sink_name.as_ref().map(|n| n.to_string().into_boxed_str()),
default_source_name: server_info.default_source_name.as_ref().map(|n| n.to_string().into_boxed_str()),
}).unwrap();
commands_tx
.send(ThreadMessage::HandleServerInfo {
default_sink_name: server_info.default_sink_name.as_ref().map(|n| n.to_string().into_boxed_str()),
default_source_name: server_info.default_source_name.as_ref().map(|n| n.to_string().into_boxed_str()),
})
.unwrap();
});
}
ThreadMessage::LoadSinkInput { entity_id } => {
@ -293,7 +306,10 @@ impl PaThread {
});
}
ThreadMessage::HandleServerInfo { default_sink_name, default_source_name} => {
ThreadMessage::HandleServerInfo {
default_sink_name,
default_source_name,
} => {
let current_state = self.current_state.load();
self.set_state(Arc::new(State {
timestamp: Instant::now(),
@ -346,6 +362,8 @@ impl PaThread {
}
}
// self.current_state is no longer valid beyond this point.
self.mainloop.quit(Retval(0));
self.mainloop.unlock();
}
@ -354,17 +372,43 @@ impl PaThread {
self.current_state.store(Arc::clone(&value));
// If there are no subscribers, thats ok.
_ = self.state_tx.send(value);
_ = self.state_tx.try_broadcast(value);
}
}
#[derive(Debug)]
pub(super) struct PaWorker {
pub(super) commands_tx: flume::Sender<ThreadMessage>,
pub(super) state_tx: async_broadcast::Sender<Arc<State>>,
pub(super) current_state: ArcSwap<State>,
}
impl PaWorker {
pub(super) fn spawn(client_name: String) -> PaWorker {
let (commands_tx, commands_rx) = flume::unbounded();
let (state_tx, _) = async_broadcast::broadcast(5);
let worker = PaWorker {
commands_tx: commands_tx.clone(),
state_tx: state_tx.clone(),
current_state: ArcSwap::new(Arc::new(State {
timestamp: Instant::now(),
default_sink_id: None,
default_source_id: None,
entities_by_id: HashMap::new(),
})),
};
PaThread::spawn(client_name, commands_tx, commands_rx, state_tx, &worker.current_state);
worker
}
}
impl Drop for PaWorker {
fn drop(&mut self) {
self.commands_tx.send(ThreadMessage::Terminate).ok();
let (termination_signal_tx, termination_signal_rx) = oneshot::channel();
self.commands_tx.send(ThreadMessage::Terminate { termination_signal_tx }).ok();
termination_signal_rx.recv().unwrap()
}
}