Skip to content

Improve project structure #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 11 additions & 73 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,14 @@

"""A highlevel interface for the dispatch API."""

import grpc.aio
from frequenz.channels import Broadcast, Receiver
from frequenz.client.dispatch.types import Dispatch

from frequenz.dispatch.actor import DispatchActor, DispatchEvent

__all__ = ["Dispatcher"]


class Dispatcher:
"""A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API.
It provides two channels:

One that sends a dispatch event message whenever a dispatch is created, updated or deleted.

The other sends a dispatch message whenever a dispatch is ready to be
executed according to the schedule.

allows to receive new dispatches and ready dispatches.

Example:
```python
from frequenz.dispatch import Dispatcher

async def run():
dispatcher = Dispatcher(API_CONNECTION_INFO)
dispatcher.start() # this will start the actor
dispatch_arrived = dispatcher.new_dispatches().new_receiver()
dispatch_ready = dispatcher.ready_dispatches().new_receiver()
```
"""

def __init__(
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
):
"""Initialize the dispatcher.

Args:
microgrid_id: The microgrid id.
grpc_channel: The gRPC channel.
svc_addr: The service address.
"""
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
self._actor = DispatchActor(
microgrid_id,
grpc_channel,
svc_addr,
self._updated_channel.new_sender(),
self._ready_channel.new_sender(),
)

async def start(self) -> None:
"""Start the actor."""
self._actor.start()

def updated_dispatches(self) -> Receiver[DispatchEvent]:
"""Return new, updated or deleted dispatches receiver.

Returns:
A new receiver for new dispatches.
"""
return self._updated_channel.new_receiver()

def ready_dispatches(self) -> Receiver[Dispatch]:
"""Return ready dispatches receiver.

Returns:
A new receiver for ready dispatches.
"""
return self._ready_channel.new_receiver()
from frequenz.dispatch._dispatcher import Dispatcher, ReceiverFetcher
from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated

__all__ = [
"Created",
"Deleted",
"DispatchEvent",
"Dispatcher",
"ReceiverFetcher",
"Updated",
]
140 changes: 140 additions & 0 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""A highlevel interface for the dispatch API."""

import abc
from typing import Protocol, TypeVar

import grpc.aio
from frequenz.channels import Broadcast, Receiver
from frequenz.client.dispatch.types import Dispatch

from frequenz.dispatch._event import DispatchEvent
from frequenz.dispatch.actor import DispatchingActor

ReceivedT = TypeVar("ReceivedT")
"""The type being received."""


class ReceiverFetcher(Protocol[ReceivedT]):
"""An interface that just exposes a `new_receiver` method."""

@abc.abstractmethod
def new_receiver(
self, name: str | None = None, maxsize: int = 50
) -> Receiver[ReceivedT]:
"""Get a receiver from the channel.

Args:
name: A name to identify the receiver in the logs.
maxsize: The maximum size of the receiver.

Returns:
A receiver instance.
"""


class Dispatcher:
"""A highlevel interface for the dispatch API.

This class provides a highlevel interface to the dispatch API.
It provides two channels:

One that sends a dispatch event message whenever a dispatch is created, updated or deleted.

The other sends a dispatch message whenever a dispatch is ready to be
executed according to the schedule.

allows to receive new dispatches and ready dispatches.

Example: Processing ready-to-execute dispatches
```python
import grpc.aio

async def run():
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
microgrid_id = 1
service_address = "localhost:50051"

dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
dispatcher.start() # this will start the actor

ready_receiver = dispatcher.ready_to_execute.new_receiver()

async for dispatch in ready_receiver:
print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}")
# execute the dispatch
```

Example: Getting notification about dispatch lifecycle events
```python
from typing import assert_never

import grpc.aio
from frequenz.dispatch import Created, Deleted, Dispatcher, Updated


async def run():
grpc_channel = grpc.aio.insecure_channel("localhost:50051")
microgrid_id = 1
service_address = "localhost:50051"
dispatcher = Dispatcher(microgrid_id, grpc_channel, service_address)
dispatcher.start() # this will start the actor

events_receiver = dispatcher.lifecycle_events.new_receiver()

async for event in events_receiver:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think we should also expose the client as dispatcher.client so the user can react to the events in a meaningful way when they want to update or create a new event as a response

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this. Why do you need the client?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, anyone who wants to update, delete or create a request needs the client to do so, the high-level API
doesn't offer those things (and I don't see how it could do that better than the client)

match event:
case Created(dispatch):
print(f"A dispatch was created: {dispatch}")
case Deleted(dispatch):
print(f"A dispatch was deleted: {dispatch}")
case Updated(dispatch):
print(f"A dispatch was updated: {dispatch}")
case _ as unhandled:
assert_never(unhandled)
```
"""

def __init__(
self, microgrid_id: int, grpc_channel: grpc.aio.Channel, svc_addr: str
):
"""Initialize the dispatcher.

