-
Notifications
You must be signed in to change notification settings - Fork 94
Expose poller autoscaling options #830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Submodule sdk-core
updated
2 files
+15 −12 | core/src/worker/workflow/machines/nexus_operation_state_machine.rs | |
+1 −0 | tests/integ_tests/workflow_tests/nexus.rs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,11 @@ | |
Optional, | ||
Sequence, | ||
Type, | ||
Union, | ||
cast, | ||
) | ||
|
||
from typing_extensions import TypedDict | ||
from typing_extensions import TypeAlias, TypedDict | ||
|
||
import temporalio.activity | ||
import temporalio.api.common.v1 | ||
|
@@ -48,6 +49,48 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class PollerBehaviorSimpleMaximum: | ||
"""A poller behavior that will attempt to poll as long as a slot is available, up to the | ||
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks. | ||
""" | ||
|
||
maximum: int = 5 | ||
|
||
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: | ||
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum( | ||
simple_maximum=self.maximum | ||
) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class PollerBehaviorAutoscaling: | ||
"""A poller behavior that will automatically scale the number of pollers based on feedback | ||
from the server. A slot must be available before beginning polling. | ||
""" | ||
|
||
minimum: int = 1 | ||
"""At least this many poll calls will always be attempted (assuming slots are available).""" | ||
maximum: int = 100 | ||
"""At most this many poll calls will ever be open at once. Must be >= `minimum`.""" | ||
initial: int = 5 | ||
"""This many polls will be attempted initially before scaling kicks in. Must be between | ||
`minimum` and `maximum`.""" | ||
|
||
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: | ||
return temporalio.bridge.worker.PollerBehaviorAutoscaling( | ||
minimum=self.minimum, | ||
maximum=self.maximum, | ||
initial=self.initial, | ||
) | ||
|
||
|
||
PollerBehavior: TypeAlias = Union[ | ||
PollerBehaviorSimpleMaximum, | ||
PollerBehaviorAutoscaling, | ||
] | ||
|
||
|
||
class Worker: | ||
"""Worker to process workflows and/or activities. | ||
|
||
|
@@ -76,9 +119,9 @@ def __init__( | |
max_concurrent_activities: Optional[int] = None, | ||
max_concurrent_local_activities: Optional[int] = None, | ||
tuner: Optional[WorkerTuner] = None, | ||
max_concurrent_workflow_task_polls: int = 5, | ||
max_concurrent_workflow_task_polls: Optional[int] = None, | ||
nonsticky_to_sticky_poll_ratio: float = 0.2, | ||
max_concurrent_activity_task_polls: int = 5, | ||
max_concurrent_activity_task_polls: Optional[int] = None, | ||
no_remote_activities: bool = False, | ||
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10), | ||
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60), | ||
|
@@ -94,6 +137,12 @@ def __init__( | |
use_worker_versioning: bool = False, | ||
disable_safe_workflow_eviction: bool = False, | ||
deployment_config: Optional[WorkerDeploymentConfig] = None, | ||
workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( | ||
maximum=5 | ||
), | ||
activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum( | ||
maximum=5 | ||
), | ||
) -> None: | ||
"""Create a worker to process workflows and/or activities. | ||
|
||
|
@@ -152,10 +201,17 @@ def __init__( | |
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and | ||
``max_concurrent_local_activities`` arguments. | ||
|
||
Defaults to fixed-size 100 slots for each slot kind if unset and none of the | ||
max_* arguments are provided. | ||
|
||
WARNING: This argument is experimental | ||
max_concurrent_workflow_task_polls: Maximum number of concurrent | ||
poll workflow task requests we will perform at a time on this | ||
worker's task queue. | ||
|
||
If set, will override any value passed to ``workflow_task_poller_behavior``. | ||
|
||
WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't find where this or the activity one is used anymore |
||
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls * | ||
this number = the number of max pollers that will be allowed for | ||
the nonsticky queue when sticky tasks are enabled. If both | ||
|
@@ -166,6 +222,10 @@ def __init__( | |
max_concurrent_activity_task_polls: Maximum number of concurrent | ||
poll activity task requests we will perform at a time on this | ||
worker's task queue. | ||
|
||
If set, will override any value passed to ``activity_task_poller_behavior``. | ||
|
||
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead | ||
no_remote_activities: If true, this worker will only handle workflow | ||
tasks and local activities, it will not poll for activity tasks. | ||
sticky_queue_schedule_to_start_timeout: How long a workflow task is | ||
|
@@ -231,6 +291,10 @@ def __init__( | |
deployment_config: Deployment config for the worker. Exclusive with `build_id` and | ||
`use_worker_versioning`. | ||
WARNING: This is an experimental feature and may change in the future. | ||
workflow_task_poller_behavior: Specify the behavior of workflow task polling. | ||
Defaults to a 5-poller maximum. | ||
activity_task_poller_behavior: Specify the behavior of activity task polling. | ||
Defaults to a 5-poller maximum. | ||
""" | ||
if not activities and not workflows: | ||
raise ValueError("At least one activity or workflow must be specified") | ||
|
@@ -393,6 +457,15 @@ def __init__( | |
build_id=build_id | ||
) | ||
|
||
if max_concurrent_workflow_task_polls: | ||
workflow_task_poller_behavior = PollerBehaviorSimpleMaximum( | ||
maximum=max_concurrent_workflow_task_polls | ||
) | ||
if max_concurrent_activity_task_polls: | ||
activity_task_poller_behavior = PollerBehaviorSimpleMaximum( | ||
maximum=max_concurrent_activity_task_polls | ||
) | ||
|
||
# Create bridge worker last. We have empirically observed that if it is | ||
# created before an error is raised from the activity worker | ||
# constructor, a deadlock/hang will occur presumably while trying to | ||
|
@@ -408,9 +481,7 @@ def __init__( | |
identity_override=identity, | ||
max_cached_workflows=max_cached_workflows, | ||
tuner=bridge_tuner, | ||
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls, | ||
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio, | ||
max_concurrent_activity_task_polls=max_concurrent_activity_task_polls, | ||
# We have to disable remote activities if a user asks _or_ if we | ||
# are not running an activity worker at all. Otherwise shutdown | ||
# will not proceed properly. | ||
|
@@ -440,6 +511,8 @@ def __init__( | |
else set() | ||
), | ||
versioning_strategy=versioning_strategy, | ||
workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(), | ||
activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(), | ||
), | ||
) | ||
|
||
|
@@ -696,9 +769,9 @@ class WorkerConfig(TypedDict, total=False): | |
max_concurrent_activities: Optional[int] | ||
max_concurrent_local_activities: Optional[int] | ||
tuner: Optional[WorkerTuner] | ||
max_concurrent_workflow_task_polls: int | ||
max_concurrent_workflow_task_polls: Optional[int] | ||
nonsticky_to_sticky_poll_ratio: float | ||
max_concurrent_activity_task_polls: int | ||
max_concurrent_activity_task_polls: Optional[int] | ||
no_remote_activities: bool | ||
sticky_queue_schedule_to_start_timeout: timedelta | ||
max_heartbeat_throttle_interval: timedelta | ||
|
@@ -714,6 +787,8 @@ class WorkerConfig(TypedDict, total=False): | |
use_worker_versioning: bool | ||
disable_safe_workflow_eviction: bool | ||
deployment_config: Optional[WorkerDeploymentConfig] | ||
workflow_task_poller_behavior: PollerBehavior | ||
activity_task_poller_behavior: PollerBehavior | ||
|
||
|
||
@dataclass | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be clearer if there is a
default
ClassVar
on the dataclass that represents the default, e.g.PollerBehaviorSimpleMaximum.default
instead of inline code, but not requiredIn fact, looking at params here, if you fix the issue where you're accidentally ignoring the
max_concurrent_workflow_task_polls
andmax_concurrent_activity_task_polls
now, you're going to need to know whether one of the other is set, so you may need a arg-unset sentinel type of value here instead of a valuable default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah I meant to go back and add comments saying
max_
will override if set and implement that behavior