diff --git a/src/specify_cli/extensions/__init__.py b/src/specify_cli/extensions/__init__.py index 8dd1b10554..e8d90ecf3a 100644 --- a/src/specify_cli/extensions/__init__.py +++ b/src/specify_cli/extensions/__init__.py @@ -49,6 +49,28 @@ } ) EXTENSION_COMMAND_NAME_PATTERN = re.compile(r"^speckit\.([a-z0-9-]+)\.([a-z0-9-]+)$") +EXTENSION_ID_PATTERN = re.compile(r"^[a-z0-9-]+$") + +# Characters allowed verbatim in a catalog-provided version when it is used to +# build a download filename. Anything else (path separators, "..", control +# chars, whitespace) is collapsed to "-" so an untrusted version cannot inject +# a nested path or an unwritable filename — we sanitize rather than rely on the +# post-hoc Path.resolve() containment check as the primary guard. +_UNSAFE_VERSION_TOKEN_PATTERN = re.compile(r"[^A-Za-z0-9._-]+") + + +def _safe_version_token(version: Any, fallback: str = "unknown") -> str: + """Reduce a catalog ``version`` to a filename-safe token. + + Non-strings (JSON null, numbers) and blank values degrade to ``fallback``. + Path separators and traversal tokens are stripped so the result can never + span directories or escape the download target. + """ + if not isinstance(version, str): + return fallback + token = _UNSAFE_VERSION_TOKEN_PATTERN.sub("-", version).strip("-.") + return token or fallback + VALID_EFFECTS = frozenset({"read-only", "read-write"}) @@ -217,7 +239,7 @@ def _validate(self): raise ValidationError(f"Missing extension.{field}") # Validate extension ID format - if not re.match(r"^[a-z0-9-]+$", ext["id"]): + if not EXTENSION_ID_PATTERN.match(ext["id"]): raise ValidationError( f"Invalid extension ID '{ext['id']}': " "must be lowercase alphanumeric with hyphens only" @@ -2576,6 +2598,14 @@ def download_extension( """ import urllib.error + if not isinstance(extension_id, str) or not EXTENSION_ID_PATTERN.match( + extension_id + ): + raise ExtensionError( + f"Invalid extension ID '{extension_id}': " + "must be lowercase alphanumeric with hyphens only" + ) + # Get extension info from catalog ext_info = self.get_extension_info(extension_id) if not ext_info: @@ -2608,9 +2638,13 @@ def download_extension( target_dir = self.cache_dir / "downloads" target_dir.mkdir(parents=True, exist_ok=True) - version = ext_info.get("version", "unknown") + version = _safe_version_token(ext_info.get("version")) zip_filename = f"{extension_id}-{version}.zip" zip_path = target_dir / zip_filename + try: + zip_path.resolve().relative_to(target_dir.resolve()) + except ValueError as e: + raise ExtensionError(f"Refusing unsafe extension ZIP path: {zip_filename}") from e extra_headers = None resolved_download_url = self._resolve_github_release_asset_api_url(download_url) diff --git a/src/specify_cli/extensions/_commands.py b/src/specify_cli/extensions/_commands.py index 6821419b30..a1168630aa 100644 --- a/src/specify_cli/extensions/_commands.py +++ b/src/specify_cli/extensions/_commands.py @@ -38,6 +38,30 @@ extension_app.add_typer(catalog_app, name="catalog") +def _catalog_str(ext: dict, key: str, fallback: str = "") -> str: + """Read a catalog string field, treating JSON null / non-strings / blanks + as missing so they fall back instead of rendering as the literal "None". + + Catalog entries are untrusted (remote/community catalogs); ``dict.get(key, + default)`` only substitutes the default when the key is absent, so an + explicit ``null`` value would otherwise reach ``str()`` and print "None". + """ + value = ext.get(key) + if isinstance(value, str): + stripped = value.strip() + if stripped: + return stripped + return fallback + + +def _catalog_id(ext: dict) -> str: + """Return a usable extension id, or "" when the catalog entry lacks a + valid one. An empty result means callers should skip the install hint (and + ideally the entry) rather than emit a command ``download_extension()`` will + refuse.""" + return _catalog_str(ext, "id") + + # Root helpers re-fetched at call time so test monkeypatching of # `specify_cli.` keeps working after the move. def _require_specify_project(*args, **kwargs): @@ -168,7 +192,7 @@ def _resolve_catalog_extension( # Try by display name - search using argument as query, then filter for exact match search_results = catalog.search(query=argument) - name_matches = [ext for ext in search_results if ext["name"].lower() == argument.lower()] + name_matches = [ext for ext in search_results if str(ext.get("name", "")).lower() == argument.lower()] if len(name_matches) == 1: return (name_matches[0], None) @@ -207,22 +231,30 @@ def extension_list( available: bool = typer.Option(False, "--available", help="Show available extensions from catalog"), all_extensions: bool = typer.Option(False, "--all", help="Show both installed and available"), ): - """List installed extensions.""" - from . import ExtensionManager + """List installed extensions, and available catalog extensions with --available/--all.""" + from . import ExtensionManager, ExtensionCatalog, ExtensionError project_root = _require_specify_project() manager = ExtensionManager(project_root) installed = manager.list_installed() - if not installed and not (available or all_extensions): + # Default (no flags) lists installed; --all also lists installed. + # --available alone lists only catalog extensions, not installed. + show_installed = all_extensions or not available + show_available = available or all_extensions + + if not installed and not show_available: console.print("[yellow]No extensions installed.[/yellow]") console.print("\nInstall an extension with:") console.print(" specify extension add ") return - if installed: + if show_installed: console.print("\n[bold cyan]Installed Extensions:[/bold cyan]\n") + if not installed: + console.print(" [dim]No extensions installed.[/dim]") + console.print() for ext in installed: status_icon = "✓" if ext["enabled"] else "✗" status_color = "green" if ext["enabled"] else "red" @@ -233,9 +265,46 @@ def extension_list( console.print(f" Commands: {ext['command_count']} | Hooks: {ext['hook_count']} | Priority: {ext['priority']} | Status: {'Enabled' if ext['enabled'] else 'Disabled'}") console.print() - if available or all_extensions: - console.print("\nInstall an extension:") - console.print(" [cyan]specify extension add [/cyan]") + if show_available: + # Query the catalog and show extensions that are not already installed. + catalog = ExtensionCatalog(project_root) + installed_ids = {ext["id"] for ext in installed} + + try: + results = catalog.search() + except ExtensionError as e: + console.print(f"\n[red]Error:[/red] Could not query extension catalog: {_escape_markup(str(e))}") + console.print("[dim]The catalog may be temporarily unavailable. Try again later.[/dim]") + raise typer.Exit(1) + + available_exts = [ext for ext in results if ext.get("id") not in installed_ids] + + console.print("\n[bold cyan]Available Extensions:[/bold cyan]\n") + if not available_exts: + console.print(" [dim]No additional extensions available in the catalog.[/dim]") + else: + for ext in available_exts: + # Catalog fields are untrusted (remote/community catalogs); escape + # before embedding in Rich markup to prevent markup injection. + # A missing/blank id means the entry cannot be installed, so skip + # it entirely rather than printing a bogus install hint. + extension_id = _catalog_id(ext) + if not extension_id: + continue + safe_id = _escape_markup(extension_id) + verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else "" + safe_name = _escape_markup(_catalog_str(ext, "name", "(unnamed)")) + safe_version = _escape_markup(_catalog_str(ext, "version", "?")) + console.print(f" [bold]{safe_name}[/bold] (v{safe_version}){verified_badge}") + console.print(f" [dim]{safe_id}[/dim]") + console.print(f" {_escape_markup(_catalog_str(ext, 'description'))}") + install_allowed = ext.get("_install_allowed", True) + if install_allowed: + console.print(f" [cyan]Install:[/cyan] specify extension add {safe_id}") + else: + catalog_name = _escape_markup(_catalog_str(ext, "_catalog_name")) + console.print(f" [yellow]Discovery only — not installable from '{catalog_name}'[/yellow]") + console.print() @catalog_app.command("list") @@ -601,7 +670,7 @@ def extension_add( # Download extension ZIP (use resolved ID, not original argument which may be display name) extension_id = ext_info['id'] - console.print(f"Downloading {_escape_markup(str(ext_info['name']))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...") + console.print(f"Downloading {_escape_markup(str(ext_info.get('name', extension_id)))} v{_escape_markup(str(ext_info.get('version', 'unknown')))}...") zip_path = catalog.download_extension(extension_id) try: @@ -751,13 +820,20 @@ def extension_search( console.print(f"\n[green]Found {len(results)} extension(s):[/green]\n") for ext in results: + # Catalog entries are untrusted; a missing/blank id cannot be + # installed or referenced, so skip it rather than emit a bogus + # install hint or crash on ext['id']. + extension_id = _catalog_id(ext) + if not extension_id: + continue + # Extension header verified_badge = " [green]✓ Verified[/green]" if ext.get("verified") else "" - console.print(f"[bold]{_escape_markup(str(ext['name']))}[/bold] (v{_escape_markup(str(ext['version']))}){verified_badge}") - console.print(f" {_escape_markup(str(ext['description']))}") + console.print(f"[bold]{_escape_markup(_catalog_str(ext, 'name', '(unnamed)'))}[/bold] (v{_escape_markup(_catalog_str(ext, 'version', '?'))}){verified_badge}") + console.print(f" {_escape_markup(_catalog_str(ext, 'description'))}") # Metadata - console.print(f"\n [dim]Author:[/dim] {_escape_markup(str(ext.get('author', 'Unknown')))}") + console.print(f"\n [dim]Author:[/dim] {_escape_markup(_catalog_str(ext, 'author', 'Unknown'))}") if ext.get('tags'): tags_str = ", ".join(str(t) for t in ext['tags']) console.print(f" [dim]Tags:[/dim] {_escape_markup(tags_str)}") @@ -776,7 +852,7 @@ def extension_search( if ext.get('downloads') is not None: stats.append(f"Downloads: {ext['downloads']:,}") if ext.get('stars') is not None: - stats.append(f"Stars: {ext['stars']}") + stats.append(f"Stars: {_escape_markup(str(ext['stars']))}") if stats: console.print(f" [dim]{' | '.join(stats)}[/dim]") @@ -785,7 +861,7 @@ def extension_search( console.print(f" [dim]Repository:[/dim] {_escape_markup(str(ext['repository']))}") # Install command (show warning if not installable) - safe_id = _escape_markup(str(ext['id'])) + safe_id = _escape_markup(extension_id) if install_allowed: console.print(f"\n [cyan]Install:[/cyan] specify extension add {safe_id}") else: @@ -897,17 +973,17 @@ def _print_extension_info(ext_info: dict, manager): # Header verified_badge = " [green]✓ Verified[/green]" if ext_info.get("verified") else "" - console.print(f"\n[bold]{_escape_markup(str(ext_info['name']))}[/bold] (v{_escape_markup(str(ext_info['version']))}){verified_badge}") + console.print(f"\n[bold]{_escape_markup(_catalog_str(ext_info, 'name', '(unnamed)'))}[/bold] (v{_escape_markup(_catalog_str(ext_info, 'version', '?'))}){verified_badge}") console.print(f"ID: {_escape_markup(str(ext_info['id']))}") console.print() # Description - console.print(f"{_escape_markup(str(ext_info['description']))}") + console.print(f"{_escape_markup(_catalog_str(ext_info, 'description'))}") console.print() # Author and License - console.print(f"[dim]Author:[/dim] {_escape_markup(str(ext_info.get('author', 'Unknown')))}") - console.print(f"[dim]License:[/dim] {_escape_markup(str(ext_info.get('license', 'Unknown')))}") + console.print(f"[dim]Author:[/dim] {_escape_markup(_catalog_str(ext_info, 'author', 'Unknown'))}") + console.print(f"[dim]License:[/dim] {_escape_markup(_catalog_str(ext_info, 'license', 'Unknown'))}") # Category and Effect if ext_info.get('category'): @@ -923,23 +999,26 @@ def _print_extension_info(ext_info: dict, manager): console.print() # Requirements - if ext_info.get('requires'): + reqs = ext_info.get('requires') + if isinstance(reqs, dict) and reqs: console.print("[bold]Requirements:[/bold]") - reqs = ext_info['requires'] if reqs.get('speckit_version'): console.print(f" • Spec Kit: {_escape_markup(str(reqs['speckit_version']))}") - if reqs.get('tools'): - for tool in reqs['tools']: - tool_name = _escape_markup(str(tool['name'])) + tools = reqs.get('tools') + if isinstance(tools, list): + for tool in tools: + if not isinstance(tool, dict): + continue + tool_name = _escape_markup(str(tool.get('name', '(unnamed)'))) tool_version = _escape_markup(str(tool.get('version', 'any'))) required = " (required)" if tool.get('required') else " (optional)" console.print(f" • {tool_name}: {tool_version}{required}") console.print() # Provides - if ext_info.get('provides'): + provides = ext_info.get('provides') + if isinstance(provides, dict) and provides: console.print("[bold]Provides:[/bold]") - provides = ext_info['provides'] if provides.get('commands'): console.print(f" • Commands: {_escape_markup(str(provides['commands']))}") if provides.get('hooks'): @@ -957,7 +1036,7 @@ def _print_extension_info(ext_info: dict, manager): if ext_info.get('downloads') is not None: stats.append(f"Downloads: {ext_info['downloads']:,}") if ext_info.get('stars') is not None: - stats.append(f"Stars: {ext_info['stars']}") + stats.append(f"Stars: {_escape_markup(str(ext_info['stars']))}") if stats: console.print(f"[bold]Statistics:[/bold] {' | '.join(stats)}") console.print() @@ -1064,8 +1143,8 @@ def extension_update( continue try: - catalog_version = pkg_version.Version(ext_info["version"]) - except pkg_version.InvalidVersion: + catalog_version = pkg_version.Version(str(ext_info["version"])) + except (pkg_version.InvalidVersion, KeyError): console.print( f"⚠ {safe_ext_id}: Invalid catalog version '{_escape_markup(str(ext_info.get('version')))}' (skipping)" ) diff --git a/tests/test_extension_add_path_traversal.py b/tests/test_extension_add_path_traversal.py new file mode 100644 index 0000000000..e8aad49a49 --- /dev/null +++ b/tests/test_extension_add_path_traversal.py @@ -0,0 +1,82 @@ +"""Security test for `specify extension add