diff --git a/docker-compose.yml b/docker-compose.yml index 76b645560..45ee678cf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,6 +57,17 @@ services: - db - vulnerablecode + vulnerablecode_rqworker_high: + build: . + command: wait-for-it web:8000 -- python ./manage.py rqworker high + env_file: + - docker.env + volumes: + - /etc/vulnerablecode/:/etc/vulnerablecode/ + depends_on: + - vulnerablecode_redis + - db + - vulnerablecode nginx: image: nginx diff --git a/vulnerabilities/migrations/0120_impactedpackage_last_range_unfurl_at_and_more.py b/vulnerabilities/migrations/0120_impactedpackage_last_range_unfurl_at_and_more.py new file mode 100644 index 000000000..6e070bde4 --- /dev/null +++ b/vulnerabilities/migrations/0120_impactedpackage_last_range_unfurl_at_and_more.py @@ -0,0 +1,42 @@ +# Generated by Django 5.2.11 on 2026-04-08 10:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0119_remove_advisoryset_identifiers_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="impactedpackage", + name="last_range_unfurl_at", + field=models.DateTimeField( + blank=True, + db_index=True, + help_text="Timestamp of the last vers range unfurl.", + null=True, + ), + ), + migrations.AddField( + model_name="impactedpackage", + name="last_successful_range_unfurl_at", + field=models.DateTimeField( + blank=True, + db_index=True, + help_text="Timestamp of the last successful vers range unfurl.", + null=True, + ), + ), + migrations.AddField( + model_name="pipelineschedule", + name="run_priority", + field=models.IntegerField( + choices=[(1, "high"), (2, "default")], + default=2, + help_text="Select the pipeline execution priority", + ), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 45d8acf55..a802f7011 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2262,6 +2262,10 @@ def requeue(self): class PipelineSchedule(models.Model): """The Database representation of a pipeline schedule.""" + class ExecutionPriority(models.IntegerChoices): + HIGH = 1, "high" + DEFAULT = 2, "default" + pipeline_id = models.CharField( max_length=600, help_text=("Identify a registered Pipeline class."), @@ -2306,6 +2310,14 @@ class PipelineSchedule(models.Model): help_text=("Number of hours to wait between run of this pipeline."), ) + run_priority = models.IntegerField( + null=False, + blank=False, + choices=ExecutionPriority.choices, + default=ExecutionPriority.DEFAULT, + help_text=("Select the pipeline execution priority"), + ) + schedule_work_id = models.CharField( max_length=255, unique=True, @@ -3240,6 +3252,20 @@ class ImpactedPackage(models.Model): help_text="Timestamp indicating when this impact was added.", ) + last_range_unfurl_at = models.DateTimeField( + blank=True, + null=True, + db_index=True, + help_text="Timestamp of the last vers range unfurl.", + ) + + last_successful_range_unfurl_at = models.DateTimeField( + blank=True, + null=True, + db_index=True, + help_text="Timestamp of the last successful vers range unfurl.", + ) + def to_dict(self): from vulnerabilities.utils import purl_to_dict diff --git a/vulnerabilities/pipelines/__init__.py b/vulnerabilities/pipelines/__init__.py index 632fd95f6..499f53331 100644 --- a/vulnerabilities/pipelines/__init__.py +++ b/vulnerabilities/pipelines/__init__.py @@ -24,6 +24,7 @@ from vulnerabilities.improver import MAX_CONFIDENCE from vulnerabilities.models import Advisory from vulnerabilities.models import PipelineRun +from vulnerabilities.models import PipelineSchedule from vulnerabilities.pipes.advisory import import_advisory from vulnerabilities.pipes.advisory import insert_advisory from vulnerabilities.pipes.advisory import insert_advisory_v2 @@ -144,6 +145,9 @@ class VulnerableCodePipeline(PipelineDefinition, BasePipelineRun): # When set to true pipeline is run only once. # To rerun onetime pipeline reset is_active field to True via migration. run_once = False + # Interval between runs in hour. + run_interval = 24 + run_priority = PipelineSchedule.ExecutionPriority.DEFAULT def on_failure(self): """ @@ -176,6 +180,9 @@ class VulnerableCodeBaseImporterPipeline(VulnerableCodePipeline): # When set to true pipeline is run only once. # To rerun onetime pipeline reset is_active field to True via migration. run_once = False + # Interval between runs in hour. + run_interval = 24 + run_priority = PipelineSchedule.ExecutionPriority.DEFAULT @classmethod def steps(cls): @@ -277,6 +284,9 @@ class VulnerableCodeBaseImporterPipelineV2(VulnerableCodePipeline): # When set to true pipeline is run only once. # To rerun onetime pipeline reset is_active field to True via migration. run_once = False + # Interval between runs in hour. + run_interval = 24 + run_priority = PipelineSchedule.ExecutionPriority.DEFAULT @classmethod def steps(cls): diff --git a/vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py b/vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py index f18f43fbf..48d691fe0 100644 --- a/vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py +++ b/vulnerabilities/pipelines/v2_improvers/unfurl_version_range.py @@ -8,9 +8,13 @@ # import logging +from datetime import timedelta from traceback import format_exc as traceback_format_exc from aboutcode.pipeline import LoopProgress +from django.db.models import F +from django.db.models import Q +from django.utils import timezone from fetchcode.package_versions import SUPPORTED_ECOSYSTEMS as FETCHCODE_SUPPORTED_ECOSYSTEMS from packageurl import PackageURL from univers.version_range import RANGE_CLASS_BY_SCHEMES @@ -19,44 +23,64 @@ from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import ImpactedPackageAffecting from vulnerabilities.models import PackageV2 +from vulnerabilities.models import PipelineSchedule from vulnerabilities.pipelines import VulnerableCodePipeline from vulnerabilities.pipes.fetchcode_utils import get_versions from vulnerabilities.utils import update_purl_version class UnfurlVersionRangePipeline(VulnerableCodePipeline): + """ + Unfurl affected version ranges by first processing those that have + never been unfurled and then handling ranges that were last unfurled + two or more days ago. + """ pipeline_id = "unfurl_version_range_v2" + run_interval = 2 + run_priority = PipelineSchedule.ExecutionPriority.HIGH + + # Days elapsed before version range is re-unfurled + reunfurl_after_days = 2 + @classmethod def steps(cls): return (cls.unfurl_version_range,) def unfurl_version_range(self): - impacted_packages = ImpactedPackage.objects.all().order_by("-created_at") - impacted_packages_count = impacted_packages.count() - processed_impacted_packages_count = 0 processed_affected_packages_count = 0 cached_versions = {} + update_unfurl_date = [] + update_successful_unfurl_date = [] + update_batch_size = 5000 + chunk_size = 5000 + + impacted_packages = impacted_package_qs(cutoff_day=self.reunfurl_after_days) + impacted_packages_count = impacted_packages.count() self.log(f"Unfurl affected vers range for {impacted_packages_count:,d} ImpactedPackage.") - progress = LoopProgress(total_iterations=impacted_packages_count, logger=self.log) - for impact in progress.iter(impacted_packages): + + progress = LoopProgress( + total_iterations=impacted_packages_count, progress_step=5, logger=self.log + ) + for impact in progress.iter(impacted_packages.iterator(chunk_size=chunk_size)): + update_unfurl_date.append(impact.pk) purl = PackageURL.from_string(impact.base_purl) if not impact.affecting_vers or not any( c in impact.affecting_vers for c in ("<", ">", "!") ): + update_successful_unfurl_date.append(impact.pk) continue if purl.type not in FETCHCODE_SUPPORTED_ECOSYSTEMS: continue if purl.type not in RANGE_CLASS_BY_SCHEMES: continue - versions = get_purl_versions(purl, cached_versions) or [] + versions = get_purl_versions(purl, cached_versions, self.log) or [] affected_purls = get_affected_purls( versions=versions, - affecting_vers=impact.affecting_vers, - base_purl=purl, + impact=impact, logger=self.log, ) if not affected_purls: @@ -68,14 +92,31 @@ def unfurl_version_range(self): relation=ImpactedPackageAffecting, logger=self.log, ) + update_successful_unfurl_date.append(impact.pk) processed_impacted_packages_count += 1 + if len(update_unfurl_date) > update_batch_size: + ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update( + last_range_unfurl_at=timezone.now() + ) + ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update( + last_successful_range_unfurl_at=timezone.now() + ) + update_unfurl_date.clear() + update_successful_unfurl_date.clear() + + ImpactedPackage.objects.filter(pk__in=update_unfurl_date).update( + last_range_unfurl_at=timezone.now() + ) + ImpactedPackage.objects.filter(pk__in=update_successful_unfurl_date).update( + last_successful_range_unfurl_at=timezone.now() + ) self.log(f"Successfully processed {processed_impacted_packages_count:,d} ImpactedPackage.") self.log(f"{processed_affected_packages_count:,d} new Impact-Package relation created.") -def get_affected_purls(versions, affecting_vers, base_purl, logger): - affecting_version_range = VersionRange.from_string(affecting_vers) +def get_affected_purls(versions, impact, logger): + affecting_version_range = VersionRange.from_string(impact.affecting_vers) version_class = affecting_version_range.version_class try: @@ -84,7 +125,7 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger): versions = [version_class(v) for v in versions] except Exception as e: logger( - f"Error while parsing versions for {base_purl!s}: {e!r} \n {traceback_format_exc()}", + f"Error while parsing versions for {impact.base_purl!s}: {e!r} \n {traceback_format_exc()}", level=logging.ERROR, ) return @@ -95,21 +136,24 @@ def get_affected_purls(versions, affecting_vers, base_purl, logger): if version in affecting_version_range: affected_purls.append( update_purl_version( - purl=base_purl, + purl=impact.base_purl, version=str(version), ) ) except Exception as e: logger( - f"Error while checking {version!s} in {affecting_version_range!s}: {e!r} \n {traceback_format_exc()}", + ( + f"Error while checking {version!s} in {affecting_version_range!s} for " + f"advisory {impact.advisory.avid}: {e!r} \n {traceback_format_exc()}" + ), level=logging.ERROR, ) return affected_purls -def get_purl_versions(purl, cached_versions): +def get_purl_versions(purl, cached_versions, logger): if not purl in cached_versions: - purls = get_versions(purl) + purls = get_versions(purl, logger) if purls is not None: cached_versions[purl] = purls return cached_versions.get(purl) or [] @@ -135,3 +179,16 @@ def bulk_create_with_m2m(purls, impact, relation, logger): return 0 return len(relations) + + +def impacted_package_qs(cutoff_day=2): + cutoff = timezone.now() - timedelta(days=cutoff_day) + return ( + ImpactedPackage.objects.filter( + (Q(last_range_unfurl_at__isnull=True) | Q(last_range_unfurl_at__lte=cutoff)) + & Q(affecting_vers__isnull=False) + & ~Q(affecting_vers="") + ) + .order_by(F("last_range_unfurl_at").asc(nulls_first=True)) + .only("pk", "affecting_vers", "advisory", "base_purl") + ) diff --git a/vulnerabilities/schedules.py b/vulnerabilities/schedules.py index e6443e5ab..215129e10 100644 --- a/vulnerabilities/schedules.py +++ b/vulnerabilities/schedules.py @@ -95,10 +95,21 @@ def update_pipeline_schedule(): PipelineSchedule.objects.exclude(pipeline_id__in=pipelines.keys()).delete() for id, pipeline_class in pipelines.items(): run_once = getattr(pipeline_class, "run_once", False) + run_interval = getattr(pipeline_class, "run_interval", 24) + run_priority = getattr( + pipeline_class, "run_priority", PipelineSchedule.ExecutionPriority.DEFAULT + ) - PipelineSchedule.objects.get_or_create( + pipeline, created = PipelineSchedule.objects.get_or_create( pipeline_id=id, defaults={ "is_run_once": run_once, + "run_interval": run_interval, + "run_priority": run_priority, }, ) + + if not created: + pipeline.run_priority = run_priority + pipeline.run_interval = run_interval + pipeline.save() diff --git a/vulnerabilities/tasks.py b/vulnerabilities/tasks.py index 2e7ac2b10..6c2be3fac 100644 --- a/vulnerabilities/tasks.py +++ b/vulnerabilities/tasks.py @@ -20,7 +20,13 @@ logger = logging.getLogger(__name__) -queue = django_rq.get_queue("default") +default_queue = django_rq.get_queue("default") +high_queue = django_rq.get_queue("high") + +queues = { + "default": django_rq.get_queue("default"), + "high": django_rq.get_queue("high"), +} def execute_pipeline(pipeline_id, run_id): @@ -112,6 +118,8 @@ def set_run_failure(job, connection, type, value, traceback): def enqueue_pipeline(pipeline_id): pipeline_schedule = models.PipelineSchedule.objects.get(pipeline_id=pipeline_id) + queue = queues.get(pipeline_schedule.get_run_priority_display()) + if pipeline_schedule.status in [ models.PipelineRun.Status.RUNNING, models.PipelineRun.Status.QUEUED, @@ -139,5 +147,7 @@ def enqueue_pipeline(pipeline_id): def dequeue_job(job_id): """Remove a job from queue if it hasn't been executed yet.""" - if job_id in queue.jobs: - queue.remove(job_id) + + for queue in queues.values(): + if job_id in queue.jobs: + queue.remove(job_id) diff --git a/vulnerabilities/templates/pipeline_dashboard.html b/vulnerabilities/templates/pipeline_dashboard.html index a7f4139a4..fc474efe7 100644 --- a/vulnerabilities/templates/pipeline_dashboard.html +++ b/vulnerabilities/templates/pipeline_dashboard.html @@ -62,6 +62,7 @@

