|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import json |
| 4 | +import subprocess # nosec |
| 5 | +import tempfile |
| 6 | +from dataclasses import dataclass |
| 7 | +from pathlib import Path |
| 8 | +from re import search |
| 9 | +from typing import ( |
| 10 | + Any, |
| 11 | + Union, |
| 12 | +) |
| 13 | + |
| 14 | +from pydantic import BaseModel |
| 15 | + |
| 16 | +from exasol.toolbox.util.dependencies.shared_models import Package |
| 17 | + |
| 18 | +PIP_AUDIT_VULNERABILITY_PATTERN = ( |
| 19 | + r"^Found \d+ known vulnerabilit\w{1,3} in \d+ package\w?$" |
| 20 | +) |
| 21 | + |
| 22 | + |
| 23 | +@dataclass |
| 24 | +class PipAuditException(Exception): |
| 25 | + return_code: int |
| 26 | + stdout: str |
| 27 | + stderr: str |
| 28 | + |
| 29 | + def __init__(self, subprocess_output: subprocess.CompletedProcess) -> None: |
| 30 | + self.return_code = subprocess_output.returncode |
| 31 | + self.stdout = subprocess_output.stdout |
| 32 | + self.stderr = subprocess_output.stderr |
| 33 | + |
| 34 | + |
| 35 | +class Vulnerability(Package): |
| 36 | + id: str |
| 37 | + aliases: list[str] |
| 38 | + fix_versions: list[str] |
| 39 | + description: str |
| 40 | + |
| 41 | + @classmethod |
| 42 | + def from_audit_entry( |
| 43 | + cls, package_name: str, version: str, vuln_entry: dict[str, Any] |
| 44 | + ) -> Vulnerability: |
| 45 | + """ |
| 46 | + Create a Vulnerability from a pip-audit vulnerability entry |
| 47 | + """ |
| 48 | + return cls( |
| 49 | + name=package_name, |
| 50 | + version=version, |
| 51 | + id=vuln_entry["id"], |
| 52 | + aliases=vuln_entry["aliases"], |
| 53 | + fix_versions=vuln_entry["fix_versions"], |
| 54 | + description=vuln_entry["description"], |
| 55 | + ) |
| 56 | + |
| 57 | + @property |
| 58 | + def security_issue_entry(self) -> dict[str, Union[str, list[str]]]: |
| 59 | + return { |
| 60 | + "name": self.name, |
| 61 | + "version": str(self.version), |
| 62 | + "refs": [self.id] + self.aliases, |
| 63 | + "description": self.description, |
| 64 | + } |
| 65 | + |
| 66 | + |
| 67 | +def audit_poetry_files(working_directory: Path) -> str: |
| 68 | + """ |
| 69 | + Audit the `pyproject.toml` and `poetry.lock` files |
| 70 | +
|
| 71 | + pip-audit evaluates installed packages. This is to provide |
| 72 | + additional security-related information beyond seeing if a given package |
| 73 | + has a known vulnerability. Thus, to audit our `pyproject.toml` and |
| 74 | + `poetry.lock` files without altering a locally sourced poetry environment, |
| 75 | + this function first exports the locked packages to a requirements.txt file. |
| 76 | + Then, pip-audit evaluates the requirements.txt by installing them to a virtualenv |
| 77 | + and then inspecting the dependencies. |
| 78 | + """ |
| 79 | + |
| 80 | + requirements_txt = "requirements.txt" |
| 81 | + output = subprocess.run( |
| 82 | + ["poetry", "export", "--format=requirements.txt"], |
| 83 | + capture_output=True, |
| 84 | + text=True, |
| 85 | + cwd=working_directory, |
| 86 | + ) # nosec |
| 87 | + if output.returncode != 0: |
| 88 | + raise PipAuditException(subprocess_output=output) |
| 89 | + |
| 90 | + with tempfile.TemporaryDirectory() as path: |
| 91 | + tmpdir = Path(path) |
| 92 | + (tmpdir / requirements_txt).write_text(output.stdout) |
| 93 | + |
| 94 | + command = ["pip-audit", "-r", requirements_txt, "-f", "json"] |
| 95 | + output = subprocess.run( |
| 96 | + command, |
| 97 | + capture_output=True, |
| 98 | + text=True, |
| 99 | + cwd=tmpdir, |
| 100 | + ) # nosec |
| 101 | + |
| 102 | + if output.returncode != 0: |
| 103 | + # pip-audit does not distinguish between 1) finding vulnerabilities |
| 104 | + # and 2) other errors performing the pip-audit (i.e. malformed file); |
| 105 | + # they both map to returncode = 1, so we have our own logic to raise errors |
| 106 | + # for the case of 2) and not 1). |
| 107 | + if not search(PIP_AUDIT_VULNERABILITY_PATTERN, output.stderr.strip()): |
| 108 | + raise PipAuditException(subprocess_output=output) |
| 109 | + return output.stdout |
| 110 | + |
| 111 | + |
| 112 | +class Vulnerabilities(BaseModel): |
| 113 | + vulnerabilities: list[Vulnerability] |
| 114 | + |
| 115 | + @classmethod |
| 116 | + def load_from_pip_audit(cls, working_directory: Path) -> Vulnerabilities: |
| 117 | + """ |
| 118 | + Convert the pip-audit JSON output into a Vulnerabilities model |
| 119 | +
|
| 120 | + The output from pip-audit is a JSON, which as a dictionary looks like: |
| 121 | + >>> audit_dict = {"dependencies": [ |
| 122 | + ... {"name": "alabaster", "version": "0.7.16", "vulns": []}, |
| 123 | + ... {"name": "cryptography", "version": "43.0.3", "vulns": |
| 124 | + ... [{"id": "GHSA-79v4-65xg-pq4g", "fix_versions": ["44.0.1"], |
| 125 | + ... "aliases": ["CVE-2024-12797"], |
| 126 | + ... "description": "pyca/cryptography\'s wheels..."}, ...]}]} |
| 127 | + """ |
| 128 | + audit_json = audit_poetry_files(working_directory) |
| 129 | + audit_dict = json.loads(audit_json) |
| 130 | + |
| 131 | + vulnerabilities = [] |
| 132 | + for entry in audit_dict["dependencies"]: |
| 133 | + for vuln_entry in entry["vulns"]: |
| 134 | + vulnerabilities.append( |
| 135 | + Vulnerability.from_audit_entry( |
| 136 | + package_name=entry["name"], |
| 137 | + version=entry["version"], |
| 138 | + vuln_entry=vuln_entry, |
| 139 | + ) |
| 140 | + ) |
| 141 | + return Vulnerabilities(vulnerabilities=vulnerabilities) |
| 142 | + |
| 143 | + @property |
| 144 | + def security_issue_dict(self) -> list[dict[str, Union[str, list[str]]]]: |
| 145 | + return [ |
| 146 | + vulnerability.security_issue_entry for vulnerability in self.vulnerabilities |
| 147 | + ] |
0 commit comments