Skip to content

Commit e37d144

Browse files
committed
Expose poller autoscaling options
1 parent 4933dc5 commit e37d144

File tree

6 files changed

+166
-22
lines changed

6 files changed

+166
-22
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ use temporal_sdk_core::api::errors::PollError;
1515
use temporal_sdk_core::replay::{HistoryForReplay, ReplayWorkerInput};
1616
use temporal_sdk_core_api::errors::WorkflowErrorType;
1717
use temporal_sdk_core_api::worker::{
18-
PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
19-
SlotReleaseContext, SlotReservationContext, SlotSupplier as SlotSupplierTrait,
20-
SlotSupplierPermit,
18+
SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, SlotReleaseContext,
19+
SlotReservationContext, SlotSupplier as SlotSupplierTrait, SlotSupplierPermit,
2120
};
2221
use temporal_sdk_core_api::Worker;
2322
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
@@ -49,9 +48,9 @@ pub struct WorkerConfig {
4948
identity_override: Option<String>,
5049
max_cached_workflows: usize,
5150
tuner: TunerHolder,
52-
max_concurrent_workflow_task_polls: usize,
51+
workflow_task_poller_behavior: PollerBehavior,
5352
nonsticky_to_sticky_poll_ratio: f32,
54-
max_concurrent_activity_task_polls: usize,
53+
activity_task_poller_behavior: PollerBehavior,
5554
no_remote_activities: bool,
5655
sticky_queue_schedule_to_start_timeout_millis: u64,
5756
max_heartbeat_throttle_interval_millis: u64,
@@ -63,6 +62,42 @@ pub struct WorkerConfig {
6362
nondeterminism_as_workflow_fail_for_types: HashSet<String>,
6463
}
6564

65+
#[derive(FromPyObject)]
66+
pub struct PollerBehaviorSimpleMaximum {
67+
pub maximum: usize,
68+
}
69+
70+
#[derive(FromPyObject)]
71+
pub struct PollerBehaviorAutoscaling {
72+
pub minimum: usize,
73+
pub maximum: usize,
74+
pub initial: usize,
75+
}
76+
77+
/// Recreates [temporal_sdk_core_api::worker::PollerBehavior]
78+
#[derive(FromPyObject)]
79+
pub enum PollerBehavior {
80+
SimpleMaximum(PollerBehaviorSimpleMaximum),
81+
Autoscaling(PollerBehaviorAutoscaling),
82+
}
83+
84+
impl From<PollerBehavior> for temporal_sdk_core_api::worker::PollerBehavior {
85+
fn from(value: PollerBehavior) -> Self {
86+
match value {
87+
PollerBehavior::SimpleMaximum(simple) => {
88+
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.maximum)
89+
}
90+
PollerBehavior::Autoscaling(auto) => {
91+
temporal_sdk_core_api::worker::PollerBehavior::Autoscaling {
92+
minimum: auto.minimum,
93+
maximum: auto.maximum,
94+
initial: auto.initial,
95+
}
96+
}
97+
}
98+
}
99+
}
100+
66101
/// Recreates [temporal_sdk_core_api::worker::WorkerVersioningStrategy]
67102
#[derive(FromPyObject)]
68103
pub enum WorkerVersioningStrategy {
@@ -626,14 +661,10 @@ fn convert_worker_config(
626661
.versioning_strategy(converted_versioning_strategy)
627662
.client_identity_override(conf.identity_override)
628663
.max_cached_workflows(conf.max_cached_workflows)
629-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
630-
conf.max_concurrent_workflow_task_polls,
631-
))
664+
.workflow_task_poller_behavior(conf.workflow_task_poller_behavior)
632665
.tuner(Arc::new(converted_tuner))
633666
.nonsticky_to_sticky_poll_ratio(conf.nonsticky_to_sticky_poll_ratio)
634-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
635-
conf.max_concurrent_activity_task_polls,
636-
))
667+
.activity_task_poller_behavior(conf.activity_task_poller_behavior)
637668
.no_remote_activities(conf.no_remote_activities)
638669
.sticky_queue_schedule_to_start_timeout(Duration::from_millis(
639670
conf.sticky_queue_schedule_to_start_timeout_millis,

temporalio/bridge/worker.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class WorkerConfig:
4848
identity_override: Optional[str]
4949
max_cached_workflows: int
5050
tuner: TunerHolder
51-
max_concurrent_workflow_task_polls: int
51+
workflow_task_poller_behavior: PollerBehavior
5252
nonsticky_to_sticky_poll_ratio: float
53-
max_concurrent_activity_task_polls: int
53+
activity_task_poller_behavior: PollerBehavior
5454
no_remote_activities: bool
5555
sticky_queue_schedule_to_start_timeout_millis: int
5656
max_heartbeat_throttle_interval_millis: int
@@ -62,6 +62,28 @@ class WorkerConfig:
6262
nondeterminism_as_workflow_fail_for_types: Set[str]
6363

6464

65+
@dataclass
66+
class PollerBehaviorSimpleMaximum:
67+
"""Python representation of the Rust struct for simple poller behavior."""
68+
69+
maximum: int
70+
71+
72+
@dataclass
73+
class PollerBehaviorAutoscaling:
74+
"""Python representation of the Rust struct for autoscaling poller behavior."""
75+
76+
minimum: int
77+
maximum: int
78+
initial: int
79+
80+
81+
PollerBehavior: TypeAlias = Union[
82+
PollerBehaviorSimpleMaximum,
83+
PollerBehaviorAutoscaling,
84+
]
85+
86+
6587
@dataclass
6688
class WorkerDeploymentVersion:
6789
"""Python representation of the Rust struct for configuring a worker deployment version."""

temporalio/worker/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@
4343
WorkflowSlotInfo,
4444
)
4545
from ._worker import (
46+
PollerBehavior,
47+
PollerBehaviorAutoscaling,
48+
PollerBehaviorSimpleMaximum,
4649
Worker,
4750
WorkerConfig,
4851
WorkerDeploymentConfig,
@@ -65,6 +68,9 @@
6568
"ReplayerConfig",
6669
"WorkflowReplayResult",
6770
"WorkflowReplayResults",
71+
"PollerBehavior",
72+
"PollerBehaviorSimpleMaximum",
73+
"PollerBehaviorAutoscaling",
6874
# Interceptor base classes
6975
"Interceptor",
7076
"ActivityInboundInterceptor",

temporalio/worker/_replayer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,7 @@ def on_eviction_hook(
242242
1
243243
),
244244
),
245-
max_concurrent_workflow_task_polls=1,
246245
nonsticky_to_sticky_poll_ratio=1,
247-
max_concurrent_activity_task_polls=1,
248246
no_remote_activities=True,
249247
sticky_queue_schedule_to_start_timeout_millis=1000,
250248
max_heartbeat_throttle_interval_millis=1000,
@@ -255,6 +253,12 @@ def on_eviction_hook(
255253
versioning_strategy=temporalio.bridge.worker.WorkerVersioningStrategyNone(
256254
build_id=self._config["build_id"] or load_default_build_id(),
257255
),
256+
workflow_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
257+
1
258+
),
259+
activity_task_poller_behavior=temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
260+
1
261+
),
258262
),
259263
)
260264
# Start worker

