diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 8df56b874..d5db57566 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -332,7 +332,7 @@ def get_fixing_vulnerabilities(self, obj): return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()] -class AdvisoryPackageV2Serializer(serializers.ModelSerializer): +class PackageV3Serializer(serializers.ModelSerializer): purl = serializers.CharField(source="package_url") risk_score = serializers.FloatField(read_only=True) affected_by_vulnerabilities = serializers.SerializerMethodField() @@ -353,26 +353,38 @@ class Meta: def get_affected_by_vulnerabilities(self, package): """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" + impacts = package.affected_in_impacts.select_related("advisory").prefetch_related( + "fixed_by_packages" + ) + + avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} + + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + advisory_by_avid = {adv.avid: adv for adv in latest_advisories} + result = {} - request = self.context.get("request") - for impact in package.affected_in_impacts.all(): - advisory = impact.advisory + + for impact in impacts: + avid = impact.advisory.avid + advisory = advisory_by_avid.get(avid) + if not advisory: + continue fixed_by_packages = [pkg.purl for pkg in impact.fixed_by_packages.all()] - code_fixes = CodeFixV2.objects.filter(advisory=advisory).distinct() - code_fix_urls = [ - reverse("advisory-codefix-detail", args=[code_fix.id], request=request) - for code_fix in code_fixes - ] result[advisory.avid] = { "advisory_id": advisory.avid, "fixed_by_packages": fixed_by_packages, - "code_fixes": code_fix_urls, } return result def get_fixing_vulnerabilities(self, package): - return [impact.advisory.avid for impact in package.fixed_in_impacts.all()] + impacts = package.fixed_in_impacts.select_related("advisory") + + avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} + + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + + return [adv.avid for adv in latest_advisories] def get_next_non_vulnerable_version(self, package): if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: @@ -1013,9 +1025,9 @@ def get_view_name(self): return "Pipeline Jobs" -class AdvisoriesPackageV2ViewSet(viewsets.ReadOnlyModelViewSet): +class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet): queryset = PackageV2.objects.all() - serializer_class = AdvisoryPackageV2Serializer + serializer_class = PackageV3Serializer filter_backends = [filters.DjangoFilterBackend] filterset_class = AdvisoryPackageV2FilterSet @@ -1039,35 +1051,42 @@ def get_queryset(self): ) def list(self, request, *args, **kwargs): - filtered_queryset = self.filter_queryset(self.get_queryset()) - page = self.paginate_queryset(filtered_queryset) + queryset = self.filter_queryset(self.get_queryset()) + page = self.paginate_queryset(queryset) - advisories = set() - if page is not None: - for package in page: - advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) - advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) + packages = page if page is not None else queryset - # Serialize the vulnerabilities with advisory_id and advisory label as keys - advisory_data = {f"{adv.avid}": AdvisoryV2Serializer(adv).data for adv in advisories} + avids = set() - # Serialize the current page of packages - serializer = self.get_serializer(page, many=True) - data = serializer.data + for package in packages: + for impact in package.affected_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) - # Use 'self.get_paginated_response' to include pagination data - return self.get_paginated_response({"advisories": advisory_data, "packages": data}) + for impact in package.fixed_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) - # If pagination is not applied, collect vulnerabilities for all packages - for package in queryset: - advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) - advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = {f"{adv.avid}": AdvisoryV2Serializer(adv).data for adv in advisories} + advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in latest_advisories} - serializer = self.get_serializer(queryset, many=True) - data = serializer.data - return Response({"advisories": advisory_data, "packages": data}) + serializer = self.get_serializer(packages, many=True) + + if page is not None: + return self.get_paginated_response( + { + "packages": serializer.data, + "advisories": advisory_data, + } + ) + + return Response( + { + "packages": serializer.data, + "advisories": advisory_data, + } + ) @extend_schema( request=PackageurlListSerializer, @@ -1093,17 +1112,16 @@ def bulk_lookup(self, request): "message": "A non-empty 'purls' list of PURLs is required.", }, ) - validated_data = serializer.validated_data - purls = validated_data.get("purls") - # Fetch packages matching the provided purls + purls = serializer.validated_data.get("purls") + packages = ( PackageV2.objects.for_purls(purls) .prefetch_related( Prefetch( "affected_in_impacts", queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", + "fixed_by_packages" ), ), Prefetch( @@ -1114,17 +1132,25 @@ def bulk_lookup(self, request): .with_is_vulnerable() ) - # Collect vulnerabilities associated with these packages - advisories = set() + avids = set() + for package in packages: - advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) - advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) + for impact in package.affected_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) - # Serialize vulnerabilities with vulnerability_id as keys - advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in advisories} + for impact in package.fixed_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) - # Serialize packages - package_data = AdvisoryPackageV2Serializer( + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + + advisory_data = { + adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data + for adv in latest_advisories + } + + package_data = PackageV3Serializer( packages, many=True, context={"request": request}, @@ -1132,8 +1158,8 @@ def bulk_lookup(self, request): return Response( { - "advisories": advisory_data, "packages": package_data, + "advisories": advisory_data, } ) @@ -1161,6 +1187,7 @@ def bulk_search(self, request): "message": "A non-empty 'purls' list of PURLs is required.", }, ) + validated_data = serializer.validated_data purls = validated_data.get("purls") purl_only = validated_data.get("purl_only", False) @@ -1202,24 +1229,31 @@ def bulk_search(self, request): packages = query - # Collect vulnerabilities associated with these packages - advisories = set() + avids = set() for package in packages: - advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) - advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) - - advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in advisories} + for impact in package.affected_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) + for impact in package.fixed_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) + + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + advisory_data = { + adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data + for adv in latest_advisories + } if not purl_only: - package_data = AdvisoryPackageV2Serializer( + package_data = PackageV3Serializer( packages, many=True, context={"request": request}, ).data return Response( { - "advisories": advisory_data, "packages": package_data, + "advisories": advisory_data, } ) @@ -1249,24 +1283,31 @@ def bulk_search(self, request): ) packages = query - # Collect vulnerabilities associated with these packages - advisories = set() + avids = set() for package in packages: - advisories.update({impact.advisory for impact in package.affected_in_impacts.all()}) - advisories.update({impact.advisory for impact in package.fixed_in_impacts.all()}) - - advisory_data = {adv.advisory_id: AdvisoryV2Serializer(adv).data for adv in advisories} + for impact in package.affected_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) + for impact in package.fixed_in_impacts.all(): + if impact.advisory_id: + avids.add(impact.advisory.avid) + + latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + advisory_data = { + adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data + for adv in latest_advisories + } if not purl_only: - package_data = AdvisoryPackageV2Serializer( + package_data = PackageV3Serializer( packages, many=True, context={"request": request}, ).data return Response( { - "advisories": advisory_data, "packages": package_data, + "advisories": advisory_data, } ) @@ -1316,6 +1357,4 @@ def lookup(self, request): purl = validated_data.get("purl") qs = self.get_queryset().for_purls([purl]).with_is_vulnerable() - return Response( - AdvisoryPackageV2Serializer(qs, many=True, context={"request": request}).data - ) + return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data) diff --git a/vulnerabilities/migrations/0107_remove_advisoryv2_date_imported.py b/vulnerabilities/migrations/0107_remove_advisoryv2_date_imported.py new file mode 100644 index 000000000..9f392a38e --- /dev/null +++ b/vulnerabilities/migrations/0107_remove_advisoryv2_date_imported.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.25 on 2025-12-29 07:48 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0106_alter_advisoryreference_url_and_more"), + ] + + operations = [ + migrations.RemoveField( + model_name="advisoryv2", + name="date_imported", + ), + ] diff --git a/vulnerabilities/migrations/0108_advisoryv2_advisory_latest_by_avid_idx.py b/vulnerabilities/migrations/0108_advisoryv2_advisory_latest_by_avid_idx.py new file mode 100644 index 000000000..1e738d3b0 --- /dev/null +++ b/vulnerabilities/migrations/0108_advisoryv2_advisory_latest_by_avid_idx.py @@ -0,0 +1,19 @@ +# Generated by Django 4.2.25 on 2025-12-29 08:17 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0107_remove_advisoryv2_date_imported"), + ] + + operations = [ + migrations.AddIndex( + model_name="advisoryv2", + index=models.Index( + fields=["avid", "-date_collected", "-id"], name="advisory_latest_by_avid_idx" + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index f4b21eb49..ed5c2d254 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -42,9 +42,11 @@ from django.db import transaction from django.db.models import Count from django.db.models import Exists +from django.db.models import F from django.db.models import OuterRef from django.db.models import Prefetch from django.db.models import Q +from django.db.models import Subquery from django.db.models.functions import Length from django.db.models.functions import Trim from django.urls import reverse @@ -1341,20 +1343,6 @@ def url(self): return f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/{id}.json" -class AdvisoryV2QuerySet(BaseQuerySet): - def search(query): - """ - This function will take a string as an input, the string could be an alias or an advisory ID or - something in the advisory description. - """ - return AdvisoryV2.objects.filter( - Q(advisory_id__icontains=query) - | Q(aliases__alias__icontains=query) - | Q(summary__icontains=query) - | Q(references__url__icontains=query) - ).distinct() - - class AdvisoryQuerySet(BaseQuerySet): def search(query): """ @@ -2829,6 +2817,33 @@ class Meta: unique_together = ["commit_hash", "vcs_url"] +class AdvisoryV2QuerySet(BaseQuerySet): + def latest_for_avid(self, avid: str): + return ( + self.filter(avid=avid) + .order_by( + F("date_collected").desc(nulls_last=True), + "-id", + ) + .first() + ) + + def latest_per_avid(self): + latest_ids = ( + self.filter(avid=OuterRef("avid")) + .order_by( + F("date_collected").desc(nulls_last=True), + "-id", + ) + .values("id")[:1] + ) + + return self.filter(id=Subquery(latest_ids)) + + def latest_for_avids(self, avids): + return self.filter(avid__in=avids).latest_per_avid() + + class AdvisoryV2(models.Model): """ An advisory represents data directly obtained from upstream transformed @@ -2916,9 +2931,6 @@ class AdvisoryV2(models.Model): blank=True, null=True, help_text="UTC Date of publication of the advisory" ) date_collected = models.DateTimeField(help_text="UTC Date on which the advisory was collected") - date_imported = models.DateTimeField( - blank=True, null=True, help_text="UTC Date on which the advisory was imported" - ) original_advisory_text = models.TextField( blank=True, @@ -2959,11 +2971,17 @@ def risk_score(self): risk_score = min(float(self.exploitability * self.weighted_severity), 10.0) return round(risk_score, 1) - objects = AdvisoryQuerySet.as_manager() + objects = AdvisoryV2QuerySet.as_manager() class Meta: unique_together = ["datasource_id", "advisory_id", "unique_content_id"] ordering = ["datasource_id", "advisory_id", "date_published", "unique_content_id"] + indexes = [ + models.Index( + fields=["avid", "-date_collected", "-id"], + name="advisory_latest_by_avid_idx", + ) + ] def save(self, *args, **kwargs): self.full_clean() diff --git a/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py b/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py index b0e7d06df..0c2eeb6ba 100644 --- a/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py +++ b/vulnerabilities/tests/pipelines/test_compute_advisory_todo_v2.py @@ -80,7 +80,6 @@ def test_advisory_todo_missing_summary(self): unique_content_id="test_id", url=self.advisory_data1.url, summary="", - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id", @@ -107,7 +106,6 @@ def test_advisory_todo_missing_fixed(self): unique_content_id="test_id", url=self.advisory_data2.url, summary=self.advisory_data2.summary, - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id", @@ -134,7 +132,6 @@ def test_advisory_todo_missing_affected(self): unique_content_id="test_id", url=self.advisory_data3.url, summary=self.advisory_data3.summary, - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id", @@ -162,7 +159,6 @@ def test_advisory_todo_conflicting_fixed_affected(self): unique_content_id="test_id1", url=self.advisory_data1.url, summary=self.advisory_data1.summary, - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id_2", @@ -180,7 +176,6 @@ def test_advisory_todo_conflicting_fixed_affected(self): unique_content_id="test_id2", url=self.advisory_data4.url, summary=self.advisory_data4.summary, - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id_2", diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index fed02fc09..42df1c623 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -856,8 +856,8 @@ def setUp(self): self.client = APIClient(enforce_csrf_checks=True) def test_list_with_purl_filter(self): - url = reverse("advisories-package-v2-list") - with self.assertNumQueries(19): + url = reverse("package-v3-list") + with self.assertNumQueries(23): response = self.client.get(url, {"purl": "pkg:pypi/sample@1.0.0"}) assert response.status_code == 200 assert "packages" in response.data["results"] @@ -865,8 +865,8 @@ def test_list_with_purl_filter(self): assert self.advisory.avid in response.data["results"]["advisories"] def test_bulk_lookup(self): - url = reverse("advisories-package-v2-bulk-lookup") - with self.assertNumQueries(18): + url = reverse("package-v3-bulk-lookup") + with self.assertNumQueries(22): response = self.client.post(url, {"purls": ["pkg:pypi/sample@1.0.0"]}, format="json") assert response.status_code == 200 assert "packages" in response.data @@ -874,31 +874,31 @@ def test_bulk_lookup(self): assert self.advisory.avid in response.data["advisories"] def test_bulk_search_plain(self): - url = reverse("advisories-package-v2-bulk-search") + url = reverse("package-v3-bulk-search") payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": True, "purl_only": False} - with self.assertNumQueries(18): + with self.assertNumQueries(22): response = self.client.post(url, payload, format="json") assert response.status_code == 200 assert "packages" in response.data assert "advisories" in response.data def test_bulk_search_purl_only(self): - url = reverse("advisories-package-v2-bulk-search") + url = reverse("package-v3-bulk-search") payload = {"purls": ["pkg:pypi/sample@1.0.0"], "plain_purl": False, "purl_only": True} - with self.assertNumQueries(16): + with self.assertNumQueries(17): response = self.client.post(url, payload, format="json") assert response.status_code == 200 assert "pkg:pypi/sample@1.0.0" in response.data def test_lookup_single_package(self): - url = reverse("advisories-package-v2-lookup") - with self.assertNumQueries(12): + url = reverse("package-v3-lookup") + with self.assertNumQueries(15): response = self.client.post(url, {"purl": "pkg:pypi/sample@1.0.0"}, format="json") assert response.status_code == 200 assert any(pkg["purl"] == "pkg:pypi/sample@1.0.0" for pkg in response.data) def test_get_all_vulnerable_purls(self): - url = reverse("advisories-package-v2-all") + url = reverse("package-v3-all") with self.assertNumQueries(3): response = self.client.get(url) assert response.status_code == 200 diff --git a/vulnerabilities/tests/test_commit_code.py b/vulnerabilities/tests/test_commit_code.py index cdd093c8a..ea7f857cd 100644 --- a/vulnerabilities/tests/test_commit_code.py +++ b/vulnerabilities/tests/test_commit_code.py @@ -16,7 +16,6 @@ def setup_method(self): unique_content_id="test_id", url="https://example.com", summary="summary", - date_imported=date, date_collected=date, advisory_id="test_id", avid="test_pipeline/test_id", diff --git a/vulnerabilities/tests/test_data_migrations.py b/vulnerabilities/tests/test_data_migrations.py index 2ed8865a0..8303c4003 100644 --- a/vulnerabilities/tests/test_data_migrations.py +++ b/vulnerabilities/tests/test_data_migrations.py @@ -977,7 +977,6 @@ def setUpBeforeMigration(self, apps): unique_content_id="old_adv", url="https://old.example.com", summary="Old advisory", - date_imported=date, date_collected=date, advisory_id="old_adv", avid="test_pipeline/old_adv", diff --git a/vulnerabilities/tests/test_same_avid_different_content_id.py b/vulnerabilities/tests/test_same_avid_different_content_id.py new file mode 100644 index 000000000..a366d1872 --- /dev/null +++ b/vulnerabilities/tests/test_same_avid_different_content_id.py @@ -0,0 +1,185 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import uuid +from datetime import timedelta + +import pytest +from django.utils.timezone import now + +from vulnerabilities.models import AdvisoryV2 + + +@pytest.fixture +def advisory_factory(db): + """ + Factory to create AdvisoryV2 objects with minimal required fields. + """ + + def _create(*, avid, advisory_id, collected_at): + return AdvisoryV2.objects.create( + datasource_id="test_source", + advisory_id=advisory_id, + avid=avid, + unique_content_id=str(uuid.uuid4()), + url="https://example.com/advisory", + date_collected=collected_at, + ) + + return _create + + +@pytest.fixture +def timestamps(): + now_ts = now() + return { + "old": now_ts - timedelta(days=3), + "mid": now_ts - timedelta(days=1), + "new": now_ts, + } + + +@pytest.mark.django_db +def test_latest_for_avid_returns_latest_by_date_collected( + advisory_factory, timestamps, django_assert_num_queries +): + avid = "source/ADV-1" + + older = advisory_factory( + avid=avid, + advisory_id="ADV-1", + collected_at=timestamps["old"], + ) + newer = advisory_factory( + avid=avid, + advisory_id="ADV-1", + collected_at=timestamps["new"], + ) + + with django_assert_num_queries(1): + result = AdvisoryV2.objects.latest_for_avid(avid) + + assert result.id == newer.id + assert result.id != older.id + + +@pytest.mark.django_db +def test_latest_for_avid_tie_breaks_by_id(advisory_factory, timestamps, django_assert_num_queries): + avid = "source/ADV-2" + ts = timestamps["mid"] + + first = advisory_factory( + avid=avid, + advisory_id="ADV-2", + collected_at=ts, + ) + second = advisory_factory( + avid=avid, + advisory_id="ADV-2", + collected_at=ts, + ) + + with django_assert_num_queries(1): + result = AdvisoryV2.objects.latest_for_avid(avid) + + assert result.id == second.id + + +@pytest.mark.django_db +def test_latest_per_avid_returns_one_row_per_avid( + advisory_factory, timestamps, django_assert_num_queries +): + advisory_factory( + avid="source/A", + advisory_id="A", + collected_at=timestamps["old"], + ) + latest_a = advisory_factory( + avid="source/A", + advisory_id="A", + collected_at=timestamps["new"], + ) + + latest_b = advisory_factory( + avid="source/B", + advisory_id="B", + collected_at=timestamps["mid"], + ) + + with django_assert_num_queries(1): + qs = AdvisoryV2.objects.latest_per_avid() + results = list(qs) + + assert len(results) == 2 + ids = {obj.id for obj in results} + assert ids == {latest_a.id, latest_b.id} + + +@pytest.mark.django_db +def test_latest_per_avid_excludes_older_versions(advisory_factory, timestamps): + avid = "source/C" + + older = advisory_factory( + avid=avid, + advisory_id="C", + collected_at=timestamps["old"], + ) + latest = advisory_factory( + avid=avid, + advisory_id="C", + collected_at=timestamps["new"], + ) + + results = list(AdvisoryV2.objects.latest_per_avid()) + + assert latest in results + assert older not in results + + +@pytest.mark.django_db +def test_latest_for_avids_filters_and_collapses_correctly( + advisory_factory, timestamps, django_assert_num_queries +): + advisory_factory( + avid="source/A", + advisory_id="A", + collected_at=timestamps["old"], + ) + latest_a = advisory_factory( + avid="source/A", + advisory_id="A", + collected_at=timestamps["new"], + ) + + latest_b = advisory_factory( + avid="source/B", + advisory_id="B", + collected_at=timestamps["mid"], + ) + + advisory_factory( + avid="source/C", + advisory_id="C", + collected_at=timestamps["new"], + ) + + with django_assert_num_queries(1): + qs = AdvisoryV2.objects.latest_for_avids({"source/A", "source/B"}) + results = list(qs) + + assert len(results) == 2 + ids = {obj.id for obj in results} + assert ids == {latest_a.id, latest_b.id} + + +@pytest.mark.django_db +def test_latest_for_avids_with_empty_input_returns_empty_queryset(django_assert_num_queries): + with django_assert_num_queries(0): + qs = AdvisoryV2.objects.latest_for_avids(set()) + assert qs.count() == 0 diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index 8a77df84b..b0649d54d 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -188,32 +188,66 @@ def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) package = self.object next_non_vulnerable, latest_non_vulnerable = package.get_non_vulnerable_versions() - fixed_pkg_details = {} - for impact in package.affected_in_impacts.all(): - if impact.advisory.id not in fixed_pkg_details: - fixed_pkg_details[impact.advisory.id] = [] - fixed_pkg_details[impact.advisory.id].extend( - [ - {"pkg": pkg, "affected_count": pkg.affected_in_impacts.count()} - for pkg in impact.fixed_by_packages.all() - ] - ) + + ( + fixed_pkg_details, + affected_by_advisories, + fixing_advisories, + ) = self.get_fixed_package_details(package) + context["package"] = package context["next_non_vulnerable"] = next_non_vulnerable context["latest_non_vulnerable"] = latest_non_vulnerable - context["affected_by_advisories"] = { - impact.advisory for impact in package.affected_in_impacts.all() - } - context["fixing_advisories"] = { - impact.advisory for impact in package.fixed_in_impacts.all() - } + context["affected_by_advisories"] = affected_by_advisories + context["fixing_advisories"] = fixing_advisories context["package_search_form"] = PackageSearchForm(self.request.GET) context["fixed_package_details"] = fixed_pkg_details - # context["history"] = list(package.history) return context + def get_fixed_package_details(self, package): + affected_impacts = package.affected_in_impacts.select_related("advisory") + fixed_impacts = package.fixed_in_impacts.select_related("advisory") + + affected_avids = {impact.advisory.avid for impact in affected_impacts if impact.advisory_id} + + fixed_avids = {impact.advisory.avid for impact in fixed_impacts if impact.advisory_id} + + all_avids = affected_avids | fixed_avids + + latest_advisories = models.AdvisoryV2.objects.latest_for_avids(all_avids) + advisory_by_avid = {adv.avid: adv for adv in latest_advisories} + + fixed_pkg_details = {} + + for impact in affected_impacts: + avid = impact.advisory.avid + advisory = advisory_by_avid.get(avid) + if not advisory: + continue + if avid not in fixed_pkg_details: + fixed_pkg_details[avid] = [] + fixed_pkg_details[avid].extend( + [ + { + "pkg": pkg, + "affected_count": pkg.affected_in_impacts.count(), + } + for pkg in impact.fixed_by_packages.all() + ] + ) + + affected_by_advisories = { + advisory_by_avid[avid] for avid in affected_avids if avid in advisory_by_avid + } + + fixing_advisories = { + advisory_by_avid[avid] for avid in fixed_avids if avid in advisory_by_avid + } + + return fixed_pkg_details, affected_by_advisories, fixing_advisories + def get_queryset(self): return ( super() @@ -360,6 +394,15 @@ class AdvisoryDetails(DetailView): slug_url_kwarg = "avid" slug_field = "avid" + def get_object(self, queryset=None): + avid = self.kwargs.get(self.slug_url_kwarg) + obj = models.AdvisoryV2.objects.latest_for_avid(avid) + + if not obj: + raise Http404("Advisory not found") + + return obj + def get_queryset(self): return ( super() diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 8d170678a..49948a3b9 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -20,10 +20,10 @@ from vulnerabilities.api import CPEViewSet from vulnerabilities.api import PackageViewSet from vulnerabilities.api import VulnerabilityViewSet -from vulnerabilities.api_v2 import AdvisoriesPackageV2ViewSet from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet from vulnerabilities.api_v2 import PackageV2ViewSet +from vulnerabilities.api_v2 import PackageV3ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet from vulnerabilities.api_v2 import VulnerabilityV2ViewSet from vulnerabilities.views import AdminLoginView @@ -62,18 +62,19 @@ def __init__(self, *args, **kwargs): api_v2_router = OptionalSlashRouter() api_v2_router.register("packages", PackageV2ViewSet, basename="package-v2") -api_v2_router.register( - "advisories-packages", AdvisoriesPackageV2ViewSet, basename="advisories-package-v2" -) api_v2_router.register("vulnerabilities", VulnerabilityV2ViewSet, basename="vulnerability-v2") api_v2_router.register("codefixes", CodeFixViewSet, basename="codefix") api_v2_router.register("pipelines", PipelineScheduleV2ViewSet, basename="pipelines") api_v2_router.register("advisory-codefixes", CodeFixV2ViewSet, basename="advisory-codefix") +api_v3_router = OptionalSlashRouter() + +api_v3_router.register("packages", PackageV3ViewSet, basename="package-v3") urlpatterns = [ path("admin/login/", AdminLoginView.as_view(), name="admin-login"), path("api/v2/", include(api_v2_router.urls)), + path("api/v3/", include(api_v3_router.urls)), path( "robots.txt", TemplateView.as_view(template_name="robots.txt", content_type="text/plain"),