Skip to content

Expose poller autoscaling options #830

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 42 additions & 11 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use temporal_sdk_core::api::errors::PollError;
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
use temporal_sdk_core_api::errors::WorkflowErrorType;
use temporal_sdk_core_api::worker::{
PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait,
SlotSupplierPermit,
SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext,
SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit,
};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
Expand Down Expand Up @@ -49,9 +48,9 @@ pub struct WorkerConfig {
identity_override: Option<String>,
max_cached_workflows: usize,
tuner: TunerHolder,
max_concurrent_workflow_task_polls: usize,
workflow_task_poller_behavior: PollerBehavior,
nonsticky_to_sticky_poll_ratio: f32,
max_concurrent_activity_task_polls: usize,
activity_task_poller_behavior: PollerBehavior,
no_remote_activities: bool,
sticky_queue_schedule_to_start_timeout_millis: u64,
max_heartbeat_throttle_interval_millis: u64,
Expand All @@ -63,6 +62,42 @@ pub struct WorkerConfig {
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
}

#[derive(FromPyObject)]
pub struct PollerBehaviorSimpleMaximum {
pub simple_maximum: usize,
}

#[derive(FromPyObject)]
pub struct PollerBehaviorAutoscaling {
pub minimum: usize,
pub maximum: usize,
pub initial: usize,
}

/// Recreates [temporal_sdk_core_api::worker::PollerBehavior]
#[derive(FromPyObject)]
pub enum PollerBehavior {
SimpleMaximum(PollerBehaviorSimpleMaximum),
Autoscaling(PollerBehaviorAutoscaling),
}

impl From<PollerBehavior> for temporal_sdk_core_api::worker::PollerBehavior {
fn from(value: PollerBehavior) -> Self {
match value {
PollerBehavior::SimpleMaximum(simple) => {
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum)
}
PollerBehavior::Autoscaling(auto) => {
temporal_sdk_core_api::worker::PollerBehavior::Autoscaling {
minimum: auto.minimum,
maximum: auto.maximum,
initial: auto.initial,
}
}
}
}
}

/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy]
#[derive(FromPyObject)]
pub enum WorkerVersioningStrategy {
Expand Down Expand Up @@ -626,14 +661,10 @@ fn convert_worker_config(
.versioning_strategy(converted_versioning_strategy)
.client_identity_override(conf.identity_override)
.max_cached_workflows(conf.max_cached_workflows)
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
conf.max_concurrent_workflow_task_polls,
))
.workflow_task_poller_behavior(conf.workflow_task_poller_behavior)
.tuner(Arc::new(converted_tuner))
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
conf.max_concurrent_activity_task_polls,
))
.activity_task_poller_behavior(conf.activity_task_poller_behavior)
.no_remote_activities(conf.no_remote_activities)
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
conf.sticky_queue_schedule_to_start_timeout_millis,
Expand Down
26 changes: 24 additions & 2 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class WorkerConfig:
identity_override: Optional[str]
max_cached_workflows: int
tuner: TunerHolder
max_concurrent_workflow_task_polls: int
workflow_task_poller_behavior: PollerBehavior
nonsticky_to_sticky_poll_ratio: float
max_concurrent_activity_task_polls: int
activity_task_poller_behavior: PollerBehavior
no_remote_activities: bool
sticky_queue_schedule_to_start_timeout_millis: int
max_heartbeat_throttle_interval_millis: int
Expand All @@ -62,6 +62,28 @@ class WorkerConfig:
nondeterminism_as_workflow_fail_for_types: Set[str]


@dataclass
class PollerBehaviorSimpleMaximum:
"""Python representation of the Rust struct for simple poller behavior."""

simple_maximum: int


@dataclass
class PollerBehaviorAutoscaling:
"""Python representation of the Rust struct for autoscaling poller behavior."""

minimum: int
maximum: int
initial: int


PollerBehavior: TypeAlias = Union[
PollerBehaviorSimpleMaximum,
PollerBehaviorAutoscaling,
]


