Skip to content

♻️ Remove async import #12042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,5 @@ You can execute the following command to see the configuration:
`docker compose exec celerybeat bash -c "celery -A dojo inspect stats"`
and see what is in effect.

### Asynchronous Import

<span style="background-color:rgba(242, 86, 29, 0.3)">This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.</span>

Import and Re-Import can also be configured to handle uploads asynchronously to aid in
processing especially large scans. It works by batching Findings and Endpoints by a
configurable amount. Each batch will be be processed in separate celery tasks.

The following variables impact async imports.

- `DD_ASYNC_FINDING_IMPORT` defaults to False
- `DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE` defaults to 100

When using asynchronous imports with dynamic scanners, Endpoints will continue to "trickle" in
even after the import has returned a successful response. This is because processing continues
to occur after the Findings have already been imported.

To determine if an import has been fully completed, please see the progress bar in the appropriate test.
### Asynchronous Import: Deprecated
This feature has been removed in 2.47.0
4 changes: 4 additions & 0 deletions docs/content/en/open_source/upgrading/2.47.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ weight: -20250505
description: No special instructions.
---
There are no special instructions for upgrading to 2.47.x. Check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.47.0) for the contents of the release.

## Removal of Asynchronous Import

Please note that asynchronous import has been removed as it was announced in 2.46. If you haven't migrated from this feature yet, we recommend doing before upgrading to 2.47.0
41 changes: 0 additions & 41 deletions dojo/importers/base_importer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import base64
import logging
from warnings import warn

