Skip to content

Commit e5eda58

Browse files
dugshubclaude
andcommitted
feat(security): add command injection prevention to SubprocessExecutor (CLI-6 Priority 1)
Implements CLI-6 Priority 1 (CRITICAL): Command injection prevention. Changes to SubprocessExecutor: - Uses create_subprocess_exec() by default (safe mode) - Added allow_shell_features parameter (default: False) - Commands parsed with shlex.split() for safe execution - Security warning logged when shell features enabled - Invalid shell syntax caught and reported gracefully - Empty command detection with clear error messages Security Model: - Default: Shell disabled, commands executed directly - Opt-in: allow_shell_features=True enables shell interpretation - Shell metacharacters treated as literals in safe mode - Prevents all command injection attack vectors Breaking Change: Commands now execute without shell by default. Migration: # Before (VULNERABLE) await executor.run("echo test | grep foo") # After (safe - literal pipe character) await executor.run("echo test | grep foo") # | is literal # Or opt-in to shell features (trusted commands only) await executor.run("echo test | grep foo", allow_shell_features=True) Tests Added: - 15 command injection unit tests (test_command_injection.py) - 13 security integration tests (test_subprocess_security.py) - Updated 8 existing subprocess executor tests Test Coverage: - Command chaining blocked (;, &, &&) - Pipe operations blocked (|) - Command substitution blocked ($(), backticks) - Redirection blocked (<, >) - Quoted arguments handled safely - Invalid syntax handled gracefully All 782 tests passing. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 0092238 commit e5eda58

File tree

4 files changed

+394
-29
lines changed

4 files changed

+394
-29
lines changed

src/cli_patterns/execution/subprocess_executor.py

Lines changed: 62 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
from __future__ import annotations
1414

1515
import asyncio
16+
import logging
1617
import os
18+
import shlex
1719
from typing import Optional, Union
1820

1921
from rich.console import Console
@@ -23,6 +25,8 @@
2325
from ..ui.design.registry import theme_registry
2426
from ..ui.design.tokens import StatusToken
2527

28+
logger = logging.getLogger(__name__)
29+
2630

2731
class CommandResult:
2832
"""Result of a command execution."""
@@ -83,6 +87,7 @@ async def run(
8387
timeout: Optional[float] = None,
8488
cwd: Optional[str] = None,
8589
env: Optional[dict[str, str]] = None,
90+
allow_shell_features: bool = False,
8691
) -> CommandResult:
8792
"""Execute a command asynchronously with themed output streaming.
8893
@@ -91,22 +96,40 @@ async def run(
9196
timeout: Command timeout in seconds (uses default if None)
9297
cwd: Working directory for the command
9398
env: Environment variables for the command
99+
allow_shell_features: Allow shell features (pipes, redirects, etc.).
100+
SECURITY WARNING: Only enable for trusted commands. When False,
101+
command is executed without shell to prevent injection attacks.
94102
95103
Returns:
96104
CommandResult with exit code and captured output
97105
"""
98106
timeout = timeout or self.default_timeout
99107

100-
# Show running status
101-
if self.stream_output:
102-
running_style = theme_registry.resolve(StatusToken.RUNNING)
103-
self.console.print(Text(f"Running: {command}", style=running_style))
104-
105-
# Prepare command
108+
# Prepare command list for display and execution
106109
if isinstance(command, list):
110+
command_list = command
107111
command_str = " ".join(command)
108112
else:
109113
command_str = command
114+
# Parse string into list for safe execution
115+
try:
116+
command_list = shlex.split(command_str)
117+
except ValueError as e:
118+
# Invalid shell syntax
119+
stderr_msg = f"Invalid command syntax: {e}"
120+
if self.stream_output:
121+
error_style = theme_registry.resolve(StatusToken.ERROR)
122+
self.console.print(Text(stderr_msg, style=error_style))
123+
return CommandResult(
124+
exit_code=-1,
125+
stdout="",
126+
stderr=stderr_msg,
127+
)
128+
129+
# Show running status
130+
if self.stream_output:
131+
running_style = theme_registry.resolve(StatusToken.RUNNING)
132+
self.console.print(Text(f"Running: {command_str}", style=running_style))
110133

