Skip to content

Commit e0cccf2

Browse files
authored
Improve project structure (#15)
- **Move `Dispatcher` to its own module** - **Expose all public symbols** - **Remove the `_DispatchEventBase` class** - **Move dispatch events to their own module** - **Fix `Dispatcher` example in docstring** - **Make `Dispatcher` return receiver fetchers** - **Rename `DispatchActor` to `DispatchingActor`** - **Use a timer for polling.** - **Improve `Dispatcher` examples** - **Rename `updated_dispatches` and `ready_dispatches`**
2 parents c0b38bd + ea1db3c commit e0cccf2

File tree

5 files changed

+203
-118
lines changed

5 files changed

+203
-118
lines changed

src/frequenz/dispatch/__init__.py

Lines changed: 11 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,76 +3,14 @@
33

44
"""A highlevel interface for the dispatch API."""
55

6-
import grpc.aio
7-
from frequenz.channels import Broadcast, Receiver
8-
from frequenz.client.dispatch.types import Dispatch
9-
10-
from frequenz.dispatch.actor import DispatchActor, DispatchEvent
11-
12-
__all__ = ["Dispatcher"]
13-
14-
15-
class Dispatcher:
16-
"""A highlevel interface for the dispatch API.
17-
18-
This class provides a highlevel interface to the dispatch API.
19-
It provides two channels:
20-
21-
One that sends a dispatch event message whenever a dispatch is created, updated or deleted.
22-
23-
The other sends a dispatch message whenever a dispatch is ready to be
24-
executed according to the schedule.
25-
26-
allows to receive new dispatches and ready dispatches.
27-
28-
Example:
29-
```python
30-
from frequenz.dispatch import Dispatcher
31-
32-
async def run():
33-
dispatcher = Dispatcher(API_CONNECTION_INFO)
34-
dispatcher.start() # this will start the actor
35-
dispatch_arrived = dispatcher.new_dispatches().new_receiver()
36-
dispatch_ready = dispatcher.ready_dispatches().new_receiver()
37-
```
38-
"""
39-
40-
def __init__(
41-
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
42-
):
43-
"""Initialize the dispatcher.
44-
45-
Args:
46-
microgrid_id: The microgrid id.
47-
grpc_channel: The gRPC channel.
48-
svc_addr: The service address.
49-
"""
50-
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
51-
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
52-
self._actor = DispatchActor(
53-
microgrid_id,
54-
grpc_channel,
55-
svc_addr,
56-
self._updated_channel.new_sender(),
57-
self._ready_channel.new_sender(),
58-
)
59-
60-
async def start(self) -> None:
61-
"""Start the actor."""
62-
self._actor.start()
63-
64-
def updated_dispatches(self) -> Receiver[DispatchEvent]:
65-
"""Return new, updated or deleted dispatches receiver.
66-
67-
Returns:
68-
A new receiver for new dispatches.
69-
"""
70-
return self._updated_channel.new_receiver()
71-
72-
def ready_dispatches(self) -> Receiver[Dispatch]:
73-
"""Return ready dispatches receiver.
74-
75-
Returns:
76-
A new receiver for ready dispatches.
77-
"""
78-
return self._ready_channel.new_receiver()
6+
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
7+
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
8+
9+
__all__ = [
10+
"Created",
11+
"Deleted",
12+
"DispatchEvent",
13+
"Dispatcher",
14+
"ReceiverFetcher",
15+
"Updated",
16+
]

src/frequenz/dispatch/_dispatcher.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""A highlevel interface for the dispatch API."""
5+
6+
import abc
7+
from typing import Protocol, TypeVar
8+
9+
import grpc.aio
10+
from frequenz.channels import Broadcast, Receiver
11+
from frequenz.client.dispatch.types import Dispatch
12+
13+
from frequenz.dispatch._event import DispatchEvent
14+
from frequenz.dispatch.actor import DispatchingActor
15+
16+
ReceivedT = TypeVar("ReceivedT")
17+
"""The type being received."""
18+
19+
20+
class ReceiverFetcher(Protocol[ReceivedT]):
21+
"""An interface that just exposes a `new_receiver` method."""
22+
23+
@abc.abstractmethod
24+
def new_receiver(
25+
self, name: str | None = None, maxsize: int = 50
26+
) -> Receiver[ReceivedT]:
27+
"""Get a receiver from the channel.
28+
29+
Args:
30+
name: A name to identify the receiver in the logs.
31+
maxsize: The maximum size of the receiver.
32+
33+
Returns:
34+
A receiver instance.
35+
"""
36+
37+
38+
class Dispatcher:
39+
"""A highlevel interface for the dispatch API.
40+
41+
This class provides a highlevel interface to the dispatch API.
42+
It provides two channels:
43+
44+
One that sends a dispatch event message whenever a dispatch is created, updated or deleted.
45+
46+
The other sends a dispatch message whenever a dispatch is ready to be
47+
executed according to the schedule.
48+
49+
allows to receive new dispatches and ready dispatches.
50+
51+
Example: Processing ready-to-execute dispatches
52+
```python
53+
import grpc.aio
54+
55+
async def run():
56+
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
57+
microgrid_id = 1
58+
service_address = "localhost:50051"
59+
60+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
61+
dispatcher.start() # this will start the actor
62+
63+
ready_receiver = dispatcher.ready_to_execute.new_receiver()
64+
65+
async for dispatch in ready_receiver:
66+
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
67+
# execute the dispatch
68+
```
69+
70+
Example: Getting notification about dispatch lifecycle events
71+
```python
72+
from typing import assert_never
73+
74+
import grpc.aio
75+
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated
76+
77+
78+
async def run():
79+
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
80+
microgrid_id = 1
81+
service_address = "localhost:50051"
82+
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
83+
dispatcher.start() # this will start the actor
84+
85+
events_receiver = dispatcher.lifecycle_events.new_receiver()
86+
87+
async for event in events_receiver:
88+
match event:
89+
case Created(dispatch):
90+
print(f"A dispatch was created: {dispatch}")
91+
case Deleted(dispatch):
92+
print(f"A dispatch was deleted: {dispatch}")
93+
case Updated(dispatch):
94+
print(f"A dispatch was updated: {dispatch}")
95+
case _ as unhandled:
96+
assert_never(unhandled)
97+
```
98+
"""
99+
100+
def __init__(
101+
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
102+
):
103+
"""Initialize the dispatcher.
104+
105+
Args:
106+
microgrid_id: The microgrid id.
107+
grpc_channel: The gRPC channel.
108+
svc_addr: The service address.
109+
"""
110+
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
111+
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
112+
self._actor = DispatchingActor(
113+
microgrid_id,
114+
grpc_channel,
115+
svc_addr,
116+
self._updated_channel.new_sender(),
117+
self._ready_channel.new_sender(),
118+
)
119+
120+
async def start(self) -> None:
121+
"""Start the actor."""
122+
self._actor.start()
123+
124+
@property
125+
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
126+
"""Return new, updated or deleted dispatches receiver.
127+
128+
Returns:
129+
A new receiver for new dispatches.
130+
"""
131+
return self._updated_channel
132+
133+
@property
134+
def ready_to_execute(self) -> ReceiverFetcher[Dispatch]:
135+
"""Return ready dispatches receiver.
136+
137+
Returns:
138+
A new receiver for ready dispatches.
139+
"""
140+
return self._ready_channel

src/frequenz/dispatch/_event.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# License: MIT
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Dispatch lifecycle events."""
5+
6+
from dataclasses import dataclass
7+
8+
from frequenz.client.dispatch.types import Dispatch
9+
10+
11+
@dataclass(frozen=True)
12+
class Created:
13+
"""A dispatch created event."""
14+
15+
dispatch: Dispatch
16+
"""The dispatch that was created."""
17+
18+
19+
@dataclass(frozen=True)
20+
class Updated:
21+
"""A dispatch updated event."""
22+
23+
dispatch: Dispatch
24+
"""The dispatch that was updated."""
25+
26+
27+
@dataclass(frozen=True)
28+
class Deleted:
29+
"""A dispatch deleted event."""
30+
31+
dispatch: Dispatch
32+
"""The dispatch that was deleted."""
33+
34+
35+
DispatchEvent = Created | Updated | Deleted
36+
"""Type that is sent over the channel for dispatch updates.
37+
38+
This type is used to send dispatches that were created, updated or deleted
39+
over the channel.
40+
"""

src/frequenz/dispatch/actor.py

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,19 @@
55

66
import asyncio
77
import logging
8-
from dataclasses import dataclass
98
from datetime import datetime, timedelta, timezone
109
from typing import cast
1110

1211
import grpc.aio
1312
from dateutil import rrule
1413
from frequenz.channels import Sender
14+
from frequenz.channels.util import Timer
1515
from frequenz.client.dispatch import Client
1616
from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday
1717
from frequenz.sdk.actor import Actor
1818

19+
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated
20+
1921
_MAX_AHEAD_SCHEDULE = timedelta(hours=5)
2022
"""The maximum time ahead to schedule a dispatch.
2123
@@ -55,37 +57,7 @@
5557
"""To map from our Weekday enum to the dateutil library enum."""
5658

5759

58-
@dataclass(frozen=True)
59-
class _DispatchEventBase:
60-
dispatch: Dispatch
61-
"""The dispatch that this event is about.
62-
63-
Objects of this base class are sent over the channel when a dispatch is
64-
created, updated or deleted.
65-
"""
66-
67-
68-
class Created(_DispatchEventBase):
69-
"""Wraps a dispatch that was created."""
70-
71-
72-
class Updated(_DispatchEventBase):
73-
"""Wraps a dispatch that was updated."""
74-
75-
76-
class Deleted(_DispatchEventBase):
77-
"""Wraps a dispatch that was deleted."""
78-
79-
80-
DispatchEvent = Created | Updated | Deleted
81-
"""Type that is sent over the channel for dispatch updates.
82-
83-
This type is used to send dispatches that were created, updated or deleted
84-
over the channel.
85-
"""
86-
87-
88-
class DispatchActor(Actor):
60+
class DispatchingActor(Actor):
8961
"""Dispatch actor.
9062
9163
This actor is responsible for handling dispatches for a microgrid.
@@ -122,14 +94,14 @@ def __init__(
12294
self._microgrid_id = microgrid_id
12395
self._updated_dispatch_sender = updated_dispatch_sender
12496
self._ready_dispatch_sender = ready_dispatch_sender
125-
self._poll_interval = poll_interval
97+
self._poll_timer = Timer.timeout(poll_interval)
12698

12799
async def _run(self) -> None:
128100
"""Run the actor."""
101+
self._poll_timer.reset()
129102
try:
130-
while True:
103+
async for _ in self._poll_timer:
131104
await self._fetch()
132-
await asyncio.sleep(self._poll_interval.total_seconds())
133105
except asyncio.CancelledError:
134106
for task in self._scheduled.values():
135107
task.cancel()

tests/test_frequenz_dispatch.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,8 @@
1919
from frequenz.client.dispatch.types import Dispatch, Frequency
2020
from pytest import fixture
2121

22-
from frequenz.dispatch.actor import (
23-
Created,
24-
Deleted,
25-
DispatchActor,
26-
DispatchEvent,
27-
Updated,
28-
)
22+
from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated
23+
from frequenz.dispatch.actor import DispatchingActor
2924

3025

3126
# This method replaces the event loop for all tests in the file.
@@ -55,7 +50,7 @@ def _now() -> datetime:
5550
class ActorTestEnv:
5651
"""Test environment for the actor."""
5752

58-
actor: DispatchActor
53+
actor: DispatchingActor
5954
"""The actor under test."""
6055
updated_dispatches: Receiver[DispatchEvent]
6156
"""The receiver for updated dispatches."""
@@ -90,7 +85,7 @@ async def send(self, msg: T) -> None:
9085
ready_dispatches = Broadcast[Dispatch]("ready_dispatches")
9186
microgrid_id = randint(1, 100)
9287

93-
actor = DispatchActor(
88+
actor = DispatchingActor(
9489
microgrid_id=microgrid_id,
9590
grpc_channel=MagicMock(),
9691
svc_addr="localhost",
@@ -228,7 +223,7 @@ async def test_dispatch_schedule(
228223
await actor_env.client.create(**to_create_params(sample))
229224
dispatch = actor_env.client.dispatches[0]
230225

231-
next_run = DispatchActor.calculate_next_run(dispatch, _now())
226+
next_run = DispatchingActor.calculate_next_run(dispatch, _now())
232227
assert next_run is not None
233228

234229
fake_time.shift(next_run - _now() - timedelta(seconds=1))

0 commit comments

Comments
 (0)