Skip to content

WIP: track mitigated status better in reimport #12258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: bugfix
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from datetime import timezone
from warnings import warn

from django.core.files.uploadedfile import TemporaryUploadedFile
Expand Down Expand Up @@ -98,6 +99,7 @@ def process_scan(
reactivated_findings,
findings_to_mitigate,
untouched_findings,
mitigated_findings,
) = self.determine_process_method(parsed_findings, **kwargs)
# Close any old findings in the processed list if the the user specified for that
# to occur in the form that is then passed to the kwargs
Expand All @@ -119,21 +121,22 @@ def process_scan(
# feature enabled
test_import_history = self.update_import_history(
new_findings=new_findings,
closed_findings=closed_findings,
# closed_findings=mitigated_findings + closed_findings,
closed_findings=mitigated_findings + closed_findings,
reactivated_findings=reactivated_findings,
untouched_findings=untouched_findings,
)
# Send out som notifications to the user
logger.debug("REIMPORT_SCAN: Generating notifications")
updated_count = (
len(closed_findings) + len(reactivated_findings) + len(new_findings)
len(mitigated_findings) + len(closed_findings) + len(reactivated_findings) + len(new_findings)
)
self.notify_scan_added(
self.test,
updated_count,
new_findings=new_findings,
findings_reactivated=reactivated_findings,
findings_mitigated=closed_findings,
findings_mitigated=mitigated_findings + closed_findings,
findings_untouched=untouched_findings,
)
# Update the test progress to reflect that the import has completed
Expand All @@ -144,7 +147,7 @@ def process_scan(
self.test,
updated_count,
len(new_findings),
len(closed_findings),
len(closed_findings) + len(mitigated_findings),
len(reactivated_findings),
len(untouched_findings),
test_import_history,
Expand All @@ -167,6 +170,7 @@ def process_findings(
self.new_items = []
self.reactivated_items = []
self.unchanged_items = []
self.mitigated_items = []
self.group_names_to_findings_dict = {}

logger.debug(f"starting reimport of {len(parsed_findings) if parsed_findings else 0} items.")
Expand Down Expand Up @@ -235,14 +239,14 @@ def process_findings(
else:
finding.save(push_to_jira=self.push_to_jira)

self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items))
self.to_mitigate = (set(self.original_items) - set(self.reactivated_items) - set(self.unchanged_items) - set(self.mitigated_items))
# due to #3958 we can have duplicates inside the same report
# this could mean that a new finding is created and right after
# that it is detected as the 'matched existing finding' for a
# following finding in the same report
# this means untouched can have this finding inside it,
# while it is in fact a new finding. So we subtract new_items
self.untouched = set(self.unchanged_items) - set(self.to_mitigate) - set(self.new_items) - set(self.reactivated_items)
self.untouched = set(self.unchanged_items) - set(self.to_mitigate) - set(self.new_items) - set(self.reactivated_items) - set(self.mitigated_items)
# Process groups
self.process_groups_for_all_findings(**kwargs)
# Process the results and return them back
Expand Down Expand Up @@ -338,6 +342,7 @@ def async_process_findings(
new_findings = []
reactivated_findings = []
findings_to_mitigate = []
mitigated_findings = []
untouched_findings = []
# First kick off all the workers
for findings_list in chunk_list:
Expand All @@ -357,6 +362,7 @@ def async_process_findings(
serial_reactivated_findings,
serial_findings_to_mitigate,
serial_untouched_findings,
serial_mitigated_findings,
) = results
new_findings += [
next(deserialize("json", finding)).object
Expand All @@ -374,8 +380,13 @@ def async_process_findings(
next(deserialize("json", finding)).object
for finding in serial_untouched_findings
]
mitigated_findings += [
next(deserialize("json", finding)).object
for finding in serial_mitigated_findings
]

logger.debug("REIMPORT_SCAN: All Findings Collected")
return new_findings, reactivated_findings, findings_to_mitigate, untouched_findings
return new_findings, reactivated_findings, findings_to_mitigate, untouched_findings, mitigated_findings

def match_new_finding_to_existing_finding(
self,
Expand Down Expand Up @@ -565,6 +576,7 @@ def process_matched_mitigated_finding(
)
self.endpoint_manager.chunk_endpoints_and_reactivate(endpoint_statuses)
existing_finding.notes.add(note)
logger.warning(f"VALENTIJN4 reactivated existing {existing_finding.id} unsaved {unsaved_finding.id}")
self.reactivated_items.append(existing_finding)
# The new finding is active while the existing on is mitigated. The existing finding needs to
# be updated in some way
Expand Down Expand Up @@ -603,6 +615,9 @@ def process_matched_active_finding(
existing_finding.active = False
if self.verified is not None:
existing_finding.verified = self.verified
logger.warning(f"VALENTIJN1 mitigated existing {existing_finding.id} unsaved {unsaved_finding.id}")
# raise ValueError("VALENTIJN1")
self.mitigated_items.append(existing_finding)
elif unsaved_finding.risk_accepted or unsaved_finding.false_p or unsaved_finding.out_of_scope:
logger.debug("Reimported mitigated item matches a finding that is currently open, closing.")
logger.debug(
Expand All @@ -612,11 +627,19 @@ def process_matched_active_finding(
existing_finding.risk_accepted = unsaved_finding.risk_accepted
existing_finding.false_p = unsaved_finding.false_p
existing_finding.out_of_scope = unsaved_finding.out_of_scope
existing_finding.is_mitigated = True
existing_finding.mitigated_by = existing_finding.mitigated_by or unsaved_finding.mitigated_by
existing_finding.mitigated = existing_finding.mitigated or unsaved_finding.mitigated or timezone.now()
existing_finding.active = False
if self.verified is not None:
existing_finding.verified = self.verified

logger.warning(f"VALENTIJN2 mitigated existing {existing_finding.id} unsaved {unsaved_finding.id}")
# raise ValueError("VALENTIJN2")
self.mitigated_items.append(existing_finding)
else:
# if finding is the same but list of affected was changed, finding is marked as unchanged. This is a known issue
logger.warning(f"VALENTIJN3 unchanged existing {existing_finding.id} unsaved {unsaved_finding.id}")
self.unchanged_items.append(existing_finding)
# Set the component name and version on the existing finding if it is present
# on the old finding, but not present on the existing finding (do not override)
Expand Down Expand Up @@ -744,13 +767,17 @@ def process_results(
serialized_untouched = [
serialize("json", [finding]) for finding in self.untouched
]
serialized_mitigated = [
serialize("json", [finding]) for finding in self.mitigated_items
]
return (
serialized_new_items,
serialized_reactivated_items,
serialized_to_mitigate,
serialized_untouched,
serialized_mitigated,
)
return self.new_items, self.reactivated_items, self.to_mitigate, self.untouched
return self.new_items, self.reactivated_items, self.to_mitigate, self.untouched, self.mitigated_items

def calculate_unsaved_finding_hash_code(
self,
Expand Down
2 changes: 1 addition & 1 deletion unittests/test_import_reimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,7 +1460,7 @@ def test_import_history_reactivated_and_untouched_findings_do_not_mix(self):
# reimport the second report
self.reimport_scan_with_params(test_id, self.generic_import_2, scan_type=self.scan_type_generic)
# reimport the first report again
self.reimport_scan_with_params(test_id, self.generic_import_1, scan_type=self.scan_type_generic)
# self.reimport_scan_with_params(test_id, self.generic_import_1, scan_type=self.scan_type_generic)
# Passing this test means an exception does not occur

def test_dynamic_parsing_field_set_to_true(self):
Expand Down
Loading