Args:
microgrid_id: The microgrid id.
grpc_channel: The gRPC channel.
svc_addr: The service address.
"""
self._ready_channel = Broadcast[Dispatch]("ready_dispatches")
self._updated_channel = Broadcast[DispatchEvent]("new_dispatches")
self._actor = DispatchingActor(
microgrid_id,
grpc_channel,
svc_addr,
self._updated_channel.new_sender(),
self._ready_channel.new_sender(),
)

async def start(self) -> None:
"""Start the actor."""
self._actor.start()

@property
def lifecycle_events(self) -> ReceiverFetcher[DispatchEvent]:
"""Return new, updated or deleted dispatches receiver.

Returns:
A new receiver for new dispatches.
"""
return self._updated_channel

@property
def ready_to_execute(self) -> ReceiverFetcher[Dispatch]:
"""Return ready dispatches receiver.

Returns:
A new receiver for ready dispatches.
"""
return self._ready_channel
40 changes: 40 additions & 0 deletions src/frequenz/dispatch/_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Dispatch lifecycle events."""

from dataclasses import dataclass

from frequenz.client.dispatch.types import Dispatch


@dataclass(frozen=True)
class Created:
"""A dispatch created event."""

dispatch: Dispatch
"""The dispatch that was created."""


@dataclass(frozen=True)
class Updated:
"""A dispatch updated event."""

dispatch: Dispatch
"""The dispatch that was updated."""


@dataclass(frozen=True)
class Deleted:
"""A dispatch deleted event."""

dispatch: Dispatch
"""The dispatch that was deleted."""


DispatchEvent = Created | Updated | Deleted
"""Type that is sent over the channel for dispatch updates.

This type is used to send dispatches that were created, updated or deleted
over the channel.
"""
42 changes: 7 additions & 35 deletions src/frequenz/dispatch/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@

import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import cast

import grpc.aio
from dateutil import rrule
from frequenz.channels import Sender
from frequenz.channels.util import Timer
from frequenz.client.dispatch import Client
from frequenz.client.dispatch.types import Dispatch, Frequency, Weekday
from frequenz.sdk.actor import Actor

from frequenz.dispatch._event import Created, Deleted, DispatchEvent, Updated

_MAX_AHEAD_SCHEDULE = timedelta(hours=5)
"""The maximum time ahead to schedule a dispatch.

Expand Down Expand Up @@ -55,37 +57,7 @@
"""To map from our Weekday enum to the dateutil library enum."""


@dataclass(frozen=True)
class _DispatchEventBase:
dispatch: Dispatch
"""The dispatch that this event is about.

Objects of this base class are sent over the channel when a dispatch is
created, updated or deleted.
"""


class Created(_DispatchEventBase):
"""Wraps a dispatch that was created."""


class Updated(_DispatchEventBase):
"""Wraps a dispatch that was updated."""


class Deleted(_DispatchEventBase):
"""Wraps a dispatch that was deleted."""


DispatchEvent = Created | Updated | Deleted
"""Type that is sent over the channel for dispatch updates.

This type is used to send dispatches that were created, updated or deleted
over the channel.
"""


class DispatchActor(Actor):
class DispatchingActor(Actor):
"""Dispatch actor.

This actor is responsible for handling dispatches for a microgrid.
Expand Down Expand Up @@ -122,14 +94,14 @@ def __init__(
self._microgrid_id = microgrid_id
self._updated_dispatch_sender = updated_dispatch_sender
self._ready_dispatch_sender = ready_dispatch_sender
self._poll_interval = poll_interval
self._poll_timer = Timer.timeout(poll_interval)

async def _run(self) -> None:
"""Run the actor."""
self._poll_timer.reset()
try:
while True:
async for _ in self._poll_timer:
await self._fetch()
await asyncio.sleep(self._poll_interval.total_seconds())
except asyncio.CancelledError:
for task in self._scheduled.values():
task.cancel()
Expand Down
15 changes: 5 additions & 10 deletions tests/test_frequenz_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@
from frequenz.client.dispatch.types import Dispatch, Frequency
from pytest import fixture

from frequenz.dispatch.actor import (
Created,
Deleted,
DispatchActor,
DispatchEvent,
Updated,
)
from frequenz.dispatch import Created, Deleted, DispatchEvent, Updated
from frequenz.dispatch.actor import DispatchingActor


# This method replaces the event loop for all tests in the file.
Expand Down Expand Up @@ -55,7 +50,7 @@ def _now() -> datetime:
class ActorTestEnv:
"""Test environment for the actor."""

actor: DispatchActor
actor: DispatchingActor
"""The actor under test."""
updated_dispatches: Receiver[DispatchEvent]
"""The receiver for updated dispatches."""
Expand Down Expand Up @@ -90,7 +85,7 @@ async def send(self, msg: T) -> None:
ready_dispatches = Broadcast[Dispatch]("ready_dispatches")
microgrid_id = randint(1, 100)

actor = DispatchActor(
actor = DispatchingActor(
microgrid_id=microgrid_id,
grpc_channel=MagicMock(),
svc_addr="localhost",
Expand Down Expand Up @@ -228,7 +223,7 @@ async def test_dispatch_schedule(
await actor_env.client.create(**to_create_params(sample))
dispatch = actor_env.client.dispatches[0]

next_run = DispatchActor.calculate_next_run(dispatch, _now())
next_run = DispatchingActor.calculate_next_run(dispatch, _now())
assert next_run is not None

fake_time.shift(next_run - _now() - timedelta(seconds=1))
Expand Down