temporalio/worker/_worker.py

Lines changed: 64 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
Optional,
1919
Sequence,
2020
Type,
21+
Union,
2122
cast,
2223
)
2324

24-
from typing_extensions import TypedDict
25+
from typing_extensions import TypeAlias, TypedDict
2526

2627
import temporalio.activity
2728
import temporalio.api.common.v1
@@ -48,6 +49,48 @@
4849
logger = logging.getLogger(__name__)
4950

5051

52+
@dataclass
53+
class PollerBehaviorSimpleMaximum:
54+
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
55+
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
56+
"""
57+
58+
maximum: int = 5
59+
60+
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
61+
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
62+
maximum=self.maximum
63+
)
64+
65+
66+
@dataclass
67+
class PollerBehaviorAutoscaling:
68+
"""A poller behavior that will automatically scale the number of pollers based on feedback
69+
from the server. A slot must be available before beginning polling.
70+
"""
71+
72+
minimum: int = 1
73+
"""At least this many poll calls will always be attempted (assuming slots are available)."""
74+
maximum: int = 100
75+
"""At most this many poll calls will ever be open at once. Must be >= `minimum`."""
76+
initial: int = 5
77+
"""This many polls will be attempted initially before scaling kicks in. Must be between
78+
`minimum` and `maximum`."""
79+
80+
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
81+
return temporalio.bridge.worker.PollerBehaviorAutoscaling(
82+
minimum=self.minimum,
83+
maximum=self.maximum,
84+
initial=self.initial,
85+
)
86+
87+
88+
PollerBehavior: TypeAlias = Union[
89+
PollerBehaviorSimpleMaximum,
90+
PollerBehaviorAutoscaling,
91+
]
92+
93+
5194
class Worker:
5295
"""Worker to process workflows and/or activities.
5396
@@ -76,9 +119,9 @@ def __init__(
76119
max_concurrent_activities: Optional[int] = None,
77120
max_concurrent_local_activities: Optional[int] = None,
78121
tuner: Optional[WorkerTuner] = None,
79-
max_concurrent_workflow_task_polls: int = 5,
122+
max_concurrent_workflow_task_polls: Optional[int] = None,
80123
nonsticky_to_sticky_poll_ratio: float = 0.2,
81-
max_concurrent_activity_task_polls: int = 5,
124+
max_concurrent_activity_task_polls: Optional[int] = None,
82125
no_remote_activities: bool = False,
83126
sticky_queue_schedule_to_start_timeout: timedelta = timedelta(seconds=10),
84127
max_heartbeat_throttle_interval: timedelta = timedelta(seconds=60),
@@ -94,6 +137,12 @@ def __init__(
94137
use_worker_versioning: bool = False,
95138
disable_safe_workflow_eviction: bool = False,
96139
deployment_config: Optional[WorkerDeploymentConfig] = None,
140+
workflow_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
141+
maximum=5
142+
),
143+
activity_task_poller_behavior: PollerBehavior = PollerBehaviorSimpleMaximum(
144+
maximum=5
145+
),
97146
) -> None:
98147
"""Create a worker to process workflows and/or activities.
99148
@@ -156,6 +205,8 @@ def __init__(
156205
max_concurrent_workflow_task_polls: Maximum number of concurrent
157206
poll workflow task requests we will perform at a time on this
158207
worker's task queue.
208+
209+
WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead
159210
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
160211
this number = the number of max pollers that will be allowed for
161212
the nonsticky queue when sticky tasks are enabled. If both
@@ -166,6 +217,8 @@ def __init__(
166217
max_concurrent_activity_task_polls: Maximum number of concurrent
167218
poll activity task requests we will perform at a time on this
168219
worker's task queue.
220+
221+
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
169222
no_remote_activities: If true, this worker will only handle workflow
170223
tasks and local activities, it will not poll for activity tasks.
171224
sticky_queue_schedule_to_start_timeout: How long a workflow task is
@@ -231,6 +284,8 @@ def __init__(
231284
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
232285
`use_worker_versioning`.
233286
WARNING: This is an experimental feature and may change in the future.
287+
workflow_task_poller_behavior: Specify the behavior of workflow task polling
288+
activity_task_poller_behavior: Specify the behavior of activity task polling
234289
"""
235290
if not activities and not workflows:
236291
raise ValueError("At least one activity or workflow must be specified")
@@ -408,9 +463,7 @@ def __init__(
408463
identity_override=identity,
409464
max_cached_workflows=max_cached_workflows,
410465
tuner=bridge_tuner,
411-
max_concurrent_workflow_task_polls=max_concurrent_workflow_task_polls,
412466
nonsticky_to_sticky_poll_ratio=nonsticky_to_sticky_poll_ratio,
413-
max_concurrent_activity_task_polls=max_concurrent_activity_task_polls,
414467
# We have to disable remote activities if a user asks _or_ if we
415468
# are not running an activity worker at all. Otherwise shutdown
416469
# will not proceed properly.
@@ -440,6 +493,8 @@ def __init__(
440493
else set()
441494
),
442495
versioning_strategy=versioning_strategy,
496+
workflow_task_poller_behavior=workflow_task_poller_behavior._to_bridge(),
497+
activity_task_poller_behavior=activity_task_poller_behavior._to_bridge(),
443498
),
444499
)
445500