111134
# Merge environment variables
112135
process_env = os.environ.copy()
@@ -122,14 +145,39 @@ async def run(
122145
process = None # Initialize process variable
123146

124147
try:
125-
# Create subprocess
126-
process = await asyncio.create_subprocess_shell(
127-
command_str,
128-
stdout=asyncio.subprocess.PIPE,
129-
stderr=asyncio.subprocess.PIPE,
130-
cwd=cwd,
131-
env=process_env,
132-
)
148+
# Create subprocess - use shell only if explicitly allowed
149+
if allow_shell_features:
150+
# SECURITY WARNING: Shell features enabled
151+
logger.warning(
152+
f"Executing command with shell features enabled: {command_str}"
153+
)
154+
process = await asyncio.create_subprocess_shell(
155+
command_str,
156+
stdout=asyncio.subprocess.PIPE,
157+
stderr=asyncio.subprocess.PIPE,
158+
cwd=cwd,
159+
env=process_env,
160+
)
161+
else:
162+
# Safe execution without shell (prevents injection)
163+
if not command_list:
164+
stderr_msg = "Empty command"
165+
if self.stream_output:
166+
error_style = theme_registry.resolve(StatusToken.ERROR)
167+
self.console.print(Text(stderr_msg, style=error_style))
168+
return CommandResult(
169+
exit_code=-1,
170+
stdout="",
171+
stderr=stderr_msg,
172+
)
173+
174+
process = await asyncio.create_subprocess_exec(
175+
*command_list,
176+
stdout=asyncio.subprocess.PIPE,
177+
stderr=asyncio.subprocess.PIPE,
178+
cwd=cwd,
179+
env=process_env,
180+
)
133181

134182
# Create tasks for reading streams
135183
stdout_task = asyncio.create_task(
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
"""Integration tests for subprocess security features.
2+
3+
This module tests the security features of SubprocessExecutor, including
4+
command injection prevention through the allow_shell_features flag.
5+
"""
6+
7+
import pytest
8+
9+
from cli_patterns.execution.subprocess_executor import SubprocessExecutor
10+
11+
12+
@pytest.mark.asyncio
13+
@pytest.mark.integration
14+
class TestSubprocessSecurity:
15+
"""Test security features of SubprocessExecutor."""
16+
17+
async def test_safe_command_without_shell(self) -> None:
18+
"""Should execute safe command without shell features."""
19+
executor = SubprocessExecutor(stream_output=False)
20+
result = await executor.run("echo hello", allow_shell_features=False)
21+
22+
assert result.success
23+
assert "hello" in result.stdout
24+
25+
async def test_blocks_command_injection_without_shell(self) -> None:
26+
"""Should prevent command injection when shell features disabled."""
27+
executor = SubprocessExecutor(stream_output=False)
28+
29+
# This should fail because semicolon will be treated as literal argument
30+
# The command "echo" will receive "test;whoami" as a single argument
31+
result = await executor.run("echo test;whoami", allow_shell_features=False)
32+
33+
# Should succeed (echo accepts the argument)
34+
assert result.success
35+
# But semicolon should be in the output as a literal character
36+
assert "test;whoami" in result.stdout or ";" in result.stdout
37+
38+
async def test_allows_shell_features_when_enabled(self) -> None:
39+
"""Should allow shell features when explicitly enabled."""
40+
executor = SubprocessExecutor(stream_output=False)
41+
42+
# This should work with shell features enabled
43+
result = await executor.run(
44+
"echo hello && echo world", allow_shell_features=True
45+
)
46+
47+
assert result.success
48+
assert "hello" in result.stdout
49+
assert "world" in result.stdout
50+
51+
async def test_pipe_fails_without_shell(self) -> None:
52+
"""Should not support pipes when shell features disabled."""
53+
executor = SubprocessExecutor(stream_output=False)
54+
55+
# Pipe should not work without shell
56+
result = await executor.run("echo test | cat", allow_shell_features=False)
57+
58+
# The command will fail because "|" will be treated as a literal argument
59+
# and echo doesn't accept that as a valid option
60+
assert not result.success or "|" in result.stdout
61+
62+
async def test_pipe_works_with_shell(self) -> None:
63+
"""Should support pipes when shell features enabled."""
64+
executor = SubprocessExecutor(stream_output=False)
65+
66+
# Pipe should work with shell enabled
67+
result = await executor.run("echo test | cat", allow_shell_features=True)
68+
69+
assert result.success
70+
assert "test" in result.stdout
71+
72+
async def test_command_substitution_fails_without_shell(self) -> None:
73+
"""Should not execute command substitution without shell."""
74+
executor = SubprocessExecutor(stream_output=False)
75+
76+
# Command substitution should not work
77+
result = await executor.run("echo $(whoami)", allow_shell_features=False)
78+
79+
# Should treat $() as literal text
80+
assert not result.success or "$" in result.stdout or "(" in result.stdout
81+
82+
async def test_command_substitution_works_with_shell(self) -> None:
83+
"""Should execute command substitution with shell features."""
84+
executor = SubprocessExecutor(stream_output=False)
85+
86+
# Command substitution should work with shell
87+
result = await executor.run("echo $(echo test)", allow_shell_features=True)
88+
89+
assert result.success
90+
assert "test" in result.stdout
91+
92+
async def test_redirection_fails_without_shell(self) -> None:
93+
"""Should not support redirection without shell features."""
94+
executor = SubprocessExecutor(stream_output=False)
95+
96+
# Redirection should not work
97+
result = await executor.run(
98+
"echo test > /tmp/test_output", allow_shell_features=False
99+
)
100+
101+
# Should treat > as literal argument
102+
assert not result.success or ">" in result.stdout
103+
104+
async def test_handles_commands_with_arguments(self) -> None:
105+
"""Should handle commands with normal arguments safely."""
106+
executor = SubprocessExecutor(stream_output=False)
107+
108+
result = await executor.run("echo -n hello world", allow_shell_features=False)
109+
110+
assert result.success
111+
assert "hello world" in result.stdout or "helloworld" in result.stdout
112+
113+
async def test_handles_quoted_arguments(self) -> None:
114+
"""Should handle quoted arguments correctly."""
115+
executor = SubprocessExecutor(stream_output=False)
116+
117+
result = await executor.run('echo "hello world"', allow_shell_features=False)
118+
119+
assert result.success
120+
assert "hello world" in result.stdout
121+
122+
async def test_default_is_safe_mode(self) -> None:
123+
"""Should default to safe mode (shell features disabled)."""
124+
executor = SubprocessExecutor(stream_output=False)
125+
126+
# Without specifying allow_shell_features, should default to False
127+
result = await executor.run("echo test")
128+
129+
assert result.success
130+
assert "test" in result.stdout
131+
132+
async def test_invalid_command_syntax(self) -> None:
133+
"""Should handle invalid shell syntax gracefully."""
134+
executor = SubprocessExecutor(stream_output=False)
135+
136+
# Unmatched quotes should fail in shlex.split
137+
result = await executor.run('echo "unterminated', allow_shell_features=False)
138+
139+
assert not result.success
140+
assert "Invalid command syntax" in result.stderr
141+
142+
async def test_command_not_found(self) -> None:
143+
"""Should handle command not found gracefully."""
144+
executor = SubprocessExecutor(stream_output=False)
145+
146+
result = await executor.run(
147+
"nonexistent_command_xyz", allow_shell_features=False
148+
)
149+
150+
assert not result.success
151+
assert result.exit_code != 0

0 commit comments

Comments
 (0)