Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
from vulnerabilities.pipelines.v2_importers import mattermost_importer as mattermost_importer_v2
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
Expand Down Expand Up @@ -87,6 +88,7 @@
aosp_importer_v2.AospImporterPipeline,
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
124 changes: 124 additions & 0 deletions vulnerabilities/pipelines/v2_importers/mattermost_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from typing import Iterable

from packageurl import PackageURL
from univers.version_range import GitHubVersionRange

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import fetch_response

MM_REPO = {
"Mattermost Mobile Apps": "mattermost-mobile",
"Mattermost Server": "mattermost-server",
"Mattermost Desktop App": "desktop",
"Mattermost Boards": "mattermost-plugin-boards",
"Mattermost Plugins": "mattermost-plugin-github",
}


class MattermostImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
Importer for Xen Security Advisories from xsa.json.
"""

pipeline_id = "mattermost_importer_v2"
url = "https://securityupdates.mattermost.com/security_updates.json"
spdx_license_expression = "LicenseRef-scancode-other-permissive"

_cached_data = None # Class-level cache

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def get_mattermost_data(self):
if self._cached_data is None:
self._cached_data = fetch_response(self.url).json()
return self._cached_data

def advisories_count(self) -> int:
data = self.get_mattermost_data()
return len(data) if data else 0

def collect_advisories(self) -> Iterable[AdvisoryData]:
data = self.get_mattermost_data()
if not data:
return

for advisory in data:
vuln_id = advisory.get("issue_id")
if not vuln_id or not vuln_id.startswith("MMSA-"):
self.log(f"Skipping advisory with missing issue_id. {vuln_id}")
continue
cve_id = advisory.get("cve_id")
details = advisory.get("details")

platform = advisory.get("platform")

fixed_versions = advisory.get("fix_versions", [])

package_name = MM_REPO.get(platform)

affected_packages = []
severity = advisory.get("severity")
if not package_name:
self.log(f"Unknown platform '{platform}' in advisory '{vuln_id}'.")

else:
package = PackageURL(
type="github",
namespace="mattermost",
name=MM_REPO.get(platform),
)

if isinstance(fixed_versions, list):
fixed_versions = [v for v in fixed_versions if v and v.strip()]
fixed_versions = [v.lstrip("v") for v in fixed_versions]
if isinstance(fixed_versions, str):
fixed_versions = [fixed_versions.lstrip("v")]

fixed_versions = [v.replace("and ", "") for v in fixed_versions]
fixed_versions = [v.strip() for v in fixed_versions]

try:
affected_packages.append(
AffectedPackageV2(
package=package,
fixed_version_range=GitHubVersionRange.from_versions(fixed_versions),
)
)
except Exception as e:
self.log(
f"Error processing fixed versions '{fixed_versions}' for advisory '{vuln_id}': {e}"
)

severities = []
severities.append(
VulnerabilitySeverity(system=severity_systems.CVSS31_QUALITY, value=severity)
)

reference = ReferenceV2(
url="https://mattermost.com/security-updates/",
)

yield AdvisoryData(
advisory_id=vuln_id,
aliases=[cve_id],
summary=details,
references_v2=[reference],
affected_packages=affected_packages,
url=self.url,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import pytest
from packageurl import PackageURL
from univers.version_range import GitHubVersionRange

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.mattermost_importer import MattermostImporterPipeline


@pytest.fixture
def sample_mattermost_data():
return [
{
"issue_id": "MMSA-2024-001",
"cve_id": "CVE-2024-1234",
"details": "Test vulnerability in Mattermost Server",
"platform": "Mattermost Server",
"severity": "HIGH",
"fix_versions": ["v9.0.1", "v8.1.5"],
}
]


@pytest.fixture
def importer(monkeypatch, sample_mattermost_data):
"""
Create an importer with fetch_response mocked.
"""

def mock_fetch_response(url):
class MockResponse:
def json(self_inner):
return sample_mattermost_data

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

return MattermostImporterPipeline()


def test_advisories_count(importer):
assert importer.advisories_count() == 1


def test_collect_advisories_happy_path(importer):
advisories = list(importer.collect_advisories())

assert len(advisories) == 1
advisory = advisories[0]

assert isinstance(advisory, AdvisoryData)
assert advisory.advisory_id == "MMSA-2024-001"
assert advisory.aliases == ["CVE-2024-1234"]
assert "Test vulnerability" in advisory.summary

assert advisory.affected_packages
affected = advisory.affected_packages[0]

assert affected.package == PackageURL(
type="github",
namespace="mattermost",
name="mattermost-server",
)

assert isinstance(affected.fixed_version_range, GitHubVersionRange)
assert str(affected.fixed_version_range) == "vers:github/8.1.5|9.0.1"


def test_skip_invalid_issue_id(monkeypatch):
data = [
{
"issue_id": "INVALID-001",
"platform": "Mattermost Server",
}
]

def mock_fetch_response(url):
class MockResponse:
def json(self):
return data

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

importer = MattermostImporterPipeline()
advisories = list(importer.collect_advisories())

assert advisories == []


def test_unknown_platform(monkeypatch):
data = [
{
"issue_id": "MMSA-2024-002",
"platform": "Unknown Product",
"fix_versions": ["1.0.0"],
}
]

def mock_fetch_response(url):
class MockResponse:
def json(self):
return data

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

importer = MattermostImporterPipeline()
advisories = list(importer.collect_advisories())

assert len(advisories) == 1
assert advisories[0].affected_packages == []


def test_fixed_version_string_normalization(monkeypatch):
data = [
{
"issue_id": "MMSA-2024-003",
"platform": "Mattermost Desktop App",
"fix_versions": "v2.0.0",
}
]

def mock_fetch_response(url):
class MockResponse:
def json(self):
return data

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

importer = MattermostImporterPipeline()
advisories = list(importer.collect_advisories())

affected = advisories[0].affected_packages[0]
assert "2.0.0" in str(affected.fixed_version_range)


def test_bad_version_does_not_crash(monkeypatch):
data = [
{
"issue_id": "MMSA-2024-004",
"platform": "Mattermost Server",
"fix_versions": ["not-a-version"],
}
]

def mock_fetch_response(url):
class MockResponse:
def json(self):
return data

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

importer = MattermostImporterPipeline()
advisories = list(importer.collect_advisories())

# Advisory should still be yielded, but without affected packages
assert len(advisories) == 1
assert advisories[0].affected_packages == []


def test_fetch_is_cached(monkeypatch):
call_count = {"count": 0}

def mock_fetch_response(url):
call_count["count"] += 1

class MockResponse:
def json(self):
return []

return MockResponse()

monkeypatch.setattr(
"vulnerabilities.pipelines.v2_importers.mattermost_importer.fetch_response",
mock_fetch_response,
)

importer = MattermostImporterPipeline()
importer.advisories_count()
importer.collect_advisories()

assert call_count["count"] == 1
14 changes: 13 additions & 1 deletion vulnerabilities/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from django.contrib.auth.views import LoginView
from django.core.exceptions import ValidationError
from django.core.mail import send_mail
from django.db.models import F
from django.db.models import Prefetch
from django.http.response import Http404
from django.shortcuts import get_object_or_404
Expand Down Expand Up @@ -691,7 +692,18 @@ class AdvisoryPackagesDetails(DetailView):
model = models.AdvisoryV2
template_name = "advisory_package_details.html"
slug_url_kwarg = "avid"
slug_field = "avid"

def get_object(self, queryset=None):
avid = self.kwargs.get(self.slug_url_kwarg)
if not avid:
raise Http404("Missing advisory identifier")

advisory = models.AdvisoryV2.objects.latest_for_avid(avid)

if not advisory:
raise Http404(f"No advisory found for avid: {avid}")

return advisory

def get_queryset(self):
"""
Expand Down
Loading