-
Notifications
You must be signed in to change notification settings - Fork 5
Improve project structure #15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
e05790d
Move `Dispatcher` to its own module
llucax 2100294
Expose all public symbols
llucax faf43ad
Remove the `_DispatchEventBase` class
llucax e3798e9
Move dispatch events to their own module
llucax 8077a82
Fix `Dispatcher` example in docstring
llucax 4b62a78
Make `Dispatcher` return receiver fetchers
llucax d3b7932
Rename `DispatchActor` to `DispatchingActor`
llucax dfb8829
Use a timer for polling.
llucax 85a7ff3
Rename `updated_dispatches` and `ready_dispatches`
llucax ea1db3c
Improve `Dispatcher` examples
llucax File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
# License: MIT | ||
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""A highlevel interface for the dispatch API.""" | ||
|
||
import abc | ||
from typing import Protocol, TypeVar | ||
|
||
import grpc.aio | ||
from frequenz.channels import Broadcast, Receiver | ||
from frequenz.client.dispatch.types import Dispatch | ||
|
||
from frequenz.dispatch._event import DispatchEvent | ||
from frequenz.dispatch.actor import DispatchingActor | ||
|
||
ReceivedT = TypeVar("ReceivedT") | ||
"""The type being received.""" | ||
|
||
|
||
class ReceiverFetcher(Protocol[ReceivedT]): | ||
"""An interface that just exposes a `new_receiver` method.""" | ||
|
||
@abc.abstractmethod | ||
def new_receiver( | ||
self, name: str | None = None, maxsize: int = 50 | ||
) -> Receiver[ReceivedT]: | ||
"""Get a receiver from the channel. | ||
|
||
Args: | ||
name: A name to identify the receiver in the logs. | ||
maxsize: The maximum size of the receiver. | ||
|
||
Returns: | ||
A receiver instance. | ||
""" | ||
|
||
|
||
class Dispatcher: | ||
"""A highlevel interface for the dispatch API. | ||
|
||
This class provides a highlevel interface to the dispatch API. | ||
It provides two channels: | ||
|
||
One that sends a dispatch event message whenever a dispatch is created, updated or deleted. | ||
|
||
The other sends a dispatch message whenever a dispatch is ready to be | ||
executed according to the schedule. | ||
|
||
allows to receive new dispatches and ready dispatches. | ||
|
||
Example: Processing ready-to-execute dispatches | ||
```python | ||
import grpc.aio | ||
|
||
async def run(): | ||
grpc_channel = grpc.aio.insecure_channel("localhost:50051") | ||
microgrid_id = 1 | ||
service_address = "localhost:50051" | ||
|
||
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) | ||
dispatcher.start() # this will start the actor | ||
|
||
ready_receiver = dispatcher.ready_to_execute.new_receiver() | ||
|
||
async for dispatch in ready_receiver: | ||
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") | ||
# execute the dispatch | ||
``` | ||
|
||
Example: Getting notification about dispatch lifecycle events | ||
```python | ||
from typing import assert_never | ||
|
||
import grpc.aio | ||
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated | ||
|
||
|
||
async def run(): | ||
grpc_channel = grpc.aio.insecure_channel("localhost:50051") | ||
microgrid_id = 1 | ||
service_address = "localhost:50051" | ||
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address) | ||
dispatcher.start() # this will start the actor | ||
|
||
events_receiver = dispatcher.lifecycle_events.new_receiver() | ||
|
||
async for event in events_receiver: | ||
match event: | ||
case Created(dispatch): | ||
print(f"A dispatch was created: {dispatch}") | ||
case Deleted(dispatch): | ||
print(f"A dispatch was deleted: {dispatch}") | ||
case Updated(dispatch): | ||
print(f"A dispatch was updated: {dispatch}") | ||
case _ as unhandled: | ||
assert_never(unhandled) | ||
``` | ||
""" | ||
|
||
def __init__( | ||
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str | ||
): | ||
"""Initialize the dispatcher. | ||
|
||
Args: | ||
microgrid_id: The microgrid id. | ||
grpc_channel: The gRPC channel. | ||
svc_addr: The service address. | ||
""" | ||
self._ready_channel = Broadcast[Dispatch]("ready_dispatches") | ||
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches") | ||
self._actor = DispatchingActor( | ||
microgrid_id, | ||
grpc_channel, | ||
svc_addr, | ||
self._updated_channel.new_sender(), | ||
self._ready_channel.new_sender(), | ||
) | ||
|
||
async def start(self) -> None: | ||
"""Start the actor.""" | ||
self._actor.start() | ||
|
||
@property | ||
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]: | ||
"""Return new, updated or deleted dispatches receiver. | ||
|
||
Returns: | ||
A new receiver for new dispatches. | ||
""" | ||
return self._updated_channel | ||
|
||
@property | ||
def ready_to_execute(self) -> ReceiverFetcher[Dispatch]: | ||
"""Return ready dispatches receiver. | ||
|
||
Returns: | ||
A new receiver for ready dispatches. | ||
""" | ||
return self._ready_channel |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# License: MIT | ||
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH | ||
|
||
"""Dispatch lifecycle events.""" | ||
|
||
from dataclasses import dataclass | ||
|
||
from frequenz.client.dispatch.types import Dispatch | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Created: | ||
"""A dispatch created event.""" | ||
|
||
dispatch: Dispatch | ||
"""The dispatch that was created.""" | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Updated: | ||
"""A dispatch updated event.""" | ||
|
||
dispatch: Dispatch | ||
"""The dispatch that was updated.""" | ||
|
||
|
||
@dataclass(frozen=True) | ||
class Deleted: | ||
"""A dispatch deleted event.""" | ||
|
||
dispatch: Dispatch | ||
"""The dispatch that was deleted.""" | ||
|
||
|
||
DispatchEvent = Created | Updated | Deleted | ||
"""Type that is sent over the channel for dispatch updates. | ||
|
||
This type is used to send dispatches that were created, updated or deleted | ||
over the channel. | ||
""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes me think we should also expose the client as
dispatcher.client
so the user can react to the events in a meaningful way when they want to update or create a new event as a responseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get this. Why do you need the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, anyone who wants to update, delete or create a request needs the client to do so, the high-level API
doesn't offer those things (and I don't see how it could do that better than the client)