diff --git a/.changeset/rust-modular-utilities.md b/.changeset/rust-modular-utilities.md new file mode 100644 index 0000000..8ccafc7 --- /dev/null +++ b/.changeset/rust-modular-utilities.md @@ -0,0 +1,14 @@ +--- +'command-stream': patch +--- + +Reorganize Rust code with modular utilities (matching JS pattern) + +- Extract trace.rs (152 lines) - Logging and tracing utilities +- Extract ansi.rs (194 lines) - ANSI escape code handling +- Extract quote.rs (161 lines) - Shell quoting utilities +- Update utils.rs to re-export from new modules and focus on CommandResult/VirtualUtils +- Update lib.rs with new module declarations and re-exports + +The Rust structure now mirrors the JavaScript modular organization for consistency. +All modules remain well under the 1500-line limit guideline. diff --git a/rust/src/ansi.rs b/rust/src/ansi.rs new file mode 100644 index 0000000..834c212 --- /dev/null +++ b/rust/src/ansi.rs @@ -0,0 +1,194 @@ +//! ANSI control character utilities for command-stream +//! +//! This module handles stripping and processing of ANSI escape codes +//! and control characters from text output. + +/// ANSI control character utilities +pub struct AnsiUtils; + +impl AnsiUtils { + /// Strip ANSI escape sequences from text + /// + /// Removes color codes, cursor movement, and other ANSI escape sequences + /// while preserving the actual text content. + /// + /// # Examples + /// + /// ``` + /// use command_stream::ansi::AnsiUtils; + /// + /// let text = "\x1b[31mRed text\x1b[0m"; + /// assert_eq!(AnsiUtils::strip_ansi(text), "Red text"); + /// ``` + pub fn strip_ansi(text: &str) -> String { + let re = regex::Regex::new(r"\x1b\[[0-9;]*[mGKHFJ]").unwrap(); + re.replace_all(text, "").to_string() + } + + /// Strip control characters from text, preserving newlines, carriage returns, and tabs + /// + /// Removes control characters (ASCII 0x00-0x1F and 0x7F) except: + /// - Newlines (\n = 0x0A) + /// - Carriage returns (\r = 0x0D) + /// - Tabs (\t = 0x09) + /// + /// # Examples + /// + /// ``` + /// use command_stream::ansi::AnsiUtils; + /// + /// let text = "Hello\x00World\nNew line\tTab"; + /// assert_eq!(AnsiUtils::strip_control_chars(text), "HelloWorld\nNew line\tTab"); + /// ``` + pub fn strip_control_chars(text: &str) -> String { + text.chars() + .filter(|c| { + // Preserve newlines (\n = \x0A), carriage returns (\r = \x0D), and tabs (\t = \x09) + !matches!(*c as u32, + 0x00..=0x08 | 0x0B | 0x0C | 0x0E..=0x1F | 0x7F + ) + }) + .collect() + } + + /// Strip both ANSI sequences and control characters + /// + /// Combines `strip_ansi` and `strip_control_chars` for complete text cleaning. + pub fn strip_all(text: &str) -> String { + Self::strip_control_chars(&Self::strip_ansi(text)) + } + + /// Clean data for processing (strips ANSI and control chars) + /// + /// Alias for `strip_all` - provides semantic clarity when processing + /// data that needs to be cleaned for further processing. + pub fn clean_for_processing(data: &str) -> String { + Self::strip_all(data) + } +} + +/// Configuration for ANSI handling +/// +/// Controls how ANSI escape codes and control characters are processed +/// in command output. +#[derive(Debug, Clone)] +pub struct AnsiConfig { + /// Whether to preserve ANSI escape sequences in output + pub preserve_ansi: bool, + /// Whether to preserve control characters in output + pub preserve_control_chars: bool, +} + +impl Default for AnsiConfig { + fn default() -> Self { + AnsiConfig { + preserve_ansi: true, + preserve_control_chars: true, + } + } +} + +impl AnsiConfig { + /// Create a new AnsiConfig that preserves everything (default) + pub fn new() -> Self { + Self::default() + } + + /// Create a config that strips all ANSI and control characters + pub fn strip_all() -> Self { + AnsiConfig { + preserve_ansi: false, + preserve_control_chars: false, + } + } + + /// Process output according to config settings + /// + /// Applies the configured stripping rules to the input data. + pub fn process_output(&self, data: &str) -> String { + if !self.preserve_ansi && !self.preserve_control_chars { + AnsiUtils::clean_for_processing(data) + } else if !self.preserve_ansi { + AnsiUtils::strip_ansi(data) + } else if !self.preserve_control_chars { + AnsiUtils::strip_control_chars(data) + } else { + data.to_string() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_strip_ansi() { + let text = "\x1b[31mRed text\x1b[0m"; + assert_eq!(AnsiUtils::strip_ansi(text), "Red text"); + } + + #[test] + fn test_strip_ansi_multiple_codes() { + let text = "\x1b[1m\x1b[32mBold Green\x1b[0m Normal"; + assert_eq!(AnsiUtils::strip_ansi(text), "Bold Green Normal"); + } + + #[test] + fn test_strip_control_chars() { + let text = "Hello\x00World\nNew line\tTab"; + assert_eq!( + AnsiUtils::strip_control_chars(text), + "HelloWorld\nNew line\tTab" + ); + } + + #[test] + fn test_strip_control_chars_preserves_whitespace() { + let text = "Line1\nLine2\r\nLine3\tTabbed"; + assert_eq!( + AnsiUtils::strip_control_chars(text), + "Line1\nLine2\r\nLine3\tTabbed" + ); + } + + #[test] + fn test_strip_all() { + let text = "\x1b[31mRed\x00text\x1b[0m"; + assert_eq!(AnsiUtils::strip_all(text), "Redtext"); + } + + #[test] + fn test_ansi_config_default() { + let config = AnsiConfig::default(); + let text = "\x1b[31mRed\x00text\x1b[0m"; + assert_eq!(config.process_output(text), text); + } + + #[test] + fn test_ansi_config_strip_all() { + let config = AnsiConfig::strip_all(); + let text = "\x1b[31mRed\x00text\x1b[0m"; + assert_eq!(config.process_output(text), "Redtext"); + } + + #[test] + fn test_ansi_config_strip_ansi_only() { + let config = AnsiConfig { + preserve_ansi: false, + preserve_control_chars: true, + }; + let text = "\x1b[31mRed text\x1b[0m"; + assert_eq!(config.process_output(text), "Red text"); + } + + #[test] + fn test_ansi_config_strip_control_only() { + let config = AnsiConfig { + preserve_ansi: true, + preserve_control_chars: false, + }; + let text = "Hello\x00World"; + assert_eq!(config.process_output(text), "HelloWorld"); + } +} diff --git a/rust/src/events.rs b/rust/src/events.rs new file mode 100644 index 0000000..3bf6421 --- /dev/null +++ b/rust/src/events.rs @@ -0,0 +1,305 @@ +//! Event emitter for stream events +//! +//! This module provides an EventEmitter-like implementation for ProcessRunner +//! events, similar to the JavaScript StreamEmitter class. + +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::trace::trace_lazy; + +/// Event types that can be emitted by ProcessRunner +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub enum EventType { + /// Stdout data received + Stdout, + /// Stderr data received + Stderr, + /// Combined data event (contains type and data) + Data, + /// Process ended + End, + /// Process exited with code + Exit, + /// Error occurred + Error, + /// Process spawned + Spawn, +} + +impl std::fmt::Display for EventType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EventType::Stdout => write!(f, "stdout"), + EventType::Stderr => write!(f, "stderr"), + EventType::Data => write!(f, "data"), + EventType::End => write!(f, "end"), + EventType::Exit => write!(f, "exit"), + EventType::Error => write!(f, "error"), + EventType::Spawn => write!(f, "spawn"), + } + } +} + +/// Event data variants +#[derive(Debug, Clone)] +pub enum EventData { + /// String data (for stdout, stderr) + String(String), + /// Exit code + ExitCode(i32), + /// Data event with type and content + TypedData { data_type: String, data: String }, + /// Command result + Result(crate::CommandResult), + /// Error message + Error(String), + /// No data + None, +} + +/// Type alias for event listeners +type Listener = Arc; + +/// Event emitter for ProcessRunner events +/// +/// Provides on(), once(), off(), and emit() methods similar to Node.js EventEmitter. +pub struct StreamEmitter { + listeners: RwLock>>, +} + +impl Default for StreamEmitter { + fn default() -> Self { + Self::new() + } +} + +impl StreamEmitter { + /// Create a new event emitter + pub fn new() -> Self { + StreamEmitter { + listeners: RwLock::new(HashMap::new()), + } + } + + /// Register a listener for an event + /// + /// # Arguments + /// * `event` - The event type to listen for + /// * `listener` - The callback function to invoke + /// + /// # Example + /// ```ignore + /// emitter.on(EventType::Stdout, |data| { + /// if let EventData::String(s) = data { + /// println!("Got stdout: {}", s); + /// } + /// }); + /// ``` + pub async fn on(&self, event: EventType, listener: F) + where + F: Fn(EventData) + Send + Sync + 'static, + { + trace_lazy("StreamEmitter", || { + format!("on() called for event: {}", event) + }); + + let mut listeners = self.listeners.write().await; + listeners + .entry(event) + .or_default() + .push(Arc::new(listener)); + } + + /// Register a one-time listener for an event + /// + /// The listener will be removed after it is invoked once. + pub async fn once(&self, event: EventType, listener: F) + where + F: Fn(EventData) + Send + Sync + 'static, + { + trace_lazy("StreamEmitter", || { + format!("once() called for event: {}", event) + }); + + // Wrap the listener to track if it's been called + let called = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let called_clone = called.clone(); + + let once_listener = move |data: EventData| { + if !called_clone.swap(true, std::sync::atomic::Ordering::SeqCst) { + listener(data); + } + }; + + self.on(event, once_listener).await; + } + + /// Emit an event to all registered listeners + /// + /// # Arguments + /// * `event` - The event type to emit + /// * `data` - The event data to pass to listeners + pub async fn emit(&self, event: EventType, data: EventData) { + let listeners = self.listeners.read().await; + + if let Some(event_listeners) = listeners.get(&event) { + trace_lazy("StreamEmitter", || { + format!( + "Emitting event {} to {} listeners", + event, + event_listeners.len() + ) + }); + + for listener in event_listeners { + listener(data.clone()); + } + } + } + + /// Remove all listeners for an event + /// + /// # Arguments + /// * `event` - The event type to clear listeners for + pub async fn off(&self, event: EventType) { + trace_lazy("StreamEmitter", || { + format!("off() called for event: {}", event) + }); + + let mut listeners = self.listeners.write().await; + listeners.remove(&event); + } + + /// Get the number of listeners for an event + pub async fn listener_count(&self, event: &EventType) -> usize { + let listeners = self.listeners.read().await; + listeners.get(event).map(|v| v.len()).unwrap_or(0) + } + + /// Remove all listeners for all events + pub async fn remove_all_listeners(&self) { + trace_lazy("StreamEmitter", || "Removing all listeners".to_string()); + let mut listeners = self.listeners.write().await; + listeners.clear(); + } +} + +impl std::fmt::Debug for StreamEmitter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StreamEmitter") + .field("listeners", &">>") + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[tokio::test] + async fn test_emit_basic() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .on(EventType::Stdout, move |_| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("test".to_string())) + .await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_once() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .once(EventType::Exit, move |_| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + // Emit twice + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + + // Should only be called once + assert_eq!(counter.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn test_off() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .on(EventType::Stdout, move |_| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter.off(EventType::Stdout).await; + emitter + .emit(EventType::Stdout, EventData::String("test".to_string())) + .await; + + assert_eq!(counter.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn test_listener_count() { + let emitter = StreamEmitter::new(); + + assert_eq!(emitter.listener_count(&EventType::Stdout).await, 0); + + emitter.on(EventType::Stdout, |_| {}).await; + assert_eq!(emitter.listener_count(&EventType::Stdout).await, 1); + + emitter.on(EventType::Stdout, |_| {}).await; + assert_eq!(emitter.listener_count(&EventType::Stdout).await, 2); + } + + #[tokio::test] + async fn test_multiple_events() { + let emitter = StreamEmitter::new(); + let stdout_counter = Arc::new(AtomicUsize::new(0)); + let stderr_counter = Arc::new(AtomicUsize::new(0)); + + let stdout_clone = stdout_counter.clone(); + let stderr_clone = stderr_counter.clone(); + + emitter + .on(EventType::Stdout, move |_| { + stdout_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .on(EventType::Stderr, move |_| { + stderr_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("out".to_string())) + .await; + emitter + .emit(EventType::Stderr, EventData::String("err".to_string())) + .await; + + assert_eq!(stdout_counter.load(Ordering::SeqCst), 1); + assert_eq!(stderr_counter.load(Ordering::SeqCst), 1); + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 311230f..0671f5e 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -9,40 +9,93 @@ //! //! - Async command execution with tokio //! - Streaming output via async iterators +//! - Event-based output handling (on, once, emit) //! - Virtual commands for common operations (cat, ls, mkdir, etc.) //! - Shell operator support (&&, ||, ;, |) +//! - Pipeline support with `.pipe()` method and `Pipeline` builder +//! - Global state management for shell settings +//! - `cmd!` macro for ergonomic command creation (similar to JS `$` tagged template literals) //! - Cross-platform support //! +//! ## Module Organization +//! +//! The codebase follows a modular architecture similar to the JavaScript implementation: +//! +//! - `ansi` - ANSI escape code handling utilities +//! - `commands` - Virtual command implementations +//! - `events` - Event emitter for stream events +//! - `macros` - The `cmd!` macro for ergonomic command creation +//! - `pipeline` - Pipeline execution support +//! - `quote` - Shell quoting utilities +//! - `shell_parser` - Shell command parsing +//! - `state` - Global state management +//! - `stream` - Async streaming and iteration support +//! - `trace` - Logging and tracing utilities +//! - `utils` - Command results and virtual command helpers +//! //! ## Quick Start //! //! ```rust,no_run -//! use command_stream::run; +//! use command_stream::{run, cmd}; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! // Execute a simple command //! let result = run("echo hello world").await?; //! println!("{}", result.stdout); +//! +//! // Using the cmd! macro (similar to JS $ tagged template) +//! let name = "world"; +//! let result = cmd!("echo hello {}", name).await?; +//! println!("{}", result.stdout); +//! +//! // Using pipelines +//! use command_stream::Pipeline; +//! let result = Pipeline::new() +//! .add("echo hello world") +//! .add("grep world") +//! .run() +//! .await?; +//! //! Ok(()) //! } //! ``` +// Modular utility modules (following JavaScript modular pattern) +pub mod ansi; +pub mod events; +#[doc(hidden)] +pub mod macros; +pub mod pipeline; +pub mod quote; +pub mod state; +pub mod stream; +pub mod trace; + +// Core modules pub mod commands; pub mod shell_parser; pub mod utils; use std::collections::HashMap; -use std::env; use std::path::PathBuf; use std::process::Stdio; -use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::process::{Child, Command}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::mpsc; pub use commands::{CommandContext, StreamChunk}; pub use shell_parser::{parse_shell_command, needs_real_shell, ParsedCommand}; -pub use utils::{AnsiConfig, AnsiUtils, CommandResult, VirtualUtils, quote, trace}; +pub use utils::{CommandResult, VirtualUtils}; + +// Re-export modular utilities at crate root for convenient access +pub use ansi::{AnsiConfig, AnsiUtils}; +pub use events::{EventData, EventType, StreamEmitter}; +pub use pipeline::{Pipeline, PipelineExt, PipelineBuilder}; +pub use quote::quote; +pub use state::{global_state, reset_global_state, get_shell_settings, set_shell_option, unset_shell_option, GlobalState, ShellSettings}; +pub use stream::{StreamingRunner, OutputStream, OutputChunk, AsyncIterator, IntoStream}; +pub use trace::trace; /// Error type for command-stream operations #[derive(Debug, thiserror::Error)] @@ -66,21 +119,6 @@ pub enum Error { /// Result type for command-stream operations pub type Result = std::result::Result; -/// Shell settings for controlling execution behavior -#[derive(Debug, Clone, Default)] -pub struct ShellSettings { - /// Exit immediately if a command exits with non-zero status (set -e) - pub errexit: bool, - /// Print commands as they are executed (set -v) - pub verbose: bool, - /// Print trace of commands (set -x) - pub xtrace: bool, - /// Return value of a pipeline is the status of the last command to exit with non-zero (set -o pipefail) - pub pipefail: bool, - /// Treat unset variables as an error (set -u) - pub nounset: bool, -} - /// Options for command execution #[derive(Debug, Clone)] pub struct RunOptions { @@ -179,8 +217,8 @@ impl ProcessRunner { return Ok(()); } - // Parse command for shell operators - let parsed = if self.options.shell_operators && !needs_real_shell(&self.command) { + // Parse command for shell operators (for future use with virtual command pipelines) + let _parsed = if self.options.shell_operators && !needs_real_shell(&self.command) { parse_shell_command(&self.command) } else { None @@ -364,6 +402,16 @@ impl ProcessRunner { pub fn result(&self) -> Option<&CommandResult> { self.result.as_ref() } + + /// Get the command string + pub fn command(&self) -> &str { + &self.command + } + + /// Get the options + pub fn options(&self) -> &RunOptions { + &self.options + } } /// Shell configuration @@ -452,41 +500,4 @@ pub fn run_sync(command: impl Into) -> Result { rt.block_on(run(command)) } -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_simple_echo() { - let result = run("echo hello").await.unwrap(); - assert!(result.is_success()); - assert!(result.stdout.contains("hello")); - } - - #[tokio::test] - async fn test_virtual_echo() { - let mut runner = ProcessRunner::new("echo test virtual", RunOptions::default()); - let result = runner.run().await.unwrap(); - assert!(result.is_success()); - assert!(result.stdout.contains("test virtual")); - } - - #[tokio::test] - async fn test_process_runner() { - let mut runner = ProcessRunner::new("echo hello world", RunOptions { - mirror: false, - ..Default::default() - }); - - let result = runner.run().await.unwrap(); - assert!(result.is_success()); - } - - #[tokio::test] - async fn test_virtual_pwd() { - let mut runner = ProcessRunner::new("pwd", RunOptions::default()); - let result = runner.run().await.unwrap(); - assert!(result.is_success()); - assert!(!result.stdout.is_empty()); - } -} +// Tests are located in tests/ directory for better organization diff --git a/rust/src/macros.rs b/rust/src/macros.rs new file mode 100644 index 0000000..90fdfc0 --- /dev/null +++ b/rust/src/macros.rs @@ -0,0 +1,165 @@ +//! Macros for ergonomic command execution +//! +//! This module provides command execution macros that offer a similar experience +//! to JavaScript's `$` tagged template literal for shell command execution. +//! +//! ## Available Macros +//! +//! - `s!` - Short, concise macro (recommended for most use cases) +//! - `sh!` - Shell macro (alternative short form) +//! - `cmd!` - Command macro (explicit name) +//! - `cs!` - Command-stream macro (another alternative) +//! +//! All macros are aliases and provide identical functionality. +//! +//! ## Usage +//! +//! ```rust,no_run +//! use command_stream::s; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! // Simple command +//! let result = s!("echo hello world").await?; +//! +//! // With interpolation (values are automatically quoted for safety) +//! let name = "John Doe"; +//! let result = s!("echo Hello, {}", name).await?; +//! +//! // Multiple arguments +//! let file = "test.txt"; +//! let dir = "/tmp"; +//! let result = s!("cp {} {}", file, dir).await?; +//! +//! Ok(()) +//! } +//! ``` + +/// Build a shell command with interpolated values safely quoted +/// +/// This function is used internally by the `cmd!` macro to build +/// shell commands with properly quoted interpolated values. +pub fn build_shell_command(parts: &[&str], values: &[&str]) -> String { + let mut result = String::new(); + + for (i, part) in parts.iter().enumerate() { + result.push_str(part); + if i < values.len() { + result.push_str(&crate::quote::quote(values[i])); + } + } + + result +} + +/// Helper function to create a ProcessRunner from a command string +pub fn create_runner(command: String) -> crate::ProcessRunner { + crate::ProcessRunner::new(command, crate::RunOptions { + mirror: true, + capture: true, + ..Default::default() + }) +} + +/// Helper function to create a ProcessRunner with custom options +pub fn create_runner_with_options(command: String, options: crate::RunOptions) -> crate::ProcessRunner { + crate::ProcessRunner::new(command, options) +} + +/// The `cmd!` macro for ergonomic shell command execution +/// +/// This macro provides a similar experience to JavaScript's `$` tagged template literal. +/// Values interpolated into the command are automatically quoted for shell safety. +/// +/// Note: Consider using the shorter `s!` or `sh!` aliases for more concise code. +/// +/// # Examples +/// +/// ```rust,no_run +/// use command_stream::s; +/// +/// # async fn example() -> Result<(), command_stream::Error> { +/// // Simple command (returns a future that can be awaited) +/// let result = s!("echo hello").await?; +/// +/// // With string interpolation +/// let name = "world"; +/// let result = s!("echo hello {}", name).await?; +/// +/// // With multiple values +/// let src = "source.txt"; +/// let dst = "dest.txt"; +/// let result = s!("cp {} {}", src, dst).await?; +/// +/// // Values with special characters are automatically quoted +/// let filename = "file with spaces.txt"; +/// let result = s!("cat {}", filename).await?; // Safely handles spaces +/// # Ok(()) +/// # } +/// ``` +/// +/// # Safety +/// +/// All interpolated values are automatically quoted using shell-safe quoting, +/// preventing command injection attacks. +#[macro_export] +macro_rules! cmd { + // No interpolation - just a plain command string + ($cmd:expr) => {{ + async { + $crate::run($cmd).await + } + }}; + + // With format-style interpolation + ($fmt:expr, $($arg:expr),+ $(,)?) => {{ + // Build command with quoted values + let mut result = String::new(); + let values: Vec = vec![$(format!("{}", $arg)),+]; + let values_ref: Vec<&str> = values.iter().map(|s| s.as_str()).collect(); + let fmt_parts: Vec<&str> = $fmt.split("{}").collect(); + for (i, part) in fmt_parts.iter().enumerate() { + result.push_str(part); + if i < values_ref.len() { + result.push_str(&$crate::quote::quote(values_ref[i])); + } + } + + async move { + $crate::run(result).await + } + }}; +} + +/// The `sh!` macro - alias for `cmd!` +/// +/// This is an alternative name for `cmd!` that some users may find +/// more intuitive for shell command execution. +#[macro_export] +macro_rules! sh { + ($($args:tt)*) => { + $crate::cmd!($($args)*) + }; +} + +/// The `s!` macro - short alias for `cmd!` +/// +/// This is a concise alternative to `cmd!` for quick shell command execution. +/// Recommended for use in documentation and examples. +#[macro_export] +macro_rules! s { + ($($args:tt)*) => { + $crate::cmd!($($args)*) + }; +} + +/// The `cs!` macro - alias for `cmd!` +/// +/// Short for "command-stream", this provides another alternative +/// for shell command execution. +#[macro_export] +macro_rules! cs { + ($($args:tt)*) => { + $crate::cmd!($($args)*) + }; +} diff --git a/rust/src/pipeline.rs b/rust/src/pipeline.rs new file mode 100644 index 0000000..1c19100 --- /dev/null +++ b/rust/src/pipeline.rs @@ -0,0 +1,411 @@ +//! Pipeline execution support +//! +//! This module provides pipeline functionality similar to the JavaScript +//! `$.process-runner-pipeline.mjs` module. It allows chaining commands +//! together with the output of one command becoming the input of the next. +//! +//! ## Usage +//! +//! ```rust,no_run +//! use command_stream::{Pipeline, run}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! // Create a pipeline +//! let result = Pipeline::new() +//! .add("echo hello world") +//! .add("grep world") +//! .add("wc -l") +//! .run() +//! .await?; +//! +//! println!("Output: {}", result.stdout); +//! Ok(()) +//! } +//! ``` + +use std::collections::HashMap; +use std::path::PathBuf; +use std::process::Stdio; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::process::Command; + +use crate::trace::trace_lazy; +use crate::{CommandResult, Result, RunOptions, StdinOption}; + +/// A pipeline of commands to be executed sequentially +/// +/// Each command's stdout is piped to the next command's stdin. +#[derive(Debug, Clone)] +pub struct Pipeline { + /// Commands in the pipeline + commands: Vec, + /// Initial stdin content (optional) + stdin: Option, + /// Working directory + cwd: Option, + /// Environment variables + env: Option>, + /// Whether to mirror output to parent stdout/stderr + mirror: bool, + /// Whether to capture output + capture: bool, +} + +impl Default for Pipeline { + fn default() -> Self { + Self::new() + } +} + +impl Pipeline { + /// Create a new empty pipeline + pub fn new() -> Self { + Pipeline { + commands: Vec::new(), + stdin: None, + cwd: None, + env: None, + mirror: true, + capture: true, + } + } + + /// Add a command to the pipeline + pub fn add(mut self, command: impl Into) -> Self { + self.commands.push(command.into()); + self + } + + /// Set the initial stdin content for the first command + pub fn stdin(mut self, content: impl Into) -> Self { + self.stdin = Some(content.into()); + self + } + + /// Set the working directory for all commands + pub fn cwd(mut self, path: impl Into) -> Self { + self.cwd = Some(path.into()); + self + } + + /// Set environment variables for all commands + pub fn env(mut self, env: HashMap) -> Self { + self.env = Some(env); + self + } + + /// Set whether to mirror output to stdout/stderr + pub fn mirror_output(mut self, mirror: bool) -> Self { + self.mirror = mirror; + self + } + + /// Set whether to capture output + pub fn capture_output(mut self, capture: bool) -> Self { + self.capture = capture; + self + } + + /// Execute the pipeline and return the result + pub async fn run(self) -> Result { + if self.commands.is_empty() { + return Ok(CommandResult { + stdout: String::new(), + stderr: "No commands in pipeline".to_string(), + code: 1, + }); + } + + trace_lazy("Pipeline", || { + format!("Running pipeline with {} commands", self.commands.len()) + }); + + let mut current_stdin = self.stdin.clone(); + let mut last_result = CommandResult { + stdout: String::new(), + stderr: String::new(), + code: 0, + }; + let mut accumulated_stderr = String::new(); + + for (i, cmd_str) in self.commands.iter().enumerate() { + let is_last = i == self.commands.len() - 1; + + trace_lazy("Pipeline", || { + format!("Executing command {}/{}: {}", i + 1, self.commands.len(), cmd_str) + }); + + // Check if this is a virtual command + let first_word = cmd_str.split_whitespace().next().unwrap_or(""); + if crate::commands::are_virtual_commands_enabled() { + if let Some(result) = self.try_virtual_command(first_word, cmd_str, ¤t_stdin).await { + if result.code != 0 { + return Ok(CommandResult { + stdout: result.stdout, + stderr: accumulated_stderr + &result.stderr, + code: result.code, + }); + } + current_stdin = Some(result.stdout.clone()); + accumulated_stderr.push_str(&result.stderr); + last_result = result; + continue; + } + } + + // Execute via shell + let shell = find_available_shell(); + let mut cmd = Command::new(&shell.cmd); + for arg in &shell.args { + cmd.arg(arg); + } + cmd.arg(cmd_str); + + // Configure stdio + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + // Set working directory + if let Some(ref cwd) = self.cwd { + cmd.current_dir(cwd); + } + + // Set environment + if let Some(ref env_vars) = self.env { + for (key, value) in env_vars { + cmd.env(key, value); + } + } + + // Spawn the process + let mut child = cmd.spawn()?; + + // Write stdin if available + if let Some(ref stdin_content) = current_stdin { + if let Some(mut stdin) = child.stdin.take() { + let content = stdin_content.clone(); + tokio::spawn(async move { + let _ = stdin.write_all(content.as_bytes()).await; + let _ = stdin.shutdown().await; + }); + } + } + + // Read stdout + let mut stdout_content = String::new(); + if let Some(mut stdout) = child.stdout.take() { + stdout.read_to_string(&mut stdout_content).await?; + } + + // Read stderr + let mut stderr_content = String::new(); + if let Some(mut stderr) = child.stderr.take() { + stderr.read_to_string(&mut stderr_content).await?; + } + + // Mirror output if enabled and this is the last command + if is_last && self.mirror { + if !stdout_content.is_empty() { + print!("{}", stdout_content); + } + if !stderr_content.is_empty() { + eprint!("{}", stderr_content); + } + } + + // Wait for the process + let status = child.wait().await?; + let code = status.code().unwrap_or(-1); + + accumulated_stderr.push_str(&stderr_content); + + if code != 0 { + return Ok(CommandResult { + stdout: stdout_content, + stderr: accumulated_stderr, + code, + }); + } + + // Set up stdin for next command + current_stdin = Some(stdout_content.clone()); + last_result = CommandResult { + stdout: stdout_content, + stderr: String::new(), + code, + }; + } + + Ok(CommandResult { + stdout: last_result.stdout, + stderr: accumulated_stderr, + code: last_result.code, + }) + } + + /// Try to execute a virtual command + async fn try_virtual_command( + &self, + cmd_name: &str, + full_cmd: &str, + stdin: &Option, + ) -> Option { + let parts: Vec<&str> = full_cmd.split_whitespace().collect(); + let args: Vec = parts.iter().skip(1).map(|s| s.to_string()).collect(); + + let ctx = crate::commands::CommandContext { + args, + stdin: stdin.clone(), + cwd: self.cwd.clone(), + env: self.env.clone(), + output_tx: None, + is_cancelled: None, + }; + + match cmd_name { + "echo" => Some(crate::commands::echo(ctx).await), + "pwd" => Some(crate::commands::pwd(ctx).await), + "cd" => Some(crate::commands::cd(ctx).await), + "true" => Some(crate::commands::r#true(ctx).await), + "false" => Some(crate::commands::r#false(ctx).await), + "sleep" => Some(crate::commands::sleep(ctx).await), + "cat" => Some(crate::commands::cat(ctx).await), + "ls" => Some(crate::commands::ls(ctx).await), + "mkdir" => Some(crate::commands::mkdir(ctx).await), + "rm" => Some(crate::commands::rm(ctx).await), + "touch" => Some(crate::commands::touch(ctx).await), + "cp" => Some(crate::commands::cp(ctx).await), + "mv" => Some(crate::commands::mv(ctx).await), + "basename" => Some(crate::commands::basename(ctx).await), + "dirname" => Some(crate::commands::dirname(ctx).await), + "env" => Some(crate::commands::env(ctx).await), + "exit" => Some(crate::commands::exit(ctx).await), + "which" => Some(crate::commands::which(ctx).await), + "yes" => Some(crate::commands::yes(ctx).await), + "seq" => Some(crate::commands::seq(ctx).await), + "test" => Some(crate::commands::test(ctx).await), + _ => None, + } + } +} + +/// Shell configuration +#[derive(Debug, Clone)] +struct ShellConfig { + cmd: String, + args: Vec, +} + +/// Find an available shell +fn find_available_shell() -> ShellConfig { + let is_windows = cfg!(windows); + + if is_windows { + ShellConfig { + cmd: "cmd.exe".to_string(), + args: vec!["/c".to_string()], + } + } else { + let shells = [ + ("/bin/sh", "-c"), + ("/usr/bin/sh", "-c"), + ("/bin/bash", "-c"), + ]; + + for (cmd, arg) in shells { + if std::path::Path::new(cmd).exists() { + return ShellConfig { + cmd: cmd.to_string(), + args: vec![arg.to_string()], + }; + } + } + + ShellConfig { + cmd: "/bin/sh".to_string(), + args: vec!["-c".to_string()], + } + } +} + +/// Extension trait to add `.pipe()` method to ProcessRunner +pub trait PipelineExt { + /// Pipe the output of this command to another command + fn pipe(self, command: impl Into) -> PipelineBuilder; +} + +impl PipelineExt for crate::ProcessRunner { + fn pipe(self, command: impl Into) -> PipelineBuilder { + PipelineBuilder { + first: self, + additional: vec![command.into()], + } + } +} + +/// Builder for piping commands together +pub struct PipelineBuilder { + first: crate::ProcessRunner, + additional: Vec, +} + +impl PipelineBuilder { + /// Add another command to the pipeline + pub fn pipe(mut self, command: impl Into) -> Self { + self.additional.push(command.into()); + self + } + + /// Execute the pipeline + pub async fn run(mut self) -> Result { + // First, run the initial command + let first_result = self.first.run().await?; + + if first_result.code != 0 { + return Ok(first_result); + } + + // Then run the rest as a pipeline + let mut current_stdin = Some(first_result.stdout); + let mut accumulated_stderr = first_result.stderr; + let mut last_result = CommandResult { + stdout: String::new(), + stderr: String::new(), + code: 0, + }; + + for cmd_str in &self.additional { + let mut runner = crate::ProcessRunner::new( + cmd_str.clone(), + RunOptions { + stdin: StdinOption::Content(current_stdin.take().unwrap_or_default()), + mirror: false, + capture: true, + ..Default::default() + }, + ); + + let result = runner.run().await?; + accumulated_stderr.push_str(&result.stderr); + + if result.code != 0 { + return Ok(CommandResult { + stdout: result.stdout, + stderr: accumulated_stderr, + code: result.code, + }); + } + + current_stdin = Some(result.stdout.clone()); + last_result = result; + } + + Ok(CommandResult { + stdout: last_result.stdout, + stderr: accumulated_stderr, + code: last_result.code, + }) + } +} diff --git a/rust/src/quote.rs b/rust/src/quote.rs new file mode 100644 index 0000000..adddc4f --- /dev/null +++ b/rust/src/quote.rs @@ -0,0 +1,161 @@ +//! Shell quoting utilities for command-stream +//! +//! This module provides functions for safely quoting values for shell usage, +//! preventing command injection and ensuring proper argument handling. + +/// Quote a value for safe shell usage +/// +/// This function quotes strings appropriately for use in shell commands, +/// handling special characters and edge cases. +/// +/// # Examples +/// +/// ``` +/// use command_stream::quote::quote; +/// +/// // Safe characters are passed through unchanged +/// assert_eq!(quote("hello"), "hello"); +/// assert_eq!(quote("/path/to/file"), "/path/to/file"); +/// +/// // Special characters are quoted +/// assert_eq!(quote("hello world"), "'hello world'"); +/// +/// // Single quotes in strings are escaped +/// assert_eq!(quote("it's"), "'it'\\''s'"); +/// +/// // Empty strings are quoted +/// assert_eq!(quote(""), "''"); +/// ``` +pub fn quote(value: &str) -> String { + if value.is_empty() { + return "''".to_string(); + } + + // If already properly quoted with single quotes, check if we can use as-is + if value.starts_with('\'') && value.ends_with('\'') && value.len() >= 2 { + let inner = &value[1..value.len() - 1]; + if !inner.contains('\'') { + return value.to_string(); + } + } + + // If already double-quoted, wrap in single quotes + if value.starts_with('"') && value.ends_with('"') && value.len() > 2 { + return format!("'{}'", value); + } + + // Check if the string needs quoting at all + // Safe characters: alphanumeric, dash, underscore, dot, slash, colon, equals, comma, plus, at + let safe_pattern = regex::Regex::new(r"^[a-zA-Z0-9_\-./=,+@:]+$").unwrap(); + + if safe_pattern.is_match(value) { + return value.to_string(); + } + + // Default behavior: wrap in single quotes and escape any internal single quotes + // The shell escape sequence for a single quote inside single quotes is: '\'' + // This ends the single quote, adds an escaped single quote, and starts single quotes again + format!("'{}'", value.replace('\'', "'\\''")) +} + +/// Quote multiple values and join them with spaces +/// +/// Convenience function for quoting a list of arguments. +/// +/// # Examples +/// +/// ``` +/// use command_stream::quote::quote_all; +/// +/// let args = vec!["echo", "hello world", "test"]; +/// assert_eq!(quote_all(&args), "echo 'hello world' test"); +/// ``` +pub fn quote_all(values: &[&str]) -> String { + values + .iter() + .map(|v| quote(v)) + .collect::>() + .join(" ") +} + +/// Check if a string needs quoting for shell usage +/// +/// Returns true if the string contains characters that would be interpreted +/// specially by the shell. +/// +/// # Examples +/// +/// ``` +/// use command_stream::quote::needs_quoting; +/// +/// assert!(!needs_quoting("hello")); +/// assert!(needs_quoting("hello world")); +/// assert!(needs_quoting("$PATH")); +/// ``` +pub fn needs_quoting(value: &str) -> bool { + if value.is_empty() { + return true; + } + + let safe_pattern = regex::Regex::new(r"^[a-zA-Z0-9_\-./=,+@:]+$").unwrap(); + !safe_pattern.is_match(value) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_quote_empty() { + assert_eq!(quote(""), "''"); + } + + #[test] + fn test_quote_safe_chars() { + assert_eq!(quote("hello"), "hello"); + assert_eq!(quote("/path/to/file"), "/path/to/file"); + assert_eq!(quote("file.txt"), "file.txt"); + assert_eq!(quote("key=value"), "key=value"); + assert_eq!(quote("user@host"), "user@host"); + } + + #[test] + fn test_quote_special_chars() { + assert_eq!(quote("hello world"), "'hello world'"); + assert_eq!(quote("it's"), "'it'\\''s'"); + assert_eq!(quote("$var"), "'$var'"); + assert_eq!(quote("test*"), "'test*'"); + } + + #[test] + fn test_quote_already_quoted() { + assert_eq!(quote("'already quoted'"), "'already quoted'"); + assert_eq!(quote("\"double quoted\""), "'\"double quoted\"'"); + } + + #[test] + fn test_quote_all() { + let args = vec!["echo", "hello world", "test"]; + assert_eq!(quote_all(&args), "echo 'hello world' test"); + } + + #[test] + fn test_needs_quoting() { + assert!(!needs_quoting("hello")); + assert!(!needs_quoting("/path/to/file")); + assert!(needs_quoting("hello world")); + assert!(needs_quoting("$PATH")); + assert!(needs_quoting("")); + assert!(needs_quoting("test*")); + } + + #[test] + fn test_quote_with_newlines() { + assert_eq!(quote("line1\nline2"), "'line1\nline2'"); + } + + #[test] + fn test_quote_with_tabs() { + assert_eq!(quote("col1\tcol2"), "'col1\tcol2'"); + } +} diff --git a/rust/src/state.rs b/rust/src/state.rs new file mode 100644 index 0000000..1a498e3 --- /dev/null +++ b/rust/src/state.rs @@ -0,0 +1,333 @@ +//! Global state management for command-stream +//! +//! This module handles signal handlers, process tracking, and cleanup, +//! similar to the JavaScript $.state.mjs module. + +use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::trace::trace_lazy; + +/// Shell settings for controlling execution behavior +#[derive(Debug, Clone, Default)] +pub struct ShellSettings { + /// Exit immediately if a command exits with non-zero status (set -e) + pub errexit: bool, + /// Print commands as they are executed (set -v) + pub verbose: bool, + /// Print trace of commands (set -x) + pub xtrace: bool, + /// Return value of a pipeline is the status of the last command to exit with non-zero (set -o pipefail) + pub pipefail: bool, + /// Treat unset variables as an error (set -u) + pub nounset: bool, + /// Disable filename globbing (set -f) + pub noglob: bool, + /// Export all variables (set -a) + pub allexport: bool, +} + +impl ShellSettings { + /// Create new shell settings with defaults + pub fn new() -> Self { + Self::default() + } + + /// Reset all settings to their defaults + pub fn reset(&mut self) { + *self = Self::default(); + } + + /// Set a shell option by name + /// + /// Supports both short flags (e, v, x, u, f, a) and long names + pub fn set(&mut self, option: &str, value: bool) { + match option { + "e" | "errexit" => self.errexit = value, + "v" | "verbose" => self.verbose = value, + "x" | "xtrace" => self.xtrace = value, + "u" | "nounset" => self.nounset = value, + "f" | "noglob" => self.noglob = value, + "a" | "allexport" => self.allexport = value, + "o pipefail" | "pipefail" => self.pipefail = value, + _ => { + trace_lazy("ShellSettings", || { + format!("Unknown shell option: {}", option) + }); + } + } + } + + /// Enable a shell option + pub fn enable(&mut self, option: &str) { + self.set(option, true); + } + + /// Disable a shell option + pub fn disable(&mut self, option: &str) { + self.set(option, false); + } +} + +/// Global state for the command-stream library +pub struct GlobalState { + /// Current shell settings + shell_settings: RwLock, + /// Set of active process runner IDs + active_runners: RwLock>, + /// Counter for generating runner IDs + next_runner_id: std::sync::atomic::AtomicU64, + /// Whether signal handlers are installed + signal_handlers_installed: AtomicBool, + /// Whether virtual commands are enabled + virtual_commands_enabled: AtomicBool, + /// Initial working directory + initial_cwd: RwLock>, +} + +impl Default for GlobalState { + fn default() -> Self { + Self::new() + } +} + +impl GlobalState { + /// Create a new global state + pub fn new() -> Self { + let initial_cwd = std::env::current_dir().ok(); + + GlobalState { + shell_settings: RwLock::new(ShellSettings::new()), + active_runners: RwLock::new(HashSet::new()), + next_runner_id: std::sync::atomic::AtomicU64::new(1), + signal_handlers_installed: AtomicBool::new(false), + virtual_commands_enabled: AtomicBool::new(true), + initial_cwd: RwLock::new(initial_cwd), + } + } + + /// Get the current shell settings + pub async fn get_shell_settings(&self) -> ShellSettings { + self.shell_settings.read().await.clone() + } + + /// Set shell settings + pub async fn set_shell_settings(&self, settings: ShellSettings) { + *self.shell_settings.write().await = settings; + } + + /// Modify shell settings with a closure + pub async fn with_shell_settings(&self, f: F) + where + F: FnOnce(&mut ShellSettings), + { + let mut settings = self.shell_settings.write().await; + f(&mut settings); + } + + /// Enable a shell option + pub async fn enable_shell_option(&self, option: &str) { + self.shell_settings.write().await.enable(option); + } + + /// Disable a shell option + pub async fn disable_shell_option(&self, option: &str) { + self.shell_settings.write().await.disable(option); + } + + /// Register a new active runner and return its ID + pub async fn register_runner(&self) -> u64 { + let id = self + .next_runner_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + self.active_runners.write().await.insert(id); + + trace_lazy("GlobalState", || { + format!("Registered runner {}", id) + }); + + id + } + + /// Unregister an active runner + pub async fn unregister_runner(&self, id: u64) { + self.active_runners.write().await.remove(&id); + + trace_lazy("GlobalState", || { + format!("Unregistered runner {}", id) + }); + } + + /// Get the count of active runners + pub async fn active_runner_count(&self) -> usize { + self.active_runners.read().await.len() + } + + /// Check if signal handlers are installed + pub fn are_signal_handlers_installed(&self) -> bool { + self.signal_handlers_installed.load(Ordering::SeqCst) + } + + /// Mark signal handlers as installed + pub fn set_signal_handlers_installed(&self, installed: bool) { + self.signal_handlers_installed.store(installed, Ordering::SeqCst); + } + + /// Check if virtual commands are enabled + pub fn are_virtual_commands_enabled(&self) -> bool { + self.virtual_commands_enabled.load(Ordering::SeqCst) + } + + /// Enable virtual commands + pub fn enable_virtual_commands(&self) { + self.virtual_commands_enabled.store(true, Ordering::SeqCst); + trace_lazy("GlobalState", || "Virtual commands enabled".to_string()); + } + + /// Disable virtual commands + pub fn disable_virtual_commands(&self) { + self.virtual_commands_enabled.store(false, Ordering::SeqCst); + trace_lazy("GlobalState", || "Virtual commands disabled".to_string()); + } + + /// Get the initial working directory + pub async fn get_initial_cwd(&self) -> Option { + self.initial_cwd.read().await.clone() + } + + /// Reset global state to defaults + pub async fn reset(&self) { + // Reset shell settings + *self.shell_settings.write().await = ShellSettings::new(); + + // Clear active runners + self.active_runners.write().await.clear(); + + // Reset virtual commands flag + self.virtual_commands_enabled.store(true, Ordering::SeqCst); + + // Don't reset signal handlers installed flag - that's managed separately + + trace_lazy("GlobalState", || "Global state reset completed".to_string()); + } + + /// Restore working directory to initial + pub async fn restore_cwd(&self) -> std::io::Result<()> { + if let Some(ref initial) = *self.initial_cwd.read().await { + if initial.exists() { + std::env::set_current_dir(initial)?; + } + } + Ok(()) + } +} + +/// Global state singleton +static GLOBAL_STATE: std::sync::OnceLock> = std::sync::OnceLock::new(); + +/// Get the global state instance +pub fn global_state() -> Arc { + GLOBAL_STATE + .get_or_init(|| Arc::new(GlobalState::new())) + .clone() +} + +/// Reset the global state (for testing) +pub async fn reset_global_state() { + global_state().reset().await; +} + +/// Get current shell settings +pub async fn get_shell_settings() -> ShellSettings { + global_state().get_shell_settings().await +} + +/// Enable a shell option globally +pub async fn set_shell_option(option: &str) { + global_state().enable_shell_option(option).await; +} + +/// Disable a shell option globally +pub async fn unset_shell_option(option: &str) { + global_state().disable_shell_option(option).await; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_shell_settings_default() { + let settings = ShellSettings::new(); + assert!(!settings.errexit); + assert!(!settings.verbose); + assert!(!settings.xtrace); + assert!(!settings.pipefail); + assert!(!settings.nounset); + } + + #[test] + fn test_shell_settings_set() { + let mut settings = ShellSettings::new(); + + settings.set("e", true); + assert!(settings.errexit); + + settings.set("errexit", false); + assert!(!settings.errexit); + + settings.set("o pipefail", true); + assert!(settings.pipefail); + } + + #[tokio::test] + async fn test_global_state_runners() { + let state = GlobalState::new(); + + let id1 = state.register_runner().await; + let id2 = state.register_runner().await; + + assert_eq!(state.active_runner_count().await, 2); + assert!(id1 != id2); + + state.unregister_runner(id1).await; + assert_eq!(state.active_runner_count().await, 1); + + state.unregister_runner(id2).await; + assert_eq!(state.active_runner_count().await, 0); + } + + #[tokio::test] + async fn test_global_state_virtual_commands() { + let state = GlobalState::new(); + + assert!(state.are_virtual_commands_enabled()); + + state.disable_virtual_commands(); + assert!(!state.are_virtual_commands_enabled()); + + state.enable_virtual_commands(); + assert!(state.are_virtual_commands_enabled()); + } + + #[tokio::test] + async fn test_global_state_reset() { + let state = GlobalState::new(); + + // Modify state + state.enable_shell_option("errexit").await; + state.register_runner().await; + state.disable_virtual_commands(); + + // Reset + state.reset().await; + + // Verify reset + let settings = state.get_shell_settings().await; + assert!(!settings.errexit); + assert_eq!(state.active_runner_count().await, 0); + assert!(state.are_virtual_commands_enabled()); + } +} diff --git a/rust/src/stream.rs b/rust/src/stream.rs new file mode 100644 index 0000000..b5b7c18 --- /dev/null +++ b/rust/src/stream.rs @@ -0,0 +1,369 @@ +//! Streaming and async iteration support +//! +//! This module provides async streaming capabilities similar to JavaScript's +//! async iterators and stream handling in `$.stream-utils.mjs`. +//! +//! ## Usage +//! +//! ```rust,no_run +//! use command_stream::{StreamingRunner, OutputChunk}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let runner = StreamingRunner::new("yes hello"); +//! +//! // Stream output as it arrives +//! let mut stream = runner.stream(); +//! let mut count = 0; +//! while let Some(chunk) = stream.next().await { +//! match chunk { +//! OutputChunk::Stdout(data) => { +//! print!("{}", String::from_utf8_lossy(&data)); +//! count += 1; +//! if count >= 5 { +//! break; +//! } +//! } +//! OutputChunk::Stderr(data) => { +//! eprint!("{}", String::from_utf8_lossy(&data)); +//! } +//! OutputChunk::Exit(code) => { +//! println!("Process exited with code: {}", code); +//! break; +//! } +//! } +//! } +//! +//! Ok(()) +//! } +//! ``` + +use std::collections::HashMap; +use std::path::PathBuf; +use std::process::Stdio; +use tokio::io::BufReader; +use tokio::process::Command; +use tokio::sync::mpsc; + +use crate::trace::trace_lazy; +use crate::{CommandResult, Result}; + +/// A chunk of output from a streaming process +#[derive(Debug, Clone)] +pub enum OutputChunk { + /// Stdout data + Stdout(Vec), + /// Stderr data + Stderr(Vec), + /// Process exit code + Exit(i32), +} + +/// A streaming process runner that allows async iteration over output +pub struct StreamingRunner { + command: String, + cwd: Option, + env: Option>, + stdin_content: Option, +} + +impl StreamingRunner { + /// Create a new streaming runner + pub fn new(command: impl Into) -> Self { + StreamingRunner { + command: command.into(), + cwd: None, + env: None, + stdin_content: None, + } + } + + /// Set the working directory + pub fn cwd(mut self, path: impl Into) -> Self { + self.cwd = Some(path.into()); + self + } + + /// Set environment variables + pub fn env(mut self, env: HashMap) -> Self { + self.env = Some(env); + self + } + + /// Set stdin content + pub fn stdin(mut self, content: impl Into) -> Self { + self.stdin_content = Some(content.into()); + self + } + + /// Start the process and return a stream of output chunks + pub fn stream(mut self) -> OutputStream { + let (tx, rx) = mpsc::channel(1024); + + // Spawn the process handling task + let command = self.command.clone(); + let cwd = self.cwd.take(); + let env = self.env.take(); + let stdin_content = self.stdin_content.take(); + + tokio::spawn(async move { + if let Err(e) = run_streaming_process(command, cwd, env, stdin_content, tx.clone()).await { + trace_lazy("StreamingRunner", || format!("Error: {}", e)); + } + }); + + OutputStream { rx } + } + + /// Run to completion and collect all output + pub async fn collect(self) -> Result { + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + let mut exit_code = 0; + + let mut stream = self.stream(); + while let Some(chunk) = stream.rx.recv().await { + match chunk { + OutputChunk::Stdout(data) => stdout.extend(data), + OutputChunk::Stderr(data) => stderr.extend(data), + OutputChunk::Exit(code) => exit_code = code, + } + } + + Ok(CommandResult { + stdout: String::from_utf8_lossy(&stdout).to_string(), + stderr: String::from_utf8_lossy(&stderr).to_string(), + code: exit_code, + }) + } +} + +/// Stream of output chunks from a process +pub struct OutputStream { + rx: mpsc::Receiver, +} + +impl OutputStream { + /// Receive the next chunk + pub async fn next(&mut self) -> Option { + self.rx.recv().await + } + + /// Collect all remaining output into vectors + pub async fn collect(mut self) -> (Vec, Vec, i32) { + let mut stdout = Vec::new(); + let mut stderr = Vec::new(); + let mut exit_code = 0; + + while let Some(chunk) = self.rx.recv().await { + match chunk { + OutputChunk::Stdout(data) => stdout.extend(data), + OutputChunk::Stderr(data) => stderr.extend(data), + OutputChunk::Exit(code) => exit_code = code, + } + } + + (stdout, stderr, exit_code) + } + + /// Collect stdout only, discarding stderr + pub async fn collect_stdout(mut self) -> Vec { + let mut stdout = Vec::new(); + + while let Some(chunk) = self.rx.recv().await { + if let OutputChunk::Stdout(data) = chunk { + stdout.extend(data); + } + } + + stdout + } +} + +/// Run a streaming process and send output to the channel +async fn run_streaming_process( + command: String, + cwd: Option, + env: Option>, + stdin_content: Option, + tx: mpsc::Sender, +) -> Result<()> { + trace_lazy("StreamingRunner", || format!("Starting: {}", command)); + + let shell = find_available_shell(); + let mut cmd = Command::new(&shell.cmd); + for arg in &shell.args { + cmd.arg(arg); + } + cmd.arg(&command); + + // Configure stdio + if stdin_content.is_some() { + cmd.stdin(Stdio::piped()); + } else { + cmd.stdin(Stdio::null()); + } + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + // Set working directory + if let Some(ref cwd) = cwd { + cmd.current_dir(cwd); + } + + // Set environment + if let Some(ref env_vars) = env { + for (key, value) in env_vars { + cmd.env(key, value); + } + } + + // Spawn the process + let mut child = cmd.spawn()?; + + // Write stdin if needed + if let Some(content) = stdin_content { + if let Some(mut stdin) = child.stdin.take() { + use tokio::io::AsyncWriteExt; + let _ = stdin.write_all(content.as_bytes()).await; + let _ = stdin.shutdown().await; + } + } + + // Spawn stdout reader + let stdout = child.stdout.take(); + let tx_stdout = tx.clone(); + let stdout_handle = if let Some(stdout) = stdout { + Some(tokio::spawn(async move { + let mut reader = BufReader::new(stdout); + let mut buf = vec![0u8; 8192]; + loop { + use tokio::io::AsyncReadExt; + match reader.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx_stdout.send(OutputChunk::Stdout(buf[..n].to_vec())).await.is_err() { + break; + } + } + Err(_) => break, + } + } + })) + } else { + None + }; + + // Spawn stderr reader + let stderr = child.stderr.take(); + let tx_stderr = tx.clone(); + let stderr_handle = if let Some(stderr) = stderr { + Some(tokio::spawn(async move { + let mut reader = BufReader::new(stderr); + let mut buf = vec![0u8; 8192]; + loop { + use tokio::io::AsyncReadExt; + match reader.read(&mut buf).await { + Ok(0) => break, + Ok(n) => { + if tx_stderr.send(OutputChunk::Stderr(buf[..n].to_vec())).await.is_err() { + break; + } + } + Err(_) => break, + } + } + })) + } else { + None + }; + + // Wait for readers to complete + if let Some(handle) = stdout_handle { + let _ = handle.await; + } + if let Some(handle) = stderr_handle { + let _ = handle.await; + } + + // Wait for process to exit + let status = child.wait().await?; + let code = status.code().unwrap_or(-1); + + // Send exit code + let _ = tx.send(OutputChunk::Exit(code)).await; + + trace_lazy("StreamingRunner", || format!("Exited with code: {}", code)); + + Ok(()) +} + +/// Shell configuration +#[derive(Debug, Clone)] +struct ShellConfig { + cmd: String, + args: Vec, +} + +/// Find an available shell +fn find_available_shell() -> ShellConfig { + let is_windows = cfg!(windows); + + if is_windows { + ShellConfig { + cmd: "cmd.exe".to_string(), + args: vec!["/c".to_string()], + } + } else { + let shells = [ + ("/bin/sh", "-c"), + ("/usr/bin/sh", "-c"), + ("/bin/bash", "-c"), + ]; + + for (cmd, arg) in shells { + if std::path::Path::new(cmd).exists() { + return ShellConfig { + cmd: cmd.to_string(), + args: vec![arg.to_string()], + }; + } + } + + ShellConfig { + cmd: "/bin/sh".to_string(), + args: vec!["-c".to_string()], + } + } +} + +/// Async iterator trait for output streams +#[async_trait::async_trait] +pub trait AsyncIterator { + type Item; + + /// Get the next item from the iterator + async fn next(&mut self) -> Option; +} + +#[async_trait::async_trait] +impl AsyncIterator for OutputStream { + type Item = OutputChunk; + + async fn next(&mut self) -> Option { + self.rx.recv().await + } +} + +/// Extension trait to convert ProcessRunner into a stream +pub trait IntoStream { + /// Convert into an output stream + fn into_stream(self) -> OutputStream; +} + +impl IntoStream for crate::ProcessRunner { + fn into_stream(self) -> OutputStream { + let streaming = StreamingRunner::new(self.command().to_string()); + streaming.stream() + } +} diff --git a/rust/src/trace.rs b/rust/src/trace.rs new file mode 100644 index 0000000..7c78c30 --- /dev/null +++ b/rust/src/trace.rs @@ -0,0 +1,152 @@ +//! Trace/logging utilities for command-stream +//! +//! This module provides verbose logging functionality that can be controlled +//! via environment variables for debugging and development purposes. + +use std::env; + +/// Check if tracing is enabled via environment variables +/// +/// Tracing can be controlled via: +/// - COMMAND_STREAM_TRACE=true/false (explicit control) +/// - COMMAND_STREAM_VERBOSE=true (enables tracing unless TRACE=false) +pub fn is_trace_enabled() -> bool { + let trace_env = env::var("COMMAND_STREAM_TRACE").ok(); + let verbose_env = env::var("COMMAND_STREAM_VERBOSE") + .map(|v| v == "true") + .unwrap_or(false); + + match trace_env.as_deref() { + Some("false") => false, + Some("true") => true, + _ => verbose_env, + } +} + +/// Trace function for verbose logging +/// +/// Outputs trace messages to stderr when tracing is enabled. +/// Messages are prefixed with timestamp and category. +/// +/// # Examples +/// +/// ``` +/// use command_stream::trace::trace; +/// +/// trace("ProcessRunner", "Starting command execution"); +/// ``` +pub fn trace(category: &str, message: &str) { + if !is_trace_enabled() { + return; + } + + let timestamp = chrono::Utc::now().to_rfc3339(); + eprintln!("[TRACE {}] [{}] {}", timestamp, category, message); +} + +/// Trace function with lazy message evaluation +/// +/// Only evaluates the message function if tracing is enabled. +/// This is useful for expensive message formatting that should +/// be avoided when tracing is disabled. +/// +/// # Examples +/// +/// ``` +/// use command_stream::trace::trace_lazy; +/// +/// trace_lazy("ProcessRunner", || { +/// format!("Expensive computation result: {}", 42) +/// }); +/// ``` +pub fn trace_lazy(category: &str, message_fn: F) +where + F: FnOnce() -> String, +{ + if !is_trace_enabled() { + return; + } + + trace(category, &message_fn()); +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::sync::Mutex; + + // Use a mutex to serialize tests that modify environment variables + // This prevents race conditions when tests run in parallel + static ENV_MUTEX: Mutex<()> = Mutex::new(()); + + /// Helper to save and restore environment variables during tests + struct EnvGuard { + trace_value: Option, + verbose_value: Option, + } + + impl EnvGuard { + fn new() -> Self { + EnvGuard { + trace_value: env::var("COMMAND_STREAM_TRACE").ok(), + verbose_value: env::var("COMMAND_STREAM_VERBOSE").ok(), + } + } + } + + impl Drop for EnvGuard { + fn drop(&mut self) { + // Restore original values + match &self.trace_value { + Some(v) => env::set_var("COMMAND_STREAM_TRACE", v), + None => env::remove_var("COMMAND_STREAM_TRACE"), + } + match &self.verbose_value { + Some(v) => env::set_var("COMMAND_STREAM_VERBOSE", v), + None => env::remove_var("COMMAND_STREAM_VERBOSE"), + } + } + } + + #[test] + fn test_trace_disabled_by_default() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _guard = EnvGuard::new(); + + // Clear env vars to test default behavior + env::remove_var("COMMAND_STREAM_TRACE"); + env::remove_var("COMMAND_STREAM_VERBOSE"); + assert!(!is_trace_enabled()); + } + + #[test] + fn test_trace_enabled_by_verbose() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _guard = EnvGuard::new(); + + env::remove_var("COMMAND_STREAM_TRACE"); + env::set_var("COMMAND_STREAM_VERBOSE", "true"); + assert!(is_trace_enabled()); + } + + #[test] + fn test_trace_explicit_true() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _guard = EnvGuard::new(); + + env::remove_var("COMMAND_STREAM_VERBOSE"); + env::set_var("COMMAND_STREAM_TRACE", "true"); + assert!(is_trace_enabled()); + } + + #[test] + fn test_trace_explicit_false_overrides_verbose() { + let _lock = ENV_MUTEX.lock().unwrap(); + let _guard = EnvGuard::new(); + + env::set_var("COMMAND_STREAM_TRACE", "false"); + env::set_var("COMMAND_STREAM_VERBOSE", "true"); + assert!(!is_trace_enabled()); + } +} diff --git a/rust/src/utils.rs b/rust/src/utils.rs index 080f1a8..94e5dbb 100644 --- a/rust/src/utils.rs +++ b/rust/src/utils.rs @@ -1,55 +1,25 @@ //! Utility functions and types for command-stream //! -//! This module provides helper functions for tracing, error handling, -//! and common file system operations used by virtual commands. +//! This module provides helper functions for command results, virtual command +//! utilities, and re-exports from specialized utility modules. +//! +//! ## Module Organization +//! +//! The utilities are organized into focused modules following the same +//! modular pattern as the JavaScript implementation: +//! +//! - `trace` - Logging and tracing utilities +//! - `ansi` - ANSI escape code handling +//! - `quote` - Shell quoting utilities +//! - `utils` (this module) - Command results and virtual command helpers use std::env; use std::path::{Path, PathBuf}; -/// Check if tracing is enabled via environment variables -/// -/// Tracing can be controlled via: -/// - COMMAND_STREAM_TRACE=true/false (explicit control) -/// - COMMAND_STREAM_VERBOSE=true (enables tracing unless TRACE=false) -pub fn is_trace_enabled() -> bool { - let trace_env = env::var("COMMAND_STREAM_TRACE").ok(); - let verbose_env = env::var("COMMAND_STREAM_VERBOSE") - .map(|v| v == "true") - .unwrap_or(false); - - match trace_env.as_deref() { - Some("false") => false, - Some("true") => true, - _ => verbose_env, - } -} - -/// Trace function for verbose logging -/// -/// Outputs trace messages to stderr when tracing is enabled. -/// Messages are prefixed with timestamp and category. -pub fn trace(category: &str, message: &str) { - if !is_trace_enabled() { - return; - } - - let timestamp = chrono::Utc::now().to_rfc3339(); - eprintln!("[TRACE {}] [{}] {}", timestamp, category, message); -} - -/// Trace function with lazy message evaluation -/// -/// Only evaluates the message function if tracing is enabled. -pub fn trace_lazy(category: &str, message_fn: F) -where - F: FnOnce() -> String, -{ - if !is_trace_enabled() { - return; - } - - trace(category, &message_fn()); -} +// Re-export from specialized modules for backwards compatibility +pub use crate::trace::{is_trace_enabled, trace, trace_lazy}; +pub use crate::ansi::{AnsiConfig, AnsiUtils}; +pub use crate::quote::quote; /// Result type for virtual command operations #[derive(Debug, Clone)] @@ -167,101 +137,6 @@ impl VirtualUtils { } } -/// ANSI control character utilities -pub struct AnsiUtils; - -impl AnsiUtils { - /// Strip ANSI escape sequences from text - pub fn strip_ansi(text: &str) -> String { - let re = regex::Regex::new(r"\x1b\[[0-9;]*[mGKHFJ]").unwrap(); - re.replace_all(text, "").to_string() - } - - /// Strip control characters from text, preserving newlines, carriage returns, and tabs - pub fn strip_control_chars(text: &str) -> String { - text.chars() - .filter(|c| { - // Preserve newlines (\n = \x0A), carriage returns (\r = \x0D), and tabs (\t = \x09) - !matches!(*c as u32, - 0x00..=0x08 | 0x0B | 0x0C | 0x0E..=0x1F | 0x7F - ) - }) - .collect() - } - - /// Strip both ANSI sequences and control characters - pub fn strip_all(text: &str) -> String { - Self::strip_control_chars(&Self::strip_ansi(text)) - } - - /// Clean data for processing (strips ANSI and control chars) - pub fn clean_for_processing(data: &str) -> String { - Self::strip_all(data) - } -} - -/// Configuration for ANSI handling -#[derive(Debug, Clone)] -pub struct AnsiConfig { - pub preserve_ansi: bool, - pub preserve_control_chars: bool, -} - -impl Default for AnsiConfig { - fn default() -> Self { - AnsiConfig { - preserve_ansi: true, - preserve_control_chars: true, - } - } -} - -impl AnsiConfig { - /// Process output according to config settings - pub fn process_output(&self, data: &str) -> String { - if !self.preserve_ansi && !self.preserve_control_chars { - AnsiUtils::clean_for_processing(data) - } else if !self.preserve_ansi { - AnsiUtils::strip_ansi(data) - } else if !self.preserve_control_chars { - AnsiUtils::strip_control_chars(data) - } else { - data.to_string() - } - } -} - -/// Quote a value for safe shell usage -pub fn quote(value: &str) -> String { - if value.is_empty() { - return "''".to_string(); - } - - // If already properly quoted, check if we can use as-is - if value.starts_with('\'') && value.ends_with('\'') && value.len() >= 2 { - let inner = &value[1..value.len() - 1]; - if !inner.contains('\'') { - return value.to_string(); - } - } - - if value.starts_with('"') && value.ends_with('"') && value.len() > 2 { - // If already double-quoted, wrap in single quotes - return format!("'{}'", value); - } - - // Check if the string needs quoting at all - // Safe characters: alphanumeric, dash, underscore, dot, slash, colon, equals, comma, plus - let safe_pattern = regex::Regex::new(r"^[a-zA-Z0-9_\-./=,+@:]+$").unwrap(); - - if safe_pattern.is_match(value) { - return value.to_string(); - } - - // Default behavior: wrap in single quotes and escape any internal single quotes - format!("'{}'", value.replace('\'', "'\\''")) -} - #[cfg(test)] mod tests { use super::*; @@ -284,6 +159,13 @@ mod tests { assert_eq!(result.code, 1); } + #[test] + fn test_command_result_error_with_code() { + let result = CommandResult::error_with_code("permission denied", 126); + assert!(!result.is_success()); + assert_eq!(result.code, 126); + } + #[test] fn test_resolve_path_absolute() { let path = VirtualUtils::resolve_path("/absolute/path", None); @@ -298,38 +180,51 @@ mod tests { } #[test] - fn test_strip_ansi() { - let text = "\x1b[31mRed text\x1b[0m"; - assert_eq!(AnsiUtils::strip_ansi(text), "Red text"); + fn test_validate_args_success() { + let args = vec!["arg1".to_string()]; + assert!(VirtualUtils::validate_args(&args, 1, "cmd").is_none()); } #[test] - fn test_strip_control_chars() { - let text = "Hello\x00World\nNew line\tTab"; - assert_eq!(AnsiUtils::strip_control_chars(text), "HelloWorld\nNew line\tTab"); + fn test_validate_args_missing() { + let args = vec!["arg1".to_string()]; + let result = VirtualUtils::validate_args(&args, 2, "cmd"); + assert!(result.is_some()); } #[test] - fn test_quote_empty() { - assert_eq!(quote(""), "''"); + fn test_missing_operand_error() { + let result = VirtualUtils::missing_operand_error("cat"); + assert!(!result.is_success()); + assert!(result.stderr.contains("missing operand")); } #[test] - fn test_quote_safe_chars() { - assert_eq!(quote("hello"), "hello"); - assert_eq!(quote("/path/to/file"), "/path/to/file"); + fn test_invalid_argument_error() { + let result = VirtualUtils::invalid_argument_error("ls", "invalid option"); + assert!(!result.is_success()); + assert!(result.stderr.contains("invalid option")); } + // Re-exported module tests are in their respective modules + // These tests verify the re-exports work correctly + #[test] - fn test_quote_special_chars() { + fn test_reexported_quote() { + assert_eq!(quote("hello"), "hello"); assert_eq!(quote("hello world"), "'hello world'"); - assert_eq!(quote("it's"), "'it'\\''s'"); } #[test] - fn test_validate_args() { - let args = vec!["arg1".to_string()]; - assert!(VirtualUtils::validate_args(&args, 1, "cmd").is_none()); - assert!(VirtualUtils::validate_args(&args, 2, "cmd").is_some()); + fn test_reexported_ansi_utils() { + let text = "\x1b[31mRed text\x1b[0m"; + assert_eq!(AnsiUtils::strip_ansi(text), "Red text"); + } + + #[test] + fn test_reexported_ansi_config() { + let config = AnsiConfig::default(); + assert!(config.preserve_ansi); + assert!(config.preserve_control_chars); } } diff --git a/rust/tests/events.rs b/rust/tests/events.rs new file mode 100644 index 0000000..bef68c3 --- /dev/null +++ b/rust/tests/events.rs @@ -0,0 +1,207 @@ +//! Integration tests for the events module + +use command_stream::{EventData, EventType, StreamEmitter}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[tokio::test] +async fn test_stream_emitter_creation() { + let emitter = StreamEmitter::new(); + assert_eq!(emitter.listener_count(&EventType::Stdout).await, 0); +} + +#[tokio::test] +async fn test_stream_emitter_on_emit() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .on(EventType::Stdout, move |data| { + if let EventData::String(s) = data { + assert_eq!(s, "hello"); + counter_clone.fetch_add(1, Ordering::SeqCst); + } + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("hello".to_string())) + .await; + + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_stream_emitter_multiple_listeners() { + let emitter = StreamEmitter::new(); + let counter1 = Arc::new(AtomicUsize::new(0)); + let counter2 = Arc::new(AtomicUsize::new(0)); + let c1 = counter1.clone(); + let c2 = counter2.clone(); + + emitter + .on(EventType::Stdout, move |_| { + c1.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .on(EventType::Stdout, move |_| { + c2.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("test".to_string())) + .await; + + assert_eq!(counter1.load(Ordering::SeqCst), 1); + assert_eq!(counter2.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_stream_emitter_once() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .once(EventType::Exit, move |_| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + // Emit multiple times + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + + // Should only count once + assert_eq!(counter.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_stream_emitter_off() { + let emitter = StreamEmitter::new(); + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + emitter + .on(EventType::Stderr, move |_| { + counter_clone.fetch_add(1, Ordering::SeqCst); + }) + .await; + + assert_eq!(emitter.listener_count(&EventType::Stderr).await, 1); + + emitter.off(EventType::Stderr).await; + + assert_eq!(emitter.listener_count(&EventType::Stderr).await, 0); + + // Emit after off - should not trigger listener + emitter + .emit(EventType::Stderr, EventData::String("error".to_string())) + .await; + + assert_eq!(counter.load(Ordering::SeqCst), 0); +} + +#[tokio::test] +async fn test_stream_emitter_different_events() { + let emitter = StreamEmitter::new(); + let stdout_counter = Arc::new(AtomicUsize::new(0)); + let stderr_counter = Arc::new(AtomicUsize::new(0)); + let exit_counter = Arc::new(AtomicUsize::new(0)); + + let out = stdout_counter.clone(); + let err = stderr_counter.clone(); + let exit = exit_counter.clone(); + + emitter + .on(EventType::Stdout, move |_| { + out.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .on(EventType::Stderr, move |_| { + err.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .on(EventType::Exit, move |_| { + exit.fetch_add(1, Ordering::SeqCst); + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("out1".to_string())) + .await; + emitter + .emit(EventType::Stdout, EventData::String("out2".to_string())) + .await; + emitter + .emit(EventType::Stderr, EventData::String("err".to_string())) + .await; + emitter.emit(EventType::Exit, EventData::ExitCode(0)).await; + + assert_eq!(stdout_counter.load(Ordering::SeqCst), 2); + assert_eq!(stderr_counter.load(Ordering::SeqCst), 1); + assert_eq!(exit_counter.load(Ordering::SeqCst), 1); +} + +#[tokio::test] +async fn test_stream_emitter_remove_all_listeners() { + let emitter = StreamEmitter::new(); + + emitter.on(EventType::Stdout, |_| {}).await; + emitter.on(EventType::Stderr, |_| {}).await; + emitter.on(EventType::Exit, |_| {}).await; + + assert!(emitter.listener_count(&EventType::Stdout).await > 0); + assert!(emitter.listener_count(&EventType::Stderr).await > 0); + assert!(emitter.listener_count(&EventType::Exit).await > 0); + + emitter.remove_all_listeners().await; + + assert_eq!(emitter.listener_count(&EventType::Stdout).await, 0); + assert_eq!(emitter.listener_count(&EventType::Stderr).await, 0); + assert_eq!(emitter.listener_count(&EventType::Exit).await, 0); +} + +#[tokio::test] +async fn test_event_data_variants() { + let emitter = StreamEmitter::new(); + + let string_received = Arc::new(std::sync::Mutex::new(None)); + let code_received = Arc::new(std::sync::Mutex::new(None)); + + let str_clone = string_received.clone(); + let code_clone = code_received.clone(); + + emitter + .on(EventType::Stdout, move |data| { + if let EventData::String(s) = data { + *str_clone.lock().unwrap() = Some(s); + } + }) + .await; + + emitter + .on(EventType::Exit, move |data| { + if let EventData::ExitCode(code) = data { + *code_clone.lock().unwrap() = Some(code); + } + }) + .await; + + emitter + .emit(EventType::Stdout, EventData::String("test data".to_string())) + .await; + emitter.emit(EventType::Exit, EventData::ExitCode(42)).await; + + assert_eq!(*string_received.lock().unwrap(), Some("test data".to_string())); + assert_eq!(*code_received.lock().unwrap(), Some(42)); +} diff --git a/rust/tests/macros.rs b/rust/tests/macros.rs new file mode 100644 index 0000000..93071c2 --- /dev/null +++ b/rust/tests/macros.rs @@ -0,0 +1,77 @@ +//! Tests for the cmd! macro and its aliases (s!, sh!, cs!) + +use command_stream::{cmd, sh, s, cs}; + +#[tokio::test] +async fn test_cmd_macro_simple() { + let result = cmd!("echo hello").await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello")); +} + +#[tokio::test] +async fn test_cmd_macro_with_interpolation() { + let name = "world"; + let result = cmd!("echo hello {}", name).await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello")); + assert!(result.stdout.contains("world")); +} + +#[tokio::test] +async fn test_cmd_macro_with_multiple_interpolations() { + let greeting = "Hello"; + let name = "World"; + let result = cmd!("echo {} {}", greeting, name).await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("Hello")); + assert!(result.stdout.contains("World")); +} + +#[tokio::test] +async fn test_cmd_macro_with_special_chars() { + // Test that special characters are properly quoted + let filename = "test file with spaces.txt"; + let result = cmd!("echo {}", filename).await.unwrap(); + assert!(result.is_success()); + // The output should contain the filename (quoted in the command) + assert!(result.stdout.contains("test file with spaces.txt")); +} + +#[tokio::test] +async fn test_sh_macro_alias() { + let result = sh!("echo hello from sh").await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello from sh")); +} + +#[tokio::test] +async fn test_s_macro_alias() { + let result = s!("echo hello from s").await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello from s")); +} + +#[tokio::test] +async fn test_cs_macro_alias() { + let result = cs!("echo hello from cs").await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello from cs")); +} + +#[tokio::test] +async fn test_s_macro_with_interpolation() { + let name = "world"; + let result = s!("echo hello {}", name).await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("hello")); + assert!(result.stdout.contains("world")); +} + +#[tokio::test] +async fn test_cmd_macro_with_numbers() { + let count = 42; + let result = cmd!("echo The answer is {}", count).await.unwrap(); + assert!(result.is_success()); + assert!(result.stdout.contains("42")); +} diff --git a/rust/tests/pipeline.rs b/rust/tests/pipeline.rs new file mode 100644 index 0000000..f338004 --- /dev/null +++ b/rust/tests/pipeline.rs @@ -0,0 +1,93 @@ +//! Tests for the Pipeline module + +use command_stream::{Pipeline, PipelineExt, ProcessRunner, RunOptions}; + +#[tokio::test] +async fn test_pipeline_simple() { + let result = Pipeline::new() + .add("echo hello world") + .run() + .await + .unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("hello world")); +} + +#[tokio::test] +async fn test_pipeline_two_commands() { + let result = Pipeline::new() + .add("echo 'hello\nworld\nhello again'") + .add("grep hello") + .run() + .await + .unwrap(); + + assert!(result.is_success()); + // The grep should filter to only lines containing "hello" + assert!(result.stdout.contains("hello")); +} + +#[tokio::test] +async fn test_pipeline_with_stdin() { + let result = Pipeline::new() + .stdin("line1\nline2\nline3") + .add("cat") + .run() + .await + .unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("line1")); + assert!(result.stdout.contains("line2")); + assert!(result.stdout.contains("line3")); +} + +#[tokio::test] +async fn test_pipeline_three_commands() { + let result = Pipeline::new() + .add("echo 'apple\nbanana\napricot\nblueberry'") + .add("grep a") + .add("wc -l") + .run() + .await + .unwrap(); + + assert!(result.is_success()); + // Should count lines containing 'a': apple, banana, apricot = 3 lines +} + +#[tokio::test] +async fn test_pipeline_empty() { + let result = Pipeline::new().run().await.unwrap(); + + // Empty pipeline should return error + assert!(!result.is_success()); + assert!(result.stderr.contains("No commands")); +} + +#[tokio::test] +async fn test_pipeline_failure_propagation() { + let result = Pipeline::new() + .add("echo hello") + .add("false") // This command always fails + .add("echo should not reach here") + .run() + .await + .unwrap(); + + // Pipeline should fail because 'false' returns non-zero + assert!(!result.is_success()); +} + +#[tokio::test] +async fn test_pipeline_builder_pattern() { + // Test the fluent API + let pipeline = Pipeline::new() + .add("echo test") + .mirror_output(false) + .capture_output(true); + + let result = pipeline.run().await.unwrap(); + assert!(result.is_success()); +} diff --git a/rust/tests/state.rs b/rust/tests/state.rs new file mode 100644 index 0000000..86a61c0 --- /dev/null +++ b/rust/tests/state.rs @@ -0,0 +1,207 @@ +//! Integration tests for the state module + +use command_stream::state::{GlobalState, ShellSettings}; + +#[test] +fn test_shell_settings_default() { + let settings = ShellSettings::new(); + assert!(!settings.errexit); + assert!(!settings.verbose); + assert!(!settings.xtrace); + assert!(!settings.pipefail); + assert!(!settings.nounset); + assert!(!settings.noglob); + assert!(!settings.allexport); +} + +#[test] +fn test_shell_settings_set_short_flags() { + let mut settings = ShellSettings::new(); + + settings.set("e", true); + assert!(settings.errexit); + + settings.set("v", true); + assert!(settings.verbose); + + settings.set("x", true); + assert!(settings.xtrace); + + settings.set("u", true); + assert!(settings.nounset); + + settings.set("f", true); + assert!(settings.noglob); + + settings.set("a", true); + assert!(settings.allexport); +} + +#[test] +fn test_shell_settings_set_long_names() { + let mut settings = ShellSettings::new(); + + settings.set("errexit", true); + assert!(settings.errexit); + + settings.set("verbose", true); + assert!(settings.verbose); + + settings.set("xtrace", true); + assert!(settings.xtrace); + + settings.set("nounset", true); + assert!(settings.nounset); + + settings.set("pipefail", true); + assert!(settings.pipefail); +} + +#[test] +fn test_shell_settings_enable_disable() { + let mut settings = ShellSettings::new(); + + settings.enable("errexit"); + assert!(settings.errexit); + + settings.disable("errexit"); + assert!(!settings.errexit); +} + +#[test] +fn test_shell_settings_reset() { + let mut settings = ShellSettings::new(); + + settings.enable("errexit"); + settings.enable("verbose"); + settings.enable("pipefail"); + + settings.reset(); + + assert!(!settings.errexit); + assert!(!settings.verbose); + assert!(!settings.pipefail); +} + +#[tokio::test] +async fn test_global_state_shell_settings() { + let state = GlobalState::new(); + + // Default settings + let settings = state.get_shell_settings().await; + assert!(!settings.errexit); + + // Enable option + state.enable_shell_option("errexit").await; + let settings = state.get_shell_settings().await; + assert!(settings.errexit); + + // Disable option + state.disable_shell_option("errexit").await; + let settings = state.get_shell_settings().await; + assert!(!settings.errexit); +} + +#[tokio::test] +async fn test_global_state_runner_registration() { + let state = GlobalState::new(); + + assert_eq!(state.active_runner_count().await, 0); + + let id1 = state.register_runner().await; + assert_eq!(state.active_runner_count().await, 1); + + let id2 = state.register_runner().await; + assert_eq!(state.active_runner_count().await, 2); + + // IDs should be unique + assert!(id1 != id2); + + state.unregister_runner(id1).await; + assert_eq!(state.active_runner_count().await, 1); + + state.unregister_runner(id2).await; + assert_eq!(state.active_runner_count().await, 0); +} + +#[tokio::test] +async fn test_global_state_virtual_commands() { + let state = GlobalState::new(); + + // Enabled by default + assert!(state.are_virtual_commands_enabled()); + + state.disable_virtual_commands(); + assert!(!state.are_virtual_commands_enabled()); + + state.enable_virtual_commands(); + assert!(state.are_virtual_commands_enabled()); +} + +#[tokio::test] +async fn test_global_state_signal_handlers() { + let state = GlobalState::new(); + + // Not installed by default + assert!(!state.are_signal_handlers_installed()); + + state.set_signal_handlers_installed(true); + assert!(state.are_signal_handlers_installed()); + + state.set_signal_handlers_installed(false); + assert!(!state.are_signal_handlers_installed()); +} + +#[tokio::test] +async fn test_global_state_reset() { + let state = GlobalState::new(); + + // Modify state + state.enable_shell_option("errexit").await; + state.enable_shell_option("pipefail").await; + state.register_runner().await; + state.register_runner().await; + state.disable_virtual_commands(); + + // Reset + state.reset().await; + + // Verify reset + let settings = state.get_shell_settings().await; + assert!(!settings.errexit); + assert!(!settings.pipefail); + assert_eq!(state.active_runner_count().await, 0); + assert!(state.are_virtual_commands_enabled()); +} + +#[tokio::test] +async fn test_global_state_with_shell_settings() { + let state = GlobalState::new(); + + state + .with_shell_settings(|settings| { + settings.errexit = true; + settings.verbose = true; + }) + .await; + + let settings = state.get_shell_settings().await; + assert!(settings.errexit); + assert!(settings.verbose); +} + +#[tokio::test] +async fn test_global_state_initial_cwd() { + let state = GlobalState::new(); + + let cwd = state.get_initial_cwd().await; + // Should have an initial cwd + assert!(cwd.is_some()); +} + +#[test] +fn test_global_state_default() { + let state = GlobalState::default(); + assert!(state.are_virtual_commands_enabled()); + assert!(!state.are_signal_handlers_installed()); +} diff --git a/rust/tests/stream.rs b/rust/tests/stream.rs new file mode 100644 index 0000000..4a19c00 --- /dev/null +++ b/rust/tests/stream.rs @@ -0,0 +1,102 @@ +//! Tests for the streaming module + +use command_stream::{StreamingRunner, OutputChunk, AsyncIterator}; + +#[tokio::test] +async fn test_streaming_runner_basic() { + let runner = StreamingRunner::new("echo hello world"); + let result = runner.collect().await.unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("hello world")); +} + +#[tokio::test] +async fn test_streaming_runner_with_stdin() { + let runner = StreamingRunner::new("cat") + .stdin("test input"); + let result = runner.collect().await.unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("test input")); +} + +#[tokio::test] +async fn test_output_stream_chunks() { + let runner = StreamingRunner::new("echo chunk1 && echo chunk2"); + let mut stream = runner.stream(); + + let mut stdout_chunks = Vec::new(); + let mut exit_code = None; + + while let Some(chunk) = stream.next().await { + match chunk { + OutputChunk::Stdout(data) => { + stdout_chunks.push(String::from_utf8_lossy(&data).to_string()); + } + OutputChunk::Stderr(_) => {} + OutputChunk::Exit(code) => { + exit_code = Some(code); + } + } + } + + assert!(exit_code.is_some()); + assert_eq!(exit_code.unwrap(), 0); + let combined: String = stdout_chunks.join(""); + assert!(combined.contains("chunk1")); + assert!(combined.contains("chunk2")); +} + +#[tokio::test] +async fn test_streaming_collect_stdout() { + let runner = StreamingRunner::new("echo stdout only"); + let stream = runner.stream(); + + let stdout = stream.collect_stdout().await; + let stdout_str = String::from_utf8_lossy(&stdout); + + assert!(stdout_str.contains("stdout only")); +} + +#[tokio::test] +async fn test_streaming_stderr() { + // Using sh -c to redirect to stderr + let runner = StreamingRunner::new("sh -c 'echo error message >&2'"); + let result = runner.collect().await.unwrap(); + + assert!(result.stderr.contains("error message")); +} + +#[tokio::test] +async fn test_streaming_exit_code() { + let runner = StreamingRunner::new("exit 42"); + let result = runner.collect().await.unwrap(); + + assert_eq!(result.code, 42); +} + +#[tokio::test] +async fn test_streaming_runner_cwd() { + let runner = StreamingRunner::new("pwd") + .cwd("/tmp"); + let result = runner.collect().await.unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("/tmp")); +} + +#[tokio::test] +async fn test_streaming_runner_env() { + use std::collections::HashMap; + + let mut env = HashMap::new(); + env.insert("TEST_VAR".to_string(), "test_value".to_string()); + + let runner = StreamingRunner::new("sh -c 'echo $TEST_VAR'") + .env(env); + let result = runner.collect().await.unwrap(); + + assert!(result.is_success()); + assert!(result.stdout.contains("test_value")); +}