-
-
Notifications
You must be signed in to change notification settings - Fork 4.4k
ref(aci): dual write workflow group action status #92522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fa02abf
3e38f69
63ec4a6
de60178
b24816d
ffcbecb
441484a
0f7e7a7
96a25f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,9 @@ | |
ActionGroupStatus, | ||
DataCondition, | ||
DataConditionGroup, | ||
DataConditionGroupAction, | ||
Workflow, | ||
WorkflowActionGroupStatus, | ||
WorkflowDataConditionGroup, | ||
WorkflowFireHistory, | ||
) | ||
|
@@ -94,7 +97,7 @@ def create_workflow_fire_histories( | |
|
||
|
||
# TODO(cathy): only reinforce workflow frequency for certain issue types | ||
def filter_recently_fired_workflow_actions( | ||
def filter_recently_fired_actions( | ||
filtered_action_groups: set[DataConditionGroup], event_data: WorkflowEventData | ||
) -> BaseQuerySet[Action]: | ||
# get the actions for any of the triggered data condition groups | ||
|
@@ -127,11 +130,143 @@ def filter_recently_fired_workflow_actions( | |
actions_without_statuses_ids = {action.id for action in actions_without_statuses} | ||
filtered_actions = actions.filter(id__in=actions_to_include | actions_without_statuses_ids) | ||
|
||
# dual write to WorkflowActionGroupStatus, ignoring results for now until they are canonical | ||
_ = filter_recently_fired_workflow_actions(filtered_action_groups, event_data) | ||
|
||
create_workflow_fire_histories(filtered_actions, event_data) | ||
|
||
return filtered_actions | ||
|
||
|
||
def get_workflow_action_group_statuses( | ||
action_to_workflows_ids: dict[int, set[int]], group: Group, workflow_ids: set[int] | ||
) -> dict[int, list[WorkflowActionGroupStatus]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Docstring.. "returns them grouped by Action ID" as I'm not sure the key is obvious enough from name or type. |
||
""" | ||
Returns a mapping of action IDs to their corresponding WorkflowActionGroupStatus objects | ||
given the provided action_to_workflows_ids and group. | ||
""" | ||
|
||
all_statuses = WorkflowActionGroupStatus.objects.filter( | ||
group=group, action_id__in=action_to_workflows_ids.keys(), workflow_id__in=workflow_ids | ||
) | ||
|
||
actions_with_statuses: dict[int, list[WorkflowActionGroupStatus]] = defaultdict(list) | ||
|
||
for status in all_statuses: | ||
workflow_id = status.workflow_id | ||
action_id = status.action_id | ||
if workflow_id not in action_to_workflows_ids[action_id]: | ||
# if the (workflow, action) combination shouldn't be processed, skip it | ||
# more difficult to query than to iterate | ||
continue | ||
|
||
actions_with_statuses[action_id].append(status) | ||
|
||
return actions_with_statuses | ||
|
||
|
||
def process_workflow_action_group_statuses( | ||
action_to_workflows_ids: dict[int, set[int]], | ||
action_to_statuses: dict[int, list[WorkflowActionGroupStatus]], | ||
workflows: BaseQuerySet[Workflow], | ||
group: Group, | ||
now: datetime, | ||
) -> tuple[dict[int, int], set[int], list[WorkflowActionGroupStatus]]: | ||
""" | ||
Determine which workflow actions should be fired based on their statuses. | ||
Prepare the statuses to update and create. | ||
""" | ||
|
||
action_to_workflow_ids: dict[int, int] = {} # will dedupe because there can be only 1 | ||
workflow_frequencies = { | ||
workflow.id: workflow.config.get("frequency", 0) * timedelta(minutes=1) | ||
for workflow in workflows | ||
} | ||
statuses_to_update: set[int] = set() | ||
|
||
for action_id, statuses in action_to_statuses.items(): | ||
for status in statuses: | ||
if (now - status.date_updated) > workflow_frequencies.get(status.workflow_id, 0): | ||
# we should fire the workflow for this action | ||
action_to_workflow_ids[action_id] = status.workflow_id | ||
statuses_to_update.add(status.id) | ||
|
||
missing_statuses: list[WorkflowActionGroupStatus] = [] | ||
for action_id, expected_workflows in action_to_workflows_ids.items(): | ||
wags = action_to_statuses.get(action_id, []) | ||
actual_workflows = {status.workflow_id for status in wags} | ||
missing_workflows = expected_workflows - actual_workflows | ||
|
||
for workflow_id in missing_workflows: | ||
# create a new status for the missing workflow | ||
missing_statuses.append( | ||
WorkflowActionGroupStatus( | ||
workflow_id=workflow_id, action_id=action_id, group=group, date_updated=now | ||
) | ||
) | ||
action_to_workflow_ids[action_id] = workflow_id | ||
|
||
return action_to_workflow_ids, statuses_to_update, missing_statuses | ||
|
||
|
||
def update_workflow_action_group_statuses( | ||
now: datetime, statuses_to_update: set[int], missing_statuses: list[WorkflowActionGroupStatus] | ||
) -> None: | ||
WorkflowActionGroupStatus.objects.filter( | ||
id__in=statuses_to_update, date_updated__lt=now | ||
).update(date_updated=now) | ||
|
||
WorkflowActionGroupStatus.objects.bulk_create( | ||
missing_statuses, | ||
batch_size=1000, | ||
ignore_conflicts=True, | ||
) | ||
|
||
|
||
def filter_recently_fired_workflow_actions( | ||
filtered_action_groups: set[DataConditionGroup], event_data: WorkflowEventData | ||
) -> BaseQuerySet[Action]: | ||
""" | ||
Returns actions associated with the provided DataConditionsGroups, excluding those that have been recently fired. Also updates associated WorkflowActionGroupStatus objects. | ||
""" | ||
|
||
data_condition_group_actions = DataConditionGroupAction.objects.filter( | ||
condition_group__in=filtered_action_groups | ||
).values_list("action_id", "condition_group__workflowdataconditiongroup__workflow_id") | ||
|
||
action_to_workflows_ids: dict[int, set[int]] = defaultdict(set) | ||
workflow_ids: set[int] = set() | ||
|
||
for action_id, workflow_id in data_condition_group_actions: | ||
action_to_workflows_ids[action_id].add(workflow_id) | ||
workflow_ids.add(workflow_id) | ||
|
||
workflows = Workflow.objects.filter(id__in=workflow_ids) | ||
|
||
action_to_statuses = get_workflow_action_group_statuses( | ||
action_to_workflows_ids=action_to_workflows_ids, | ||
group=event_data.event.group, | ||
workflow_ids=workflow_ids, | ||
) | ||
now = timezone.now() | ||
action_to_workflow_ids, statuses_to_update, missing_statuses = ( | ||
process_workflow_action_group_statuses( | ||
action_to_workflows_ids=action_to_workflows_ids, | ||
action_to_statuses=action_to_statuses, | ||
workflows=workflows, | ||
group=event_data.event.group, | ||
now=now, | ||
) | ||
) | ||
update_workflow_action_group_statuses(now, statuses_to_update, missing_statuses) | ||
|
||
# TODO: write this in a single spot | ||
# create_workflow_fire_histories | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kinda think this should be lifted out of here instead; noting which workflows fire isn't really a job for action filtering; the higher level stuff needs to know "we have our list of workflow actions to fire". If we bump this to a higher level, this TODO doesn't need to exist and I think the correctness of it all is clearer. |
||
|
||
# TODO: somehow attach workflows so we can fire actions with the appropriate workflow env | ||
return Action.objects.filter(id__in=list(action_to_workflow_ids.keys())) | ||
|
||
|
||
def get_available_action_integrations_for_org(organization: Organization) -> list[RpcIntegration]: | ||
providers = [ | ||
handler.provider_slug | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By name, this just filters what we pass in, but it also does status and history updating.
I think a docstring with "Returns actions associated with the provided DataConditionGroups, excluding those that ... whatever. Also updates ...".
I do wonder if some of the book-keeping could be separated from the filtering to simplify that.