Skip to content

Add dynamic config function #842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions temporalio/worker/_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,18 @@ def __init__(
if defn.name in self._workflows:
raise ValueError(f"More than one workflow named {defn.name}")
if should_enforce_versioning_behavior:
if defn.versioning_behavior in [
None,
temporalio.common.VersioningBehavior.UNSPECIFIED,
]:
if (
defn.versioning_behavior
in [
None,
temporalio.common.VersioningBehavior.UNSPECIFIED,
]
and not defn.dynamic_config_fn
):
raise ValueError(
f"Workflow {defn.name} must specify a versioning behavior using "
"the `versioning_behavior` argument to `@workflow.defn`."
"the `versioning_behavior` argument to `@workflow.defn` or by "
"defining a function decorated with `@workflow.dynamic_config`."
)

# Prepare the workflow with the runner (this will error in the
Expand Down
62 changes: 53 additions & 9 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,15 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
# metadata query
self._current_details = ""

# The versioning behavior of this workflow, as established by annotation or by the dynamic
# config function. Is only set once upon initialization.
self._versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None

# Dynamic failure exception types as overridden by the dynamic config function
self._dynamic_failure_exception_types: Optional[
Sequence[type[BaseException]]
] = None

def get_thread_id(self) -> Optional[int]:
return self._current_thread_id

Expand All @@ -348,11 +357,7 @@ def activate(
temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion()
)
self._current_completion.successful.SetInParent()
self._current_completion.successful.versioning_behavior = (
self._defn.versioning_behavior.value
if self._defn.versioning_behavior
else temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
)