Pipeline Dashboard

Pipeline ID
Active
+
Priority
Interval
Status
Last Run End Time
@@ -79,6 +80,7 @@

Pipeline Dashboard

{{ schedule.pipeline_id }}
{{ schedule.is_active|yesno:"Yes,No" }}
+
{{ schedule.get_run_priority_display|capfirst}}
{% if schedule.is_run_once %} Once diff --git a/vulnerabilities/tests/pipelines/v2_improvers/test_unfurl_version_range.py b/vulnerabilities/tests/pipelines/v2_improvers/test_unfurl_version_range.py index a1927a426..3d73c6884 100644 --- a/vulnerabilities/tests/pipelines/v2_improvers/test_unfurl_version_range.py +++ b/vulnerabilities/tests/pipelines/v2_improvers/test_unfurl_version_range.py @@ -13,14 +13,18 @@ from unittest.mock import patch from django.test import TestCase +from django.utils import timezone from packageurl import PackageURL from univers.version_range import VersionRange from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import PackageCommitPatchData from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import PackageV2 from vulnerabilities.pipelines.v2_improvers.unfurl_version_range import UnfurlVersionRangePipeline +from vulnerabilities.pipelines.v2_improvers.unfurl_version_range import impacted_package_qs from vulnerabilities.pipes.advisory import insert_advisory_v2 from vulnerabilities.tests.pipelines import TestLogger @@ -28,7 +32,7 @@ class TestUnfurlVersionRangePipeline(TestCase): def setUp(self): self.logger = TestLogger() - advisory1 = AdvisoryDataV2( + self.advisory1 = AdvisoryDataV2( summary="Test advisory", aliases=["CVE-2025-0001"], references=[], @@ -48,14 +52,54 @@ def setUp(self): date_published=datetime.now() - timedelta(days=10), url="https://example.com/advisory", ) - insert_advisory_v2( - advisory=advisory1, - pipeline_id="test_pipeline_v2", - logger=self.logger.write, + + self.advisory2 = AdvisoryDataV2( + summary="Test advisory", + aliases=["CVE-2025-0001"], + references=[], + severities=[], + weaknesses=[], + affected_packages=[ + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/>3.2.1|<4.0.0"), + fixed_version_range=VersionRange.from_string("vers:npm/4.0.0"), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=VersionRange.from_string("vers:npm/>4.2.1|<5.0.0"), + fixed_version_range=VersionRange.from_string("vers:npm/5.0.0"), + introduced_by_commit_patches=[], + fixed_by_commit_patches=[], + ), + AffectedPackageV2( + package=PackageURL.from_string("pkg:npm/foobar"), + affected_version_range=None, + fixed_version_range=None, + introduced_by_commit_patches=[], + fixed_by_commit_patches=[ + PackageCommitPatchData( + vcs_url="https://foobar.vcs/", + commit_hash="982f801f", + ), + ], + ), + ], + patches=[], + advisory_id="GHSA-1234", + date_published=datetime.now() - timedelta(days=10), + url="https://example.com/advisory", ) @patch("vulnerabilities.pipelines.v2_improvers.unfurl_version_range.get_purl_versions") def test_affecting_version_range_unfurl(self, mock_fetch): + insert_advisory_v2( + advisory=self.advisory1, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) self.assertEqual(1, PackageV2.objects.count()) mock_fetch.return_value = {"3.4.1", "3.9.0", "2.1.0", "4.0.0", "4.1.0"} pipeline = UnfurlVersionRangePipeline() @@ -67,3 +111,53 @@ def test_affecting_version_range_unfurl(self, mock_fetch): self.assertEqual(3, PackageV2.objects.count()) self.assertEqual(1, impact.fixed_by_packages.count()) self.assertEqual(2, impact.affecting_packages.count()) + self.assertNotEqual(None, impact.last_range_unfurl_at) + self.assertNotEqual(None, impact.last_successful_range_unfurl_at) + + def test_impacted_package_qs_dont_process_empty_vers(self): + insert_advisory_v2( + advisory=self.advisory2, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + + self.assertEqual(3, ImpactedPackage.objects.count()) + self.assertEqual(2, impacted_package_qs().count()) + + def test_impacted_package_qs_dont_process_empty_vers(self): + insert_advisory_v2( + advisory=self.advisory2, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + impact = ImpactedPackage.objects.filter(affecting_vers__isnull=False).first() + impact.last_range_unfurl_at = timezone.now() + impact.save() + + self.assertEqual(1, impacted_package_qs().count()) + + def test_impacted_package_qs_prioritize_never_unfurled_impact_first(self): + insert_advisory_v2( + advisory=self.advisory2, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + impact = ImpactedPackage.objects.filter(affecting_vers__isnull=False).first() + impact.last_range_unfurl_at = timezone.now() - timedelta(days=4) + impact.save() + + self.assertEqual(2, impacted_package_qs().count()) + first_impact_to_process = impacted_package_qs().first() + self.assertEqual(None, first_impact_to_process.last_range_unfurl_at) + + def test_impacted_package_reunfurl_vers(self): + insert_advisory_v2( + advisory=self.advisory2, + pipeline_id="test_pipeline_v2", + logger=self.logger.write, + ) + impact = ImpactedPackage.objects.filter(affecting_vers__isnull=False).first() + impact.last_range_unfurl_at = timezone.now() + impact.save() + + self.assertEqual(1, impacted_package_qs().count()) diff --git a/vulnerablecode/settings.py b/vulnerablecode/settings.py index 4c480cbc8..8ec5f6e31 100644 --- a/vulnerablecode/settings.py +++ b/vulnerablecode/settings.py @@ -392,7 +392,13 @@ "PORT": env.str("VULNERABLECODE_REDIS_PORT", default="6379"), "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=""), "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_REDIS_DEFAULT_TIMEOUT", default=3600), - } + }, + "high": { + "HOST": env.str("VULNERABLECODE_REDIS_HOST", default="localhost"), + "PORT": env.str("VULNERABLECODE_REDIS_PORT", default="6379"), + "PASSWORD": env.str("VULNERABLECODE_REDIS_PASSWORD", default=""), + "DEFAULT_TIMEOUT": env.int("VULNERABLECODE_REDIS_DEFAULT_TIMEOUT", default=3600), + }, }