Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 182 additions & 22 deletions backend/services/input_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
Input Validation & Sanitization
Prevents malicious inputs and abuse
"""
from typing import Optional
from typing import Optional, Set
from urllib.parse import urlparse
from pathlib import Path, PurePosixPath
import re
import os
import ipaddress
import socket


class InputValidator:
Expand All @@ -15,6 +17,31 @@ class InputValidator:
# Allowed Git URL schemes
ALLOWED_SCHEMES = {'http', 'https', 'git', 'ssh'}

# Allowed Git hosts (configurable via ALLOWED_GIT_HOSTS env var)
# Default: major public Git hosting providers only
DEFAULT_ALLOWED_HOSTS = {
'github.com',
'gitlab.com',
'bitbucket.org',
'codeberg.org',
'sr.ht', # sourcehut
}

# Shell metacharacters that could enable command injection
# These should NEVER appear in a legitimate Git URL
COMMAND_INJECTION_CHARS = {
';', # Command separator
'&&', # AND operator
'||', # OR operator
'|', # Pipe
'`', # Backtick execution
'$(', # Subshell
'${', # Variable expansion
'\n', # Newline
'\r', # Carriage return
'\x00', # Null byte
}

# Dangerous path patterns
DANGEROUS_PATTERNS = [
'..', # Path traversal
Expand All @@ -30,43 +57,176 @@ class InputValidator:
MAX_QUERY_LENGTH = 500
MAX_FILE_PATH_LENGTH = 500
MAX_REPO_NAME_LENGTH = 100
MAX_GIT_URL_LENGTH = 500
MAX_FUNCTIONS_PER_REPO = 50000 # Prevent indexing chromium-sized repos
MAX_REPOS_PER_USER = 50

@staticmethod
def _get_allowed_hosts() -> Set[str]:
"""Get allowed Git hosts from environment or use defaults."""
env_hosts = os.environ.get('ALLOWED_GIT_HOSTS', '')
if env_hosts:
# Parse comma-separated list from env
return {h.strip().lower() for h in env_hosts.split(',') if h.strip()}
return InputValidator.DEFAULT_ALLOWED_HOSTS

@staticmethod
def _contains_injection_chars(url: str) -> Optional[str]:
"""Check if URL contains shell injection characters."""
for char in InputValidator.COMMAND_INJECTION_CHARS:
if char in url:
return f"URL contains forbidden character: {repr(char)}"
return None

@staticmethod
def _is_private_ip(hostname: str) -> bool:
"""
Check if hostname resolves to a private/reserved IP address.
Prevents SSRF attacks targeting internal networks.
"""
# Direct IP check first
try:
ip = ipaddress.ip_address(hostname)
return (
ip.is_private or
ip.is_loopback or
ip.is_link_local or
ip.is_multicast or
ip.is_reserved or
ip.is_unspecified
)
except ValueError:
pass # Not a direct IP, continue with hostname check

# Known dangerous hostnames
dangerous_hostnames = {
'localhost',
'localhost.localdomain',
'ip6-localhost',
'ip6-loopback',
}
if hostname.lower() in dangerous_hostnames:
return True

# Check for AWS/cloud metadata endpoints
metadata_hosts = {
'169.254.169.254', # AWS/GCP metadata
'metadata.google.internal',
'metadata.internal',
}
if hostname.lower() in metadata_hosts:
return True

# Try to resolve hostname and check resulting IP
# Only do this as a final check - don't want to slow down validation
try:
resolved_ip = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(resolved_ip)
return (
ip.is_private or
ip.is_loopback or
ip.is_link_local or
ip.is_reserved
)
except (socket.gaierror, socket.herror, ValueError):
# Can't resolve - will fail at clone anyway
# Don't block, let git handle it
pass

return False

@staticmethod
def _extract_host_from_ssh_url(ssh_url: str) -> Optional[str]:
"""Extract hostname from SSH URL format (git@host:user/repo.git)."""
# Format: git@github.com:user/repo.git
if not ssh_url.startswith('git@'):
return None

# Split off 'git@' and get the host part before ':'
remainder = ssh_url[4:] # Remove 'git@'
if ':' not in remainder:
return None

host = remainder.split(':')[0].lower()
return host

@staticmethod
def validate_git_url(git_url: str) -> tuple[bool, Optional[str]]:
"""Validate Git URL is safe"""
if not git_url or len(git_url) > 500:
return False, "Git URL too long or empty"
"""
Validate Git URL is safe to clone.

