Skip to content

Commit

Permalink
Implement simple support for delegates with `colour.utilities.Delegat…
Browse files Browse the repository at this point in the history
…e` class.
  • Loading branch information
KelSolaar committed Dec 31, 2024
1 parent 8aec135 commit 9925c3e
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 1 deletion.
4 changes: 4 additions & 0 deletions colour/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,12 @@
index_along_last_axis,
format_array_as_row,
)
from .delegate import Delegate
from .metrics import metric_mse, metric_psnr
from .network import (
TreeNode,
Port,
notify_process_state,
PortNode,
PortGraph,
ExecutionPort,
Expand Down Expand Up @@ -278,13 +280,15 @@
"index_along_last_axis",
"format_array_as_row",
]
__all__ += ["Delegate"]
__all__ += [
"metric_mse",
"metric_psnr",
]
__all__ += [
"TreeNode",
"Port",
"notify_process_state",
"PortNode",
"PortGraph",
"ExecutionPort",
Expand Down
73 changes: 73 additions & 0 deletions colour/utilities/delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
Delegate - Event Notifications
==============================
Define a delegate class for event notifications:
- :class:`colour.utilities.Delegate`
"""

from __future__ import annotations

import typing

if typing.TYPE_CHECKING:
from colour.hints import Any, Callable, List

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "[email protected]"
__status__ = "Production"

__all__ = ["Delegate"]


class Delegate:
"""
Define a delegate allowing listeners to register and be notified of events.
Methods
-------
- :meth:`~colour.utilities.Delegate.add_listener`
- :meth:`~colour.utilities.Delegate.remove_listener`
- :meth:`~colour.utilities.Delegate.notify`
"""

def __init__(self) -> None:
self._listeners: List = []

def add_listener(self, listener: Callable) -> None:
"""
Add the given listener to the delegate.
Parameters
----------
listener
Callable listening to the delegate notifications.
"""

if listener not in self._listeners:
self._listeners.append(listener)

def remove_listener(self, listener: Callable) -> None:
"""
Remove the given listener from the delegate.
Parameters
----------
listener
Callable listening to the delegate notifications.
"""

if listener in self._listeners:
self._listeners.remove(listener)

def notify(self, *args: Any, **kwargs: Any) -> None:
"""
Notify the delegate listeners.
"""

for listener in self._listeners:
listener(*args, **kwargs)
56 changes: 55 additions & 1 deletion colour/utilities/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import atexit
import concurrent.futures
import functools
import multiprocessing
import os
import threading
Expand All @@ -38,6 +39,7 @@
if typing.TYPE_CHECKING:
from colour.hints import (
Any,
Callable,
Dict,
Generator,
List,
Expand All @@ -47,7 +49,7 @@
Type,
)

from colour.utilities import MixinLogging, attest, optional, required
from colour.utilities import Delegate, MixinLogging, attest, optional, required

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
Expand All @@ -59,6 +61,7 @@
__all__ = [
"TreeNode",
"Port",
"notify_process_state",
"PortNode",
"ControlFlowNode",
"PortGraph",
Expand Down Expand Up @@ -926,6 +929,33 @@ def to_graphviz(self) -> str:
return f"<{self._name}> {self.name}"


def notify_process_state(function: Callable) -> Any:
"""
Define a decorator to notify about the process state.
Parameters
----------
function
Function to decorate.
"""

@functools.wraps(function)
def wrapper(*args: Any, **kwargs: Any) -> Any:
self = next(iter(args)) if args else None

if self is not None and hasattr(self, "on_process_start"):
self.on_process_start.notify(self)

result = function(*args, **kwargs)

if self is not None and hasattr(self, "on_process_end"):
self.on_process_end.notify(self)

return result

return wrapper


class PortNode(TreeNode, MixinLogging):
"""
Define a node with support for input and output ports.
Expand Down Expand Up @@ -959,6 +989,13 @@ class PortNode(TreeNode, MixinLogging):
- :meth:`~colour.utilities.PortNode.process`
- :meth:`~colour.utilities.PortNode.to_graphviz`
Delegates
---------
- :attr:`~colour.utilities.PortNode.on_connected`
- :attr:`~colour.utilities.PortNode.on_disconnected`
- :attr:`~colour.utilities.PortNode.on_process_start`
- :attr:`~colour.utilities.PortNode.on_process_end`
Examples
--------
>>> class NodeAdd(PortNode):
Expand All @@ -971,6 +1008,7 @@ class PortNode(TreeNode, MixinLogging):
... self.add_input_port("b")
... self.add_output_port("output")
...
... @notify_process_state
... def process(self):
... a = self.get_input("a")
... b = self.get_input("b")
Expand All @@ -997,6 +1035,11 @@ def __init__(self, name: str | None = None, description: str = "") -> None:
self._output_ports = {}
self._dirty = True

self.on_connected: Delegate = Delegate()
self.on_disconnected: Delegate = Delegate()
self.on_process_start: Delegate = Delegate()
self.on_process_end: Delegate = Delegate()

@property
def input_ports(self) -> Dict[str, Port]:
"""
Expand Down Expand Up @@ -1435,6 +1478,8 @@ def connect(

port_source.connect(port_target)

self.on_connected.notify((self, source_port, target_node, target_port))

def disconnect(
self,
source_port: str,
Expand Down Expand Up @@ -1480,6 +1525,9 @@ def disconnect(

port_source.disconnect(port_target)

self.on_disconnected.notify((self, source_port, target_node, target_port))

@notify_process_state
def process(self) -> None:
"""
Process the node, must be reimplemented by sub-classes.
Expand All @@ -1501,6 +1549,7 @@ def process(self) -> None:
... self.add_input_port("b")
... self.add_output_port("output")
...
... @notify_process_state
... def process(self):
... a = self.get_input("a")
... b = self.get_input("b")
Expand Down Expand Up @@ -1587,6 +1636,7 @@ class PortGraph(PortNode):
... self.add_input_port("b")
... self.add_output_port("output")
...
... @notify_process_state
... def process(self):
... a = self.get_input("a")
... b = self.get_input("b")
Expand Down Expand Up @@ -1848,6 +1898,7 @@ def walk_ports(self) -> Generator:

raise error # noqa: TRY201

@notify_process_state
def process(self, **kwargs: Dict) -> None:
"""
Process the node-graph by walking it and calling the
Expand Down Expand Up @@ -2074,6 +2125,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_output_port("element", None, "Current element of the array")
self.add_output_port("loop_output", None, "Port for loop Output", ExecutionPort)

@notify_process_state
def process(self) -> None:
"""
Process the ``for`` loop node.
Expand Down Expand Up @@ -2240,6 +2292,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_output_port("results", [], "Results from the parallel loop")
self.add_output_port("loop_output", None, "Port for loop output", ExecutionPort)

@notify_process_state
def process(self) -> None:
"""
Process the ``for`` loop node.
Expand Down Expand Up @@ -2411,6 +2464,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.add_output_port("results", [], "Results from the parallel loop")
self.add_output_port("loop_output", None, "Port for loop output", ExecutionPort)

@notify_process_state
def process(self) -> None:
"""
Process the ``for`` loop node.
Expand Down
57 changes: 57 additions & 0 deletions colour/utilities/tests/test_delegate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Define the unit tests for the :mod:`colour.utilities.delegate` module."""

from __future__ import annotations

from colour.utilities import (
Delegate,
)

__author__ = "Colour Developers"
__copyright__ = "Copyright 2013 Colour Developers"
__license__ = "BSD-3-Clause - https://opensource.org/licenses/BSD-3-Clause"
__maintainer__ = "Colour Developers"
__email__ = "[email protected]"
__status__ = "Production"

__all__ = [
"TestDelegate",
]


class TestDelegate:
"""
Define :class:`colour.utilities.structures.Delegate` class unit tests
methods.
"""

def test_required_methods(self) -> None:
"""Test the presence of required methods."""

required_methods = ("add_listener", "remove_listener", "notify")

for method in required_methods:
assert method in dir(Delegate)

def test_Delegate(self) -> None:
"""Test the :class:`colour.utilities.structures.Delegate` class."""

delegate = Delegate()

data = []

def _listener(a: int) -> None:
"""Define a unit tests listener."""

data.append(a)

delegate.add_listener(_listener)

delegate.notify("Foo")

assert data == ["Foo"]

delegate.remove_listener(_listener)

delegate.notify("Bar")

assert data == ["Foo"]
Loading

0 comments on commit 9925c3e

Please sign in to comment.