-
Notifications
You must be signed in to change notification settings - Fork 98
Add dynamic config function #842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
import asyncio | ||
import contextvars | ||
import dataclasses | ||
import inspect | ||
import logging | ||
import threading | ||
|
@@ -415,6 +416,56 @@ def decorator( | |
return decorator(fn.__name__, description, fn, bypass_async_check=True) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class DynamicWorkflowConfig: | ||
"""Returned by functions using the :py:func:`dynamic_config` decorator, see it for more.""" | ||
|
||
failure_exception_types: Sequence[Type[BaseException]] = dataclasses.field( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clarify on this field and the next that if these are set, they override anything in the definition, they are not additive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kinda too bad this is dynamic only, this is a nice feature regardless heh (as would be a callback where they can choose programmatically whether an exception is task fail or workflow fail) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I mean we can shift it to be like that if people ask I suppose - but since it only applies on first WFT for now it's not much different from the defn version |
||
default_factory=list | ||
) | ||
"""The types of exceptions that, if a workflow-thrown exception extends, will cause the | ||
workflow/update to fail instead of suspending the workflow via task failure. These are applied | ||
in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will | ||
fail a workflow/update in all user exception cases. WARNING: This setting is experimental. | ||
""" | ||
versioning_behavior: temporalio.common.VersioningBehavior = ( | ||
temporalio.common.VersioningBehavior.UNSPECIFIED | ||
) | ||
"""Specifies the versioning behavior to use for this workflow. | ||
WARNING: This setting is experimental. | ||
""" | ||
|
||
|
||
def dynamic_config( | ||
fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig], | ||
) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]: | ||
"""Decorator to allow configuring a dynamic workflow's behavior. | ||
|
||
Because dynamic workflows may conceptually represent more than one workflow type, it may be | ||
desirable to have different settings for fields that would normally be passed to | ||
:py:func:`defn`, but vary based on the workflow type name or other information available in | ||
the workflow's context. This function will be called after the workflow's :py:func:`init`, | ||
if it has one, but before the workflow's :py:func:`run` method. | ||
|
||
The method must only take self as a parameter, and any values set in the class it returns will | ||
override those provided to :py:func:`defn`. | ||
|
||
Cannot be specified on non-dynamic workflows. | ||
|
||
Args: | ||
fn: The function to decorate. | ||
""" | ||
if inspect.iscoroutinefunction(fn): | ||
raise ValueError("Workflow dynamic_config method must be synchronous") | ||
params = list(inspect.signature(fn).parameters.values()) | ||
if len(params) != 1: | ||
raise ValueError("Workflow dynamic_config method must only take self parameter") | ||
|
||
# Add marker attribute | ||
setattr(fn, "__temporal_workflow_dynamic_config", True) | ||
return fn | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Info: | ||
"""Information about the running workflow. | ||
|
@@ -1449,6 +1500,7 @@ class _Definition: | |
arg_types: Optional[List[Type]] = None | ||
ret_type: Optional[Type] = None | ||
versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None | ||
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None | ||
|
||
@staticmethod | ||
def from_class(cls: Type) -> Optional[_Definition]: | ||
|
@@ -1513,6 +1565,7 @@ def _apply_to_class( | |
# Collect run fn and all signal/query/update fns | ||
init_fn: Optional[Callable[..., None]] = None | ||
run_fn: Optional[Callable[..., Awaitable[Any]]] = None | ||
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None | ||
seen_run_attr = False | ||
signals: Dict[Optional[str], _SignalDefinition] = {} | ||
queries: Dict[Optional[str], _QueryDefinition] = {} | ||
|
@@ -1560,6 +1613,13 @@ def _apply_to_class( | |
queries[query_defn.name] = query_defn | ||
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"): | ||
init_fn = member | ||
elif hasattr(member, "__temporal_workflow_dynamic_config"): | ||
if workflow_name: | ||
issues.append( | ||
"@workflow.dynamic_config can only be used in dynamic workflows, but " | ||
f"workflow class {workflow_name} ({cls.__name__}) is not dynamic" | ||
) | ||
dynamic_config_fn = member | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pedantic, but we should error if this is already set |
||
elif isinstance(member, UpdateMethodMultiParam): | ||
update_defn = member._defn | ||
if update_defn.name in updates: | ||
|
@@ -1643,6 +1703,7 @@ def _apply_to_class( | |
sandboxed=sandboxed, | ||
failure_exception_types=failure_exception_types, | ||
versioning_behavior=versioning_behavior, | ||
dynamic_config_fn=dynamic_config_fn, | ||
) | ||
setattr(cls, "__temporal_workflow_definition", defn) | ||
setattr(run_fn, "__temporal_workflow_definition", defn) | ||
|
Uh oh!
There was an error while loading. Please reload this page.