@dataclass
class WorkerDeploymentVersion:
"""Python representation of the Rust struct for configuring a worker deployment version."""
Expand Down
6 changes: 6 additions & 0 deletions temporalio/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
WorkflowSlotInfo,
)
from ._worker import (
PollerBehavior,
PollerBehaviorAutoscaling,
PollerBehaviorSimpleMaximum,
Worker,
WorkerConfig,
WorkerDeploymentConfig,
Expand All @@ -65,6 +68,9 @@
"ReplayerConfig",
"WorkflowReplayResult",
"WorkflowReplayResults",
"PollerBehavior",
"PollerBehaviorSimpleMaximum",
"PollerBehaviorAutoscaling",
# Interceptor base classes
"Interceptor",
"ActivityInboundInterceptor",
Expand Down
8 changes: 6 additions & 2 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,7 @@ def on_eviction_hook(
1
),
),
max_concurrent_workflow_task_polls=1,
nonsticky_to_sticky_poll_ratio=1,
max_concurrent_activity_task_polls=1,
no_remote_activities=True,
sticky_queue_schedule_to_start_timeout_millis=1000,
max_heartbeat_throttle_interval_millis=1000,
Expand All @@ -255,6 +253,12 @@ def on_eviction_hook(
versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone(
build_id=self._config["build_id"] or load_default_build_id(),
),
workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
1
),
activity_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
1
),
),
)
# Start worker
Expand Down
89 changes: 82 additions & 7 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
Optional,
Sequence,
Type,
Union,
cast,
)

from typing_extensions import TypedDict
from typing_extensions import TypeAlias, TypedDict

import temporalio.activity
import temporalio.api.common.v1
Expand All @@ -48,6 +49,48 @@
logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class PollerBehaviorSimpleMaximum:
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
"""

maximum: int = 5

def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
simple_maximum=self.maximum
)


@dataclass(frozen=True)
class PollerBehaviorAutoscaling:
"""A poller behavior that will automatically scale the number of pollers based on feedback
from the server. A slot must be available before beginning polling.
"""

minimum: int = 1
"""At least this many poll calls will always be attempted (assuming slots are available)."""
maximum: int = 100
"""At most this many poll calls will ever be open at once. Must be >= `minimum`."""
initial: int = 5
"""This many polls will be attempted initially before scaling kicks in. Must be between
`minimum` and `maximum`."""

def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
return temporalio.bridge.worker.PollerBehaviorAutoscaling(
minimum=self.minimum,
maximum=self.maximum,
initial=self.initial,
)


PollerBehavior: TypeAlias = Union[
PollerBehaviorSimpleMaximum,
PollerBehaviorAutoscaling,
]


class Worker:
"""Worker to process workflows and/or activities.