@@ -696,9 +751,9 @@ class WorkerConfig(TypedDict, total=False):
696751
max_concurrent_activities: Optional[int]
697752
max_concurrent_local_activities: Optional[int]
698753
tuner: Optional[WorkerTuner]
699-
max_concurrent_workflow_task_polls: int
754+
max_concurrent_workflow_task_polls: Optional[int]
700755
nonsticky_to_sticky_poll_ratio: float
701-
max_concurrent_activity_task_polls: int
756+
max_concurrent_activity_task_polls: Optional[int]
702757
no_remote_activities: bool
703758
sticky_queue_schedule_to_start_timeout: timedelta
704759
max_heartbeat_throttle_interval: timedelta
@@ -714,6 +769,8 @@ class WorkerConfig(TypedDict, total=False):
714769
use_worker_versioning: bool
715770
disable_safe_workflow_eviction: bool
716771
deployment_config: Optional[WorkerDeploymentConfig]
772+
workflow_task_poller_behavior: PollerBehavior
773+
activity_task_poller_behavior: PollerBehavior
717774

718775

719776
@dataclass

tests/worker/test_worker.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
CustomSlotSupplier,
2828
FixedSizeSlotSupplier,
2929
LocalActivitySlotInfo,
30+
PollerBehaviorAutoscaling,
3031
ResourceBasedSlotConfig,
3132
ResourceBasedSlotSupplier,
3233
ResourceBasedTunerConfig,
@@ -919,6 +920,29 @@ async def test_workflows_can_use_default_versioning_behavior(
919920
)
920921

921922

923+
async def test_can_run_autoscaling_polling_worker(
924+
client: Client, env: WorkflowEnvironment
925+
):
926+
async with new_worker(
927+
client,
928+
WaitOnSignalWorkflow,
929+
activities=[say_hello],
930+
workflow_task_poller_behavior=PollerBehaviorAutoscaling(),
931+
activity_task_poller_behavior=PollerBehaviorAutoscaling(),
932+
) as w:
933+
934+
async def do_workflow():
935+
wf = await client.start_workflow(
936+
WaitOnSignalWorkflow.run,
937+
id=f"resource-based-{uuid.uuid4()}",
938+
task_queue=w.task_queue,
939+
)
940+
await wf.signal(WaitOnSignalWorkflow.my_signal, "finish")
941+
await wf.result()
942+
943+
await asyncio.gather(*[do_workflow() for _ in range(20)])
944+
945+
922946
async def wait_until_worker_deployment_visible(
923947
client: Client, version: WorkerDeploymentVersion
924948
) -> DescribeWorkerDeploymentResponse:

0 commit comments

Comments
 (0)