from django.conf import settings
from django.core.exceptions import ValidationError
Expand Down Expand Up @@ -255,33 +254,11 @@ def sync_process_findings(
"""
return self.process_findings(parsed_findings, sync=True, **kwargs)

def async_process_findings(
self,
parsed_findings: list[Finding],
**kwargs: dict,
) -> list[Finding]:
"""
Processes findings in chunks within N number of processes. The
ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many
findings will be processed in a given worker/process/thread
"""
warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.", stacklevel=2)
return self.process_findings(parsed_findings, sync=False, **kwargs)

def determine_process_method(
self,
parsed_findings: list[Finding],
**kwargs: dict,
) -> list[Finding]:
"""
Determines whether to process the scan iteratively, or in chunks,
based upon the ASYNC_FINDING_IMPORT setting
"""
if settings.ASYNC_FINDING_IMPORT:
return self.async_process_findings(
parsed_findings,
**kwargs,
)
return self.sync_process_findings(
parsed_findings,
**kwargs,
Expand Down Expand Up @@ -513,24 +490,6 @@ def construct_imported_message(

return message

def chunk_findings(
self,
finding_list: list[Finding],
chunk_size: int = settings.ASYNC_FINDING_IMPORT_CHUNK_SIZE,
) -> list[list[Finding]]:
"""
Split a single large list into a list of lists of size `chunk_size`.
For Example
```
>>> chunk_findings([A, B, C, D, E], 2)
>>> [[A, B], [B, C], [E]]
```
"""
# Break the list of parsed findings into "chunk_size" lists
chunk_list = [finding_list[i:i + chunk_size] for i in range(0, len(finding_list), chunk_size)]
logger.debug(f"Split endpoints/findings into {len(chunk_list)} chunks of {chunk_size}")
return chunk_list

def update_test_progress(
self,
percentage_value: int = 100,
Expand Down
38 changes: 1 addition & 37 deletions dojo/importers/default_importer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from warnings import warn

from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.serializers import deserialize, serialize
from django.core.serializers import serialize
from django.db.models.query_utils import Q
from django.urls import reverse

Expand Down Expand Up @@ -350,38 +349,3 @@ def parse_findings_dynamic_test_type(
"""
logger.debug("IMPORT_SCAN parser v2: Create Test and parse findings")
return super().parse_findings_dynamic_test_type(scan, parser)

def async_process_findings(
self,
parsed_findings: list[Finding],
**kwargs: dict,
) -> list[Finding]:
"""
Processes findings in chunks within N number of processes. The
ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many
findings will be processed in a given worker/process/thread
"""
warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.", stacklevel=2)
chunk_list = self.chunk_findings(parsed_findings)
results_list = []
new_findings = []
# First kick off all the workers
for findings_list in chunk_list:
result = self.process_findings(
findings_list,
sync=False,
**kwargs,
)
# Since I dont want to wait until the task is done right now, save the id
# So I can check on the task later
results_list += [result]
# After all tasks have been started, time to pull the results
logger.info("IMPORT_SCAN: Collecting Findings")
for results in results_list:
serial_new_findings = results
new_findings += [next(deserialize("json", finding)).object for finding in serial_new_findings]
logger.info("IMPORT_SCAN: All Findings Collected")
# Indicate that the test is not complete yet as endpoints will still be rolling in.
self.test.percent_complete = 50
self.test.save()
return new_findings
60 changes: 1 addition & 59 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import logging
from warnings import warn

from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.serializers import deserialize, serialize
from django.core.serializers import serialize
from django.db.models.query_utils import Q

import dojo.finding.helper as finding_helper
Expand Down Expand Up @@ -314,63 +313,6 @@ def parse_findings_dynamic_test_type(
logger.debug("REIMPORT_SCAN parser v2: Create parse findings")
return super().parse_findings_dynamic_test_type(scan, parser)

def async_process_findings(
self,
parsed_findings: list[Finding],
**kwargs: dict,
) -> tuple[list[Finding], list[Finding], list[Finding], list[Finding]]:
"""
Processes findings in chunks within N number of processes. The
ASYNC_FINDING_IMPORT_CHUNK_SIZE setting will determine how many
findings will be processed in a given worker/process/thread
"""
warn("This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.", stacklevel=2)
# Indicate that the test is not complete yet as endpoints will still be rolling in.
self.update_test_progress(percentage_value=50)
chunk_list = self.chunk_findings(parsed_findings)
results_list = []
new_findings = []
reactivated_findings = []
findings_to_mitigate = []
untouched_findings = []
# First kick off all the workers
for findings_list in chunk_list:
result = self.process_findings(
findings_list,
sync=False,
**kwargs,
)
# Since I dont want to wait until the task is done right now, save the id
# So I can check on the task later
results_list += [result]
# After all tasks have been started, time to pull the results
logger.debug("REIMPORT_SCAN: Collecting Findings")
for results in results_list:
(
serial_new_findings,
serial_reactivated_findings,
serial_findings_to_mitigate,
serial_untouched_findings,
) = results
new_findings += [
next(deserialize("json", finding)).object
for finding in serial_new_findings
]
reactivated_findings += [
next(deserialize("json", finding)).object
for finding in serial_reactivated_findings
]
findings_to_mitigate += [
next(deserialize("json", finding)).object
for finding in serial_findings_to_mitigate
]
untouched_findings += [
next(deserialize("json", finding)).object
for finding in serial_untouched_findings
]
logger.debug("REIMPORT_SCAN: All Findings Collected")
return new_findings, reactivated_findings, findings_to_mitigate, untouched_findings

def match_new_finding_to_existing_finding(
self,
unsaved_finding: Finding,
Expand Down
75 changes: 4 additions & 71 deletions dojo/importers/endpoint_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

from django.conf import settings
from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.urls import reverse
from django.utils import timezone
Expand Down Expand Up @@ -95,48 +94,14 @@ def reactivate_endpoint_status(
endpoint_status.save()
return

def chunk_endpoints(
self,
endpoint_list: list[Endpoint],
chunk_size: int = settings.ASYNC_FINDING_IMPORT_CHUNK_SIZE,
) -> list[list[Endpoint]]:
"""
Split a single large list into a list of lists of size `chunk_size`.
For Example
```
>>> chunk_endpoints([A, B, C, D, E], 2)
>>> [[A, B], [B, C], [E]]
```
"""
# Break the list of parsed findings into "chunk_size" lists
chunk_list = [endpoint_list[i:i + chunk_size] for i in range(0, len(endpoint_list), chunk_size)]
logger.debug(f"Split endpoints into {len(chunk_list)} chunks of {chunk_size}")
return chunk_list

def chunk_endpoints_and_disperse(
self,
finding: Finding,
endpoints: list[Endpoint],
**kwargs: dict,
) -> None:
"""
Determines whether to asynchronously process endpoints on a finding or not. if so,
chunk up the findings to be dispersed into individual celery workers. Otherwise,
only use one worker
"""
if settings.ASYNC_FINDING_IMPORT:
chunked_list = self.chunk_endpoints(endpoints)
# If there is only one chunk, then do not bother with async
if len(chunked_list) < 2:
self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True)
return []
# First kick off all the workers
for endpoints_list in chunked_list:
self.add_endpoints_to_unsaved_finding(finding, endpoints_list, sync=False)
else:
# Do not run this asynchronously or chunk the endpoints
self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True)
return None
self.add_endpoints_to_unsaved_finding(finding, endpoints, sync=True)
return

def clean_unsaved_endpoints(
self,
Expand All @@ -158,23 +123,7 @@ def chunk_endpoints_and_reactivate(
endpoint_status_list: list[Endpoint_Status],
**kwargs: dict,
) -> None:
"""
Reactivates all endpoint status objects. Whether this function will asynchronous or not is dependent
on the ASYNC_FINDING_IMPORT setting. If it is set to true, endpoint statuses will be chunked,
and dispersed over celery workers.
"""
# Determine if this can be run async
if settings.ASYNC_FINDING_IMPORT:
chunked_list = self.chunk_endpoints(endpoint_status_list)
# If there is only one chunk, then do not bother with async
if len(chunked_list) < 2:
self.reactivate_endpoint_status(endpoint_status_list, sync=True)
logger.debug(f"Split endpoints into {len(chunked_list)} chunks of {len(chunked_list[0])}")
# First kick off all the workers
for endpoint_status_list in chunked_list:
self.reactivate_endpoint_status(endpoint_status_list, sync=False)
else:
self.reactivate_endpoint_status(endpoint_status_list, sync=True)
self.reactivate_endpoint_status(endpoint_status_list, sync=True)
return

def chunk_endpoints_and_mitigate(
Expand All @@ -183,23 +132,7 @@ def chunk_endpoints_and_mitigate(
user: Dojo_User,
**kwargs: dict,
) -> None:
"""
Mitigates all endpoint status objects. Whether this function will asynchronous or not is dependent
on the ASYNC_FINDING_IMPORT setting. If it is set to true, endpoint statuses will be chunked,
and dispersed over celery workers.
"""
# Determine if this can be run async
if settings.ASYNC_FINDING_IMPORT:
chunked_list = self.chunk_endpoints(endpoint_status_list)
# If there is only one chunk, then do not bother with async
if len(chunked_list) < 2:
self.mitigate_endpoint_status(endpoint_status_list, user, sync=True)
logger.debug(f"Split endpoints into {len(chunked_list)} chunks of {len(chunked_list[0])}")
# First kick off all the workers
for endpoint_status_list in chunked_list:
self.mitigate_endpoint_status(endpoint_status_list, user, sync=False)
else:
self.mitigate_endpoint_status(endpoint_status_list, user, sync=True)
self.mitigate_endpoint_status(endpoint_status_list, user, sync=True)
return

def update_endpoint_status(
Expand Down
10 changes: 0 additions & 10 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,6 @@
DD_RATE_LIMITER_ACCOUNT_LOCKOUT=(bool, False),
# when enabled SonarQube API parser will download the security hotspots
DD_SONARQUBE_API_PARSER_HOTSPOTS=(bool, True),
# when enabled, finding importing will occur asynchronously, default False
# This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.
DD_ASYNC_FINDING_IMPORT=(bool, False),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If True, would it make sense to create an announcement banner or other alert to notify the users / admins that they are using a feature that is no longer present?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have the time right now to implement this.

# The number of findings to be processed per celeryworker
# This experimental feature has been deprecated as of DefectDojo 2.44.0 (March release). Please exercise caution if using this feature with an older version of DefectDojo, as results may be inconsistent.
DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE=(int, 100),
# When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement
# The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement
DD_ASYNC_OBJECT_DELETE=(bool, False),
Expand Down Expand Up @@ -1790,10 +1784,6 @@ def saml2_attrib_map_format(din):
# Deside if SonarQube API parser should download the security hotspots
SONARQUBE_API_PARSER_HOTSPOTS = env("DD_SONARQUBE_API_PARSER_HOTSPOTS")

# when enabled, finding importing will occur asynchronously, default False
ASYNC_FINDING_IMPORT = env("DD_ASYNC_FINDING_IMPORT")
# The number of findings to be processed per celeryworker
ASYNC_FINDING_IMPORT_CHUNK_SIZE = env("DD_ASYNC_FINDING_IMPORT_CHUNK_SIZE")
# When enabled, deleting objects will be occur from the bottom up. In the example of deleting an engagement
# The objects will be deleted as follows Endpoints -> Findings -> Tests -> Engagement
ASYNC_OBJECT_DELETE = env("DD_ASYNC_OBJECT_DELETE")
Expand Down