Security checks:
1. Length limits
2. Command injection character detection
3. Scheme validation (https preferred)
4. Host allowlist (github, gitlab, bitbucket by default)
5. Private IP / SSRF prevention
6. URL format validation
"""
# Check length
if not git_url:
return False, "Git URL cannot be empty"
if len(git_url) > InputValidator.MAX_GIT_URL_LENGTH:
return False, f"Git URL too long (max {InputValidator.MAX_GIT_URL_LENGTH} characters)"

# CRITICAL: Check for command injection characters FIRST
# This must happen before any parsing
injection_error = InputValidator._contains_injection_chars(git_url)
if injection_error:
return False, f"Invalid Git URL: {injection_error}"

allowed_hosts = InputValidator._get_allowed_hosts()

try:
# Handle SSH URLs (git@github.com:user/repo.git)
if git_url.startswith('git@'):
# SSH URL format - basic validation
if ':' not in git_url or '/' not in git_url:
return False, "Invalid SSH URL format"
# Block localhost/private
if any(host in git_url.lower() for host in ['localhost', '127.0.0.1', '0.0.0.0']):
return False, "Private/local URLs are not allowed"
host = InputValidator._extract_host_from_ssh_url(git_url)
if not host:
return False, "Invalid SSH URL format. Expected: git@host:owner/repo.git"

# Check against allowlist
if host not in allowed_hosts:
return False, f"Host '{host}' not in allowed list. Allowed: {', '.join(sorted(allowed_hosts))}"

# Validate format: git@host:owner/repo[.git]
ssh_pattern = r'^git@[\w.-]+:[\w.-]+/[\w.-]+(?:\.git)?$'
if not re.match(ssh_pattern, git_url):
return False, "Invalid SSH URL format. Expected: git@host:owner/repo.git"

return True, None

# Parse HTTP(S) URLs
parsed = urlparse(git_url)

# Check scheme
if parsed.scheme not in InputValidator.ALLOWED_SCHEMES:
return False, f"Invalid URL scheme. Allowed: {InputValidator.ALLOWED_SCHEMES}"
# Check scheme - prefer HTTPS
if parsed.scheme not in {'http', 'https'}:
return False, f"Invalid URL scheme '{parsed.scheme}'. Only http and https are allowed for clone URLs"

# Must have a hostname
if not parsed.netloc:
return False, "Invalid URL: missing hostname"

# Extract hostname (remove port if present)
hostname = parsed.netloc.split(':')[0].lower()

# Check against allowlist
if hostname not in allowed_hosts:
return False, f"Host '{hostname}' not in allowed list. Allowed: {', '.join(sorted(allowed_hosts))}"

# Prevent file:// and other dangerous schemes
if parsed.scheme in {'file', 'ftp', 'data'}:
return False, "File and FTP URLs are not allowed"
# Check for private IP / SSRF
if InputValidator._is_private_ip(hostname):
return False, "Private/internal network URLs are not allowed"

# Must have a hostname for http/https
if parsed.scheme in {'http', 'https'} and not parsed.netloc:
return False, "Invalid URL format"
# Validate URL path format: /owner/repo[.git]
# Must have at least owner and repo
path = parsed.path
if not path or path == '/':
return False, "Invalid repository URL: missing owner/repo path"

# Prevent localhost/private IPs (basic check)
if any(host in parsed.netloc.lower() for host in ['localhost', '127.0.0.1', '0.0.0.0', '169.254']):
return False, "Private/local URLs are not allowed"
# Path should be /owner/repo or /owner/repo.git
path_pattern = r'^/[\w.-]+/[\w.-]+(?:\.git)?(?:/.*)?$'
if not re.match(path_pattern, path):
return False, "Invalid repository URL format. Expected: https://host/owner/repo"

return True, None

Expand Down
Loading