From 2ada438e35a7085dec52ad4fbeca17d978bd6a00 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Tue, 12 Nov 2024 14:54:33 +0530 Subject: [PATCH 1/6] Add bulk search in V2 API Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 142 +++++++++++++++++++++++++++++++++++++- 1 file changed, 140 insertions(+), 2 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index c02a84a02..27ba46f57 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -18,7 +18,8 @@ from vulnerabilities.models import VulnerabilityReference from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness - +from drf_spectacular.utils import extend_schema_view, extend_schema, OpenApiParameter +from rest_framework.decorators import action class WeaknessV2Serializer(serializers.ModelSerializer): cwe_id = serializers.CharField() @@ -89,7 +90,26 @@ def get_url(self, obj): request=request, ) - +@extend_schema_view( + list=extend_schema( + parameters=[ + OpenApiParameter( + name="vulnerability_id", + description="Filter by one or more vulnerability IDs", + required=False, + type={"type": "array", "items": {"type": "string"}}, + location=OpenApiParameter.QUERY, + ), + OpenApiParameter( + name="alias", + description="Filter by alias (CVE or other unique identifier)", + required=False, + type={"type": "array", "items": {"type": "string"}}, + location=OpenApiParameter.QUERY, + ), + ] + ) +) class VulnerabilityV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Vulnerability.objects.all() serializer_class = VulnerabilityV2Serializer @@ -164,6 +184,19 @@ def get_fixing_vulnerabilities(self, obj): return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()] +class PackageurlListSerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + allow_empty=False, + help_text="List of PackageURL strings in canonical form.", + ) + + +class PackageBulkSearchRequestSerializer(PackageurlListSerializer): + purl_only = serializers.BooleanField(required=False, default=False) + plain_purl = serializers.BooleanField(required=False, default=False) + + class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all() serializer_class = PackageV2Serializer @@ -200,3 +233,108 @@ def list(self, request, *args, **kwargs): serializer = self.get_serializer(queryset, many=True) data = serializer.data return Response({"packages": data}) + + @extend_schema( + request=PackageurlListSerializer, + responses={200: PackageV2Serializer(many=True)}, + ) + @action( + detail=False, + methods=["post"], + serializer_class=PackageurlListSerializer, + filter_backends=[], + pagination_class=None, + ) + def bulk_lookup(self, request): + """ + Return the response for exact PackageURLs requested for. + """ + serializer = self.serializer_class(data=request.data) + if not serializer.is_valid(): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={ + "error": serializer.errors, + "message": "A non-empty 'purls' list of PURLs is required.", + }, + ) + validated_data = serializer.validated_data + purls = validated_data.get("purls") + + return Response( + PackageV2Serializer( + Package.objects.for_purls(purls).with_is_vulnerable(), + many=True, + context={"request": request}, + ).data + ) + + + @extend_schema( + request=PackageBulkSearchRequestSerializer, + responses={200: PackageV2Serializer(many=True)}, + ) + @action( + detail=False, + methods=["post"], + serializer_class=PackageBulkSearchRequestSerializer, + filter_backends=[], + pagination_class=None, + ) + def bulk_search(self, request): + """ + Lookup for vulnerable packages using many Package URLs at once. + """ + serializer = self.serializer_class(data=request.data) + if not serializer.is_valid(): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={ + "error": serializer.errors, + "message": "A non-empty 'purls' list of PURLs is required.", + }, + ) + validated_data = serializer.validated_data + purls = validated_data.get("purls") + purl_only = validated_data.get("purl_only", False) + plain_purl = validated_data.get("plain_purl", False) + + if plain_purl: + purl_objects = [PackageURL.from_string(purl) for purl in purls] + plain_purl_objects = [ + PackageURL( + type=purl.type, + namespace=purl.namespace, + name=purl.name, + version=purl.version, + ) + for purl in purl_objects + ] + plain_purls = [str(purl) for purl in plain_purl_objects] + + query = ( + Package.objects.filter(plain_package_url__in=plain_purls) + .order_by("plain_package_url") + .distinct("plain_package_url") + .with_is_vulnerable() + ) + + if not purl_only: + return Response( + PackageV2Serializer(query, many=True, context={"request": request}).data + ) + + # using order by and distinct because there will be + # many fully qualified purl for a single plain purl + vulnerable_purls = query.vulnerable().only("plain_package_url") + vulnerable_purls = [str(package.plain_package_url) for package in vulnerable_purls] + return Response(data=vulnerable_purls) + + query = Package.objects.filter(package_url__in=purls).distinct().with_is_vulnerable() + + if not purl_only: + return Response(PackageV2Serializer(query, many=True, context={"request": request}).data) + + vulnerable_purls = query.vulnerable().only("package_url") + vulnerable_purls = [str(package.package_url) for package in vulnerable_purls] + return Response(data=vulnerable_purls) From e458518bdce0d20704e8d660e7f0411baddc6072 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Tue, 12 Nov 2024 15:08:24 +0530 Subject: [PATCH 2/6] Add all packages endpoint functionality in V2 Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 68 +++++++++- vulnerabilities/tests/test_api_v2.py | 179 +++++++++++++++++++++++++++ 2 files changed, 242 insertions(+), 5 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 27ba46f57..1e6111d32 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,8 +8,14 @@ # +from drf_spectacular.utils import OpenApiParameter +from drf_spectacular.utils import extend_schema +from drf_spectacular.utils import extend_schema_view +from packageurl import PackageURL from rest_framework import serializers +from rest_framework import status from rest_framework import viewsets +from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.reverse import reverse @@ -18,8 +24,7 @@ from vulnerabilities.models import VulnerabilityReference from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness -from drf_spectacular.utils import extend_schema_view, extend_schema, OpenApiParameter -from rest_framework.decorators import action + class WeaknessV2Serializer(serializers.ModelSerializer): cwe_id = serializers.CharField() @@ -90,6 +95,7 @@ def get_url(self, obj): request=request, ) + @extend_schema_view( list=extend_schema( parameters=[ @@ -197,6 +203,13 @@ class PackageBulkSearchRequestSerializer(PackageurlListSerializer): plain_purl = serializers.BooleanField(required=False, default=False) +class LookupRequestSerializer(serializers.Serializer): + purl = serializers.CharField( + required=True, + help_text="PackageURL strings in canonical form.", + ) + + class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all() serializer_class = PackageV2Serializer @@ -233,7 +246,7 @@ def list(self, request, *args, **kwargs): serializer = self.get_serializer(queryset, many=True) data = serializer.data return Response({"packages": data}) - + @extend_schema( request=PackageurlListSerializer, responses={200: PackageV2Serializer(many=True)}, @@ -269,7 +282,6 @@ def bulk_lookup(self, request): ).data ) - @extend_schema( request=PackageBulkSearchRequestSerializer, responses={200: PackageV2Serializer(many=True)}, @@ -333,8 +345,54 @@ def bulk_search(self, request): query = Package.objects.filter(package_url__in=purls).distinct().with_is_vulnerable() if not purl_only: - return Response(PackageV2Serializer(query, many=True, context={"request": request}).data) + return Response( + PackageV2Serializer(query, many=True, context={"request": request}).data + ) vulnerable_purls = query.vulnerable().only("package_url") vulnerable_purls = [str(package.package_url) for package in vulnerable_purls] return Response(data=vulnerable_purls) + + @action(detail=False, methods=["get"]) + def all(self, request): + """ + Return a list of Package URLs of vulnerable packages. + """ + vulnerable_purls = ( + Package.objects.vulnerable() + .only("package_url") + .order_by("package_url") + .distinct() + .values_list("package_url", flat=True) + ) + return Response(vulnerable_purls) + + @extend_schema( + request=LookupRequestSerializer, + responses={200: PackageV2Serializer(many=True)}, + ) + @action( + detail=False, + methods=["post"], + serializer_class=LookupRequestSerializer, + filter_backends=[], + pagination_class=None, + ) + def lookup(self, request): + """ + Return the response for exact PackageURL requested for. + """ + serializer = self.serializer_class(data=request.data) + if not serializer.is_valid(): + return Response( + status=status.HTTP_400_BAD_REQUEST, + data={ + "error": serializer.errors, + "message": "A 'purl' is required.", + }, + ) + validated_data = serializer.validated_data + purl = validated_data.get("purl") + + qs = self.get_queryset().for_purls([purl]).with_is_vulnerable() + return Response(PackageV2Serializer(qs, many=True, context={"request": request}).data) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index eeaa18776..9d0ba54d7 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -305,3 +305,182 @@ def test_get_fixing_vulnerabilities(self): serializer = PackageV2Serializer() vulnerabilities = serializer.get_fixing_vulnerabilities(package) self.assertEqual(vulnerabilities, ["VCID-5678"]) + + def test_bulk_lookup_with_valid_purls(self): + """ + Test bulk lookup with valid PURLs. + """ + url = reverse("package-v2-bulk-lookup") + data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"]} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data), 2) + # Verify that the returned data matches the packages + purls = [package["purl"] for package in response.data] + self.assertIn("pkg:pypi/django@3.2", purls) + self.assertIn("pkg:npm/lodash@4.17.20", purls) + + def test_bulk_lookup_with_invalid_purls(self): + """ + Test bulk lookup with invalid PURLs. + """ + url = reverse("package-v2-bulk-lookup") + data = {"purls": ["pkg:pypi/nonexistent@1.0.0", "pkg:npm/unknown@0.0.1"]} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Since the packages don't exist, the response should be empty + self.assertEqual(len(response.data), 0) + + def test_bulk_lookup_with_empty_purls(self): + """ + Test bulk lookup with empty purls list. + Should return 400 Bad Request. + """ + url = reverse("package-v2-bulk-lookup") + data = {"purls": []} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("error", response.data) + self.assertIn("message", response.data) + self.assertEqual(response.data["message"], "A non-empty 'purls' list of PURLs is required.") + + def test_bulk_search_with_valid_purls(self): + """ + Test bulk search with valid PURLs. + """ + url = reverse("package-v2-bulk-search") + data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"]} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data), 2) + purls = [package["purl"] for package in response.data] + self.assertIn("pkg:pypi/django@3.2", purls) + self.assertIn("pkg:npm/lodash@4.17.20", purls) + + def test_bulk_search_with_purl_only_true(self): + """ + Test bulk search with purl_only set to True. + Should return only the PURLs of vulnerable packages. + """ + url = reverse("package-v2-bulk-search") + data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"], "purl_only": True} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Since purl_only=True, response should be a list of PURLs + self.assertIsInstance(response.data, list) + # Only vulnerable packages should be included + self.assertEqual(len(response.data), 1) + self.assertEqual(response.data, ["pkg:pypi/django@3.2"]) + + def test_bulk_search_with_plain_purl_true(self): + """ + Test bulk search with plain_purl set to True. + """ + url = reverse("package-v2-bulk-search") + data = {"purls": ["pkg:pypi/django@3.2", "pkg:pypi/django@3.1"], "plain_purl": True} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Since plain_purl=True, packages with the same name and version are grouped + self.assertEqual(len(response.data), 1) + purls = [package["purl"] for package in response.data] + self.assertIn("pkg:pypi/django@3.2", purls[0] or "pkg:pypi/django@3.1" in purls[0]) + + def test_bulk_search_with_purl_only_and_plain_purl_true(self): + """ + Test bulk search with purl_only and plain_purl both set to True. + Should return only the plain PURLs of vulnerable packages. + """ + url = reverse("package-v2-bulk-search") + data = { + "purls": ["pkg:pypi/django@3.2", "pkg:pypi/django@3.1"], + "purl_only": True, + "plain_purl": True, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Response should be a list of plain PURLs + self.assertIsInstance(response.data, list) + # Only one plain PURL should be returned for vulnerable packages + self.assertEqual(len(response.data), 1) + self.assertEqual(response.data, ["pkg:pypi/django@3.2"]) + + def test_bulk_search_with_invalid_purls(self): + """ + Test bulk search with invalid PURLs. + """ + url = reverse("package-v2-bulk-search") + data = {"purls": ["pkg:pypi/nonexistent@1.0.0", "pkg:npm/unknown@0.0.1"]} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data), 0) + + def test_bulk_search_with_empty_purls(self): + """ + Test bulk search with empty purls list. + Should return 400 Bad Request. + """ + url = reverse("package-v2-bulk-search") + data = {"purls": []} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("error", response.data) + self.assertIn("message", response.data) + self.assertEqual(response.data["message"], "A non-empty 'purls' list of PURLs is required.") + + def test_all_vulnerable_packages(self): + """ + Test the 'all' endpoint that returns all vulnerable package URLs. + """ + url = reverse("package-v2-all") + response = self.client.get(url, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # Since package1 and package3 are vulnerable, they should be returned + expected_purls = ["pkg:pypi/django@3.2"] + self.assertEqual(sorted(response.data), sorted(expected_purls)) + + def test_lookup_with_valid_purl(self): + """ + Test the 'lookup' endpoint with a valid PURL. + """ + url = reverse("package-v2-lookup") + data = {"purl": "pkg:pypi/django@3.2"} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(len(response.data), 1) + self.assertEqual(response.data[0]["purl"], "pkg:pypi/django@3.2") + self.assertEqual(response.data[0]["affected_by_vulnerabilities"], ["VCID-1234"]) + + def test_lookup_with_invalid_purl(self): + """ + Test the 'lookup' endpoint with a PURL that does not exist. + Should return an empty list. + """ + url = reverse("package-v2-lookup") + data = {"purl": "pkg:pypi/nonexistent@1.0.0"} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) + # No packages should be returned + self.assertEqual(len(response.data), 0) + + def test_lookup_with_missing_purl(self): + """ + Test the 'lookup' endpoint without providing a 'purl'. + Should return 400 Bad Request. + """ + url = reverse("package-v2-lookup") + data = {} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("error", response.data) + self.assertIn("message", response.data) + self.assertEqual(response.data["message"], "A 'purl' is required.") + + def test_lookup_with_invalid_purl_format(self): + """ + Test the 'lookup' endpoint with an invalid PURL format. + Should return 400 Bad Request. + """ + url = reverse("package-v2-lookup") + data = {"purl": "invalid_purl_format"} + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_200_OK) From 823081baa3b67773ef0ece634456f2a12d8481c8 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Tue, 12 Nov 2024 18:42:53 +0530 Subject: [PATCH 3/6] Change response of API Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 118 ++++++++++++++++++++++++++++++++++---- 1 file changed, 107 insertions(+), 11 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 1e6111d32..a01bc89ec 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,6 +8,7 @@ # +from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema from drf_spectacular.utils import extend_schema_view @@ -19,6 +20,8 @@ from rest_framework.response import Response from rest_framework.reverse import reverse +from vulnerabilities.api import PackageFilterSet +from vulnerabilities.api import VulnerabilitySeveritySerializer from vulnerabilities.models import Package from vulnerabilities.models import Vulnerability from vulnerabilities.models import VulnerabilityReference @@ -210,9 +213,19 @@ class LookupRequestSerializer(serializers.Serializer): ) +class PackageV2FilterSet(filters.FilterSet): + affected_by_vulnerability = filters.CharFilter( + field_name="affected_by_vulnerabilities__vulnerability_id" + ) + fixing_vulnerability = filters.CharFilter(field_name="fixing_vulnerabilities__vulnerability_id") + purl = filters.CharFilter(field_name="package_url") + + class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all() serializer_class = PackageV2Serializer + filter_backends = (filters.DjangoFilterBackend,) + filterset_class = PackageV2FilterSet def get_queryset(self): queryset = super().get_queryset() @@ -234,18 +247,45 @@ def get_queryset(self): def list(self, request, *args, **kwargs): queryset = self.get_queryset() + # Apply pagination page = self.paginate_queryset(queryset) if page is not None: + # Collect only vulnerabilities for packages in the current page + vulnerabilities = set() + for package in page: + vulnerabilities.update(package.affected_by_vulnerabilities.all()) + vulnerabilities.update(package.fixing_vulnerabilities.all()) + + # Serialize the vulnerabilities with vulnerability_id as keys + vulnerability_data = { + vuln.vulnerability_id: VulnerabilityV2Serializer(vuln).data + for vuln in vulnerabilities + } + + # Serialize the current page of packages serializer = self.get_serializer(page, many=True) data = serializer.data + # Use 'self.get_paginated_response' to include pagination data - return self.get_paginated_response({"packages": data}) + return self.get_paginated_response( + {"vulnerabilities": vulnerability_data, "packages": data} + ) + + # If pagination is not applied, collect vulnerabilities for all packages + vulnerabilities = set() + for package in queryset: + vulnerabilities.update(package.affected_by_vulnerabilities.all()) + vulnerabilities.update(package.fixing_vulnerabilities.all()) + + vulnerability_data = { + vuln.vulnerability_id: VulnerabilityV2Serializer(vuln).data for vuln in vulnerabilities + } - # If pagination is not applied + # Serialize all packages when pagination is not applied serializer = self.get_serializer(queryset, many=True) data = serializer.data - return Response({"packages": data}) + return Response({"vulnerabilities": vulnerability_data, "packages": data}) @extend_schema( request=PackageurlListSerializer, @@ -274,12 +314,32 @@ def bulk_lookup(self, request): validated_data = serializer.validated_data purls = validated_data.get("purls") + # Fetch packages matching the provided purls + packages = Package.objects.for_purls(purls).with_is_vulnerable() + + # Collect vulnerabilities associated with these packages + vulnerabilities = set() + for package in packages: + vulnerabilities.update(package.affected_by_vulnerabilities.all()) + vulnerabilities.update(package.fixing_vulnerabilities.all()) + + # Serialize vulnerabilities with vulnerability_id as keys + vulnerability_data = { + vuln.vulnerability_id: VulnerabilityV2Serializer(vuln).data for vuln in vulnerabilities + } + + # Serialize packages + package_data = PackageV2Serializer( + packages, + many=True, + context={"request": request}, + ).data + return Response( - PackageV2Serializer( - Package.objects.for_purls(purls).with_is_vulnerable(), - many=True, - context={"request": request}, - ).data + { + "vulnerabilities": vulnerability_data, + "packages": package_data, + } ) @extend_schema( @@ -331,22 +391,58 @@ def bulk_search(self, request): .with_is_vulnerable() ) + packages = query + + # Collect vulnerabilities associated with these packages + vulnerabilities = set() + for package in packages: + vulnerabilities.update(package.affected_by_vulnerabilities.all()) + vulnerabilities.update(package.fixing_vulnerabilities.all()) + + vulnerability_data = { + vuln.vulnerability_id: VulnerabilityV2Serializer(vuln).data + for vuln in vulnerabilities + } + if not purl_only: + package_data = PackageV2Serializer( + packages, many=True, context={"request": request} + ).data return Response( - PackageV2Serializer(query, many=True, context={"request": request}).data + { + "vulnerabilities": vulnerability_data, + "packages": package_data, + } ) - # using order by and distinct because there will be + # Using order by and distinct because there will be # many fully qualified purl for a single plain purl vulnerable_purls = query.vulnerable().only("plain_package_url") vulnerable_purls = [str(package.plain_package_url) for package in vulnerable_purls] return Response(data=vulnerable_purls) query = Package.objects.filter(package_url__in=purls).distinct().with_is_vulnerable() + packages = query + + # Collect vulnerabilities associated with these packages + vulnerabilities = set() + for package in packages: + vulnerabilities.update(package.affected_by_vulnerabilities.all()) + vulnerabilities.update(package.fixing_vulnerabilities.all()) + + vulnerability_data = { + vuln.vulnerability_id: VulnerabilityV2Serializer(vuln).data for vuln in vulnerabilities + } if not purl_only: + package_data = PackageV2Serializer( + packages, many=True, context={"request": request} + ).data return Response( - PackageV2Serializer(query, many=True, context={"request": request}).data + { + "vulnerabilities": vulnerability_data, + "packages": package_data, + } ) vulnerable_purls = query.vulnerable().only("package_url") From 54b5dad661eabcfa53f7710e729ccc4e8b013d27 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Tue, 12 Nov 2024 18:55:27 +0530 Subject: [PATCH 4/6] Update tests Signed-off-by: Tushar Goel --- vulnerabilities/tests/test_api_v2.py | 122 ++++++++++++++++++++++----- 1 file changed, 100 insertions(+), 22 deletions(-) diff --git a/vulnerabilities/tests/test_api_v2.py b/vulnerabilities/tests/test_api_v2.py index 9d0ba54d7..fa3b7773c 100644 --- a/vulnerabilities/tests/test_api_v2.py +++ b/vulnerabilities/tests/test_api_v2.py @@ -54,6 +54,7 @@ def setUp(self): url="https://example.com/ref2", reference_type="exploit", reference_id="REF-2" ) self.reference2.vulnerabilities.add(self.vuln2) + self.user = ApiUser.objects.create_api_user(username="e@mail.com") self.auth = f"Token {self.user.auth_token.key}" self.client = APIClient(enforce_csrf_checks=True) @@ -62,13 +63,16 @@ def setUp(self): def test_list_vulnerabilities(self): """ Test listing vulnerabilities without filters. - Should return a list of vulnerabilities with IDs and URLs. + Should return a paginated response with vulnerabilities dictionary. """ url = reverse("vulnerability-v2-list") response = self.client.get(url, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("results", response.data) self.assertIn("vulnerabilities", response.data["results"]) self.assertEqual(len(response.data["results"]["vulnerabilities"]), 2) + self.assertIn("VCID-1234", response.data["results"]["vulnerabilities"]) + self.assertIn("VCID-5678", response.data["results"]["vulnerabilities"]) self.assertTrue("url" in response.data["results"]["vulnerabilities"]["VCID-1234"]) def test_retrieve_vulnerability_detail(self): @@ -100,6 +104,8 @@ def test_filter_vulnerability_by_alias(self): url = reverse("vulnerability-v2-list") response = self.client.get(url, {"alias": "CVE-2021-5678"}, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("results", response.data) + self.assertIn("vulnerabilities", response.data["results"]) self.assertEqual( response.data["results"]["vulnerabilities"]["VCID-5678"]["vulnerability_id"], "VCID-5678", @@ -159,9 +165,13 @@ def test_list_vulnerabilities_pagination(self): response = self.client.get(url, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("results", response.data) + self.assertIn("vulnerabilities", response.data["results"]) self.assertIn("next", response.data) self.assertIn("previous", response.data) - self.assertEqual(len(response.data["results"]), 1) # Assuming default page size is 10 + # The 'vulnerabilities' dictionary should contain vulnerabilities up to the page limit + self.assertEqual( + len(response.data["results"]["vulnerabilities"]), 10 + ) # Assuming default page size is 10 class PackageV2ViewSetTest(APITestCase): @@ -185,6 +195,7 @@ def setUp(self): # Associate packages with vulnerabilities self.package1.affected_by_vulnerabilities.add(self.vuln1) self.package2.fixing_vulnerabilities.add(self.vuln2) + self.user = ApiUser.objects.create_api_user(username="e@mail.com") self.auth = f"Token {self.user.auth_token.key}" self.client = APIClient(enforce_csrf_checks=True) @@ -193,13 +204,24 @@ def setUp(self): def test_list_packages(self): """ Test listing packages without filters. - Should return a list of packages with their details. + Should return a list of packages with their details and associated vulnerabilities. """ url = reverse("package-v2-list") response = self.client.get(url, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("results", response.data) self.assertIn("packages", response.data["results"]) + self.assertIn("vulnerabilities", response.data["results"]) self.assertEqual(len(response.data["results"]["packages"]), 2) + # Verify that vulnerabilities are included + self.assertIsInstance(response.data["results"]["vulnerabilities"], dict) + package_vulns = set() + for package in response.data["results"]["packages"]: + package_vulns.update(package["affected_by_vulnerabilities"]) + package_vulns.update(package["fixing_vulnerabilities"]) + self.assertTrue( + all(vuln_id in response.data["results"]["vulnerabilities"] for vuln_id in package_vulns) + ) def test_filter_packages_by_purl(self): """ @@ -264,9 +286,13 @@ def test_list_packages_pagination(self): response = self.client.get(url, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn("results", response.data) + self.assertIn("packages", response.data["results"]) + self.assertIn("vulnerabilities", response.data["results"]) self.assertIn("next", response.data) self.assertIn("previous", response.data) - self.assertEqual(len(response.data["results"]), 1) # Assuming default page size is 10 + self.assertEqual( + len(response.data["results"]["packages"]), 10 + ) # Assuming default page size is 10 def test_invalid_vulnerability_filter(self): """ @@ -309,16 +335,27 @@ def test_get_fixing_vulnerabilities(self): def test_bulk_lookup_with_valid_purls(self): """ Test bulk lookup with valid PURLs. + Should return packages and their associated vulnerabilities. """ url = reverse("package-v2-bulk-lookup") data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"]} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(len(response.data), 2) + self.assertIn("packages", response.data) + self.assertIn("vulnerabilities", response.data) + self.assertEqual(len(response.data["packages"]), 2) # Verify that the returned data matches the packages - purls = [package["purl"] for package in response.data] + purls = [package["purl"] for package in response.data["packages"]] self.assertIn("pkg:pypi/django@3.2", purls) self.assertIn("pkg:npm/lodash@4.17.20", purls) + # Verify that vulnerabilities are included + package_vulns = set() + for package in response.data["packages"]: + package_vulns.update(package["affected_by_vulnerabilities"]) + package_vulns.update(package["fixing_vulnerabilities"]) + self.assertTrue( + all(vuln_id in response.data["vulnerabilities"] for vuln_id in package_vulns) + ) def test_bulk_lookup_with_invalid_purls(self): """ @@ -329,7 +366,8 @@ def test_bulk_lookup_with_invalid_purls(self): response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) # Since the packages don't exist, the response should be empty - self.assertEqual(len(response.data), 0) + self.assertEqual(len(response.data["packages"]), 0) + self.assertEqual(len(response.data["vulnerabilities"]), 0) def test_bulk_lookup_with_empty_purls(self): """ @@ -347,15 +385,26 @@ def test_bulk_lookup_with_empty_purls(self): def test_bulk_search_with_valid_purls(self): """ Test bulk search with valid PURLs. + Should return packages and their associated vulnerabilities. """ url = reverse("package-v2-bulk-search") data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"]} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(len(response.data), 2) - purls = [package["purl"] for package in response.data] + self.assertIn("packages", response.data) + self.assertIn("vulnerabilities", response.data) + self.assertEqual(len(response.data["packages"]), 2) + purls = [package["purl"] for package in response.data["packages"]] self.assertIn("pkg:pypi/django@3.2", purls) self.assertIn("pkg:npm/lodash@4.17.20", purls) + # Verify that vulnerabilities are included + package_vulns = set() + for package in response.data["packages"]: + package_vulns.update(package["affected_by_vulnerabilities"]) + package_vulns.update(package["fixing_vulnerabilities"]) + self.assertTrue( + all(vuln_id in response.data["vulnerabilities"] for vuln_id in package_vulns) + ) def test_bulk_search_with_purl_only_true(self): """ @@ -363,7 +412,10 @@ def test_bulk_search_with_purl_only_true(self): Should return only the PURLs of vulnerable packages. """ url = reverse("package-v2-bulk-search") - data = {"purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"], "purl_only": True} + data = { + "purls": ["pkg:pypi/django@3.2", "pkg:npm/lodash@4.17.20"], + "purl_only": True, + } response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) # Since purl_only=True, response should be a list of PURLs @@ -375,15 +427,29 @@ def test_bulk_search_with_purl_only_true(self): def test_bulk_search_with_plain_purl_true(self): """ Test bulk search with plain_purl set to True. - """ + Should return packages grouped by plain PURLs. + """ + # Create another package with the same name and version but different qualifiers + Package.objects.create( + name="django", + version="3.2", + type="pypi", + qualifiers={"extension": "tar.gz"}, + ) + url = reverse("package-v2-bulk-search") - data = {"purls": ["pkg:pypi/django@3.2", "pkg:pypi/django@3.1"], "plain_purl": True} + data = { + "purls": ["pkg:pypi/django@3.2", "pkg:pypi/django@3.2?extension=tar.gz"], + "plain_purl": True, + } response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - # Since plain_purl=True, packages with the same name and version are grouped - self.assertEqual(len(response.data), 1) - purls = [package["purl"] for package in response.data] - self.assertIn("pkg:pypi/django@3.2", purls[0] or "pkg:pypi/django@3.1" in purls[0]) + self.assertIn("packages", response.data) + self.assertIn("vulnerabilities", response.data) + # Since plain_purl=True, packages with the same type, namespace, name, version are grouped + self.assertEqual(len(response.data["packages"]), 1) + purl = response.data["packages"][0]["purl"] + self.assertTrue(purl.startswith("pkg:pypi/django@3.2")) def test_bulk_search_with_purl_only_and_plain_purl_true(self): """ @@ -407,12 +473,15 @@ def test_bulk_search_with_purl_only_and_plain_purl_true(self): def test_bulk_search_with_invalid_purls(self): """ Test bulk search with invalid PURLs. + Should return an empty response. """ url = reverse("package-v2-bulk-search") data = {"purls": ["pkg:pypi/nonexistent@1.0.0", "pkg:npm/unknown@0.0.1"]} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(len(response.data), 0) + # Since the packages don't exist, the response should be empty + self.assertEqual(len(response.data["packages"]), 0) + self.assertEqual(len(response.data["vulnerabilities"]), 0) def test_bulk_search_with_empty_purls(self): """ @@ -434,32 +503,39 @@ def test_all_vulnerable_packages(self): url = reverse("package-v2-all") response = self.client.get(url, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - # Since package1 and package3 are vulnerable, they should be returned + # Since package1 is vulnerable, it should be returned expected_purls = ["pkg:pypi/django@3.2"] self.assertEqual(sorted(response.data), sorted(expected_purls)) def test_lookup_with_valid_purl(self): """ Test the 'lookup' endpoint with a valid PURL. + Should return the package and its associated vulnerabilities. """ url = reverse("package-v2-lookup") data = {"purl": "pkg:pypi/django@3.2"} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(len(response.data), 1) + self.assertEqual(1, len(response.data)) + self.assertIn("purl", response.data[0]) + self.assertIn("affected_by_vulnerabilities", response.data[0]) + self.assertIn("fixing_vulnerabilities", response.data[0]) + self.assertIn("next_non_vulnerable_version", response.data[0]) + self.assertIn("latest_non_vulnerable_version", response.data[0]) self.assertEqual(response.data[0]["purl"], "pkg:pypi/django@3.2") self.assertEqual(response.data[0]["affected_by_vulnerabilities"], ["VCID-1234"]) + self.assertEqual(response.data[0]["fixing_vulnerabilities"], []) def test_lookup_with_invalid_purl(self): """ Test the 'lookup' endpoint with a PURL that does not exist. - Should return an empty list. + Should return empty packages and vulnerabilities. """ url = reverse("package-v2-lookup") data = {"purl": "pkg:pypi/nonexistent@1.0.0"} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) - # No packages should be returned + # No packages or vulnerabilities should be returned self.assertEqual(len(response.data), 0) def test_lookup_with_missing_purl(self): @@ -478,9 +554,11 @@ def test_lookup_with_missing_purl(self): def test_lookup_with_invalid_purl_format(self): """ Test the 'lookup' endpoint with an invalid PURL format. - Should return 400 Bad Request. + Should return empty packages and vulnerabilities. """ url = reverse("package-v2-lookup") data = {"purl": "invalid_purl_format"} response = self.client.post(url, data, format="json") self.assertEqual(response.status_code, status.HTTP_200_OK) + # No packages or vulnerabilities should be returned + self.assertEqual(len(response.data), 0) From 8a49a389eb452616b0beb6ff81eb4a13726a5e4b Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Wed, 13 Nov 2024 19:23:10 +0530 Subject: [PATCH 5/6] Rearrage URLs Signed-off-by: Tushar Goel --- vulnerablecode/urls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index d92d6ce40..a4da0a7c0 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -46,8 +46,8 @@ def __init__(self, *args, **kwargs): api_router.register("aliases", AliasViewSet, basename="alias") api_v2_router = OptionalSlashRouter() -api_v2_router.register("vulnerabilities", VulnerabilityV2ViewSet, basename="vulnerability-v2") api_v2_router.register("packages", PackageV2ViewSet, basename="package-v2") +api_v2_router.register("vulnerabilities", VulnerabilityV2ViewSet, basename="vulnerability-v2") urlpatterns = [ path( From b071a5137551886102e595f0a361e478c6f330a3 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Thu, 14 Nov 2024 12:47:35 +0530 Subject: [PATCH 6/6] Add risk score Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index a01bc89ec..b0a3fa125 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -171,6 +171,7 @@ def list(self, request, *args, **kwargs): class PackageV2Serializer(serializers.ModelSerializer): purl = serializers.CharField(source="package_url") + risk_score = serializers.FloatField(read_only=True) affected_by_vulnerabilities = serializers.SerializerMethodField() fixing_vulnerabilities = serializers.SerializerMethodField() next_non_vulnerable_version = serializers.CharField(read_only=True) @@ -184,6 +185,7 @@ class Meta: "fixing_vulnerabilities", "next_non_vulnerable_version", "latest_non_vulnerable_version", + "risk_score", ] def get_affected_by_vulnerabilities(self, obj):