Skip to content

ref(aci): dual write workflow group action status #92522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion src/sentry/workflow_engine/processors/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
ActionGroupStatus,
DataCondition,
DataConditionGroup,
DataConditionGroupAction,
Workflow,
WorkflowActionGroupStatus,
WorkflowDataConditionGroup,
WorkflowFireHistory,
)
Expand Down Expand Up @@ -94,7 +97,7 @@ def create_workflow_fire_histories(


# TODO(cathy): only reinforce workflow frequency for certain issue types
def filter_recently_fired_workflow_actions(
def filter_recently_fired_actions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By name, this just filters what we pass in, but it also does status and history updating.
I think a docstring with "Returns actions associated with the provided DataConditionGroups, excluding those that ... whatever. Also updates ...".

I do wonder if some of the book-keeping could be separated from the filtering to simplify that.

filtered_action_groups: set[DataConditionGroup], event_data: WorkflowEventData
) -> BaseQuerySet[Action]:
# get the actions for any of the triggered data condition groups
Expand Down Expand Up @@ -127,11 +130,143 @@ def filter_recently_fired_workflow_actions(
actions_without_statuses_ids = {action.id for action in actions_without_statuses}
filtered_actions = actions.filter(id__in=actions_to_include | actions_without_statuses_ids)

# dual write to WorkflowActionGroupStatus, ignoring results for now until they are canonical
_ = filter_recently_fired_workflow_actions(filtered_action_groups, event_data)

create_workflow_fire_histories(filtered_actions, event_data)

return filtered_actions


def get_workflow_action_group_statuses(
action_to_workflows_ids: dict[int, set[int]], group: Group, workflow_ids: set[int]
) -> dict[int, list[WorkflowActionGroupStatus]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring.. "returns them grouped by Action ID" as I'm not sure the key is obvious enough from name or type.

"""
Returns a mapping of action IDs to their corresponding WorkflowActionGroupStatus objects
given the provided action_to_workflows_ids and group.
"""

all_statuses = WorkflowActionGroupStatus.objects.filter(
group=group, action_id__in=action_to_workflows_ids.keys(), workflow_id__in=workflow_ids
)

actions_with_statuses: dict[int, list[WorkflowActionGroupStatus]] = defaultdict(list)

for status in all_statuses:
workflow_id = status.workflow_id
action_id = status.action_id
if workflow_id not in action_to_workflows_ids[action_id]:
# if the (workflow, action) combination shouldn't be processed, skip it
# more difficult to query than to iterate
continue

actions_with_statuses[action_id].append(status)

return actions_with_statuses


def process_workflow_action_group_statuses(
action_to_workflows_ids: dict[int, set[int]],
action_to_statuses: dict[int, list[WorkflowActionGroupStatus]],
workflows: BaseQuerySet[Workflow],
group: Group,
now: datetime,
) -> tuple[dict[int, int], set[int], list[WorkflowActionGroupStatus]]:
"""
Determine which workflow actions should be fired based on their statuses.
Prepare the statuses to update and create.
"""

action_to_workflow_ids: dict[int, int] = {} # will dedupe because there can be only 1
workflow_frequencies = {
workflow.id: workflow.config.get("frequency", 0) * timedelta(minutes=1)
for workflow in workflows
}
statuses_to_update: set[int] = set()

for action_id, statuses in action_to_statuses.items():
for status in statuses:
if (now - status.date_updated) > workflow_frequencies.get(status.workflow_id, 0):
# we should fire the workflow for this action
action_to_workflow_ids[action_id] = status.workflow_id
statuses_to_update.add(status.id)

missing_statuses: list[WorkflowActionGroupStatus] = []
for action_id, expected_workflows in action_to_workflows_ids.items():
wags = action_to_statuses.get(action_id, [])
actual_workflows = {status.workflow_id for status in wags}
missing_workflows = expected_workflows - actual_workflows

for workflow_id in missing_workflows:
# create a new status for the missing workflow
missing_statuses.append(
WorkflowActionGroupStatus(
workflow_id=workflow_id, action_id=action_id, group=group, date_updated=now
)
)
action_to_workflow_ids[action_id] = workflow_id

return action_to_workflow_ids, statuses_to_update, missing_statuses


def update_workflow_action_group_statuses(
now: datetime, statuses_to_update: set[int], missing_statuses: list[WorkflowActionGroupStatus]
) -> None:
WorkflowActionGroupStatus.objects.filter(
id__in=statuses_to_update, date_updated__lt=now
).update(date_updated=now)

WorkflowActionGroupStatus.objects.bulk_create(
missing_statuses,
batch_size=1000,
ignore_conflicts=True,
)


def filter_recently_fired_workflow_actions(
filtered_action_groups: set[DataConditionGroup], event_data: WorkflowEventData
) -> BaseQuerySet[Action]:
"""
Returns actions associated with the provided DataConditionsGroups, excluding those that have been recently fired. Also updates associated WorkflowActionGroupStatus objects.
"""

data_condition_group_actions = DataConditionGroupAction.objects.filter(
condition_group__in=filtered_action_groups
).values_list("action_id", "condition_group__workflowdataconditiongroup__workflow_id")

action_to_workflows_ids: dict[int, set[int]] = defaultdict(set)
workflow_ids: set[int] = set()

for action_id, workflow_id in data_condition_group_actions:
action_to_workflows_ids[action_id].add(workflow_id)
workflow_ids.add(workflow_id)

workflows = Workflow.objects.filter(id__in=workflow_ids)

action_to_statuses = get_workflow_action_group_statuses(
action_to_workflows_ids=action_to_workflows_ids,
group=event_data.event.group,
workflow_ids=workflow_ids,
)
now = timezone.now()
action_to_workflow_ids, statuses_to_update, missing_statuses = (
process_workflow_action_group_statuses(
action_to_workflows_ids=action_to_workflows_ids,
action_to_statuses=action_to_statuses,
workflows=workflows,
group=event_data.event.group,
now=now,
)
)
update_workflow_action_group_statuses(now, statuses_to_update, missing_statuses)

# TODO: write this in a single spot
# create_workflow_fire_histories
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda think this should be lifted out of here instead; noting which workflows fire isn't really a job for action filtering; the higher level stuff needs to know "we have our list of workflow actions to fire". If we bump this to a higher level, this TODO doesn't need to exist and I think the correctness of it all is clearer.
That said, not a change for this PR.


# TODO: somehow attach workflows so we can fire actions with the appropriate workflow env
return Action.objects.filter(id__in=list(action_to_workflow_ids.keys()))


def get_available_action_integrations_for_org(organization: Organization) -> list[RpcIntegration]:
providers = [
handler.provider_slug
Expand Down
6 changes: 2 additions & 4 deletions src/sentry/workflow_engine/processors/delayed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
Condition,
)
from sentry.workflow_engine.models.data_condition_group import get_slow_conditions
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
from sentry.workflow_engine.processors.action import filter_recently_fired_actions
from sentry.workflow_engine.processors.data_condition_group import evaluate_data_conditions
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.processors.log_util import track_batch_performance
Expand Down Expand Up @@ -438,9 +438,7 @@ def fire_actions_for_groups(
action_filters.add(dcg)

# process action filters
filtered_actions = filter_recently_fired_workflow_actions(
action_filters, event_data
)
filtered_actions = filter_recently_fired_actions(action_filters, event_data)

# process workflow_triggers
workflows = set(
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
Detector,
Workflow,
)
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
from sentry.workflow_engine.processors.action import filter_recently_fired_actions
from sentry.workflow_engine.processors.data_condition_group import process_data_condition_group
from sentry.workflow_engine.processors.detector import get_detector_by_event
from sentry.workflow_engine.types import WorkflowEventData
Expand Down Expand Up @@ -177,7 +177,7 @@ def evaluate_workflows_action_filters(
},
)

return filter_recently_fired_workflow_actions(filtered_action_groups, event_data)
return filter_recently_fired_actions(filtered_action_groups, event_data)


def process_workflows(event_data: WorkflowEventData) -> set[Workflow]:
Expand Down
Loading
Loading