diff --git a/deckster/src/main.rs b/deckster/src/main.rs index 46cba02..2c0e905 100644 --- a/deckster/src/main.rs +++ b/deckster/src/main.rs @@ -8,7 +8,7 @@ use loupedeck_serial::device::LoupedeckDevice; use loupedeck_serial::events::LoupedeckEvent; fn main() -> Result<()> { - let available_devices = loupedeck_serial::device::LoupedeckDevice::discover()?; + let available_devices = LoupedeckDevice::discover()?; let device = available_devices.first() .wrap_err("at least one device should be connected")? @@ -30,9 +30,21 @@ fn run_rainbow(device: &LoupedeckDevice) -> Result<()> { let buttons = device.characteristics().available_buttons.iter().filter(|b| b.supports_color()).collect::>(); - loop { + 'outer: loop { let ms = start.elapsed().as_millis() as u64; + if !device.events_channel().is_empty() { + for event in device.events_channel() { + if let LoupedeckEvent::Disconnected = event { + break 'outer; + } + + if device.events_channel().is_empty() { + break; + } + } + } + for (index, button) in buttons.iter().enumerate() { let t = (ms + (index * 100) as u64) as f32; @@ -45,7 +57,9 @@ fn run_rainbow(device: &LoupedeckDevice) -> Result<()> { sleep((iteration + 1) * interval - start.elapsed()); iteration += 1; - } + }; + + Ok(()) } fn run_vibrations(device: &LoupedeckDevice) -> Result<()> { diff --git a/loupedeck_serial/src/device.rs b/loupedeck_serial/src/device.rs index f564df7..1eb5637 100644 --- a/loupedeck_serial/src/device.rs +++ b/loupedeck_serial/src/device.rs @@ -1,7 +1,7 @@ -use std::io; +use std::{io, thread}; use std::io::{Read, Write}; use std::sync::mpsc; -use std::thread::{sleep, spawn}; +use std::thread::{sleep, spawn, Thread}; use std::time::Duration; use bytes::Bytes; use rgb::{ComponentSlice, RGB8}; @@ -120,7 +120,9 @@ impl LoupedeckDevice { return Err(SetButtonColorError::ColorNotSupported); } - self.commands_sender.send(LoupedeckCommand::SetButtonColor { button, color }).unwrap(); + // The write worker thread not running means the device was disconnected. + // In that case, the read worker thread sends a LoupedeckEvent::Disconnected. + self.commands_sender.send(LoupedeckCommand::SetButtonColor { button, color }).ok(); Ok(()) } @@ -244,16 +246,18 @@ impl LoupedeckDevice { port.clear(ClearBuffer::Input)?; let cloned_port = port.try_clone().expect("port must be cloneable"); + let thread_name_base = format!("loupedeck_serial ({})", port.name().unwrap_or("".to_owned())); + let (public_events_sender, public_events_receiver) = crossbeam_channel::unbounded::(); let (internal_events_sender, internal_events_receiver) = mpsc::sync_channel(2); - spawn(move || { + thread::Builder::new().name(thread_name_base.to_owned() + " read worker").spawn(move || { read_messages_worker(port, public_events_sender, internal_events_sender); - }); + })?; let (commands_sender, commands_receiver) = crossbeam_channel::unbounded::(); - spawn(move || { + thread::Builder::new().name(thread_name_base.to_owned() + " write worker").spawn(move || { write_messages_worker(cloned_port, commands_receiver); - }); + })?; commands_sender.send(LoupedeckCommand::RequestSerialNumber).unwrap(); let serial_number = match internal_events_receiver.recv_timeout(Duration::from_secs(1)) { diff --git a/loupedeck_serial/src/messages.rs b/loupedeck_serial/src/messages.rs index 203f7ed..e37082a 100644 --- a/loupedeck_serial/src/messages.rs +++ b/loupedeck_serial/src/messages.rs @@ -1,6 +1,6 @@ use std::cmp::min; -use std::io::ErrorKind::TimedOut; -use std::io::{Read, Write}; +use std::io; +use std::io::{ErrorKind, Read, Write}; use std::sync::mpsc; use bytes::{Buf, BufMut, Bytes, BytesMut}; use enum_ordinalize::Ordinalize; @@ -51,27 +51,24 @@ pub(crate) fn read_messages_worker( ) { let mut internal_sender = Some(internal_sender); + let mut should_stop = false; let mut buffer = BytesMut::new(); - loop { + while !should_stop { let mut chunk = BytesMut::zeroed(MAX_MESSAGE_LENGTH); - let read_result = port.read(&mut chunk); + let read_length = port.read(&mut chunk).unwrap_or(0); - if let Err(err) = &read_result { - if err.kind() == TimedOut { - continue; - } + if read_length == 0 { + // This fails only if the other side is disconnected. + // In that case, this thread should terminate anyway and we can ignore the error. + public_sender.send(LoupedeckEvent::Disconnected).ok(); + break; } - let len = read_result.unwrap(); - if len == 0 { - panic!("read 0 bytes"); - } - - chunk.truncate(len); + chunk.truncate(read_length); buffer.put(chunk); - loop { + while !should_stop { let start_index = buffer.iter().position(|b| *b == MESSAGE_START_BYTE); if let Some(start_index) = start_index { @@ -105,7 +102,9 @@ pub(crate) fn read_messages_worker( } } ParseMessageResult::PublicEvent(event) => { - public_sender.send(event).unwrap() + if public_sender.send(event).is_err() { + should_stop = false + } } ParseMessageResult::Nothing => {} } @@ -169,7 +168,7 @@ fn parse_message(command: u8, mut message: Bytes) -> ParseMessageResult { pub(crate) fn write_messages_worker(mut port: Box, receiver: crossbeam_channel::Receiver) { let mut next_transaction_id = 0; - let mut send = |command_id: u8, data: Bytes| { + let mut send = |command_id: u8, data: Bytes| -> Result<(), io::Error> { if next_transaction_id == 0 { next_transaction_id += 1; } @@ -189,37 +188,39 @@ pub(crate) fn write_messages_worker(mut port: Box, receiver: cro prep.put_u32(length as u32); prep.put_bytes(0x00, 4); - port.write_all(&prep).unwrap(); - port.flush().unwrap(); + port.write_all(&prep)?; + port.flush()?; } else { let mut prep = BytesMut::zeroed(6); prep[0] = 0x82; prep[1] = (0x80 + length) as u8; - port.write_all(&prep).unwrap(); - port.flush().unwrap(); + port.write_all(&prep)?; + port.flush()?; } - port.write_all(&data_with_header).unwrap(); - port.flush().unwrap(); + port.write_all(&data_with_header)?; + port.flush()?; next_transaction_id = next_transaction_id.wrapping_add(1); + + Ok(()) }; for command in receiver { - match command { + let result = match command { LoupedeckCommand::RequestSerialNumber => { - send(0x03, Bytes::new()); + send(0x03, Bytes::new()) } LoupedeckCommand::RequestFirmwareVersion => { - send(0x07, Bytes::new()); + send(0x07, Bytes::new()) } LoupedeckCommand::SetBrightness(value) => { let raw_value = (value.clamp(0f32, 1f32) * 10.0) as u8; - send(0x09, Bytes::copy_from_slice(&[raw_value])); + send(0x09, Bytes::copy_from_slice(&[raw_value])) } LoupedeckCommand::SetButtonColor { button, color } => { - send(0x02, Bytes::copy_from_slice(&[button.ordinal(), color.r, color.g, color.b])); + send(0x02, Bytes::copy_from_slice(&[button.ordinal(), color.r, color.g, color.b])) } LoupedeckCommand::ReplaceFramebufferArea { display_id, @@ -238,13 +239,24 @@ pub(crate) fn write_messages_worker(mut port: Box, receiver: cro data.put_u16(height); data.put(buffer); - send(0x10, data.freeze()); + send(0x10, data.freeze()) } LoupedeckCommand::RefreshDisplay { display_id } => { - send(0x0f, Bytes::copy_from_slice(&[0, display_id])); + send(0x0f, Bytes::copy_from_slice(&[0, display_id])) } LoupedeckCommand::Vibrate { pattern } => { - send(0x1b, Bytes::copy_from_slice(&[pattern.ordinal()])); + send(0x1b, Bytes::copy_from_slice(&[pattern.ordinal()])) + } + }; + + if let Err(error) = result { + match error.kind() { + ErrorKind::TimedOut | ErrorKind::BrokenPipe => { + break + } + _ => { + panic!("IO error during write: {}", error); + } } } }