-
Notifications
You must be signed in to change notification settings - Fork 98
Add dynamic config function #842
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
import asyncio | ||
import contextvars | ||
import dataclasses | ||
import inspect | ||
import logging | ||
import threading | ||
|
@@ -415,6 +416,61 @@ def decorator( | |
return decorator(fn.__name__, description, fn, bypass_async_check=True) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class DynamicWorkflowConfig: | ||
"""Returned by functions using the :py:func:`dynamic_config` decorator, see it for more.""" | ||
|
||
failure_exception_types: Optional[Sequence[Type[BaseException]]] = None | ||
"""The types of exceptions that, if a workflow-thrown exception extends, will cause the | ||
workflow/update to fail instead of suspending the workflow via task failure. These are applied | ||
in addition to ones set on the worker constructor. If ``Exception`` is set, it effectively will | ||
fail a workflow/update in all user exception cases. | ||
|
||
Always overrides the equivalent parameter on :py:func:`defn` if set not-None. | ||
|
||
WARNING: This setting is experimental. | ||
""" | ||
versioning_behavior: temporalio.common.VersioningBehavior = ( | ||
temporalio.common.VersioningBehavior.UNSPECIFIED | ||
) | ||
"""Specifies the versioning behavior to use for this workflow. | ||
|
||
Always overrides the equivalent parameter on :py:func:`defn`. | ||
|
||
WARNING: This setting is experimental. | ||
""" | ||
|
||
|
||
def dynamic_config( | ||
fn: MethodSyncNoParam[SelfType, DynamicWorkflowConfig], | ||
) -> MethodSyncNoParam[SelfType, DynamicWorkflowConfig]: | ||
"""Decorator to allow configuring a dynamic workflow's behavior. | ||
|
||
Because dynamic workflows may conceptually represent more than one workflow type, it may be | ||
desirable to have different settings for fields that would normally be passed to | ||
:py:func:`defn`, but vary based on the workflow type name or other information available in | ||
the workflow's context. This function will be called after the workflow's :py:func:`init`, | ||
if it has one, but before the workflow's :py:func:`run` method. | ||
|
||
The method must only take self as a parameter, and any values set in the class it returns will | ||
override those provided to :py:func:`defn`. | ||
|
||
Cannot be specified on non-dynamic workflows. | ||
|
||
Args: | ||
fn: The function to decorate. | ||
""" | ||
if inspect.iscoroutinefunction(fn): | ||
raise ValueError("Workflow dynamic_config method must be synchronous") | ||
params = list(inspect.signature(fn).parameters.values()) | ||
if len(params) != 1: | ||
raise ValueError("Workflow dynamic_config method must only take self parameter") | ||
|
||
# Add marker attribute | ||
setattr(fn, "__temporal_workflow_dynamic_config", True) | ||
return fn | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Info: | ||
"""Information about the running workflow. | ||
|
@@ -1449,6 +1505,7 @@ class _Definition: | |
arg_types: Optional[List[Type]] = None | ||
ret_type: Optional[Type] = None | ||
versioning_behavior: Optional[temporalio.common.VersioningBehavior] = None | ||
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None | ||
|
||
@staticmethod | ||
def from_class(cls: Type) -> Optional[_Definition]: | ||
|
@@ -1513,6 +1570,7 @@ def _apply_to_class( | |
# Collect run fn and all signal/query/update fns | ||
init_fn: Optional[Callable[..., None]] = None | ||
run_fn: Optional[Callable[..., Awaitable[Any]]] = None | ||
dynamic_config_fn: Optional[Callable[..., DynamicWorkflowConfig]] = None | ||
seen_run_attr = False | ||
signals: Dict[Optional[str], _SignalDefinition] = {} | ||
queries: Dict[Optional[str], _QueryDefinition] = {} | ||
|
@@ -1560,6 +1618,17 @@ def _apply_to_class( | |
queries[query_defn.name] = query_defn | ||
elif name == "__init__" and hasattr(member, "__temporal_workflow_init"): | ||
init_fn = member | ||
elif hasattr(member, "__temporal_workflow_dynamic_config"): | ||
if workflow_name: | ||
issues.append( | ||
"@workflow.dynamic_config can only be used in dynamic workflows, but " | ||
f"workflow class {workflow_name} ({cls.__name__}) is not dynamic" | ||
) | ||
if dynamic_config_fn: | ||
issues.append( | ||
"@workflow.dynamic_config can only be defined once per workflow" | ||
) | ||
dynamic_config_fn = member | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pedantic, but we should error if this is already set |
||
elif isinstance(member, UpdateMethodMultiParam): | ||
update_defn = member._defn | ||
if update_defn.name in updates: | ||
|
@@ -1643,6 +1712,7 @@ def _apply_to_class( | |
sandboxed=sandboxed, | ||
failure_exception_types=failure_exception_types, | ||
versioning_behavior=versioning_behavior, | ||
dynamic_config_fn=dynamic_config_fn, | ||
) | ||
setattr(cls, "__temporal_workflow_definition", defn) | ||
setattr(run_fn, "__temporal_workflow_definition", defn) | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.