Skip to content

Commit 3b133f2

Browse files
committed
Review comments
1 parent e37d144 commit 3b133f2

File tree

5 files changed

+66
-12
lines changed

5 files changed

+66
-12
lines changed

temporalio/bridge/src/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pub struct WorkerConfig {
6464

6565
#[derive(FromPyObject)]
6666
pub struct PollerBehaviorSimpleMaximum {
67-
pub maximum: usize,
67+
pub simple_maximum: usize,
6868
}
6969

7070
#[derive(FromPyObject)]
@@ -85,7 +85,7 @@ impl From<PollerBehavior> for temporal_sdk_core_api::worker::PollerBehavior {
8585
fn from(value: PollerBehavior) -> Self {
8686
match value {
8787
PollerBehavior::SimpleMaximum(simple) => {
88-
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.maximum)
88+
temporal_sdk_core_api::worker::PollerBehavior::SimpleMaximum(simple.simple_maximum)
8989
}
9090
PollerBehavior::Autoscaling(auto) => {
9191
temporal_sdk_core_api::worker::PollerBehavior::Autoscaling {

temporalio/bridge/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class WorkerConfig:
6666
class PollerBehaviorSimpleMaximum:
6767
"""Python representation of the Rust struct for simple poller behavior."""
6868

69-
maximum: int
69+
simple_maximum: int
7070

7171

7272
@dataclass

temporalio/worker/_worker.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
logger = logging.getLogger(__name__)
5050

5151

52-
@dataclass
52+
@dataclass(frozen=True)
5353
class PollerBehaviorSimpleMaximum:
5454
"""A poller behavior that will attempt to poll as long as a slot is available, up to the
5555
provided maximum. Cannot be less than two for workflow tasks, or one for other tasks.
@@ -59,11 +59,11 @@ class PollerBehaviorSimpleMaximum:
5959

6060
def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior:
6161
return temporalio.bridge.worker.PollerBehaviorSimpleMaximum(
62-
maximum=self.maximum
62+
simple_maximum=self.maximum
6363
)
6464

6565

66-
@dataclass
66+
@dataclass(frozen=True)
6767
class PollerBehaviorAutoscaling:
6868
"""A poller behavior that will automatically scale the number of pollers based on feedback
6969
from the server. A slot must be available before beginning polling.
@@ -201,11 +201,16 @@ def __init__(
201201
``max_concurrent_workflow_tasks``, ``max_concurrent_activities``, and
202202
``max_concurrent_local_activities`` arguments.
203203
204+
Defaults to fixed-size 100 slots for each slot kind if unset and none of the
205+
max_* arguments are provided.
206+
204207
WARNING: This argument is experimental
205208
max_concurrent_workflow_task_polls: Maximum number of concurrent
206209
poll workflow task requests we will perform at a time on this
207210
worker's task queue.
208211
212+
If set, will override any value passed to ``workflow_task_poller_behavior``.
213+
209214
WARNING: Deprecated, use ``workflow_task_poller_behavior`` instead
210215
nonsticky_to_sticky_poll_ratio: max_concurrent_workflow_task_polls *
211216
this number = the number of max pollers that will be allowed for
@@ -218,6 +223,8 @@ def __init__(
218223
poll activity task requests we will perform at a time on this
219224
worker's task queue.
220225
226+
If set, will override any value passed to ``activity_task_poller_behavior``.
227+
221228
WARNING: Deprecated, use ``activity_task_poller_behavior`` instead
222229
no_remote_activities: If true, this worker will only handle workflow
223230
tasks and local activities, it will not poll for activity tasks.
@@ -284,8 +291,10 @@ def __init__(
284291
deployment_config: Deployment config for the worker. Exclusive with `build_id` and
285292
`use_worker_versioning`.
286293
WARNING: This is an experimental feature and may change in the future.
287-
workflow_task_poller_behavior: Specify the behavior of workflow task polling
288-
activity_task_poller_behavior: Specify the behavior of activity task polling
294+
workflow_task_poller_behavior: Specify the behavior of workflow task polling.
295+
Defaults to a 5-poller maximum.
296+
activity_task_poller_behavior: Specify the behavior of activity task polling.
297+
Defaults to a 5-poller maximum.
289298
"""
290299
if not activities and not workflows:
291300
raise ValueError("At least one activity or workflow must be specified")
@@ -448,6 +457,15 @@ def __init__(
448457
build_id=build_id
449458
)
450459

460+
if max_concurrent_workflow_task_polls:
461+
workflow_task_poller_behavior = PollerBehaviorSimpleMaximum(
462+
maximum=max_concurrent_workflow_task_polls
463+
)
464+
if max_concurrent_activity_task_polls:
465+
activity_task_poller_behavior = PollerBehaviorSimpleMaximum(
466+
maximum=max_concurrent_activity_task_polls
467+
)
468+
451469
# Create bridge worker last. We have empirically observed that if it is
452470
# created before an error is raised from the activity worker
453471
# constructor, a deadlock/hang will occur presumably while trying to

tests/worker/test_worker.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import uuid
77
from datetime import timedelta
88
from typing import Any, Awaitable, Callable, Optional, Sequence
9+
from urllib.request import urlopen
910

1011
import temporalio.api.enums.v1
1112
import temporalio.worker._worker
@@ -20,6 +21,7 @@
2021
)
2122
from temporalio.client import BuildIdOpAddNewDefault, Client, TaskReachabilityType
2223
from temporalio.common import RawValue, VersioningBehavior
24+
from temporalio.runtime import PrometheusConfig, Runtime, TelemetryConfig
2325
from temporalio.service import RPCError
2426
from temporalio.testing import WorkflowEnvironment
2527
from temporalio.worker import (
@@ -42,7 +44,12 @@
4244
WorkflowSlotInfo,
4345
)
4446
from temporalio.workflow import VersioningIntent
45-
from tests.helpers import assert_eventually, new_worker, worker_versioning_enabled
47+
from tests.helpers import (
48+
assert_eventually,
49+
find_free_port,
50+
new_worker,
51+
worker_versioning_enabled,
52+
)
4653

4754
# Passing through because Python 3.9 has an import bug at
4855
# https://github.com/python/cpython/issues/91351
@@ -923,13 +930,42 @@ async def test_workflows_can_use_default_versioning_behavior(
923930
async def test_can_run_autoscaling_polling_worker(
924931
client: Client, env: WorkflowEnvironment
925932
):
933+
# Create new runtime with Prom server
934+
prom_addr = f"127.0.0.1:{find_free_port()}"
935+
runtime = Runtime(
936+
telemetry=TelemetryConfig(
937+
metrics=PrometheusConfig(bind_address=prom_addr),
938+
)
939+
)
940+
client = await Client.connect(
941+
client.service_client.config.target_host,
942+
namespace=client.namespace,
943+
runtime=runtime,
944+
)
945+
926946
async with new_worker(
927947
client,
928948
WaitOnSignalWorkflow,
929949
activities=[say_hello],
930-
workflow_task_poller_behavior=PollerBehaviorAutoscaling(),
931-
activity_task_poller_behavior=PollerBehaviorAutoscaling(),
950+
workflow_task_poller_behavior=PollerBehaviorAutoscaling(initial=2),
951+
activity_task_poller_behavior=PollerBehaviorAutoscaling(initial=2),
932952
) as w:
953+
# Give pollers a beat to start
954+
await asyncio.sleep(0.3)
955+
956+
with urlopen(url=f"http://{prom_addr}/metrics") as f:
957+
prom_str: str = f.read().decode("utf-8")
958+
prom_lines = prom_str.splitlines()
959+
matches = [line for line in prom_lines if "temporal_num_pollers" in line]
960+
activity_pollers = [l for l in matches if "activity_task" in l]
961+
assert len(activity_pollers) == 1
962+
assert activity_pollers[0].endswith("2")
963+
workflow_pollers = [l for l in matches if "workflow_task" in l]
964+
assert len(workflow_pollers) == 2
965+
# There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on
966+
# initialization timing.
967+
assert workflow_pollers[0].endswith("2") or workflow_pollers[0].endswith("1")
968+
assert workflow_pollers[1].endswith("2") or workflow_pollers[1].endswith("1")
933969

934970
async def do_workflow():
935971
wf = await client.start_workflow(

0 commit comments

Comments
 (0)