From 7df8ed5c795aa71b6b5ec870027e8a961dd8324e Mon Sep 17 00:00:00 2001 From: John Cavanaugh <59479+cavanaug@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:24:45 -0700 Subject: [PATCH 1/2] fix: respond to DSR cursor queries in PTY sessions --- crates/pilotty-cli/src/daemon/pty.rs | 173 ++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 1 deletion(-) diff --git a/crates/pilotty-cli/src/daemon/pty.rs b/crates/pilotty-cli/src/daemon/pty.rs index 1da0f97..e59be6f 100644 --- a/crates/pilotty-cli/src/daemon/pty.rs +++ b/crates/pilotty-cli/src/daemon/pty.rs @@ -8,6 +8,71 @@ use portable_pty::{native_pty_system, Child, CommandBuilder, MasterPty, PtySize} use tokio::sync::mpsc; use tracing::{debug, error, warn}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DsrParseState { + Ground, + Esc, + Csi, + Csi6, +} + +/// Tracks terminal cursor position and detects DSR cursor-position requests. +struct CursorTracker { + parser: vt100::Parser, + dsr_state: DsrParseState, +} + +impl CursorTracker { + fn new(size: TermSize) -> Self { + Self { + parser: vt100::Parser::new(size.rows, size.cols, 0), + dsr_state: DsrParseState::Ground, + } + } + + fn resize(&mut self, size: TermSize) { + self.parser.screen_mut().set_size(size.rows, size.cols); + } + + fn process_output(&mut self, bytes: &[u8]) -> usize { + self.parser.process(bytes); + self.count_dsr_queries(bytes) + } + + fn cursor_position_one_indexed(&self) -> (u16, u16) { + let (row, col) = self.parser.screen().cursor_position(); + (row.saturating_add(1), col.saturating_add(1)) + } + + fn count_dsr_queries(&mut self, bytes: &[u8]) -> usize { + let mut count = 0; + + for &b in bytes { + self.dsr_state = match (self.dsr_state, b) { + (DsrParseState::Ground, 0x1b) => DsrParseState::Esc, + (DsrParseState::Ground, _) => DsrParseState::Ground, + + (DsrParseState::Esc, b'[') => DsrParseState::Csi, + (DsrParseState::Esc, 0x1b) => DsrParseState::Esc, + (DsrParseState::Esc, _) => DsrParseState::Ground, + + (DsrParseState::Csi, b'6') => DsrParseState::Csi6, + (DsrParseState::Csi, 0x1b) => DsrParseState::Esc, + (DsrParseState::Csi, _) => DsrParseState::Ground, + + (DsrParseState::Csi6, b'n') => { + count += 1; + DsrParseState::Ground + } + (DsrParseState::Csi6, 0x1b) => DsrParseState::Esc, + (DsrParseState::Csi6, _) => DsrParseState::Ground, + }; + } + + count + } +} + /// Terminal size in columns and rows. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TermSize { @@ -130,6 +195,8 @@ pub struct AsyncPtyHandle { reader_thread: Option>, /// Handle to the writer thread for cleanup. writer_thread: Option>, + /// Tracks cursor position from PTY output for DSR responses. + cursor_tracker: Arc>, } impl AsyncPtyHandle { @@ -148,10 +215,14 @@ impl AsyncPtyHandle { let (write_tx, write_rx) = mpsc::channel::>(64); let (read_tx, read_rx) = mpsc::channel::>(64); + let cursor_tracker = Arc::new(std::sync::Mutex::new(CursorTracker::new(initial_size))); + // Spawn reader thread let reader_shutdown = shutdown.clone(); + let reader_write_tx = write_tx.clone(); + let reader_tracker = cursor_tracker.clone(); let reader_thread = std::thread::spawn(move || { - Self::reader_loop(reader, read_tx, reader_shutdown); + Self::reader_loop(reader, read_tx, reader_write_tx, reader_tracker, reader_shutdown); }); // Spawn writer thread @@ -168,6 +239,7 @@ impl AsyncPtyHandle { size: std::sync::Mutex::new(initial_size), reader_thread: Some(reader_thread), writer_thread: Some(writer_thread), + cursor_tracker, }) } @@ -185,6 +257,11 @@ impl AsyncPtyHandle { .size .lock() .map_err(|_| anyhow::anyhow!("Size mutex poisoned"))? = size; + + self.cursor_tracker + .lock() + .map_err(|_| anyhow::anyhow!("Cursor tracker mutex poisoned"))? + .resize(size); Ok(()) } /// Send bytes to the PTY stdin. @@ -245,6 +322,8 @@ impl AsyncPtyHandle { fn reader_loop( mut reader: Box, read_tx: mpsc::Sender>, + write_tx: mpsc::Sender>, + cursor_tracker: Arc>, shutdown: Arc, ) { let mut buf = vec![0u8; READ_BUFFER_SIZE]; @@ -261,6 +340,32 @@ impl AsyncPtyHandle { break; } Ok(n) => { + let dsr_queries = match cursor_tracker.lock() { + Ok(mut tracker) => tracker.process_output(&buf[..n]), + Err(_) => { + warn!("Cursor tracker mutex poisoned; skipping DSR detection"); + 0 + } + }; + + if dsr_queries > 0 { + let (row, col) = match cursor_tracker.lock() { + Ok(tracker) => tracker.cursor_position_one_indexed(), + Err(_) => { + warn!("Cursor tracker mutex poisoned; using fallback cursor response"); + (1, 1) + } + }; + let response = format!("\x1b[{};{}R", row, col).into_bytes(); + + for _ in 0..dsr_queries { + if write_tx.blocking_send(response.clone()).is_err() { + debug!("PTY write channel closed while sending DSR response"); + break; + } + } + } + // Use blocking send since we're in a thread if read_tx.blocking_send(buf[..n].to_vec()).is_err() { debug!("PTY read channel closed"); @@ -350,6 +455,33 @@ mod tests { use std::io::Read; use std::time::Duration; + #[test] + fn test_cursor_tracker_detects_dsr_queries() { + let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); + + assert_eq!(tracker.process_output(b"hello"), 0); + assert_eq!(tracker.process_output(b"\x1b[6n"), 1); + assert_eq!(tracker.process_output(b"x\x1b[6n\x1b[6n"), 2); + } + + #[test] + fn test_cursor_tracker_detects_dsr_across_chunk_boundary() { + let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); + + assert_eq!(tracker.process_output(b"\x1b["), 0); + assert_eq!(tracker.process_output(b"6"), 0); + assert_eq!(tracker.process_output(b"n"), 1); + } + + #[test] + fn test_cursor_tracker_reports_one_indexed_position() { + let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); + tracker.process_output(b"abc"); + + // Cursor is after 3 chars on first row => 1-indexed (1, 4) + assert_eq!(tracker.cursor_position_one_indexed(), (1, 4)); + } + #[test] fn test_spawn_echo_and_read_output() { let session = PtySession::spawn( @@ -509,6 +641,45 @@ mod tests { .expect("resize to smaller should succeed"); } + #[tokio::test] + async fn test_async_pty_responds_to_dsr_cursor_query() { + let session = PtySession::spawn( + &[ + "bash".to_string(), + "-lc".to_string(), + "printf '\\033[6n'; IFS=';' read -r -d R row col; printf 'ok:%s;%s\\n' \"$row\" \"$col\"" + .to_string(), + ], + TermSize::default(), + None, + ) + .expect("Failed to spawn bash DSR test"); + + let handle = AsyncPtyHandle::new(session).expect("Failed to create async handle"); + + let result = tokio::time::timeout(Duration::from_secs(2), async { + let mut collected = Vec::new(); + while let Some(chunk) = handle.read().await { + collected.extend_from_slice(&chunk); + if String::from_utf8_lossy(&collected).contains("ok:") { + break; + } + } + collected + }) + .await; + + let bytes = result.expect("Timed out waiting for DSR response"); + let output = String::from_utf8_lossy(&bytes); + assert!( + output.contains("ok:"), + "Expected shell to receive DSR response, got: {:?}", + output + ); + + handle.shutdown().await; + } + #[test] fn test_spawn_with_cwd() { // Spawn pwd in /tmp and verify it outputs a path containing "tmp" From b22327c0420fc297b4aa0ebb96966e4e17f734c2 Mon Sep 17 00:00:00 2001 From: John Cavanaugh <59479+cavanaug@users.noreply.github.com> Date: Fri, 13 Mar 2026 00:36:30 -0700 Subject: [PATCH 2/2] fix: capture DSR positions at query boundaries --- crates/pilotty-cli/src/daemon/pty.rs | 145 +++++++++++++++++---------- 1 file changed, 92 insertions(+), 53 deletions(-) diff --git a/crates/pilotty-cli/src/daemon/pty.rs b/crates/pilotty-cli/src/daemon/pty.rs index e59be6f..357e6d2 100644 --- a/crates/pilotty-cli/src/daemon/pty.rs +++ b/crates/pilotty-cli/src/daemon/pty.rs @@ -34,9 +34,23 @@ impl CursorTracker { self.parser.screen_mut().set_size(size.rows, size.cols); } - fn process_output(&mut self, bytes: &[u8]) -> usize { - self.parser.process(bytes); - self.count_dsr_queries(bytes) + fn process_output(&mut self, bytes: &[u8]) -> Vec<(u16, u16)> { + let mut positions = Vec::new(); + let mut segment_start = 0; + + for (idx, &b) in bytes.iter().enumerate() { + if self.advance_dsr_state(b) { + self.parser.process(&bytes[segment_start..=idx]); + positions.push(self.cursor_position_one_indexed()); + segment_start = idx + 1; + } + } + + if segment_start < bytes.len() { + self.parser.process(&bytes[segment_start..]); + } + + positions } fn cursor_position_one_indexed(&self) -> (u16, u16) { @@ -44,32 +58,30 @@ impl CursorTracker { (row.saturating_add(1), col.saturating_add(1)) } - fn count_dsr_queries(&mut self, bytes: &[u8]) -> usize { - let mut count = 0; + fn advance_dsr_state(&mut self, b: u8) -> bool { + let mut detected = false; - for &b in bytes { - self.dsr_state = match (self.dsr_state, b) { - (DsrParseState::Ground, 0x1b) => DsrParseState::Esc, - (DsrParseState::Ground, _) => DsrParseState::Ground, + self.dsr_state = match (self.dsr_state, b) { + (DsrParseState::Ground, 0x1b) => DsrParseState::Esc, + (DsrParseState::Ground, _) => DsrParseState::Ground, - (DsrParseState::Esc, b'[') => DsrParseState::Csi, - (DsrParseState::Esc, 0x1b) => DsrParseState::Esc, - (DsrParseState::Esc, _) => DsrParseState::Ground, + (DsrParseState::Esc, b'[') => DsrParseState::Csi, + (DsrParseState::Esc, 0x1b) => DsrParseState::Esc, + (DsrParseState::Esc, _) => DsrParseState::Ground, - (DsrParseState::Csi, b'6') => DsrParseState::Csi6, - (DsrParseState::Csi, 0x1b) => DsrParseState::Esc, - (DsrParseState::Csi, _) => DsrParseState::Ground, + (DsrParseState::Csi, b'6') => DsrParseState::Csi6, + (DsrParseState::Csi, 0x1b) => DsrParseState::Esc, + (DsrParseState::Csi, _) => DsrParseState::Ground, - (DsrParseState::Csi6, b'n') => { - count += 1; - DsrParseState::Ground - } - (DsrParseState::Csi6, 0x1b) => DsrParseState::Esc, - (DsrParseState::Csi6, _) => DsrParseState::Ground, - }; - } + (DsrParseState::Csi6, b'n') => { + detected = true; + DsrParseState::Ground + } + (DsrParseState::Csi6, 0x1b) => DsrParseState::Esc, + (DsrParseState::Csi6, _) => DsrParseState::Ground, + }; - count + detected } } @@ -222,7 +234,13 @@ impl AsyncPtyHandle { let reader_write_tx = write_tx.clone(); let reader_tracker = cursor_tracker.clone(); let reader_thread = std::thread::spawn(move || { - Self::reader_loop(reader, read_tx, reader_write_tx, reader_tracker, reader_shutdown); + Self::reader_loop( + reader, + read_tx, + reader_write_tx, + reader_tracker, + reader_shutdown, + ); }); // Spawn writer thread @@ -340,28 +358,28 @@ impl AsyncPtyHandle { break; } Ok(n) => { - let dsr_queries = match cursor_tracker.lock() { + let dsr_positions = match cursor_tracker.lock() { Ok(mut tracker) => tracker.process_output(&buf[..n]), Err(_) => { warn!("Cursor tracker mutex poisoned; skipping DSR detection"); - 0 + Vec::new() } }; - if dsr_queries > 0 { - let (row, col) = match cursor_tracker.lock() { - Ok(tracker) => tracker.cursor_position_one_indexed(), - Err(_) => { - warn!("Cursor tracker mutex poisoned; using fallback cursor response"); - (1, 1) - } - }; - let response = format!("\x1b[{};{}R", row, col).into_bytes(); - - for _ in 0..dsr_queries { - if write_tx.blocking_send(response.clone()).is_err() { - debug!("PTY write channel closed while sending DSR response"); - break; + if !dsr_positions.is_empty() { + for (row, col) in dsr_positions { + let response = format!("\x1b[{};{}R", row, col).into_bytes(); + match write_tx.try_send(response) { + Ok(()) => {} + Err(mpsc::error::TrySendError::Full(_)) => { + warn!( + "PTY write channel full; dropping synthetic DSR response" + ); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + debug!("PTY write channel closed while sending DSR response"); + break; + } } } } @@ -452,25 +470,39 @@ impl Drop for AsyncPtyHandle { #[cfg(test)] mod tests { use super::*; + use regex::Regex; use std::io::Read; use std::time::Duration; #[test] - fn test_cursor_tracker_detects_dsr_queries() { + fn test_cursor_tracker_collects_cursor_positions_for_dsr_queries() { let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); - assert_eq!(tracker.process_output(b"hello"), 0); - assert_eq!(tracker.process_output(b"\x1b[6n"), 1); - assert_eq!(tracker.process_output(b"x\x1b[6n\x1b[6n"), 2); + assert_eq!(tracker.process_output(b"hello"), Vec::<(u16, u16)>::new()); + assert_eq!(tracker.process_output(b"\x1b[6n"), vec![(1, 6)]); + assert_eq!( + tracker.process_output(b"x\x1b[6n\x1b[6n"), + vec![(1, 7), (1, 7)] + ); } #[test] fn test_cursor_tracker_detects_dsr_across_chunk_boundary() { let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); - assert_eq!(tracker.process_output(b"\x1b["), 0); - assert_eq!(tracker.process_output(b"6"), 0); - assert_eq!(tracker.process_output(b"n"), 1); + assert_eq!(tracker.process_output(b"\x1b["), Vec::<(u16, u16)>::new()); + assert_eq!(tracker.process_output(b"6"), Vec::<(u16, u16)>::new()); + assert_eq!(tracker.process_output(b"n"), vec![(1, 1)]); + } + + #[test] + fn test_cursor_tracker_captures_position_at_dsr_boundary_with_trailing_output() { + let mut tracker = CursorTracker::new(TermSize { cols: 80, rows: 24 }); + + let positions = tracker.process_output(b"abc\x1b[6nxyz"); + + assert_eq!(positions, vec![(1, 4)]); + assert_eq!(tracker.cursor_position_one_indexed(), (1, 7)); } #[test] @@ -647,7 +679,7 @@ mod tests { &[ "bash".to_string(), "-lc".to_string(), - "printf '\\033[6n'; IFS=';' read -r -d R row col; printf 'ok:%s;%s\\n' \"$row\" \"$col\"" + "printf 'abc\\033[6nxyz'; IFS=';' read -r -d R row col; printf 'ok:%s;%s\\n' \"$row\" \"$col\"" .to_string(), ], TermSize::default(), @@ -671,10 +703,17 @@ mod tests { let bytes = result.expect("Timed out waiting for DSR response"); let output = String::from_utf8_lossy(&bytes); - assert!( - output.contains("ok:"), - "Expected shell to receive DSR response, got: {:?}", - output + let captures = Regex::new(r"ok:\x1b\[(\d+);(\d+)") + .expect("valid regex") + .captures(&output) + .expect("Expected shell to receive DSR response with cursor coordinates"); + let row: u16 = captures[1].parse().expect("row should parse"); + let col: u16 = captures[2].parse().expect("col should parse"); + + assert_eq!( + (row, col), + (1, 4), + "Unexpected DSR response in output: {output:?}" ); handle.shutdown().await;