|
11 | 11 | from typing import Iterable |
12 | 12 |
|
13 | 13 | from packageurl import PackageURL |
| 14 | +from univers.versions import InvalidVersion |
14 | 15 | from univers.versions import SemverVersion |
15 | 16 |
|
16 | | -from vulnerabilities.importer import AdvisoryData |
| 17 | +from vulnerabilities.importer import AdvisoryDataV2 |
17 | 18 | from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline |
18 | 19 | from vulnerabilities.utils import load_json |
19 | 20 |
|
@@ -58,31 +59,35 @@ def get_purl_inputs(self): |
58 | 59 |
|
59 | 60 | self.purl = purl |
60 | 61 |
|
61 | | - def collect_advisories(self) -> Iterable[AdvisoryData]: |
| 62 | + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: |
62 | 63 | vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" |
63 | | - advisory_files = list(vuln_directory.glob("*.json")) |
64 | | - |
65 | 64 | package_name = self.purl.name |
66 | 65 | filtered_files = [] |
67 | | - for advisory_file in advisory_files: |
68 | | - try: |
69 | | - data = load_json(advisory_file) |
70 | | - if data.get("module_name") == package_name: |
71 | | - affected_package = self.get_affected_package(data, package_name) |
72 | | - if not self.purl.version or self._version_is_affected(affected_package): |
73 | | - filtered_files.append(advisory_file) |
74 | | - except Exception as e: |
75 | | - self.log(f"Error processing advisory file {advisory_file}: {str(e)}") |
76 | | - advisory_files = filtered_files |
77 | | - |
78 | | - for advisory in list(advisory_files): |
| 66 | + for advisory_file in vuln_directory.glob("*.json"): |
| 67 | + data = load_json(advisory_file) |
| 68 | + if data.get("module_name") == package_name: |
| 69 | + affected_package = self.get_affected_package(data, package_name) |
| 70 | + if not self.purl.version or self._version_is_related(affected_package): |
| 71 | + filtered_files.append(advisory_file) |
| 72 | + |
| 73 | + for advisory in filtered_files: |
79 | 74 | result = self.to_advisory_data(advisory) |
80 | 75 | if result: |
81 | 76 | yield result |
82 | 77 |
|
83 | | - def _version_is_affected(self, affected_package): |
84 | | - if not self.purl.version or not affected_package.affected_version_range: |
| 78 | + def _version_is_related(self, affected_package): |
| 79 | + try: |
| 80 | + package_version = SemverVersion(self.purl.version) |
| 81 | + except InvalidVersion as e: |
| 82 | + self.log(f"Invalid PURL version: {self.purl.version!r}: {str(e)}") |
| 83 | + return False |
| 84 | + |
| 85 | + if ( |
| 86 | + affected_package.affected_version_range |
| 87 | + and package_version in affected_package.affected_version_range |
| 88 | + ) or ( |
| 89 | + affected_package.fixed_version_range |
| 90 | + and package_version in affected_package.fixed_version_range |
| 91 | + ): |
85 | 92 | return True |
86 | | - |
87 | | - purl_version = SemverVersion(self.purl.version) |
88 | | - return purl_version in affected_package.affected_version_range |
| 93 | + return False |
0 commit comments