self._current_activation_error: Optional[Exception] = None
self._deployment_version_for_current_task = (
act.deployment_version_for_current_task
Expand Down Expand Up @@ -419,6 +424,12 @@ def activate(
)
activation_err = None

# Apply versioning behavior if one was established
if self._versioning_behavior:
self._current_completion.successful.versioning_behavior = (
self._versioning_behavior.value
)

# If we're deleting, there better be no more tasks. It is important for
# the integrity of the system that we check this. If there are tasks
# remaining, they and any associated coroutines will get garbage
Expand All @@ -439,7 +450,6 @@ def activate(
)
# Set completion failure
self._current_completion.failed.failure.SetInParent()
# TODO: Review - odd that we don't un-set success here?
try:
self._failure_converter.to_failure(
activation_err,
Expand Down Expand Up @@ -1728,19 +1738,53 @@ def _convert_payloads(
def _instantiate_workflow_object(self) -> Any:
if not self._workflow_input:
raise RuntimeError("Expected workflow input. This is a Python SDK bug.")

if hasattr(self._defn.cls.__init__, "__temporal_workflow_init"):
return self._defn.cls(*self._workflow_input.args)
workflow_instance = self._defn.cls(*self._workflow_input.args)
else:
return self._defn.cls()
workflow_instance = self._defn.cls()

if self._defn.versioning_behavior:
self._versioning_behavior = self._defn.versioning_behavior
# If there's a dynamic config function, call it now after we've instantiated the object
# but before we start executing the workflow
if self._defn.name is None and self._defn.dynamic_config_fn is not None:
dynamic_config = None
try:
with self._as_read_only():
dynamic_config = self._defn.dynamic_config_fn(workflow_instance)
except Exception as err:
logger.exception(
f"Failed to run dynamic config function in workflow {self._info.workflow_type}"
)
# Treat as a task failure
self._current_activation_error = err
raise self._current_activation_error

if dynamic_config:
if dynamic_config.failure_exception_types is not None:
self._dynamic_failure_exception_types = (
dynamic_config.failure_exception_types
)
if (
dynamic_config.versioning_behavior
!= temporalio.common.VersioningBehavior.UNSPECIFIED
):
self._versioning_behavior = dynamic_config.versioning_behavior

return workflow_instance

def _is_workflow_failure_exception(self, err: BaseException) -> bool:
# An exception is a failure instead of a task fail if it's already a
# failure error or if it is a timeout error or if it is an instance of
# any of the failure types in the worker or workflow-level setting
wf_failure_exception_types = self._defn.failure_exception_types
if self._dynamic_failure_exception_types is not None:
wf_failure_exception_types = self._dynamic_failure_exception_types
return (
isinstance(err, temporalio.exceptions.FailureError)
or isinstance(err, asyncio.TimeoutError)
or any(isinstance(err, typ) for typ in self._defn.failure_exception_types)
or any(isinstance(err, typ) for typ in wf_failure_exception_types)
or any(
isinstance(err, typ)
for typ in self._worker_level_failure_exception_types
Expand Down
70 changes: 70 additions & 0 deletions temporalio/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import asyncio
import contextvars
import dataclasses
import inspect
import logging
import threading
Expand Down Expand Up @@ -415,6 +416,61 @@ def decorator(
return decorator(fn.__name__, description, fn, bypass_async_check=True)


@dataclass(frozen=True)
class DynamicWorkflowConfig:
"""Returned by functions using the :py:func:`dynamic_config` decorator, see it for more."""

failure_exception_types: Optional[Sequence[Type[BaseException]]] = None
"""The types of exceptions that, if a workflow-thrown exception extends, will cause the
workflow/update to fail instead of suspending the workflow via task failure. These are applied
in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will
fail a workflow/update in all user exception cases.

Always overrides the equivalent parameter on :py:func:`defn` if set not-None.

WARNING: This setting is experimental.
"""
versioning_behavior: temporalio.common.VersioningBehavior = (
temporalio.common.VersioningBehavior.UNSPECIFIED
)
"""Specifies the versioning behavior to use for this workflow.

Always overrides the equivalent parameter on :py:func:`defn`.

WARNING: This setting is experimental.
"""


def dynamic_config(
fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig],
) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]:
"""Decorator to allow configuring a dynamic workflow's behavior.

Because dynamic workflows may conceptually represent more than one workflow type, it may be
desirable to have different settings for fields that would normally be passed to
:py:func:`defn`, but vary based on the workflow type name or other information available in
the workflow's context. This function will be called after the workflow's :py:func:`init`,
if it has one, but before the workflow's :py:func:`run` method.

The method must only take self as a parameter, and any values set in the class it returns will
override those provided to :py:func:`defn`.

Cannot be specified on non-dynamic workflows.

Args:
fn: The function to decorate.
"""
if inspect.iscoroutinefunction(fn):
raise ValueError("Workflow dynamic_config method must be synchronous")
params = list(inspect.signature(fn).parameters.values())
if len(params) != 1:
raise ValueError("Workflow dynamic_config method must only take self parameter")

# Add marker attribute
setattr(fn, "__temporal_workflow_dynamic_config", True)
return fn


@dataclass(frozen=True)
class Info:
"""Information about the running workflow.
Expand Down Expand Up @@ -1449,6 +1505,7 @@ class _Definition:
arg_types: Optional[List[Type]] = None
ret_type: Optional[Type] = None
versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None

@staticmethod
def from_class(cls: Type) -> Optional[_Definition]:
Expand Down Expand Up @@ -1513,6 +1570,7 @@ def _apply_to_class(
# Collect run fn and all signal/query/update fns
init_fn: Optional[Callable[..., None]] = None
run_fn: Optional[Callable[..., Awaitable[Any]]] = None
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None
seen_run_attr = False
signals: Dict[Optional[str], _SignalDefinition] = {}
queries: Dict[Optional[str], _QueryDefinition] = {}
Expand Down Expand Up @@ -1560,6 +1618,17 @@ def _apply_to_class(
queries[query_defn.name] = query_defn
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"):
init_fn = member
elif hasattr(member, "__temporal_workflow_dynamic_config"):
if workflow_name:
issues.append(
"@workflow.dynamic_config can only be used in dynamic workflows, but "
f"workflow class {workflow_name} ({cls.__name__}) is not dynamic"
)
if dynamic_config_fn:
issues.append(
"@workflow.dynamic_config can only be defined once per workflow"
)
dynamic_config_fn = member
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pedantic, but we should error if this is already set

elif isinstance(member, UpdateMethodMultiParam):
update_defn = member._defn
if update_defn.name in updates:
Expand Down Expand Up @@ -1643,6 +1712,7 @@ def _apply_to_class(
sandboxed=sandboxed,
failure_exception_types=failure_exception_types,
versioning_behavior=versioning_behavior,
dynamic_config_fn=dynamic_config_fn,
)
setattr(cls, "__temporal_workflow_definition", defn)
setattr(run_fn, "__temporal_workflow_definition", defn)
Expand Down
48 changes: 43 additions & 5 deletions tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
WorkerTuner,
WorkflowSlotInfo,
)
from temporalio.workflow import VersioningIntent
from temporalio.workflow import DynamicWorkflowConfig, VersioningIntent
from tests.helpers import (
assert_eventually,
find_free_port,
Expand Down Expand Up @@ -795,8 +795,24 @@ async def run(self, args: Sequence[RawValue]) -> str:
return "dynamic"


async def test_worker_deployment_dynamic_workflow_on_run(
client: Client, env: WorkflowEnvironment
@workflow.defn(dynamic=True, versioning_behavior=VersioningBehavior.PINNED)
class DynamicWorkflowVersioningOnConfigMethod:
@workflow.dynamic_config
def dynamic_config(self) -> DynamicWorkflowConfig:
return DynamicWorkflowConfig(
versioning_behavior=VersioningBehavior.AUTO_UPGRADE
)

@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
return "dynamic"


async def _test_worker_deployment_dynamic_workflow(
client: Client,
env: WorkflowEnvironment,
workflow_class,
expected_versioning_behavior: temporalio.api.enums.v1.VersioningBehavior.ValueType,
):
if env.supports_time_skipping:
pytest.skip("Test Server doesn't support worker deployments")
Expand All @@ -806,7 +822,7 @@ async def test_worker_deployment_dynamic_workflow_on_run(

async with new_worker(
client,
DynamicWorkflowVersioningOnDefn,
workflow_class,
deployment_config=WorkerDeploymentConfig(
version=worker_v1,
use_worker_versioning=True,
Expand All @@ -832,11 +848,33 @@ async def test_worker_deployment_dynamic_workflow_on_run(
assert any(
event.HasField("workflow_task_completed_event_attributes")
and event.workflow_task_completed_event_attributes.versioning_behavior
== temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED
== expected_versioning_behavior
for event in history.events
)


async def test_worker_deployment_dynamic_workflow_with_pinned_versioning(
client: Client, env: WorkflowEnvironment
):
await _test_worker_deployment_dynamic_workflow(
client,
env,
DynamicWorkflowVersioningOnDefn,
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_PINNED,
)


async def test_worker_deployment_dynamic_workflow_with_auto_upgrade_versioning(
client: Client, env: WorkflowEnvironment
):
await _test_worker_deployment_dynamic_workflow(
client,
env,
DynamicWorkflowVersioningOnConfigMethod,
temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
)


@workflow.defn
class NoVersioningAnnotationWorkflow:
@workflow.run
Expand Down
47 changes: 47 additions & 0 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4878,6 +4878,18 @@ async def run(self, scenario: FailureTypesScenario) -> None:
await super().run(scenario)


class FailureTypesConfiguredDynamicConfig(FailureTypesWorkflowBase):
@workflow.dynamic_config
def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig:
return temporalio.workflow.DynamicWorkflowConfig(
failure_exception_types=[Exception]
)

@workflow.run
async def run(self, scenario: FailureTypesScenario) -> None:
await super().run(scenario)


async def test_workflow_failure_types_configured(client: Client):
# Asserter for a single scenario
async def assert_scenario(
Expand Down Expand Up @@ -5047,6 +5059,15 @@ async def run_scenario(
FailureTypesConfiguredInheritedWorkflow,
FailureTypesScenario.CAUSE_NON_DETERMINISM,
)
# When configured at the workflow level dynamically
await run_scenario(
FailureTypesConfiguredDynamicConfig,
FailureTypesScenario.THROW_CUSTOM_EXCEPTION,
)
await run_scenario(
FailureTypesConfiguredDynamicConfig,
FailureTypesScenario.CAUSE_NON_DETERMINISM,
)


@workflow.defn(failure_exception_types=[Exception])
Expand Down Expand Up @@ -7374,3 +7395,29 @@ async def test_expose_root_execution(client: Client, env: WorkflowEnvironment):
assert child_wf_info_root is not None
assert child_wf_info_root.workflow_id == parent_desc.id
assert child_wf_info_root.run_id == parent_desc.run_id


@workflow.defn(dynamic=True)
class WorkflowDynamicConfigFnFailure:
@workflow.dynamic_config
def dynamic_config(self) -> temporalio.workflow.DynamicWorkflowConfig:
raise Exception("Dynamic config failure")

@workflow.run
async def run(self, args: Sequence[RawValue]) -> None:
raise RuntimeError("Should never actually run")


async def test_workflow_dynamic_config_failure(client: Client):
async with new_worker(client, WorkflowDynamicConfigFnFailure) as worker:
handle = await client.start_workflow(
"verycooldynamicworkflow",
id=f"dynamic-config-failure-{uuid.uuid4()}",
task_queue=worker.task_queue,
execution_timeout=timedelta(seconds=5),
)

# Assert workflow task fails with our expected error message
await assert_task_fail_eventually(
handle, message_contains="Dynamic config failure"
)
Loading