Skip to content

Commit 30e00ae

Browse files
committed
Review comments
1 parent d7625e1 commit 30e00ae

File tree

5 files changed

+66
-12
lines changed

5 files changed

+66
-12
lines changed

temporalio/bridge/src/worker.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub struct WorkerConfig {
6464

6565
#[derive(FromPyObject)]
6666
pub struct PollerBehaviorSimpleMaximum {
67-
pub maximum: usize,
67+
pub simple_maximum: usize,
6868
}
6969

7070
#[derive(FromPyObject)]
@@ -85,7 +85,7 @@ impl From<PollerBehavior> for temporal_sdk_core_api::worker::PollerBehavior {
8585
fn from(value: PollerBehavior) -> Self {
8686
match value {
8787
PollerBehavior::SimpleMaximum(simple) => {
88-
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.maximum)
88+
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum)
8989
}
9090
PollerBehavior::Autoscaling(auto) => {
9191
temporal_sdk_core_api::worker::PollerBehavior::Autoscaling {

temporalio/bridge/worker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class WorkerConfig:
6666
class PollerBehaviorSimpleMaximum:
6767
"""Python representation of the Rust struct for simple poller behavior."""
6868

69-
maximum: int
69+
simple_maximum: int
7070

7171

7272
@dataclass

temporalio/worker/_worker.py

+23-5
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
logger = logging.getLogger(__name__)
5050

5151

52-
@dataclass
52+
@dataclass(frozen=True)
5353
class PollerBehaviorSimpleMaximum:
5454
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
5555
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
@@ -59,11 +59,11 @@ class PollerBehaviorSimpleMaximum:
5959

6060
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
6161
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
62-
maximum=self.maximum
62+
simple_maximum=self.maximum
6363
)
6464

6565

66-
@dataclass
66+
@dataclass(frozen=True)
6767
class PollerBehaviorAutoscaling:
6868
"""A poller behavior that will automatically scale the number of pollers based on feedback
6969
from the server. A slot must be available before beginning polling.
@@ -201,11 +201,16 @@ def __init__(
201201
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
202202
``max_concurrent_local_activities`` arguments.
203203
204+
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
205+
max_* arguments are provided.
206+
204207
WARNING: This argument is experimental
205208
max_concurrent_workflow_task_polls: Maximum number of concurrent
206209
poll workflow task requests we will perform at a time on this
207210
worker's task queue.
208211
212+
If set, will override any value passed to ``workflow_task_poller_behavior``.
213+
209214
WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead
210215
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
211216
this number = the number of max pollers that will be allowed for
@@ -218,6 +223,8 @@ def __init__(
218223
poll activity task requests we will perform at a time on this
219224
worker's task queue.
220225
226+
If set, will override any value passed to ``activity_task_poller_behavior``.
227+
221228
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
222229
no_remote_activities: If true, this worker will only handle workflow
223230
tasks and local activities, it will not poll for activity tasks.
@@ -284,8 +291,10 @@ def __init__(
284291
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
285292
`use_worker_versioning`.
286293
WARNING: This is an experimental feature and may change in the future.
287-
workflow_task_poller_behavior: Specify the behavior of workflow task polling
288-
activity_task_poller_behavior: Specify the behavior of activity task polling
294+
workflow_task_poller_behavior: Specify the behavior of workflow task polling.
295+
Defaults to a 5-poller maximum.
296+
activity_task_poller_behavior: Specify the behavior of activity task polling.
297+
Defaults to a 5-poller maximum.
289298
"""
290299
if not activities and not workflows:
291300
raise ValueError("At least one activity or workflow must be specified")
@@ -448,6 +457,15 @@ def __init__(
448457
build_id=build_id
449458
)
450459

460+
if max_concurrent_workflow_task_polls:
461+
workflow_task_poller_behavior = PollerBehaviorSimpleMaximum(
462+
maximum=max_concurrent_workflow_task_polls
463+
)
464+
if max_concurrent_activity_task_polls:
465+
activity_task_poller_behavior = PollerBehaviorSimpleMaximum(
466+
maximum=max_concurrent_activity_task_polls
467+
)
468+
451469
# Create bridge worker last. We have empirically observed that if it is
452470
# created before an error is raised from the activity worker
453471
# constructor, a deadlock/hang will occur presumably while trying to

tests/worker/test_worker.py

+39-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import uuid
77
from datetime import timedelta
88
from typing import Any, Awaitable, Callable, Optional, Sequence
9+
from urllib.request import urlopen
910

1011
import pytest
1112

@@ -22,6 +23,7 @@
2223
)
2324
from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType
2425
from temporalio.common import RawValue, VersioningBehavior
26+
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
2527
from temporalio.service import RPCError
2628
from temporalio.testing import WorkflowEnvironment
2729
from temporalio.worker import (
@@ -44,7 +46,12 @@
4446
WorkflowSlotInfo,
4547
)
4648
from temporalio.workflow import VersioningIntent
47-
from tests.helpers import assert_eventually, new_worker, worker_versioning_enabled
49+
from tests.helpers import (
50+
assert_eventually,
51+
find_free_port,
52+
new_worker,
53+
worker_versioning_enabled,
54+
)
4855

4956

5057
def test_load_default_worker_binary_id():
@@ -920,13 +927,42 @@ async def test_workflows_can_use_default_versioning_behavior(
920927
async def test_can_run_autoscaling_polling_worker(
921928
client: Client, env: WorkflowEnvironment
922929
):
930+
# Create new runtime with Prom server
931+
prom_addr = f"127.0.0.1:{find_free_port()}"
932+
runtime = Runtime(
933+
telemetry=TelemetryConfig(
934+
metrics=PrometheusConfig(bind_address=prom_addr),
935+
)
936+
)
937+
client = await Client.connect(
938+
client.service_client.config.target_host,
939+
namespace=client.namespace,
940+
runtime=runtime,
941+
)
942+
923943
async with new_worker(
924944
client,
925945
WaitOnSignalWorkflow,
926946
activities=[say_hello],
927-
workflow_task_poller_behavior=PollerBehaviorAutoscaling(),
928-
activity_task_poller_behavior=PollerBehaviorAutoscaling(),
947+
workflow_task_poller_behavior=PollerBehaviorAutoscaling(initial=2),
948+
activity_task_poller_behavior=PollerBehaviorAutoscaling(initial=2),
929949
) as w:
950+
# Give pollers a beat to start
951+
await asyncio.sleep(0.3)
952+
953+
with urlopen(url=f"http://{prom_addr}/metrics") as f:
954+
prom_str: str = f.read().decode("utf-8")
955+
prom_lines = prom_str.splitlines()
956+
matches = [line for line in prom_lines if "temporal_num_pollers" in line]
957+
activity_pollers = [l for l in matches if "activity_task" in l]
958+
assert len(activity_pollers) == 1
959+
assert activity_pollers[0].endswith("2")
960+
workflow_pollers = [l for l in matches if "workflow_task" in l]
961+
assert len(workflow_pollers) == 2
962+
# There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on
963+
# initialization timing.
964+
assert workflow_pollers[0].endswith("2") or workflow_pollers[0].endswith("1")
965+
assert workflow_pollers[1].endswith("2") or workflow_pollers[1].endswith("1")
930966

931967
async def do_workflow():
932968
wf = await client.start_workflow(

0 commit comments

Comments
 (0)