diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 304d84093..b0b1ccb52 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -52,6 +52,7 @@ from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2 from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2 from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2 +from vulnerabilities.pipelines.v2_importers import mattermost_importer as mattermost_importer_v2 from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2 from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2 from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2 @@ -87,6 +88,7 @@ aosp_importer_v2.AospImporterPipeline, ruby_importer_v2.RubyImporterPipeline, epss_importer_v2.EPSSImporterPipeline, + mattermost_importer_v2.MattermostImporterPipeline, nvd_importer.NVDImporterPipeline, github_importer.GitHubAPIImporterPipeline, gitlab_importer.GitLabImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/mattermost_importer.py b/vulnerabilities/pipelines/v2_importers/mattermost_importer.py new file mode 100644 index 000000000..d6a7b3001 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/mattermost_importer.py @@ -0,0 +1,124 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from typing import Iterable + +from packageurl import PackageURL +from univers.version_range import GitHubVersionRange + +from vulnerabilities import severity_systems +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.importer import VulnerabilitySeverity +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.utils import fetch_response + +MM_REPO = { + "Mattermost Mobile Apps": "mattermost-mobile", + "Mattermost Server": "mattermost-server", + "Mattermost Desktop App": "desktop", + "Mattermost Boards": "mattermost-plugin-boards", + "Mattermost Plugins": "mattermost-plugin-github", +} + + +class MattermostImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + Importer for Xen Security Advisories from xsa.json. + """ + + pipeline_id = "mattermost_importer_v2" + url = "https://securityupdates.mattermost.com/security_updates.json" + spdx_license_expression = "LicenseRef-scancode-other-permissive" + + _cached_data = None # Class-level cache + + @classmethod + def steps(cls): + return (cls.collect_and_store_advisories,) + + def get_mattermost_data(self): + if self._cached_data is None: + self._cached_data = fetch_response(self.url).json() + return self._cached_data + + def advisories_count(self) -> int: + data = self.get_mattermost_data() + return len(data) if data else 0 + + def collect_advisories(self) -> Iterable[AdvisoryData]: + data = self.get_mattermost_data() + if not data: + return + + for advisory in data: + vuln_id = advisory.get("issue_id") + if not vuln_id or not vuln_id.startswith("MMSA-"): + self.log(f"Skipping advisory with missing issue_id. {vuln_id}") + continue + cve_id = advisory.get("cve_id") + details = advisory.get("details") + + platform = advisory.get("platform") + + fixed_versions = advisory.get("fix_versions", []) + + package_name = MM_REPO.get(platform) + + affected_packages = [] + severity = advisory.get("severity") + if not package_name: + self.log(f"Unknown platform '{platform}' in advisory '{vuln_id}'.") + + else: + package = PackageURL( + type="github", + namespace="mattermost", + name=MM_REPO.get(platform), + ) + + if isinstance(fixed_versions, list): + fixed_versions = [v for v in fixed_versions if v and v.strip()] + fixed_versions = [v.lstrip("v") for v in fixed_versions] + if isinstance(fixed_versions, str): + fixed_versions = [fixed_versions.lstrip("v")] + + fixed_versions = [v.replace("and ", "") for v in fixed_versions] + fixed_versions = [v.strip() for v in fixed_versions] + + try: + affected_packages.append( + AffectedPackageV2( + package=package, + fixed_version_range=GitHubVersionRange.from_versions(fixed_versions), + ) + ) + except Exception as e: + self.log( + f"Error processing fixed versions '{fixed_versions}' for advisory '{vuln_id}': {e}" + ) + + severities = [] + severities.append( + VulnerabilitySeverity(system=severity_systems.CVSS31_QUALITY, value=severity) + ) + + reference = ReferenceV2( + url="https://mattermost.com/security-updates/", + ) + + yield AdvisoryData( + advisory_id=vuln_id, + aliases=[cve_id], + summary=details, + references_v2=[reference], + affected_packages=affected_packages, + url=self.url, + ) diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py new file mode 100644 index 000000000..a32383b82 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_mattermost_importer_v2.py @@ -0,0 +1,213 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import pytest +from packageurl import PackageURL +from univers.version_range import GitHubVersionRange + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.mattermost_importer import MattermostImporterPipeline + + +@pytest.fixture +def sample_mattermost_data(): + return [ + { + "issue_id": "MMSA-2024-001", + "cve_id": "CVE-2024-1234", + "details": "Test vulnerability in Mattermost Server", + "platform": "Mattermost Server", + "severity": "HIGH", + "fix_versions": ["v9.0.1", "v8.1.5"], + } + ] + + +@pytest.fixture +def importer(monkeypatch, sample_mattermost_data): + """ + Create an importer with fetch_response mocked. + """ + + def mock_fetch_response(url): + class MockResponse: + def json(self_inner): + return sample_mattermost_data + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + return MattermostImporterPipeline() + + +def test_advisories_count(importer): + assert importer.advisories_count() == 1 + + +def test_collect_advisories_happy_path(importer): + advisories = list(importer.collect_advisories()) + + assert len(advisories) == 1 + advisory = advisories[0] + + assert isinstance(advisory, AdvisoryData) + assert advisory.advisory_id == "MMSA-2024-001" + assert advisory.aliases == ["CVE-2024-1234"] + assert "Test vulnerability" in advisory.summary + + assert advisory.affected_packages + affected = advisory.affected_packages[0] + + assert affected.package == PackageURL( + type="github", + namespace="mattermost", + name="mattermost-server", + ) + + assert isinstance(affected.fixed_version_range, GitHubVersionRange) + assert str(affected.fixed_version_range) == "vers:github/8.1.5|9.0.1" + + +def test_skip_invalid_issue_id(monkeypatch): + data = [ + { + "issue_id": "INVALID-001", + "platform": "Mattermost Server", + } + ] + + def mock_fetch_response(url): + class MockResponse: + def json(self): + return data + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + importer = MattermostImporterPipeline() + advisories = list(importer.collect_advisories()) + + assert advisories == [] + + +def test_unknown_platform(monkeypatch): + data = [ + { + "issue_id": "MMSA-2024-002", + "platform": "Unknown Product", + "fix_versions": ["1.0.0"], + } + ] + + def mock_fetch_response(url): + class MockResponse: + def json(self): + return data + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + importer = MattermostImporterPipeline() + advisories = list(importer.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].affected_packages == [] + + +def test_fixed_version_string_normalization(monkeypatch): + data = [ + { + "issue_id": "MMSA-2024-003", + "platform": "Mattermost Desktop App", + "fix_versions": "v2.0.0", + } + ] + + def mock_fetch_response(url): + class MockResponse: + def json(self): + return data + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + importer = MattermostImporterPipeline() + advisories = list(importer.collect_advisories()) + + affected = advisories[0].affected_packages[0] + assert "2.0.0" in str(affected.fixed_version_range) + + +def test_bad_version_does_not_crash(monkeypatch): + data = [ + { + "issue_id": "MMSA-2024-004", + "platform": "Mattermost Server", + "fix_versions": ["not-a-version"], + } + ] + + def mock_fetch_response(url): + class MockResponse: + def json(self): + return data + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + importer = MattermostImporterPipeline() + advisories = list(importer.collect_advisories()) + + # Advisory should still be yielded, but without affected packages + assert len(advisories) == 1 + assert advisories[0].affected_packages == [] + + +def test_fetch_is_cached(monkeypatch): + call_count = {"count": 0} + + def mock_fetch_response(url): + call_count["count"] += 1 + + class MockResponse: + def json(self): + return [] + + return MockResponse() + + monkeypatch.setattr( + "vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response", + mock_fetch_response, + ) + + importer = MattermostImporterPipeline() + importer.advisories_count() + importer.collect_advisories() + + assert call_count["count"] == 1 diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index b0649d54d..8a867983e 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -15,6 +15,7 @@ from django.contrib.auth.views import LoginView from django.core.exceptions import ValidationError from django.core.mail import send_mail +from django.db.models import F from django.db.models import Prefetch from django.http.response import Http404 from django.shortcuts import get_object_or_404 @@ -691,7 +692,18 @@ class AdvisoryPackagesDetails(DetailView): model = models.AdvisoryV2 template_name = "advisory_package_details.html" slug_url_kwarg = "avid" - slug_field = "avid" + + def get_object(self, queryset=None): + avid = self.kwargs.get(self.slug_url_kwarg) + if not avid: + raise Http404("Missing advisory identifier") + + advisory = models.AdvisoryV2.objects.latest_for_avid(avid) + + if not advisory: + raise Http404(f"No advisory found for avid: {avid}") + + return advisory def get_queryset(self): """