Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
from vulnerabilities.pipelines.v2_importers import (
openssf_malicious_importer as openssf_malicious_importer_v2,
)
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import pypa_importer as pypa_importer_v2
Expand Down Expand Up @@ -89,6 +92,7 @@
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
openssf_malicious_importer_v2.OpenSSFMaliciousImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
105 changes: 105 additions & 0 deletions vulnerabilities/pipelines/v2_importers/openssf_malicious_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
from pathlib import Path
from typing import Iterable

from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import get_advisory_url

logger = logging.getLogger(__name__)


class OpenSSFMaliciousImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""
OpenSSF Malicious Packages Importer Pipeline

Collect advisories for malicious packages from the OpenSSF malicious-packages
repository. This includes typosquatting, dependency confusion, and other
malicious packages discovered in npm, PyPI, RubyGems, and other ecosystems.

See: https://github.com/ossf/malicious-packages
"""

pipeline_id = "openssf_malicious_importer"
spdx_license_expression = "Apache-2.0"
license_url = "https://github.com/ossf/malicious-packages/blob/main/LICENSE"
repo_url = "git+https://github.com/ossf/malicious-packages/"

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_and_store_advisories,
cls.clean_downloads,
)

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def advisories_count(self):
advisory_dir = Path(self.vcs_response.dest_dir) / "osv" / "malicious"
return sum(1 for _ in advisory_dir.rglob("*.json"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
from vulnerabilities.importers.osv import parse_advisory_data_v2

# Ecosystems supported by both OpenSSF malicious-packages and VulnerableCode
# Mapping: OSV ecosystem name -> purl type
supported_ecosystems = [
"pypi", # Python packages
"npm", # JavaScript/Node.js packages
"cargo", # Rust packages (crates.io)
"gem", # Ruby packages (rubygems)
"maven", # Java packages
"nuget", # .NET packages
"golang", # Go packages
]

base_path = Path(self.vcs_response.dest_dir)
advisory_dir = base_path / "osv" / "malicious"

for file in advisory_dir.rglob("*.json"):
try:
with open(file) as f:
raw_data = json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from {file}: {e}")
continue

advisory_url = get_advisory_url(
file=file,
base_path=base_path,
url="https://github.com/ossf/malicious-packages/blob/main/",
)
advisory_text = file.read_text()

advisory = parse_advisory_data_v2(
raw_data=raw_data,
supported_ecosystems=supported_ecosystems,
advisory_url=advisory_url,
advisory_text=advisory_text,
)

if advisory:
yield advisory

def clean_downloads(self):
if self.vcs_response:
self.log("Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
from pathlib import Path

import pytest

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.pipelines.v2_importers.openssf_malicious_importer import (
OpenSSFMaliciousImporterPipeline,
)


@pytest.fixture
def sample_malicious_advisory(tmp_path: Path):
"""Create a sample malicious package advisory in OSV format."""
advisory_data = {
"modified": "2025-03-28T13:05:11Z",
"published": "2025-03-28T13:05:11Z",
"schema_version": "1.5.0",
"id": "MAL-2025-1234",
"summary": "Malicious code in malicious-test-package (PyPI)",
"details": "This package contains malicious code that exfiltrates data.",
"affected": [
{
"package": {
"ecosystem": "PyPI",
"name": "malicious-test-package",
"purl": "pkg:pypi/malicious-test-package",
},
"versions": ["0.0.1", "0.0.2"],
}
],
"credits": [
{
"name": "Security Researcher",
"type": "FINDER",
"contact": ["https://example.com"],
}
],
"database_specific": {
"malicious-packages-origins": [
{
"id": "TEST-2025-01234",
"import_time": "2025-03-31T07:07:04.129197674Z",
"modified_time": "2025-03-28T13:05:11Z",
"sha256": "abc123def456",
"source": "test-source",
"versions": ["0.0.1", "0.0.2"],
}
]
},
}

advisory_dir = tmp_path / "osv" / "malicious" / "pypi" / "malicious-test-package"
advisory_dir.mkdir(parents=True)

advisory_file = advisory_dir / "MAL-2025-1234.json"
advisory_file.write_text(json.dumps(advisory_data, indent=2))

return tmp_path, advisory_file.read_text(), advisory_data


@pytest.fixture
def sample_npm_malicious_advisory(tmp_path: Path):
"""Create a sample npm malicious package advisory."""
advisory_data = {
"modified": "2025-01-15T10:00:00Z",
"published": "2025-01-15T10:00:00Z",
"schema_version": "1.5.0",
"id": "MAL-2025-5678",
"summary": "Malicious code in typosquat-package (npm)",
"details": "Typosquatting attack targeting popular package.",
"affected": [
{
"package": {
"ecosystem": "npm",
"name": "typosquat-package",
},
"versions": ["1.0.0"],
}
],
}

advisory_dir = tmp_path / "osv" / "malicious" / "npm" / "typosquat-package"
advisory_dir.mkdir(parents=True)

advisory_file = advisory_dir / "MAL-2025-5678.json"
advisory_file.write_text(json.dumps(advisory_data, indent=2))

return tmp_path, advisory_file.read_text(), advisory_data


class DummyVCSResponse:
"""Mock VCS response for testing."""

def __init__(self, dest_dir):
self.dest_dir = dest_dir

def delete(self):
pass


def test_collect_advisories_from_openssf_malicious(sample_malicious_advisory):
"""Test collecting advisories from OpenSSF malicious packages repo."""
tmp_path, advisory_text, advisory_json = sample_malicious_advisory

importer = OpenSSFMaliciousImporterPipeline()
importer.vcs_response = DummyVCSResponse(str(tmp_path))

advisories = list(importer.collect_advisories())
assert len(advisories) == 1

advisory = advisories[0]
assert isinstance(advisory, AdvisoryData)
assert advisory.advisory_id == "MAL-2025-1234"
assert "Malicious code" in advisory.summary
assert advisory.original_advisory_text.strip().startswith("{")
assert advisory.affected_packages
assert advisory.affected_packages[0].package.type == "pypi"
assert advisory.affected_packages[0].package.name == "malicious-test-package"


def test_collect_npm_advisories(sample_npm_malicious_advisory):
"""Test collecting npm malicious package advisories."""
tmp_path, advisory_text, advisory_json = sample_npm_malicious_advisory

importer = OpenSSFMaliciousImporterPipeline()
importer.vcs_response = DummyVCSResponse(str(tmp_path))

advisories = list(importer.collect_advisories())
assert len(advisories) == 1

advisory = advisories[0]
assert advisory.advisory_id == "MAL-2025-5678"
assert advisory.affected_packages[0].package.type == "npm"
assert advisory.affected_packages[0].package.name == "typosquat-package"


def test_advisories_count(sample_malicious_advisory):
"""Test counting advisories."""
tmp_path, _, _ = sample_malicious_advisory

importer = OpenSSFMaliciousImporterPipeline()
importer.vcs_response = DummyVCSResponse(str(tmp_path))

count = importer.advisories_count()
assert count == 1


def test_multiple_advisories(tmp_path: Path):
"""Test collecting multiple advisories from different ecosystems."""
# Create PyPI advisory
pypi_dir = tmp_path / "osv" / "malicious" / "pypi" / "bad-pkg"
pypi_dir.mkdir(parents=True)
(pypi_dir / "MAL-2025-0001.json").write_text(
json.dumps(
{
"id": "MAL-2025-0001",
"summary": "Bad PyPI package",
"affected": [{"package": {"ecosystem": "PyPI", "name": "bad-pkg"}, "versions": ["1.0"]}],
}
)
)

# Create npm advisory
npm_dir = tmp_path / "osv" / "malicious" / "npm" / "bad-js"
npm_dir.mkdir(parents=True)
(npm_dir / "MAL-2025-0002.json").write_text(
json.dumps(
{
"id": "MAL-2025-0002",
"summary": "Bad npm package",
"affected": [{"package": {"ecosystem": "npm", "name": "bad-js"}, "versions": ["2.0"]}],
}
)
)

importer = OpenSSFMaliciousImporterPipeline()
importer.vcs_response = DummyVCSResponse(str(tmp_path))

advisories = list(importer.collect_advisories())
assert len(advisories) == 2
assert importer.advisories_count() == 2

advisory_ids = {a.advisory_id for a in advisories}
assert advisory_ids == {"MAL-2025-0001", "MAL-2025-0002"}


def test_pipeline_metadata():
"""Test pipeline metadata is correctly set."""
assert OpenSSFMaliciousImporterPipeline.pipeline_id == "openssf_malicious_importer"
assert OpenSSFMaliciousImporterPipeline.spdx_license_expression == "Apache-2.0"
assert "ossf/malicious-packages" in OpenSSFMaliciousImporterPipeline.repo_url


def test_unsupported_ecosystem_skipped(tmp_path: Path):
"""Test that unsupported ecosystems are skipped gracefully."""
# Create advisory with unsupported ecosystem
advisory_dir = tmp_path / "osv" / "malicious" / "unsupported" / "pkg"
advisory_dir.mkdir(parents=True)
(advisory_dir / "MAL-2025-9999.json").write_text(
json.dumps(
{
"id": "MAL-2025-9999",
"summary": "Package in unsupported ecosystem",
"affected": [
{"package": {"ecosystem": "UnsupportedEcosystem", "name": "pkg"}, "versions": ["1.0"]}
],
}
)
)

importer = OpenSSFMaliciousImporterPipeline()
importer.vcs_response = DummyVCSResponse(str(tmp_path))

advisories = list(importer.collect_advisories())
# Advisory should be yielded but with no affected packages due to unsupported ecosystem
assert len(advisories) == 1
assert advisories[0].affected_packages == []