Expand Down Expand Up @@ -76,9 +119,9 @@ def __init__(
max_concurrent_activities: Optional[int] = None,
max_concurrent_local_activities: Optional[int] = None,
tuner: Optional[WorkerTuner] = None,
max_concurrent_workflow_task_polls: int = 5,
max_concurrent_workflow_task_polls: Optional[int] = None,
nonsticky_to_sticky_poll_ratio: float = 0.2,
max_concurrent_activity_task_polls: int = 5,
max_concurrent_activity_task_polls: Optional[int] = None,
no_remote_activities: bool = False,
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
Expand All @@ -94,6 +137,12 @@ def __init__(
use_worker_versioning: bool = False,
disable_safe_workflow_eviction: bool = False,
deployment_config: Optional[WorkerDeploymentConfig] = None,
workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
maximum=5
),
Comment on lines +140 to +142
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be clearer if there is a default ClassVar on the dataclass that represents the default, e.g. PollerBehaviorSimpleMaximum.default instead of inline code, but not required

In fact, looking at params here, if you fix the issue where you're accidentally ignoring the max_concurrent_workflow_task_polls and max_concurrent_activity_task_polls now, you're going to need to know whether one of the other is set, so you may need a arg-unset sentinel type of value here instead of a valuable default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah I meant to go back and add comments saying max_ will override if set and implement that behavior

activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
maximum=5
),
) -> None:
"""Create a worker to process workflows and/or activities.

Expand Down Expand Up @@ -152,10 +201,17 @@ def __init__(
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
``max_concurrent_local_activities`` arguments.

Defaults to fixed-size 100 slots for each slot kind if unset and none of the
max_* arguments are provided.

WARNING: This argument is experimental
max_concurrent_workflow_task_polls: Maximum number of concurrent
poll workflow task requests we will perform at a time on this
worker's task queue.

If set, will override any value passed to ``workflow_task_poller_behavior``.

WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find where this or the activity one is used anymore

nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
this number = the number of max pollers that will be allowed for
the nonsticky queue when sticky tasks are enabled. If both
Expand All @@ -166,6 +222,10 @@ def __init__(
max_concurrent_activity_task_polls: Maximum number of concurrent
poll activity task requests we will perform at a time on this
worker's task queue.

If set, will override any value passed to ``activity_task_poller_behavior``.

WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
no_remote_activities: If true, this worker will only handle workflow
tasks and local activities, it will not poll for activity tasks.
sticky_queue_schedule_to_start_timeout: How long a workflow task is
Expand Down Expand Up @@ -231,6 +291,10 @@ def __init__(
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
`use_worker_versioning`.
WARNING: This is an experimental feature and may change in the future.
workflow_task_poller_behavior: Specify the behavior of workflow task polling.
Defaults to a 5-poller maximum.
activity_task_poller_behavior: Specify the behavior of activity task polling.
Defaults to a 5-poller maximum.
"""
if not activities and not workflows:
raise ValueError("At least one activity or workflow must be specified")
Expand Down Expand Up @@ -393,6 +457,15 @@ def __init__(
build_id=build_id
)

if max_concurrent_workflow_task_polls:
workflow_task_poller_behavior = PollerBehaviorSimpleMaximum(
maximum=max_concurrent_workflow_task_polls
)
if max_concurrent_activity_task_polls:
activity_task_poller_behavior = PollerBehaviorSimpleMaximum(
maximum=max_concurrent_activity_task_polls
)

# Create bridge worker last. We have empirically observed that if it is
# created before an error is raised from the activity worker
# constructor, a deadlock/hang will occur presumably while trying to
Expand All @@ -408,9 +481,7 @@ def __init__(
identity_override=identity,
max_cached_workflows=max_cached_workflows,
tuner=bridge_tuner,
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
max_concurrent_activity_task_polls=max_concurrent_activity_task_polls,
# We have to disable remote activities if a user asks _or_ if we
# are not running an activity worker at all. Otherwise shutdown
# will not proceed properly.
Expand Down Expand Up @@ -440,6 +511,8 @@ def __init__(
else set()
),
versioning_strategy=versioning_strategy,
workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(),
activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(),
),
)

Expand Down Expand Up @@ -696,9 +769,9 @@ class WorkerConfig(TypedDict, total=False):
max_concurrent_activities: Optional[int]
max_concurrent_local_activities: Optional[int]
tuner: Optional[WorkerTuner]
max_concurrent_workflow_task_polls: int
max_concurrent_workflow_task_polls: Optional[int]
nonsticky_to_sticky_poll_ratio: float
max_concurrent_activity_task_polls: int
max_concurrent_activity_task_polls: Optional[int]
no_remote_activities: bool
sticky_queue_schedule_to_start_timeout: timedelta
max_heartbeat_throttle_interval: timedelta
Expand All @@ -714,6 +787,8 @@ class WorkerConfig(TypedDict, total=False):
use_worker_versioning: bool
disable_safe_workflow_eviction: bool
deployment_config: Optional[WorkerDeploymentConfig]
workflow_task_poller_behavior: PollerBehavior
activity_task_poller_behavior: PollerBehavior


@dataclass
Expand Down
Loading
Loading